Skip to content

Commit

Permalink
feat: record the duration waiting in the proxy queue
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan committed Jul 22, 2024
1 parent d294fdd commit 348bc2f
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 81 deletions.
1 change: 1 addition & 0 deletions internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface {
}

type mockTask struct {
baseTask
*TaskCondition
id UniqueID
name string
Expand Down
107 changes: 62 additions & 45 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -118,14 +119,26 @@ type task interface {
WaitToFinish() error
Notify(err error)
CanSkipAllocTimestamp() bool
SetOnEnqueueTime()
GetDurationInQueue() time.Duration
}

type baseTask struct{}
type baseTask struct {
onEnqueueTime time.Time
}

func (bt *baseTask) CanSkipAllocTimestamp() bool {
return false
}

func (bt *baseTask) SetOnEnqueueTime() {
bt.onEnqueueTime = time.Now()
}

func (bt *baseTask) GetDurationInQueue() time.Duration {
return time.Since(bt.onEnqueueTime)
}

type dmlTask interface {
task
setChannels() error
Expand Down Expand Up @@ -440,12 +453,12 @@ func (t *dropCollectionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_DropCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *dropCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_DropCollection
t.Base.SourceID = paramtable.GetNodeID()

if err := validateCollectionName(t.CollectionName); err != nil {
return err
Expand Down Expand Up @@ -506,12 +519,12 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) {

func (t *hasCollectionTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
t.Base.MsgType = commonpb.MsgType_HasCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *hasCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_HasCollection
t.Base.SourceID = paramtable.GetNodeID()

if err := validateCollectionName(t.CollectionName); err != nil {
return err
Expand Down Expand Up @@ -572,12 +585,12 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) {

func (t *describeCollectionTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
t.Base.MsgType = commonpb.MsgType_DescribeCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *describeCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_DescribeCollection
t.Base.SourceID = paramtable.GetNodeID()

if t.CollectionID != 0 && len(t.CollectionName) == 0 {
return nil
Expand Down Expand Up @@ -712,12 +725,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) {

func (t *showCollectionsTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
t.Base.MsgType = commonpb.MsgType_ShowCollections
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *showCollectionsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_ShowCollections
t.Base.SourceID = paramtable.GetNodeID()
if t.GetType() == milvuspb.ShowType_InMemory {
for _, collectionName := range t.CollectionNames {
if err := validateCollectionName(collectionName); err != nil {
Expand Down Expand Up @@ -868,6 +881,8 @@ func (t *alterCollectionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_AlterCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

Expand Down Expand Up @@ -916,8 +931,6 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p
}

func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_AlterCollection
t.Base.SourceID = paramtable.GetNodeID()

collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName)
if err != nil {
Expand Down Expand Up @@ -1049,12 +1062,12 @@ func (t *createPartitionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_CreatePartition
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *createPartitionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_CreatePartition
t.Base.SourceID = paramtable.GetNodeID()

collName, partitionTag := t.CollectionName, t.PartitionName

Expand Down Expand Up @@ -1132,12 +1145,12 @@ func (t *dropPartitionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_DropPartition
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *dropPartitionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_DropPartition
t.Base.SourceID = paramtable.GetNodeID()

collName, partitionTag := t.CollectionName, t.PartitionName

Expand Down Expand Up @@ -1238,12 +1251,12 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) {

func (t *hasPartitionTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
t.Base.MsgType = commonpb.MsgType_HasPartition
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *hasPartitionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_HasPartition
t.Base.SourceID = paramtable.GetNodeID()

collName, partitionTag := t.CollectionName, t.PartitionName

Expand Down Expand Up @@ -1310,12 +1323,12 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) {

func (t *showPartitionsTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
t.Base.MsgType = commonpb.MsgType_ShowPartitions
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *showPartitionsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_ShowPartitions
t.Base.SourceID = paramtable.GetNodeID()

if err := validateCollectionName(t.CollectionName); err != nil {
return err
Expand Down Expand Up @@ -1460,12 +1473,12 @@ func (t *flushTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_Flush
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *flushTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_Flush
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

Expand Down Expand Up @@ -1563,14 +1576,14 @@ func (t *loadCollectionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_LoadCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *loadCollectionTask) PreExecute(ctx context.Context) error {
log.Ctx(ctx).Debug("loadCollectionTask PreExecute",
zap.String("role", typeutil.ProxyRole))
t.Base.MsgType = commonpb.MsgType_LoadCollection
t.Base.SourceID = paramtable.GetNodeID()

collName := t.CollectionName

Expand Down Expand Up @@ -1715,12 +1728,12 @@ func (t *releaseCollectionTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *releaseCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
t.Base.SourceID = paramtable.GetNodeID()

collName := t.CollectionName

Expand Down Expand Up @@ -1809,12 +1822,12 @@ func (t *loadPartitionsTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_LoadPartitions
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *loadPartitionsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_LoadPartitions
t.Base.SourceID = paramtable.GetNodeID()

collName := t.CollectionName

Expand Down Expand Up @@ -1959,12 +1972,12 @@ func (t *releasePartitionsTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *releasePartitionsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
t.Base.SourceID = paramtable.GetNodeID()

collName := t.CollectionName

Expand Down Expand Up @@ -2064,12 +2077,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2129,12 +2142,12 @@ func (t *UpdateResourceGroupsTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2197,12 +2210,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2259,13 +2272,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) {
}

func (t *DescribeResourceGroupTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2386,12 +2401,12 @@ func (t *TransferNodeTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_TransferNode
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *TransferNodeTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_TransferNode
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2451,12 +2466,12 @@ func (t *TransferReplicaTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_TransferReplica
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *TransferReplicaTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_TransferReplica
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down Expand Up @@ -2522,13 +2537,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) {
}

func (t *ListResourceGroupsTask) OnEnqueue() error {
t.Base = commonpbutil.NewMsgBase()
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
t.Base.SourceID = paramtable.GetNodeID()
return nil
}

func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
t.Base.SourceID = paramtable.GetNodeID()

return nil
}
Expand Down
Loading

0 comments on commit 348bc2f

Please sign in to comment.