diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 680aea7ce57a1..344f280d337c1 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -273,9 +273,12 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe } indexInfo := &indexpb.IndexInfo{} - s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() - }), false, indexes[0].CreateTime) + // The total rows of all indexes should be based on the current perspective + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + }) + + s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) ret.State = indexInfo.State ret.FailReason = indexInfo.IndexStateFailReason @@ -325,35 +328,38 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme return ret, nil } -func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 { +func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 { unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]() - for _, seg := range segments { - segIdx, ok := seg.segmentIndexes[indexInfo.IndexID] + for segID, seg := range segments { + if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing { + continue + } + segIdx, ok := seg.indexStates[indexInfo.IndexID] if !ok { - unIndexed.Insert(seg.GetID()) + unIndexed.Insert(segID) continue } - switch segIdx.IndexState { + switch segIdx.GetState() { case commonpb.IndexState_Finished: - indexed.Insert(seg.GetID()) + indexed.Insert(segID) default: - unIndexed.Insert(seg.GetID()) + unIndexed.Insert(segID) } } retrieveContinue := len(unIndexed) != 0 for retrieveContinue { for segID := range unIndexed { unIndexed.Remove(segID) - segment := s.meta.GetSegment(segID) - if segment == nil || len(segment.CompactionFrom) == 0 { + segment := segments[segID] + if segment == nil || len(segment.compactionFrom) == 0 { continue } - for _, fromID := range segment.CompactionFrom { - fromSeg := s.meta.GetSegment(fromID) + for _, fromID := range segment.compactionFrom { + fromSeg := segments[fromID] if fromSeg == nil { continue } - if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished { + if segIndex, ok := fromSeg.indexStates[indexInfo.IndexID]; ok && segIndex.GetState() == commonpb.IndexState_Finished { indexed.Insert(fromID) continue } @@ -364,9 +370,9 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm } indexedRows := int64(0) for segID := range indexed { - segment := s.meta.GetSegment(segID) + segment := segments[segID] if segment != nil { - indexedRows += segment.GetNumOfRows() + indexedRows += segment.numRows } } return indexedRows @@ -375,7 +381,7 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm // completeIndexInfo get the index row count and index task state // if realTime, calculate current statistics // if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time -func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool, ts Timestamp) { +func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments map[int64]*indexStats, realTime bool, ts Timestamp) { var ( cntNone = 0 cntUnissued = 0 @@ -388,31 +394,34 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In pendingIndexRows = int64(0) ) - for _, seg := range segments { - totalRows += seg.NumOfRows - segIdx, ok := seg.segmentIndexes[index.IndexID] + for segID, seg := range segments { + if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing { + continue + } + totalRows += seg.numRows + segIdx, ok := seg.indexStates[index.IndexID] if !ok { - if seg.GetLastExpireTime() <= ts { + if seg.lastExpireTime <= ts { cntUnissued++ } - pendingIndexRows += seg.GetNumOfRows() + pendingIndexRows += seg.numRows continue } - if segIdx.IndexState != commonpb.IndexState_Finished { - pendingIndexRows += seg.GetNumOfRows() + if segIdx.GetState() != commonpb.IndexState_Finished { + pendingIndexRows += seg.numRows } // if realTime, calculate current statistics // if not realTime, skip segments created after index create - if !realTime && seg.GetLastExpireTime() > ts { + if !realTime && seg.lastExpireTime > ts { continue } - switch segIdx.IndexState { + switch segIdx.GetState() { case commonpb.IndexState_IndexStateNone: // can't to here - log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segIdx.SegmentID)) + log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segID)) cntNone++ case commonpb.IndexState_Unissued: cntUnissued++ @@ -420,10 +429,10 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In cntInProgress++ case commonpb.IndexState_Finished: cntFinished++ - indexedRows += seg.NumOfRows + indexedRows += seg.numRows case commonpb.IndexState_Failed: cntFailed++ - failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason) + failReason += fmt.Sprintf("%d: %s;", segID, segIdx.FailReason) } } @@ -491,9 +500,13 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde PendingIndexRows: 0, State: 0, } - s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() - }), false, indexes[0].CreateTime) + + // The total rows of all indexes should be based on the current perspective + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + }) + + s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()), zap.String("indexName", req.GetIndexName())) return &indexpb.GetIndexBuildProgressResponse{ @@ -504,6 +517,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde }, nil } +// indexStats just for indexing statistics. +// Please use it judiciously. +type indexStats struct { + ID int64 + numRows int64 + compactionFrom []int64 + indexStates map[int64]*indexpb.SegmentIndexState + state commonpb.SegmentState + lastExpireTime uint64 +} + // DescribeIndex describe the index info of the collection. func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { log := log.Ctx(ctx).With( @@ -531,9 +555,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe } // The total rows of all indexes should be based on the current perspective - segments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) + indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { indexInfo := &indexpb.IndexInfo{ @@ -589,9 +614,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt } // The total rows of all indexes should be based on the current perspective - segments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) + indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { indexInfo := &indexpb.IndexInfo{ diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 194fab8c715ad..b730d9d1310f0 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -996,7 +996,7 @@ func TestServer_DescribeIndex(t *testing.T) { }, segID - 1: { SegmentInfo: &datapb.SegmentInfo{ - ID: segID, + ID: segID - 1, CollectionID: collID, PartitionID: partID, NumOfRows: 10000, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e45286b182d29..201409feafd16 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" @@ -918,6 +919,33 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { return ret } +func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats { + m.RLock() + defer m.RUnlock() + ret := make(map[int64]*indexStats) + for _, info := range m.segments.segments { + if selector(info) { + s := &indexStats{ + ID: info.GetID(), + numRows: info.GetNumOfRows(), + compactionFrom: info.GetCompactionFrom(), + indexStates: make(map[int64]*indexpb.SegmentIndexState), + state: info.GetState(), + lastExpireTime: info.GetLastExpireTime(), + } + for indexID, segIndex := range info.segmentIndexes { + s.indexStates[indexID] = &indexpb.SegmentIndexState{ + SegmentID: segIndex.SegmentID, + State: segIndex.IndexState, + FailReason: segIndex.FailReason, + } + } + ret[info.GetID()] = s + } + } + return ret +} + // AddAllocation add allocation in segment func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { log.Debug("meta update: add allocation",