Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Optimize DescribeIndex to reduce lock contention #30975

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 63 additions & 37 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,12 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
}

indexInfo := &indexpb.IndexInfo{}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
ret.State = indexInfo.State
ret.FailReason = indexInfo.IndexStateFailReason

Expand Down Expand Up @@ -325,35 +328,38 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}

func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 {
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 {
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
for _, seg := range segments {
segIdx, ok := seg.segmentIndexes[indexInfo.IndexID]
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
segIdx, ok := seg.indexStates[indexInfo.IndexID]
if !ok {
unIndexed.Insert(seg.GetID())
unIndexed.Insert(segID)
continue
}
switch segIdx.IndexState {
switch segIdx.GetState() {
case commonpb.IndexState_Finished:
indexed.Insert(seg.GetID())
indexed.Insert(segID)
default:
unIndexed.Insert(seg.GetID())
unIndexed.Insert(segID)
}
}
retrieveContinue := len(unIndexed) != 0
for retrieveContinue {
for segID := range unIndexed {
unIndexed.Remove(segID)
segment := s.meta.GetSegment(segID)
if segment == nil || len(segment.CompactionFrom) == 0 {
segment := segments[segID]
if segment == nil || len(segment.compactionFrom) == 0 {
continue
}
for _, fromID := range segment.CompactionFrom {
fromSeg := s.meta.GetSegment(fromID)
for _, fromID := range segment.compactionFrom {
fromSeg := segments[fromID]
if fromSeg == nil {
continue
}
if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished {
if segIndex, ok := fromSeg.indexStates[indexInfo.IndexID]; ok && segIndex.GetState() == commonpb.IndexState_Finished {
indexed.Insert(fromID)
continue
}
Expand All @@ -364,9 +370,9 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
}
indexedRows := int64(0)
for segID := range indexed {
segment := s.meta.GetSegment(segID)
segment := segments[segID]
if segment != nil {
indexedRows += segment.GetNumOfRows()
indexedRows += segment.numRows
}
}
return indexedRows
Expand All @@ -375,7 +381,7 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
// completeIndexInfo get the index row count and index task state
// if realTime, calculate current statistics
// if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool, ts Timestamp) {
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments map[int64]*indexStats, realTime bool, ts Timestamp) {
var (
cntNone = 0
cntUnissued = 0
Expand All @@ -388,42 +394,45 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
pendingIndexRows = int64(0)
)

for _, seg := range segments {
totalRows += seg.NumOfRows
segIdx, ok := seg.segmentIndexes[index.IndexID]
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
totalRows += seg.numRows
segIdx, ok := seg.indexStates[index.IndexID]

if !ok {
if seg.GetLastExpireTime() <= ts {
if seg.lastExpireTime <= ts {
cntUnissued++
}
pendingIndexRows += seg.GetNumOfRows()
pendingIndexRows += seg.numRows
continue
}
if segIdx.IndexState != commonpb.IndexState_Finished {
pendingIndexRows += seg.GetNumOfRows()
if segIdx.GetState() != commonpb.IndexState_Finished {
pendingIndexRows += seg.numRows
}

// if realTime, calculate current statistics
// if not realTime, skip segments created after index create
if !realTime && seg.GetLastExpireTime() > ts {
if !realTime && seg.lastExpireTime > ts {
continue
}

switch segIdx.IndexState {
switch segIdx.GetState() {
case commonpb.IndexState_IndexStateNone:
// can't to here
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segIdx.SegmentID))
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segID))
cntNone++
case commonpb.IndexState_Unissued:
cntUnissued++
case commonpb.IndexState_InProgress:
cntInProgress++
case commonpb.IndexState_Finished:
cntFinished++
indexedRows += seg.NumOfRows
indexedRows += seg.numRows
case commonpb.IndexState_Failed:
cntFailed++
failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason)
failReason += fmt.Sprintf("%d: %s;", segID, segIdx.FailReason)
}
}

Expand Down Expand Up @@ -491,9 +500,13 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
PendingIndexRows: 0,
State: 0,
}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
return &indexpb.GetIndexBuildProgressResponse{
Expand All @@ -504,6 +517,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}, nil
}

// indexStats just for indexing statistics.
// Please use it judiciously.
type indexStats struct {
ID int64
numRows int64
compactionFrom []int64
indexStates map[int64]*indexpb.SegmentIndexState
state commonpb.SegmentState
lastExpireTime uint64
}

// DescribeIndex describe the index info of the collection.
func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log := log.Ctx(ctx).With(
Expand Down Expand Up @@ -531,9 +555,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{
Expand Down Expand Up @@ -589,9 +614,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ func TestServer_DescribeIndex(t *testing.T) {
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
Expand Down
28 changes: 28 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -918,6 +919,33 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
return ret
}

func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]*indexStats)
for _, info := range m.segments.segments {
if selector(info) {
s := &indexStats{
ID: info.GetID(),
numRows: info.GetNumOfRows(),
compactionFrom: info.GetCompactionFrom(),
indexStates: make(map[int64]*indexpb.SegmentIndexState),
state: info.GetState(),
lastExpireTime: info.GetLastExpireTime(),
}
for indexID, segIndex := range info.segmentIndexes {
s.indexStates[indexID] = &indexpb.SegmentIndexState{
SegmentID: segIndex.SegmentID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
}
}
ret[info.GetID()] = s
}
}
return ret
}

// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Debug("meta update: add allocation",
Expand Down
Loading