diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 836ad42cf40c0..3dc101ced7797 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface { } type mockTask struct { + baseTask *TaskCondition id UniqueID name string diff --git a/internal/proxy/task.go b/internal/proxy/task.go index b7c034b3f2db1..c547b4868ad7d 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -107,6 +108,20 @@ type task interface { PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) + SetOnEnqueueTime() + GetDurationInQueue() time.Duration +} + +type baseTask struct { + onEnqueueTime time.Time +} + +func (bt *baseTask) SetOnEnqueueTime() { + bt.onEnqueueTime = time.Now() +} + +func (bt *baseTask) GetDurationInQueue() time.Duration { + return time.Since(bt.onEnqueueTime) } type dmlTask interface { @@ -118,6 +133,7 @@ type dmlTask interface { type BaseInsertTask = msgstream.InsertMsg type createCollectionTask struct { + baseTask Condition *milvuspb.CreateCollectionRequest ctx context.Context @@ -317,6 +333,7 @@ func (t *createCollectionTask) PostExecute(ctx context.Context) error { } type dropCollectionTask struct { + baseTask Condition *milvuspb.DropCollectionRequest ctx context.Context @@ -362,12 +379,12 @@ func (t *dropCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -386,6 +403,7 @@ func (t *dropCollectionTask) PostExecute(ctx context.Context) error { } type hasCollectionTask struct { + baseTask Condition *milvuspb.HasCollectionRequest ctx context.Context @@ -426,13 +444,15 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) { } func (t *hasCollectionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_HasCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -460,6 +480,7 @@ func (t *hasCollectionTask) PostExecute(ctx context.Context) error { } type describeCollectionTask struct { + baseTask Condition *milvuspb.DescribeCollectionRequest ctx context.Context @@ -500,13 +521,15 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) { } func (t *describeCollectionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_DescribeCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *describeCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeCollection - t.Base.SourceID = paramtable.GetNodeID() if t.CollectionID != 0 && len(t.CollectionName) == 0 { return nil @@ -594,6 +617,7 @@ func (t *describeCollectionTask) PostExecute(ctx context.Context) error { } type showCollectionsTask struct { + baseTask Condition *milvuspb.ShowCollectionsRequest ctx context.Context @@ -636,12 +660,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) { func (t *showCollectionsTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_ShowCollections + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showCollectionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowCollections - t.Base.SourceID = paramtable.GetNodeID() if t.GetType() == milvuspb.ShowType_InMemory { for _, collectionName := range t.CollectionNames { if err := validateCollectionName(collectionName); err != nil { @@ -753,6 +777,7 @@ func (t *showCollectionsTask) PostExecute(ctx context.Context) error { } type alterCollectionTask struct { + baseTask Condition *milvuspb.AlterCollectionRequest ctx context.Context @@ -796,12 +821,12 @@ func (t *alterCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterCollection - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -817,6 +842,7 @@ func (t *alterCollectionTask) PostExecute(ctx context.Context) error { } type createPartitionTask struct { + baseTask Condition *milvuspb.CreatePartitionRequest ctx context.Context @@ -860,12 +886,12 @@ func (t *createPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreatePartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *createPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreatePartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -904,6 +930,7 @@ func (t *createPartitionTask) PostExecute(ctx context.Context) error { } type dropPartitionTask struct { + baseTask Condition *milvuspb.DropPartitionRequest ctx context.Context @@ -948,12 +975,12 @@ func (t *dropPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1018,6 +1045,7 @@ func (t *dropPartitionTask) PostExecute(ctx context.Context) error { } type hasPartitionTask struct { + baseTask Condition *milvuspb.HasPartitionRequest ctx context.Context @@ -1058,13 +1086,15 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) { } func (t *hasPartitionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_HasPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1094,6 +1124,7 @@ func (t *hasPartitionTask) PostExecute(ctx context.Context) error { } type showPartitionsTask struct { + baseTask Condition *milvuspb.ShowPartitionsRequest ctx context.Context @@ -1135,13 +1166,15 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) { } func (t *showPartitionsTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_ShowPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowPartitions - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -1256,6 +1289,7 @@ func (t *showPartitionsTask) PostExecute(ctx context.Context) error { } type flushTask struct { + baseTask Condition *milvuspb.FlushRequest ctx context.Context @@ -1301,12 +1335,12 @@ func (t *flushTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_Flush + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *flushTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_Flush - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1360,6 +1394,7 @@ func (t *flushTask) PostExecute(ctx context.Context) error { } type loadCollectionTask struct { + baseTask Condition *milvuspb.LoadCollectionRequest ctx context.Context @@ -1407,14 +1442,14 @@ func (t *loadCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadCollectionTask) PreExecute(ctx context.Context) error { log.Ctx(ctx).Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole)) - t.Base.MsgType = commonpb.MsgType_LoadCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1512,6 +1547,7 @@ func (t *loadCollectionTask) PostExecute(ctx context.Context) error { } type releaseCollectionTask struct { + baseTask Condition *milvuspb.ReleaseCollectionRequest ctx context.Context @@ -1558,12 +1594,12 @@ func (t *releaseCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleaseCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releaseCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleaseCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1605,6 +1641,7 @@ func (t *releaseCollectionTask) PostExecute(ctx context.Context) error { } type loadPartitionsTask struct { + baseTask Condition *milvuspb.LoadPartitionsRequest ctx context.Context @@ -1652,12 +1689,12 @@ func (t *loadPartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_LoadPartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1755,6 +1792,7 @@ func (t *loadPartitionsTask) PostExecute(ctx context.Context) error { } type releasePartitionsTask struct { + baseTask Condition *milvuspb.ReleasePartitionsRequest ctx context.Context @@ -1801,12 +1839,12 @@ func (t *releasePartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleasePartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releasePartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleasePartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1862,6 +1900,7 @@ func (t *releasePartitionsTask) PostExecute(ctx context.Context) error { } type CreateResourceGroupTask struct { + baseTask Condition *milvuspb.CreateResourceGroupRequest ctx context.Context @@ -1905,12 +1944,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1926,6 +1965,7 @@ func (t *CreateResourceGroupTask) PostExecute(ctx context.Context) error { } type DropResourceGroupTask struct { + baseTask Condition *milvuspb.DropResourceGroupRequest ctx context.Context @@ -1969,12 +2009,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1990,6 +2030,7 @@ func (t *DropResourceGroupTask) PostExecute(ctx context.Context) error { } type DescribeResourceGroupTask struct { + baseTask Condition *milvuspb.DescribeResourceGroupRequest ctx context.Context @@ -2030,13 +2071,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) { } func (t *DescribeResourceGroupTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2111,6 +2154,7 @@ func (t *DescribeResourceGroupTask) PostExecute(ctx context.Context) error { } type TransferNodeTask struct { + baseTask Condition *milvuspb.TransferNodeRequest ctx context.Context @@ -2154,12 +2198,12 @@ func (t *TransferNodeTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferNode + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferNodeTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferNode - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2175,6 +2219,7 @@ func (t *TransferNodeTask) PostExecute(ctx context.Context) error { } type TransferReplicaTask struct { + baseTask Condition *milvuspb.TransferReplicaRequest ctx context.Context @@ -2218,12 +2263,12 @@ func (t *TransferReplicaTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferReplica + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferReplicaTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferReplica - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2248,6 +2293,7 @@ func (t *TransferReplicaTask) PostExecute(ctx context.Context) error { } type ListResourceGroupsTask struct { + baseTask Condition *milvuspb.ListResourceGroupsRequest ctx context.Context @@ -2288,13 +2334,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) { } func (t *ListResourceGroupsTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_ListResourceGroups + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ListResourceGroups - t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_alias.go b/internal/proxy/task_alias.go index 863f91e4d2251..a80c8c14986bd 100644 --- a/internal/proxy/task_alias.go +++ b/internal/proxy/task_alias.go @@ -28,6 +28,7 @@ import ( // CreateAliasTask contains task information of CreateAlias type CreateAliasTask struct { + baseTask Condition *milvuspb.CreateAliasRequest ctx context.Context @@ -80,13 +81,13 @@ func (t *CreateAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } // PreExecute defines the tion before task execution func (t *CreateAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -115,6 +116,7 @@ func (t *CreateAliasTask) PostExecute(ctx context.Context) error { // DropAliasTask is the task to drop alias type DropAliasTask struct { + baseTask Condition *milvuspb.DropAliasRequest ctx context.Context @@ -162,12 +164,12 @@ func (t *DropAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias if err := ValidateCollectionAlias(collAlias); err != nil { return err @@ -187,6 +189,7 @@ func (t *DropAliasTask) PostExecute(ctx context.Context) error { // AlterAliasTask is the task to alter alias type AlterAliasTask struct { + baseTask Condition *milvuspb.AlterAliasRequest ctx context.Context @@ -230,12 +233,12 @@ func (t *AlterAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *AlterAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -263,6 +266,7 @@ func (t *AlterAliasTask) PostExecute(ctx context.Context) error { // DescribeAliasTask is the task to describe alias type DescribeAliasTask struct { + baseTask Condition nodeID UniqueID *milvuspb.DescribeAliasRequest @@ -305,12 +309,12 @@ func (a *DescribeAliasTask) SetTs(ts Timestamp) { func (a *DescribeAliasTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_DescribeAlias + a.Base.SourceID = a.nodeID return nil } func (a *DescribeAliasTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_DescribeAlias - a.Base.SourceID = a.nodeID // collection alias uses the same format as collection name if err := ValidateCollectionAlias(a.GetAlias()); err != nil { return err @@ -330,6 +334,7 @@ func (a *DescribeAliasTask) PostExecute(ctx context.Context) error { // ListAliasesTask is the task to list aliases type ListAliasesTask struct { + baseTask Condition nodeID UniqueID *milvuspb.ListAliasesRequest @@ -372,12 +377,12 @@ func (a *ListAliasesTask) SetTs(ts Timestamp) { func (a *ListAliasesTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_ListAliases + a.Base.SourceID = a.nodeID return nil } func (a *ListAliasesTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_ListAliases - a.Base.SourceID = a.nodeID if len(a.GetCollectionName()) > 0 { if err := validateCollectionName(a.GetCollectionName()); err != nil { diff --git a/internal/proxy/task_database.go b/internal/proxy/task_database.go index fc8bb33711ff7..fb370fb81c2af 100644 --- a/internal/proxy/task_database.go +++ b/internal/proxy/task_database.go @@ -12,6 +12,7 @@ import ( ) type createDatabaseTask struct { + baseTask Condition *milvuspb.CreateDatabaseRequest ctx context.Context @@ -80,6 +81,7 @@ func (cdt *createDatabaseTask) PostExecute(ctx context.Context) error { } type dropDatabaseTask struct { + baseTask Condition *milvuspb.DropDatabaseRequest ctx context.Context @@ -150,6 +152,7 @@ func (ddt *dropDatabaseTask) PostExecute(ctx context.Context) error { } type listDatabaseTask struct { + baseTask Condition *milvuspb.ListDatabasesRequest ctx context.Context diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index bcab2295857fb..b070eeb3efeec 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -34,6 +34,7 @@ import ( type BaseDeleteTask = msgstream.DeleteMsg type deleteTask struct { + baseTask Condition ctx context.Context tr *timerecord.TimeRecorder @@ -95,6 +96,11 @@ func (dt *deleteTask) SetTs(ts Timestamp) { } func (dt *deleteTask) OnEnqueue() error { + if dt.req.Base == nil { + dt.req.Base = commonpbutil.NewMsgBase() + } + dt.req.Base.MsgType = commonpb.MsgType_Delete + dt.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 1e908ed8342a3..541765d0b0c58 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -52,6 +52,7 @@ const ( ) type createIndexTask struct { + baseTask Condition req *milvuspb.CreateIndexRequest ctx context.Context @@ -106,6 +107,8 @@ func (cit *createIndexTask) OnEnqueue() error { if cit.req.Base == nil { cit.req.Base = commonpbutil.NewMsgBase() } + cit.req.Base.MsgType = commonpb.MsgType_CreateIndex + cit.req.Base.SourceID = paramtable.GetNodeID() return nil } @@ -359,8 +362,6 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro } func (cit *createIndexTask) PreExecute(ctx context.Context) error { - cit.req.Base.MsgType = commonpb.MsgType_CreateIndex - cit.req.Base.SourceID = paramtable.GetNodeID() collName := cit.req.GetCollectionName() @@ -422,6 +423,7 @@ func (cit *createIndexTask) PostExecute(ctx context.Context) error { } type describeIndexTask struct { + baseTask Condition *milvuspb.DescribeIndexRequest ctx context.Context @@ -465,12 +467,12 @@ func (dit *describeIndexTask) SetTs(ts Timestamp) { func (dit *describeIndexTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_DescribeIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *describeIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DescribeIndex - dit.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -545,6 +547,7 @@ func (dit *describeIndexTask) PostExecute(ctx context.Context) error { } type getIndexStatisticsTask struct { + baseTask Condition *milvuspb.GetIndexStatisticsRequest ctx context.Context @@ -589,12 +592,12 @@ func (dit *getIndexStatisticsTask) SetTs(ts Timestamp) { func (dit *getIndexStatisticsTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics - dit.Base.SourceID = dit.nodeID if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -661,6 +664,7 @@ func (dit *getIndexStatisticsTask) PostExecute(ctx context.Context) error { } type dropIndexTask struct { + baseTask Condition ctx context.Context *milvuspb.DropIndexRequest @@ -709,12 +713,12 @@ func (dit *dropIndexTask) OnEnqueue() error { if dit.Base == nil { dit.Base = commonpbutil.NewMsgBase() } + dit.Base.MsgType = commonpb.MsgType_DropIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *dropIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DropIndex - dit.Base.SourceID = paramtable.GetNodeID() collName, fieldName := dit.CollectionName, dit.FieldName @@ -781,6 +785,7 @@ func (dit *dropIndexTask) PostExecute(ctx context.Context) error { // Deprecated: use describeIndexTask instead type getIndexBuildProgressTask struct { + baseTask Condition *milvuspb.GetIndexBuildProgressRequest ctx context.Context @@ -825,12 +830,12 @@ func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) { func (gibpt *getIndexBuildProgressTask) OnEnqueue() error { gibpt.Base = commonpbutil.NewMsgBase() + gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress + gibpt.Base.SourceID = paramtable.GetNodeID() return nil } func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error { - gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress - gibpt.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gibpt.CollectionName); err != nil { return err @@ -870,6 +875,7 @@ func (gibpt *getIndexBuildProgressTask) PostExecute(ctx context.Context) error { // Deprecated: use describeIndexTask instead type getIndexStateTask struct { + baseTask Condition *milvuspb.GetIndexStateRequest ctx context.Context @@ -914,12 +920,12 @@ func (gist *getIndexStateTask) SetTs(ts Timestamp) { func (gist *getIndexStateTask) OnEnqueue() error { gist.Base = commonpbutil.NewMsgBase() + gist.Base.MsgType = commonpb.MsgType_GetIndexState + gist.Base.SourceID = paramtable.GetNodeID() return nil } func (gist *getIndexStateTask) PreExecute(ctx context.Context) error { - gist.Base.MsgType = commonpb.MsgType_GetIndexState - gist.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gist.CollectionName); err != nil { return err diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 36dbe61174981..7cfcec27aaf2f 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -22,6 +23,7 @@ import ( ) type insertTask struct { + baseTask // req *milvuspb.InsertRequest Condition insertMsg *BaseInsertTask @@ -90,6 +92,11 @@ func (it *insertTask) getChannels() []pChan { } func (it *insertTask) OnEnqueue() error { + if it.insertMsg.Base == nil { + it.insertMsg.Base = commonpbutil.NewMsgBase() + } + it.insertMsg.Base.MsgType = commonpb.MsgType_Insert + it.insertMsg.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 0cdc3bfb207d1..333459616ae3f 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -42,6 +43,7 @@ const ( ) type queryTask struct { + baseTask Condition *internalpb.RetrieveRequest @@ -666,6 +668,10 @@ func (t *queryTask) SetTs(ts Timestamp) { } func (t *queryTask) OnEnqueue() error { + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } t.Base.MsgType = commonpb.MsgType_Retrieve + t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index f28a3a7e81e38..70b65df22ed52 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -19,6 +19,7 @@ package proxy import ( "container/list" "context" + "strconv" "sync" "time" @@ -26,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" @@ -179,6 +181,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error { // we always use same msg id and ts for now. t.SetID(UniqueID(ts)) + t.SetOnEnqueueTime() return queue.addUnissuedTask(t) } @@ -440,6 +443,11 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { }() span.AddEvent("scheduler process PreExecute") + waitDuration := t.GetDurationInQueue() + metrics.ProxyReqInQueueLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.Type().String()). + Observe(float64(waitDuration.Milliseconds())) + err := t.PreExecute(ctx) defer func() { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 47532f323d0a7..e6c8765d0a256 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -46,6 +46,7 @@ const ( ) type searchTask struct { + baseTask Condition *internalpb.SearchRequest ctx context.Context diff --git a/internal/proxy/task_statistic.go b/internal/proxy/task_statistic.go index 16a2bca039042..f6f16fb1fa67b 100644 --- a/internal/proxy/task_statistic.go +++ b/internal/proxy/task_statistic.go @@ -32,6 +32,7 @@ const ( ) type getStatisticsTask struct { + baseTask request *milvuspb.GetStatisticsRequest result *milvuspb.GetStatisticsResponse Condition @@ -93,6 +94,9 @@ func (g *getStatisticsTask) OnEnqueue() error { g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{ Base: commonpbutil.NewMsgBase(), } + + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -106,10 +110,6 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute") defer sp.End() - // TODO: Maybe we should create a new MsgType: GetStatistics? - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() - collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName) if err != nil { // err is not nil if collection not exists return err @@ -589,6 +589,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP // old version of get statistics // please remove it after getStatisticsTask below is stable type getCollectionStatisticsTask struct { + baseTask Condition *milvuspb.GetCollectionStatisticsRequest ctx context.Context @@ -632,12 +633,12 @@ func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) { func (g *getCollectionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -674,6 +675,7 @@ func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error { } type getPartitionStatisticsTask struct { + baseTask Condition *milvuspb.GetPartitionStatisticsRequest ctx context.Context @@ -717,12 +719,12 @@ func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) { func (g *getPartitionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 6f041faa66fba..413c343c287ac 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -914,6 +914,7 @@ func TestHasCollectionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_HasCollection, task.Type()) @@ -976,6 +977,7 @@ func TestDescribeCollectionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeCollection, task.Type()) @@ -1224,6 +1226,7 @@ func TestCreatePartitionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_CreatePartition, task.Type()) @@ -1299,6 +1302,7 @@ func TestDropPartitionTask(t *testing.T) { queryCoord: qc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DropPartition, task.Type()) @@ -1416,6 +1420,7 @@ func TestHasPartitionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_HasPartition, task.Type()) @@ -1463,6 +1468,7 @@ func TestShowPartitionsTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_ShowPartitions, task.Type()) @@ -2549,6 +2555,7 @@ func TestCreateResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type()) @@ -2588,6 +2595,7 @@ func TestDropResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type()) @@ -2629,6 +2637,7 @@ func TestTransferNodeTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferNode, task.Type()) @@ -2671,6 +2680,7 @@ func TestTransferReplicaTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type()) @@ -2710,6 +2720,7 @@ func TestListResourceGroupsTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type()) @@ -2762,6 +2773,7 @@ func TestDescribeResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) @@ -2808,6 +2820,7 @@ func TestDescribeResourceGroupTaskFailed(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index e85ced6f810e3..0ec3432fe89fc 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -41,6 +41,7 @@ import ( ) type upsertTask struct { + baseTask Condition upsertMsg *msgstream.UpsertMsg @@ -133,6 +134,11 @@ func (it *upsertTask) getChannels() []pChan { } func (it *upsertTask) OnEnqueue() error { + if it.req.Base == nil { + it.req.Base = commonpbutil.NewMsgBase() + } + it.req.Base.MsgType = commonpb.MsgType_Upsert + it.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 66a8ee01d7d47..850110160e93d 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -322,6 +322,16 @@ var ( Name: "slow_query_count", Help: "count of slow query executed", }, []string{nodeIDLabelName, msgTypeLabelName}) + + // ProxyReqInQueueLatency records the latency that requests wait in the queue, like "CreateCollection". + ProxyReqInQueueLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "req_in_queue_latency", + Help: "latency which request waits in the queue", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName, functionLabelName}) ) // RegisterProxy registers Proxy metrics @@ -370,6 +380,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyRateLimitReqCount) registry.MustRegister(ProxySlowQueryCount) + registry.MustRegister(ProxyReqInQueueLatency) } func CleanupCollectionMetrics(nodeID int64, collection string) {