Skip to content

Commit

Permalink
fix: Check resource when loading deltalogs (milvus-io#37195)
Browse files Browse the repository at this point in the history
Related to milvus-io#36887

`LoadDeltaLogs` API did not check memory usage. When system is under
high delete load pressure, this could result into OOM quit.

This PR add resource check for `LoadDeltaLogs` actions and separate
internal deltalog loading function with public one.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Oct 29, 2024
1 parent 7134526 commit 2ff5a27
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Loader interface {
// NOTE: make sure the ref count of the corresponding collection will never go down to 0 during this
Load(ctx context.Context, collectionID int64, segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) ([]Segment, error)

// LoadDeltaLogs load deltalog and write delta data into provided segment.
// it also executes resource protection logic in case of OOM.
LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error

// LoadBloomFilterSet loads needed statslog for RemoteSegment.
Expand Down Expand Up @@ -334,7 +336,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
}
}
if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
if err = loader.loadDeltalogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
return errors.Wrap(err, "At LoadDeltaLogs")
}

Expand Down Expand Up @@ -1158,7 +1160,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
return nil
}

func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
// loadDeltalogs performs the internal actions of `LoadDeltaLogs`
// this function does not perform resource check and is meant be used among other load APIs.
func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
defer sp.End()
log := log.Ctx(ctx).With(
Expand Down Expand Up @@ -1254,6 +1258,24 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return nil
}

// LoadDeltaLogs load deltalog and write delta data into provided segment.
// it also executes resource protection logic in case of OOM.
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID(),
CollectionID: segment.Collection(),
Deltalogs: deltaLogs,
}
// Check memory & storage limit
requestResourceResult, err := loader.requestResource(ctx, loadInfo)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return err
}
defer loader.freeRequest(requestResourceResult.Resource)
return loader.loadDeltalogs(ctx, segment, deltaLogs)
}

func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
var needReset bool

Expand Down

0 comments on commit 2ff5a27

Please sign in to comment.