diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 1f15d98ad9f..ec750271659 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -90,7 +90,7 @@ func (r *reducer) run() error { func (r *reducer) createBadger(i int) *badger.DB { opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false). WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */). - WithLogger(nil) + WithLogger(nil).WithMaxCacheSize(1 << 20) db, err := badger.OpenManaged(opt) x.Check(err) r.dbs = append(r.dbs, db) @@ -159,19 +159,37 @@ func (r *reducer) encodeAndWrite( streamId = atomic.AddUint32(&r.streamId, 1) preds[pk.Attr] = streamId } - // TODO: Having many stream ids can cause memory issues with StreamWriter. So, we - // should build a way in StreamWriter to indicate that the stream is over, so the - // table for that stream can be flushed and memory released. + kv.StreamId = streamId } + // Once we have processed all records from single stream, we can mark that stream as done. + // This will close underlying table builder in Badger for stream. Since we preallocate 1 MB + // of memory for each table builder, this can result in memory saving in case we have large + // number of streams. + // This change limits maximum number of open streams to number of streams created in a single + // write call. This can also be optimised if required. + addDone := func(doneSteams []uint32, l *bpb.KVList) { + for _, streamId := range doneSteams { + l.Kv = append(l.Kv, &bpb.KV{StreamId: streamId, StreamDone: true}) + } + } + + var doneStreams []uint32 + var prevSID uint32 for batch := range entryCh { listSize += r.toList(batch, list) if listSize > 4<<20 { for _, kv := range list.Kv { setStreamId(kv) + if prevSID != 0 && (prevSID != kv.StreamId) { + doneStreams = append(doneStreams, prevSID) + } + prevSID = kv.StreamId } + addDone(doneStreams, list) x.Check(writer.Write(list)) + doneStreams = doneStreams[:0] list = &bpb.KVList{} listSize = 0 }