Skip to content

Commit

Permalink
Introduce StreamDone in bulk loader (#4297)
Browse files Browse the repository at this point in the history
During reduce phase, we insert predicate in sorted order to Badger instances. Each predicate has one underlying stream in stream writer and each stream has one underlying table builder. Each table builder has at least 1MB of buffer with it. Previously we had no way to close a stream(hence underlying table builder was in memory until flush is called on steam writer). With stream done support in BadgerV2.0, we can closed stream once we are done writing a predicate.
  • Loading branch information
ashish-goswami authored Nov 28, 2019
1 parent 4d2897e commit ae679e6
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *reducer) run() error {
func (r *reducer) createBadger(i int) *badger.DB {
opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil)
WithLogger(nil).WithMaxCacheSize(1 << 20)
db, err := badger.OpenManaged(opt)
x.Check(err)
r.dbs = append(r.dbs, db)
Expand Down Expand Up @@ -159,19 +159,37 @@ func (r *reducer) encodeAndWrite(
streamId = atomic.AddUint32(&r.streamId, 1)
preds[pk.Attr] = streamId
}
// TODO: Having many stream ids can cause memory issues with StreamWriter. So, we
// should build a way in StreamWriter to indicate that the stream is over, so the
// table for that stream can be flushed and memory released.

kv.StreamId = streamId
}

// Once we have processed all records from single stream, we can mark that stream as done.
// This will close underlying table builder in Badger for stream. Since we preallocate 1 MB
// of memory for each table builder, this can result in memory saving in case we have large
// number of streams.
// This change limits maximum number of open streams to number of streams created in a single
// write call. This can also be optimised if required.
addDone := func(doneSteams []uint32, l *bpb.KVList) {
for _, streamId := range doneSteams {
l.Kv = append(l.Kv, &bpb.KV{StreamId: streamId, StreamDone: true})
}
}

var doneStreams []uint32
var prevSID uint32
for batch := range entryCh {
listSize += r.toList(batch, list)
if listSize > 4<<20 {
for _, kv := range list.Kv {
setStreamId(kv)
if prevSID != 0 && (prevSID != kv.StreamId) {
doneStreams = append(doneStreams, prevSID)
}
prevSID = kv.StreamId
}
addDone(doneStreams, list)
x.Check(writer.Write(list))
doneStreams = doneStreams[:0]
list = &bpb.KVList{}
listSize = 0
}
Expand Down

0 comments on commit ae679e6

Please sign in to comment.