Skip to content

Commit

Permalink
enhance: add more metrics (milvus-io#31271)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan committed Mar 22, 2024
1 parent 0bf595a commit 2e67866
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 18 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ proxy:
serverMaxRecvSize: 67108864
clientMaxSendSize: 268435456
clientMaxRecvSize: 67108864
# query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds.
slowQuerySpanInSeconds: 5

# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
queryCoord:
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/milvus-io/milvus

go 1.18
go 1.21

toolchain go1.21.4

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0
Expand Down
22 changes: 18 additions & 4 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,7 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm
return result
}

// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
m.RLock()
defer m.RUnlock()
func (m *meta) getNumRowsOfCollectionUnsafe(collectionID UniqueID) int64 {
var ret int64
segments := m.segments.GetSegments()
for _, segment := range segments {
Expand All @@ -261,6 +258,13 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return ret
}

// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
m.RLock()
defer m.RUnlock()
return m.getNumRowsOfCollectionUnsafe(collectionID)
}

// GetCollectionBinlogSize returns the total binlog size and binlog size of collections.
func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
m.RLock()
Expand Down Expand Up @@ -290,6 +294,16 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
return total, collectionBinlogSize
}

func (m *meta) GetAllCollectionNumRows() map[int64]int64 {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]int64, len(m.collections))
for collectionID := range m.collections {
ret[collectionID] = m.getNumRowsOfCollectionUnsafe(collectionID)
}
return ret
}

// AddSegment records segment info, persisting info into kv store
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
log := log.Ctx(ctx)
Expand Down
44 changes: 41 additions & 3 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
Expand All @@ -43,6 +44,42 @@ func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics {
}
}

func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoordCollectionMetrics {
totalNumRows := s.meta.GetAllCollectionNumRows()
ret := &metricsinfo.DataCoordCollectionMetrics{
Collections: make(map[int64]*metricsinfo.DataCoordCollectionInfo, len(totalNumRows)),
}
for collectionID, total := range totalNumRows {
if _, ok := ret.Collections[collectionID]; !ok {
ret.Collections[collectionID] = &metricsinfo.DataCoordCollectionInfo{
NumEntitiesTotal: 0,
IndexInfo: make([]*metricsinfo.DataCoordIndexInfo, 0),
}
}
ret.Collections[collectionID].NumEntitiesTotal = total
indexInfo, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
CollectionID: collectionID,
IndexName: "",
Timestamp: 0,
})
if err := merr.CheckRPCCall(indexInfo, err); err != nil {
log.Ctx(ctx).Warn("failed to describe index, ignore to report index metrics",
zap.Int64("collection", collectionID),
zap.Error(err),
)
continue
}
for _, info := range indexInfo.GetIndexInfos() {
ret.Collections[collectionID].IndexInfo = append(ret.Collections[collectionID].IndexInfo, &metricsinfo.DataCoordIndexInfo{
NumEntitiesIndexed: info.GetIndexedRows(),
IndexName: info.GetIndexName(),
FieldID: info.GetIndexID(),
})
}
}
return ret
}

// getSystemInfoMetrics composes data cluster metrics
func (s *Server) getSystemInfoMetrics(
ctx context.Context,
Expand All @@ -53,7 +90,7 @@ func (s *Server) getSystemInfoMetrics(
// get datacoord info
nodes := s.cluster.GetSessions()
clusterTopology := metricsinfo.DataClusterTopology{
Self: s.getDataCoordMetrics(),
Self: s.getDataCoordMetrics(ctx),
ConnectedDataNodes: make([]metricsinfo.DataNodeInfos, 0, len(nodes)),
ConnectedIndexNodes: make([]metricsinfo.IndexNodeInfos, 0),
}
Expand Down Expand Up @@ -103,7 +140,7 @@ func (s *Server) getSystemInfoMetrics(
}

// getDataCoordMetrics composes datacoord infos
func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
func (s *Server) getDataCoordMetrics(ctx context.Context) metricsinfo.DataCoordInfos {
ret := metricsinfo.DataCoordInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
Expand All @@ -125,7 +162,8 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
SystemConfigurations: metricsinfo.DataCoordConfiguration{
SegmentMaxSize: Params.DataCoordCfg.SegmentMaxSize.GetAsFloat(),
},
QuotaMetrics: s.getQuotaMetrics(),
QuotaMetrics: s.getQuotaMetrics(),
CollectionMetrics: s.getCollectionMetrics(ctx),
}

metricsinfo.FillDeployMetricsWithEnv(&ret.BaseComponentInfos.SystemInfo)
Expand Down
15 changes: 13 additions & 2 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
receiveSize := proto.Size(dr.req)
rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

successCnt := dr.result.GetDeleteCnt()
metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
Expand Down Expand Up @@ -2576,8 +2579,12 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)

defer func() {
span := tr.ElapseSpan()
if span >= SlowReadSpan {
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
log.Info(rpcSlow(method), zap.Int64("nq", qt.SearchRequest.GetNq()), zap.Duration("duration", span))
metrics.ProxySlowQueryCount.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
).Inc()
}
}()

Expand Down Expand Up @@ -2837,14 +2844,18 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes

defer func() {
span := tr.ElapseSpan()
if span >= SlowReadSpan {
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
log.Info(
rpcSlow(method),
zap.String("expr", request.Expr),
zap.Strings("OutputFields", request.OutputFields),
zap.Uint64("travel_timestamp", request.TravelTimestamp),
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
zap.Duration("duration", span))
metrics.ProxySlowQueryCount.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
).Inc()
}
}()

Expand Down
25 changes: 23 additions & 2 deletions internal/querynodev2/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
}, nil
}

func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetrics, error) {
allSegments := node.manager.Segment.GetBy()
ret := &metricsinfo.QueryNodeCollectionMetrics{
CollectionRows: make(map[int64]int64),
}
for _, segment := range allSegments {
collectionID := segment.Collection()
ret.CollectionRows[collectionID] += segment.RowNum()
}
return ret, nil
}

// getSystemInfoMetrics returns metrics info of QueryNode
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) {
usedMem := hardware.GetUsedMemoryCount()
Expand All @@ -163,7 +175,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()),
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, paramtable.GetNodeID()),
}, nil
}
hardwareInfos := metricsinfo.HardwareMetrics{
Expand All @@ -177,6 +189,14 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
}
quotaMetrics.Hms = hardwareInfos

collectionMetrics, err := getCollectionMetrics(node)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, paramtable.GetNodeID()),
}, nil
}

nodeInfos := metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, paramtable.GetNodeID()),
Expand All @@ -190,7 +210,8 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
SystemConfigurations: metricsinfo.QueryNodeConfiguration{
SimdType: paramtable.Get().CommonCfg.SimdType.GetValue(),
},
QuotaMetrics: quotaMetrics,
QuotaMetrics: quotaMetrics,
CollectionMetrics: collectionMetrics,
}
metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo)

Expand Down
85 changes: 83 additions & 2 deletions internal/rootcoord/quota_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rootcoord
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/metastore/model"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -175,6 +176,59 @@ func (q *QuotaCenter) clearMetrics() {
q.proxyMetrics = make(map[UniqueID]*metricsinfo.ProxyQuotaMetrics, 0)
}

func updateNumEntitiesLoaded(current map[int64]int64, qn *metricsinfo.QueryNodeCollectionMetrics) map[int64]int64 {
for collectionID, rowNum := range qn.CollectionRows {
current[collectionID] += rowNum
}
return current
}

func (q *QuotaCenter) reportNumEntitiesLoaded(numEntitiesLoaded map[int64]int64) {
for collectionID, num := range numEntitiesLoaded {
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
if err != nil {
log.Warn("failed to get collection info by its id, ignore to report loaded num entities",
zap.Int64("collection", collectionID),
zap.Int64("num_entities_loaded", num),
zap.Error(err),
)
continue
}
metrics.RootCoordNumEntities.WithLabelValues(info.Name, metrics.LoadedLabel).Set(float64(num))
}
}

func (q *QuotaCenter) reportDataCoordCollectionMetrics(dc *metricsinfo.DataCoordCollectionMetrics) {
for collectionID, collection := range dc.Collections {
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
if err != nil {
log.Warn("failed to get collection info by its id, ignore to report total_num_entities/indexed_entities",
zap.Int64("collection", collectionID),
zap.Int64("num_entities_total", collection.NumEntitiesTotal),
zap.Int("lenOfIndexedInfo", len(collection.IndexInfo)),
zap.Error(err),
)
continue
}
metrics.RootCoordNumEntities.WithLabelValues(info.Name, metrics.TotalLabel).Set(float64(collection.NumEntitiesTotal))
fields := lo.KeyBy(info.Fields, func(v *model.Field) int64 { return v.FieldID })
for _, indexInfo := range collection.IndexInfo {
if _, ok := fields[indexInfo.FieldID]; !ok {
log.Warn("field id not found, ignore to report indexed num entities",
zap.Int64("collection", collectionID),
zap.Int64("field", indexInfo.FieldID),
)
continue
}
field := fields[indexInfo.FieldID]
metrics.RootCoordIndexedNumEntities.WithLabelValues(
info.Name,
indexInfo.IndexName,
strconv.FormatBool(typeutil.IsVectorType(field.DataType))).Set(float64(indexInfo.NumEntitiesIndexed))
}
}
}

// syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
func (q *QuotaCenter) syncMetrics() error {
oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...)
Expand All @@ -189,6 +243,8 @@ func (q *QuotaCenter) syncMetrics() error {
return err
}

numEntitiesLoaded := make(map[int64]int64)

// get Query cluster metrics
group.Go(func() error {
rsp, err := q.queryCoord.GetMetrics(ctx, req)
Expand All @@ -208,8 +264,12 @@ func (q *QuotaCenter) syncMetrics() error {
q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics
collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
}
if queryNodeMetric.CollectionMetrics != nil {
numEntitiesLoaded = updateNumEntitiesLoaded(numEntitiesLoaded, queryNodeMetric.CollectionMetrics)
}
}
q.readableCollections = collections.Collect()
q.reportNumEntitiesLoaded(numEntitiesLoaded)
return nil
})
// get Data cluster metrics
Expand All @@ -224,6 +284,10 @@ func (q *QuotaCenter) syncMetrics() error {
return err
}

if dataCoordTopology.Cluster.Self.CollectionMetrics != nil {
q.reportDataCoordCollectionMetrics(dataCoordTopology.Cluster.Self.CollectionMetrics)
}

collections := typeutil.NewUniqueSet()
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
if dataNodeMetric.QuotaMetrics != nil {
Expand Down Expand Up @@ -842,14 +906,31 @@ func (q *QuotaCenter) setRates() error {
func (q *QuotaCenter) recordMetrics() {
record := func(errorCode commonpb.ErrorCode) {
var hasException float64 = 0
for _, states := range q.quotaStates {
for collectionID, states := range q.quotaStates {
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
if err != nil {
log.Warn("failed to get collection info by its id, ignore to report quota states",
zap.Int64("collection", collectionID),
zap.Error(err),
)
continue
}
dbm, err := q.meta.GetDatabaseByID(context.Background(), info.DBID, typeutil.MaxTimestamp)
if err != nil {
log.Warn("failed to get database name info by its id, ignore to report quota states",
zap.Int64("collection", collectionID),
zap.Error(err),
)
continue
}

for _, state := range states {
if state == errorCode {
hasException = 1
}
}
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String(), dbm.Name).Set(hasException)
}
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(hasException)
}
record(commonpb.ErrorCode_MemoryQuotaExhausted)
record(commonpb.ErrorCode_DiskQuotaExhausted)
Expand Down
6 changes: 6 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
collectionName = "collection_name"
indexName = "index_name"
isVectorIndex = "is_vector_index"
segmentStateLabelName = "segment_state"
segmentIDLabelName = "segment_id"
usernameLabelName = "username"
Expand All @@ -88,6 +90,10 @@ const (
lockSource = "lock_source"
lockType = "lock_type"
lockOp = "lock_op"

// entities label
LoadedLabel = "loaded"
NumEntitiesAllLabel = "all"
)

var (
Expand Down
Loading

0 comments on commit 2e67866

Please sign in to comment.