Skip to content

Commit

Permalink
fix: [2.4] Check resource when loading deltalogs (#37195) (#37263)
Browse files Browse the repository at this point in the history
Cherry pick from master
pr: #37195
Related to #36887

`LoadDeltaLogs` API did not check memory usage. When system is under
high delete load pressure, this could result into OOM quit.

This PR add resource check for `LoadDeltaLogs` actions and separate
internal deltalog loading function with public one.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 30, 2024
1 parent ce7fbb9 commit a2a51c4
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
}
}
if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
if err = loader.loadDeltalogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
return errors.Wrap(err, "At LoadDeltaLogs")
}

Expand Down Expand Up @@ -1369,7 +1369,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
return nil
}

func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
// loadDeltalogs performs the internal actions of `LoadDeltaLogs`
// this function does not perform resource check and is meant be used among other load APIs.
func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
defer sp.End()
log := log.Ctx(ctx).With(
Expand Down Expand Up @@ -1429,6 +1431,24 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return nil
}

// LoadDeltaLogs load deltalog and write delta data into provided segment.
// it also executes resource protection logic in case of OOM.
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID(),
CollectionID: segment.Collection(),
Deltalogs: deltaLogs,
}
// Check memory & storage limit
requestResourceResult, err := loader.requestResource(ctx, loadInfo)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return err
}
defer loader.freeRequest(requestResourceResult.Resource)
return loader.loadDeltalogs(ctx, segment, deltaLogs)
}

func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
var needReset bool

Expand Down

0 comments on commit a2a51c4

Please sign in to comment.