Skip to content

Commit

Permalink
fix: [2.4] L0 compactor may cause DN OOM
Browse files Browse the repository at this point in the history
See also: milvus-io#33547
pr: milvus-io#33554

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jun 3, 2024
1 parent 30fd4a9 commit f6eac6d
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 373 deletions.
270 changes: 107 additions & 163 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/samber/lo"
"go.opentelemetry.io/otel"
Expand All @@ -29,19 +28,15 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -122,9 +117,6 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return nil, errContext
}

ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()

l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L0
})
Expand Down Expand Up @@ -162,13 +154,8 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
}
}

var resultSegments []*datapb.CompactionSegment

if float64(hardware.GetFreeMemoryCount())*paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() < float64(totalSize) {
resultSegments, err = t.linearProcess(ctxTimeout, targetSegIDs, totalDeltalogs)
} else {
resultSegments, err = t.batchProcess(ctxTimeout, targetSegIDs, lo.Values(totalDeltalogs)...)
}
maxConcurrency := getMaxConcurrentSegmentCount(totalSize)
resultSegments, err := t.process(ctx, maxConcurrency, targetSegIDs, lo.Values(totalDeltalogs)...)
if err != nil {
return nil, err
}
Expand All @@ -188,91 +175,71 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return result, nil
}

func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []int64, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
var (
resultSegments = make(map[int64]*datapb.CompactionSegment)
alteredSegments = make(map[int64]*storage.DeleteData)
)
for segID, deltaLogs := range totalDeltalogs {
log := log.With(zap.Int64("levelzero segment", segID))
// control the written deltalogs count within a batch, make sure that
// totalSize * COUNT < memLimit
func getMaxConcurrentSegmentCount(totalSize int64) int {
max := 1
memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
if memLimit > float64(totalSize) {
max = int(memLimit / float64(totalSize))
}

return max
}

log.Info("Linear L0 compaction start processing segment")
allIters, err := t.loadDelta(ctx, deltaLogs)
func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
allBlobs := make(map[string][]byte)
results := make([]*datapb.CompactionSegment, 0)
for segID, writer := range segmentWriters {
blob, tr, err := writer.Finish()
if err != nil {
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("L0 compaction serializeUpload serialize failed", zap.Error(err))
return nil, err
}

t.splitDelta(ctx, allIters, alteredSegments, targetSegments)

err = t.uploadByCheck(ctx, true, alteredSegments, resultSegments)
logID, err := t.allocator.AllocOne()
if err != nil {
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("L0 compaction serializeUpload alloc failed", zap.Error(err))
return nil, err
}
}

err := t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegments), zap.Error(err))
return nil, err
}
log.Info("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}

func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
log.Info("Batch L0 compaction start processing")
resultSegments := make(map[int64]*datapb.CompactionSegment)
blobKey, _ := binlog.BuildLogPath(storage.DeleteBinlog, writer.collectionID, writer.partitionID, writer.segmentID, -1, logID)

allBlobs[blobKey] = blob.GetValue()
deltalog := &datapb.Binlog{
EntriesNum: writer.GetRowNum(),
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.GetMemorySize(),
LogPath: blobKey,
LogID: logID,
TimestampFrom: tr.GetMinTimestamp(),
TimestampTo: tr.GetMaxTimestamp(),
}

iters, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
return nil, err
results = append(results, &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{deltalog}}},
Channel: t.plan.GetChannel(),
})
}

alteredSegments := make(map[int64]*storage.DeleteData)
t.splitDelta(ctx, iters, alteredSegments, targetSegments)
if len(allBlobs) == 0 {
return nil, nil
}

err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
if err := t.Upload(ctx, allBlobs); err != nil {
log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err))
return nil, err
}
log.Info("Batch L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}

func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*iter.DeltalogIterator, error) {
allIters := make([]*iter.DeltalogIterator, 0)

for _, paths := range deltaLogs {
blobs, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}

allIters = append(allIters, iter.NewDeltalogIterator(blobs, nil))
}
return allIters, nil
return results, nil
}

func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allIters []*iter.DeltalogIterator,
targetSegBuffer map[int64]*storage.DeleteData,
allDelta []*storage.DeleteData,
targetSegIDs []int64,
) {
) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

Expand All @@ -286,112 +253,89 @@ func (t *levelZeroCompactionTask) splitDelta(
}

// spilt all delete data to segments
for _, deltaIter := range allIters {
for deltaIter.HasNext() {
// checked by HasNext, no error here
labeled, _ := deltaIter.Next()

predicted := split(labeled.GetPk())
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
for _, delta := range allDelta {
for i, pk := range delta.Pks {
predicted := split(pk)

for _, gotSeg := range predicted {
delBuffer, ok := targetSegBuffer[gotSeg]
writer, ok := targetSegBuffer[gotSeg]
if !ok {
delBuffer = &storage.DeleteData{}
targetSegBuffer[gotSeg] = delBuffer
}
segment, _ := lo.Find(segments, func(seg *metacache.SegmentInfo) bool {
return seg.SegmentID() == gotSeg
})

delBuffer.Append(labeled.GetPk(), labeled.GetTimestamp())
writer = NewSegmentDeltaWriter(gotSeg, segment.PartitionID(), t.getCollection())
targetSegBuffer[gotSeg] = writer
}
writer.Write(pk, delta.Tss[i])
}
}
}

return targetSegBuffer
}

func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) {
var (
collID = t.metacache.Collection()
uploadKv = make(map[string][]byte)
)
func (t *levelZeroCompactionTask) process(ctx context.Context, maxConcurrency int, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()

seg, ok := t.metacache.GetSegmentByID(segmentID)
if !ok {
return nil, nil, merr.WrapErrSegmentLack(segmentID)
}
blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData)
if err != nil {
return nil, nil, err
}
results := make([]*datapb.CompactionSegment, 0)
iterCount := int(math.Ceil(float64(len(targetSegments)) / float64(maxConcurrency)))
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", maxConcurrency),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", iterCount),
)

logID, err := t.allocator.AllocOne()
log.Info("L0 compaction process start")
allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
return nil, nil, err
log.Warn("L0 compaction loadDelta fail", zap.Error(err))
return nil, err
}

blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID)
blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey)

uploadKv[blobPath] = blob.GetValue()

minTs := uint64(math.MaxUint64)
maxTs := uint64(0)
for _, ts := range dData.Tss {
if ts > maxTs {
maxTs = ts
for i := 0; i < iterCount; i++ {
safeRight := (i + 1) * maxConcurrency
if (i+1)*maxConcurrency > len(targetSegments) {
safeRight = len(targetSegments)
}
if ts < minTs {
minTs = ts
batchSegments := targetSegments[i*maxConcurrency : safeRight]
batchSegWriter := t.splitDelta(ctx, allDelta, batchSegments)
batchResults, err := t.serializeUpload(ctx, batchSegWriter)
if err != nil {
log.Warn("L0 compaction serialize upload fail", zap.Error(err))
return nil, err
}
}

deltalog := &datapb.Binlog{
EntriesNum: dData.RowCount,
LogSize: int64(len(blob.GetValue())),
LogPath: blobPath,
LogID: logID,
TimestampFrom: minTs,
TimestampTo: maxTs,
MemorySize: dData.Size(),
log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults)))
results = append(results, batchResults...)
}

return uploadKv, deltalog, nil
log.Info("L0 compaction process done")
return results, nil
}

func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error {
allBlobs := make(map[string][]byte)
tmpResults := make(map[int64]*datapb.CompactionSegment)
for segID, dData := range alteredSegments {
if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) {
blobs, binlog, err := t.composeDeltalog(segID, dData)
if err != nil {
log.Warn("L0 compaction composeDelta fail", zap.Int64("segmentID", segID), zap.Error(err))
return err
}
allBlobs = lo.Assign(blobs, allBlobs)
tmpResults[segID] = &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}},
Channel: t.plan.GetChannel(),
}
delete(alteredSegments, segID)
func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta")
defer span.End()
allData := make([]*storage.DeleteData, 0, len(deltaLogs))
for _, paths := range deltaLogs {
blobBytes, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}
}

if len(allBlobs) == 0 {
return nil
}

if err := t.Upload(ctx, allBlobs); err != nil {
log.Warn("L0 compaction upload blobs fail", zap.Error(err))
return err
}

for segID, compSeg := range tmpResults {
if _, ok := resultSegments[segID]; !ok {
resultSegments[segID] = compSeg
} else {
binlog := compSeg.Deltalogs[0].Binlogs[0]
resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog)
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}
}

return nil
allData = append(allData, dData)
}
return allData, nil
}
Loading

0 comments on commit f6eac6d

Please sign in to comment.