From 1e15e93d90445d66005bdcbf9ede732d034ae7c1 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Fri, 11 Dec 2020 17:31:28 -0800 Subject: [PATCH 1/9] Enforce matching of namespace names in token and request for * RespondActivityTaskCompleted * RespondActivityTaskFailed * RespondActivityTaskCanceled --- service/frontend/errors.go | 1 + service/frontend/workflowHandler.go | 33 +++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/service/frontend/errors.go b/service/frontend/errors.go index 298f7f9af7d..92332334817 100644 --- a/service/frontend/errors.go +++ b/service/frontend/errors.go @@ -78,6 +78,7 @@ var ( errDLQTypeIsNotSupported = serviceerror.NewInvalidArgument("The DLQ type is not supported.") errFailureMustHaveApplicationFailureInfo = serviceerror.NewInvalidArgument("Failure must have ApplicationFailureInfo.") errStatusFilterMustBeNotRunning = serviceerror.NewInvalidArgument("StatusFilter must be specified and must be not Running.") + errTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.") errShuttingDown = serviceerror.NewInternal("Shutting down") errFailedUpdateDynamicConfig = serviceerror.NewInternal("Failed to update dynamic config, err: %v.") diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 7fd13546ced..1329548f6b6 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -27,6 +27,7 @@ package frontend import ( "context" "fmt" + "strings" "sync/atomic" "time" @@ -1349,7 +1350,9 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, // created for the workflow so new commands could be made. Use the 'taskToken' provided as response of // PollActivityTaskQueue API call for completion. It fails with 'NotFoundFailure' if the taskToken is not valid // anymore due to activity timeout. -func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, retError error) { +func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, + request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, + retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskCompletedScope) @@ -1384,9 +1387,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, req return nil, wh.error(errIdentityTooLong, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( metrics.FrontendRespondActivityTaskCompletedScope, - namespaceEntry.GetInfo().Name, + namespaceName, ) defer sw.Stop() @@ -1394,6 +1398,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, req return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.GetInfo().Name) @@ -1590,9 +1598,10 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, reques return nil, wh.error(errFailureMustHaveApplicationFailureInfo, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( metrics.FrontendRespondActivityTaskFailedScope, - namespaceEntry.GetInfo().Name, + namespaceName, ) defer sw.Stop() @@ -1600,6 +1609,10 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, reques return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { return nil, wh.error(errIdentityTooLong, scope) } @@ -1775,9 +1788,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ return nil, wh.error(err, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( metrics.FrontendRespondActivityTaskCanceledScope, - namespaceEntry.GetInfo().Name, + namespaceName, ) defer sw.Stop() @@ -1785,6 +1799,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { return nil, wh.error(errIdentityTooLong, scope) } @@ -3785,3 +3803,10 @@ func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts( return nil } + +func (wh *WorkflowHandler) checkNamespaceMatch(requestNamespace string, tokenNamespace string, scope metrics.Scope) error { + if !strings.EqualFold(requestNamespace, tokenNamespace) { + return wh.error(errTokenNamespaceMismatch, scope) + } + return nil +} From 2afe06afc372ce1bd0acbacb0dce6630de0bd028 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Fri, 11 Dec 2020 17:50:14 -0800 Subject: [PATCH 2/9] Add a config flag for enabling token namespace enforcement --- common/service/dynamicconfig/constants.go | 3 +++ service/frontend/service.go | 4 ++++ service/frontend/workflowHandler.go | 3 +++ 3 files changed, 10 insertions(+) diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 4435e91a5a2..3e5ae96512a 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -112,6 +112,7 @@ var keys = map[Key]string{ VisibilityArchivalQueryMaxRangeInDays: "frontend.visibilityArchivalQueryMaxRangeInDays", VisibilityArchivalQueryMaxQPS: "frontend.visibilityArchivalQueryMaxQPS", EnableServerVersionCheck: "frontend.enableServerVersionCheck", + EnableTokenNamespaceEnforcement: "frontend.enableTokenNamespaceEnforcement", // matching settings MatchingRPS: "matching.rps", @@ -427,6 +428,8 @@ const ( VisibilityArchivalQueryMaxQPS // EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled EnableServerVersionCheck + // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request + EnableTokenNamespaceEnforcement // key for matching diff --git a/service/frontend/service.go b/service/frontend/service.go index 952ed2f7b9f..16e204cd49b 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -112,6 +112,9 @@ type Config struct { // EnableServerVersionCheck disables periodic version checking performed by the frontend EnableServerVersionCheck dynamicconfig.BoolPropertyFn + + // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request + EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn } // NewConfig returns new service config with default values @@ -153,6 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadF DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""), + EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false), } } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 1329548f6b6..cee3a86f10f 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -3805,6 +3805,9 @@ func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts( } func (wh *WorkflowHandler) checkNamespaceMatch(requestNamespace string, tokenNamespace string, scope metrics.Scope) error { + if !wh.config.EnableTokenNamespaceEnforcement() { + return nil + } if !strings.EqualFold(requestNamespace, tokenNamespace) { return wh.error(errTokenNamespaceMismatch, scope) } From d1ce9f806edd63d3c36b71e5fa02891d4fb2e373 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Fri, 11 Dec 2020 19:17:24 -0800 Subject: [PATCH 3/9] Add tests for enforcement of token namespace match --- service/frontend/workflowHandler_test.go | 57 +++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index ea3c14388f7..3282e4d8489 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/api/historyservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" @@ -61,6 +62,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/service/dynamicconfig" dc "go.temporal.io/server/common/service/dynamicconfig" ) @@ -94,6 +96,8 @@ type ( mockHistoryArchiver *archiver.HistoryArchiverMock mockVisibilityArchiver *archiver.VisibilityArchiverMock + tokenSerializer common.TaskTokenSerializer + testNamespace string testNamespaceID string } @@ -134,9 +138,10 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockHistoryArchiver = &archiver.HistoryArchiverMock{} s.mockVisibilityArchiver = &archiver.VisibilityArchiverMock{} + s.tokenSerializer = common.NewProtoTaskTokenSerializer() + mockMonitor := s.mockResource.MembershipMonitor mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes() - } func (s *workflowHandlerSuite) TearDownTest() { @@ -1382,10 +1387,60 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { } } +func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementDisabled() { + wh := s.setupTokenNamespaceTest("wrong-namespace", false) + req := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + s.NoError(err) + s.NotNil(resp) +} + +func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMismatch() { + wh := s.setupTokenNamespaceTest("wrong-namespace", true) + req := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + s.Error(err) + s.Nil(resp) +} + +func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMatch() { + wh := s.setupTokenNamespaceTest(s.testNamespace, true) + req := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + s.NoError(err) + s.NotNil(resp) +} + func (s *workflowHandlerSuite) newConfig() *Config { return NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false) } +func (s *workflowHandlerSuite) newRespondActivityTaskCompletedRequest(tokenNamespaceId string) *workflowservice.RespondActivityTaskCompletedRequest { + token, _ := s.tokenSerializer.Serialize(&tokenspb.Task{ + NamespaceId: tokenNamespaceId, + }) + + return &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.testNamespace, + TaskToken: token, + } +} + +func newNamespaceCacheEntry(namespaceName string) *cache.NamespaceCacheEntry { + info := &persistencespb.NamespaceInfo{ + Name: namespaceName, + } + return cache.NewLocalNamespaceCacheEntryForTest(info, nil, "", nil) +} + +func (s *workflowHandlerSuite) setupTokenNamespaceTest(tokenNamespace string, enforce bool) *WorkflowHandler { + s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(newNamespaceCacheEntry(tokenNamespace), nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondActivityTaskCompleted(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() + cfg := s.newConfig() + cfg.EnableTokenNamespaceEnforcement = dynamicconfig.GetBoolPropertyFn(enforce) + return s.getWorkflowHandler(cfg) +} + func updateRequest( historyArchivalURI string, historyArchivalState enumspb.ArchivalState, From 6ef9abe31f511dc63c4dc7b4b364f7c6ac41b73e Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Sat, 12 Dec 2020 13:38:36 -0800 Subject: [PATCH 4/9] Add tests for RespondActivityTaskFailed and RespondActivityTaskFailed --- service/frontend/workflowHandler_test.go | 71 +++++++++++++++++++----- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 3282e4d8489..82e2e129bf3 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -1389,26 +1389,50 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementDisabled() { wh := s.setupTokenNamespaceTest("wrong-namespace", false) - req := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) s.NoError(err) - s.NotNil(resp) + s.NotNil(resp1) + req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) + resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) + s.NoError(err) + s.NotNil(resp2) + req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) + resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) + s.NoError(err) + s.NotNil(resp3) } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMismatch() { wh := s.setupTokenNamespaceTest("wrong-namespace", true) - req := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) s.Error(err) - s.Nil(resp) + s.Nil(resp1) + req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) + resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) + s.Error(err) + s.Nil(resp2) + req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) + resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) + s.Error(err) + s.Nil(resp3) } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMatch() { wh := s.setupTokenNamespaceTest(s.testNamespace, true) - req := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp, err := wh.RespondActivityTaskCompleted(context.Background(), req) + req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) + resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) s.NoError(err) - s.NotNil(resp) + s.NotNil(resp1) + req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) + resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) + s.NoError(err) + s.NotNil(resp2) + req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) + resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) + s.NoError(err) + s.NotNil(resp3) } func (s *workflowHandlerSuite) newConfig() *Config { @@ -1416,16 +1440,33 @@ func (s *workflowHandlerSuite) newConfig() *Config { } func (s *workflowHandlerSuite) newRespondActivityTaskCompletedRequest(tokenNamespaceId string) *workflowservice.RespondActivityTaskCompletedRequest { - token, _ := s.tokenSerializer.Serialize(&tokenspb.Task{ - NamespaceId: tokenNamespaceId, - }) - return &workflowservice.RespondActivityTaskCompletedRequest{ Namespace: s.testNamespace, - TaskToken: token, + TaskToken: s.newSerializedToken(tokenNamespaceId), } } +func (s *workflowHandlerSuite) newRespondActivityTaskFailedRequest(tokenNamespaceId string) *workflowservice.RespondActivityTaskFailedRequest { + return &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: s.testNamespace, + TaskToken: s.newSerializedToken(tokenNamespaceId), + } +} + +func (s *workflowHandlerSuite) newRespondActivityTaskCanceledRequest(tokenNamespaceId string) *workflowservice.RespondActivityTaskCanceledRequest { + return &workflowservice.RespondActivityTaskCanceledRequest{ + Namespace: s.testNamespace, + TaskToken: s.newSerializedToken(tokenNamespaceId), + } +} + +func (s *workflowHandlerSuite) newSerializedToken(namespaceId string) []byte { + token, _ := s.tokenSerializer.Serialize(&tokenspb.Task{ + NamespaceId: namespaceId, + }) + return token +} + func newNamespaceCacheEntry(namespaceName string) *cache.NamespaceCacheEntry { info := &persistencespb.NamespaceInfo{ Name: namespaceName, @@ -1436,6 +1477,8 @@ func newNamespaceCacheEntry(namespaceName string) *cache.NamespaceCacheEntry { func (s *workflowHandlerSuite) setupTokenNamespaceTest(tokenNamespace string, enforce bool) *WorkflowHandler { s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(newNamespaceCacheEntry(tokenNamespace), nil).AnyTimes() s.mockHistoryClient.EXPECT().RespondActivityTaskCompleted(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondActivityTaskFailed(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondActivityTaskCanceled(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() cfg := s.newConfig() cfg.EnableTokenNamespaceEnforcement = dynamicconfig.GetBoolPropertyFn(enforce) return s.getWorkflowHandler(cfg) From 8e6ee5a4edef7d3e51c20d924731059f6f7b3db3 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Sat, 12 Dec 2020 13:47:28 -0800 Subject: [PATCH 5/9] Add namespace match check to more APIs * RespondWorkflowTaskCompleted * RespondWorkflowTaskFailed * RespondQueryTaskCompleted --- service/frontend/workflowHandler.go | 39 +++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index cee3a86f10f..fd33279968f 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -884,7 +884,9 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w // event in the history for that session. Use the 'taskToken' provided as response of PollWorkflowTaskQueue API call // for completing the WorkflowTask. // The response could contain a new workflow task if there is one or if the request asking for one. -func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, request *workflowservice.RespondWorkflowTaskCompletedRequest) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, retError error) { +func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, + request *workflowservice.RespondWorkflowTaskCompletedRequest) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, + retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskCompletedScope) @@ -917,8 +919,10 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, req return nil, wh.error(err, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( - metrics.FrontendRespondWorkflowTaskCompletedScope, namespaceEntry.GetInfo().Name, + metrics.FrontendRespondWorkflowTaskCompletedScope, + namespaceName, ) defer sw.Stop() @@ -926,6 +930,10 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, req return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + histResp, err := wh.GetHistoryClient().RespondWorkflowTaskCompleted(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: namespaceId, CompleteRequest: request}, @@ -968,7 +976,9 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, req // WorkflowTaskFailedEvent written to the history and a new WorkflowTask created. This API can be used by client to // either clear sticky taskqueue or report any panics during WorkflowTask processing. Temporal will only append first // WorkflowTaskFailed event to the history of workflow execution for consecutive failures. -func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, request *workflowservice.RespondWorkflowTaskFailedRequest) (_ *workflowservice.RespondWorkflowTaskFailedResponse, retError error) { +func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, + request *workflowservice.RespondWorkflowTaskFailedRequest) (_ *workflowservice.RespondWorkflowTaskFailedResponse, + retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskFailedScope) @@ -1000,8 +1010,10 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, reques return nil, wh.error(err, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( - metrics.FrontendRespondWorkflowTaskFailedScope, namespaceEntry.GetInfo().Name, + metrics.FrontendRespondWorkflowTaskFailedScope, + namespaceName, ) defer sw.Stop() @@ -1009,6 +1021,10 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, reques return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { return nil, wh.error(errIdentityTooLong, scope) } @@ -1562,7 +1578,9 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, // created for the workflow instance so new commands could be made. Use the 'taskToken' provided as response of // PollActivityTaskQueue API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid // anymore due to activity timeout. -func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, retError error) { +func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, + request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, + retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskFailedScope) @@ -2835,7 +2853,9 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows // RespondQueryTaskCompleted is called by application worker to complete a QueryTask (which is a WorkflowTask for query) // as a result of 'PollWorkflowTaskQueue' API call. Completing a QueryTask will unblock the client call to 'QueryWorkflow' // API and return the query result to client as a response to 'QueryWorkflow' API call. -func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, retError error) { +func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, + request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, + retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondQueryTaskCompletedScope) @@ -2866,9 +2886,10 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, reques return nil, wh.error(err, scope) } + namespaceName := namespaceEntry.GetInfo().Name scope, sw := wh.startRequestProfileWithNamespace( metrics.FrontendRespondQueryTaskCompletedScope, - namespaceEntry.GetInfo().Name, + namespaceName, ) defer sw.Stop() @@ -2876,6 +2897,10 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, reques return nil, errShuttingDown } + if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { + return nil, err + } + sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.GetInfo().Name) From 63f0cfb2a7787e3e900acaa5fc26596b4f2e1bf2 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Sat, 12 Dec 2020 14:00:02 -0800 Subject: [PATCH 6/9] Refactor tests --- service/frontend/workflowHandler_test.go | 61 +++++++++++------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 82e2e129bf3..03f8e4566cd 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -1388,51 +1388,46 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementDisabled() { - wh := s.setupTokenNamespaceTest("wrong-namespace", false) - req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) - s.NoError(err) - s.NotNil(resp1) - req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) - resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) - s.NoError(err) - s.NotNil(resp2) - req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) - resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) - s.NoError(err) - s.NotNil(resp3) + s.executeTokenTestCases("wrong-namespace", false, false, false) } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMismatch() { - wh := s.setupTokenNamespaceTest("wrong-namespace", true) - req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) - s.Error(err) - s.Nil(resp1) - req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) - resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) - s.Error(err) - s.Nil(resp2) - req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) - resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) - s.Error(err) - s.Nil(resp3) + s.executeTokenTestCases("wrong-namespace", true, true, true) } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMatch() { - wh := s.setupTokenNamespaceTest(s.testNamespace, true) + s.executeTokenTestCases(s.testNamespace, true, true, true) +} + +func (s *workflowHandlerSuite) executeTokenTestCases(tokenNamesoace string, enforceNamespaceMatch bool, + isErrorExpected bool, isNilExpected bool) { + wh := s.setupTokenNamespaceTest(tokenNamesoace, enforceNamespaceMatch) + req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) - s.NoError(err) - s.NotNil(resp1) + s.checkResponse(err, resp1, isErrorExpected, isNilExpected) + req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) - s.NoError(err) - s.NotNil(resp2) + s.checkResponse(err, resp2, isErrorExpected, isNilExpected) + req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) - s.NoError(err) - s.NotNil(resp3) + s.checkResponse(err, resp3, isErrorExpected, isNilExpected) +} + +func (s *workflowHandlerSuite) checkResponse(err error, response interface{}, + isErrorExpected bool, isNilExpected bool) { + if isErrorExpected { + s.Error(err) + } else { + s.NoError(err) + } + if isNilExpected { + s.Nil(response) + } else { + s.NotNil(response) + } } func (s *workflowHandlerSuite) newConfig() *Config { From fb3f8997a00820189549a885b1a5f5347ebf8945 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Sat, 12 Dec 2020 19:06:30 -0800 Subject: [PATCH 7/9] Add tests for three more workflow handler methods * RespondWorkflowTaskCompleted * RespondWorkflowTaskFailed * RespondQueryTaskCompleted --- service/frontend/workflowHandler_test.go | 68 ++++++++++++++++++++---- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 03f8e4566cd..0582a06d8be 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/historyservicemock/v1" + "go.temporal.io/server/api/matchingservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" @@ -85,6 +86,7 @@ type ( mockNamespaceCache *cache.MockNamespaceCache mockHistoryClient *historyservicemock.MockHistoryServiceClient mockClusterMetadata *cluster.MockMetadata + mockMatchingClient *matchingservicemock.MockMatchingServiceClient mockProducer *mocks.KafkaProducer mockMessagingClient messaging.Client @@ -132,6 +134,7 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockVisibilityMgr = s.mockResource.VisibilityMgr s.mockArchivalMetadata = s.mockResource.ArchivalMetadata s.mockArchiverProvider = s.mockResource.ArchiverProvider + s.mockMatchingClient = s.mockResource.MatchingClient s.mockProducer = &mocks.KafkaProducer{} s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) @@ -1396,24 +1399,37 @@ func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMismatch() { } func (s *workflowHandlerSuite) TestTokenNamespaceEnforcementEnabledMatch() { - s.executeTokenTestCases(s.testNamespace, true, true, true) + s.executeTokenTestCases(s.testNamespace, true, false, false) } -func (s *workflowHandlerSuite) executeTokenTestCases(tokenNamesoace string, enforceNamespaceMatch bool, +func (s *workflowHandlerSuite) executeTokenTestCases(tokenNamespace string, enforceNamespaceMatch bool, isErrorExpected bool, isNilExpected bool) { - wh := s.setupTokenNamespaceTest(tokenNamesoace, enforceNamespaceMatch) + ctx := context.Background() + wh := s.setupTokenNamespaceTest(tokenNamespace, enforceNamespaceMatch) req1 := s.newRespondActivityTaskCompletedRequest(uuid.New()) - resp1, err := wh.RespondActivityTaskCompleted(context.Background(), req1) + resp1, err := wh.RespondActivityTaskCompleted(ctx, req1) s.checkResponse(err, resp1, isErrorExpected, isNilExpected) req2 := s.newRespondActivityTaskFailedRequest(uuid.New()) - resp2, err := wh.RespondActivityTaskFailed(context.Background(), req2) + resp2, err := wh.RespondActivityTaskFailed(ctx, req2) s.checkResponse(err, resp2, isErrorExpected, isNilExpected) req3 := s.newRespondActivityTaskCanceledRequest(uuid.New()) - resp3, err := wh.RespondActivityTaskCanceled(context.Background(), req3) + resp3, err := wh.RespondActivityTaskCanceled(ctx, req3) s.checkResponse(err, resp3, isErrorExpected, isNilExpected) + + req4 := s.newRespondWorkflowTaskCompletedRequest(uuid.New()) + resp4, err := wh.RespondWorkflowTaskCompleted(ctx, req4) + s.checkResponse(err, resp4, isErrorExpected, isNilExpected) + + req5 := s.newRespondWorkflowTaskFailedRequest(uuid.New()) + resp5, err := wh.RespondWorkflowTaskFailed(ctx, req5) + s.checkResponse(err, resp5, isErrorExpected, isNilExpected) + + req6 := s.newRespondQueryTaskCompletedRequest(uuid.New()) + resp6, err := wh.RespondQueryTaskCompleted(ctx, req6) + s.checkResponse(err, resp6, isErrorExpected, isNilExpected) } func (s *workflowHandlerSuite) checkResponse(err error, response interface{}, @@ -1455,6 +1471,27 @@ func (s *workflowHandlerSuite) newRespondActivityTaskCanceledRequest(tokenNamesp } } +func (s *workflowHandlerSuite) newRespondWorkflowTaskCompletedRequest(tokenNamespaceId string) *workflowservice.RespondWorkflowTaskCompletedRequest { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Namespace: s.testNamespace, + TaskToken: s.newSerializedToken(tokenNamespaceId), + } +} + +func (s *workflowHandlerSuite) newRespondWorkflowTaskFailedRequest(tokenNamespaceId string) *workflowservice.RespondWorkflowTaskFailedRequest { + return &workflowservice.RespondWorkflowTaskFailedRequest{ + Namespace: s.testNamespace, + TaskToken: s.newSerializedToken(tokenNamespaceId), + } +} + +func (s *workflowHandlerSuite) newRespondQueryTaskCompletedRequest(tokenNamespaceId string) *workflowservice.RespondQueryTaskCompletedRequest { + return &workflowservice.RespondQueryTaskCompletedRequest{ + Namespace: s.testNamespace, + TaskToken: s.newSerializedQueryTaskToken(tokenNamespaceId), + } +} + func (s *workflowHandlerSuite) newSerializedToken(namespaceId string) []byte { token, _ := s.tokenSerializer.Serialize(&tokenspb.Task{ NamespaceId: namespaceId, @@ -1462,6 +1499,15 @@ func (s *workflowHandlerSuite) newSerializedToken(namespaceId string) []byte { return token } +func (s *workflowHandlerSuite) newSerializedQueryTaskToken(namespaceId string) []byte { + token, _ := s.tokenSerializer.SerializeQueryTaskToken(&tokenspb.QueryTask{ + NamespaceId: namespaceId, + TaskQueue: "some-task-queue", + TaskId: "some-task-id", + }) + return token +} + func newNamespaceCacheEntry(namespaceName string) *cache.NamespaceCacheEntry { info := &persistencespb.NamespaceInfo{ Name: namespaceName, @@ -1471,9 +1517,13 @@ func newNamespaceCacheEntry(namespaceName string) *cache.NamespaceCacheEntry { func (s *workflowHandlerSuite) setupTokenNamespaceTest(tokenNamespace string, enforce bool) *WorkflowHandler { s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(newNamespaceCacheEntry(tokenNamespace), nil).AnyTimes() - s.mockHistoryClient.EXPECT().RespondActivityTaskCompleted(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() - s.mockHistoryClient.EXPECT().RespondActivityTaskFailed(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() - s.mockHistoryClient.EXPECT().RespondActivityTaskCanceled(context.Background(), gomock.Any()).Return(nil, nil).AnyTimes() + ctx := context.Background() + s.mockHistoryClient.EXPECT().RespondActivityTaskCompleted(ctx, gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondActivityTaskFailed(ctx, gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondActivityTaskCanceled(ctx, gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondWorkflowTaskCompleted(ctx, gomock.Any()).Return(nil, nil).AnyTimes() + s.mockHistoryClient.EXPECT().RespondWorkflowTaskFailed(ctx, gomock.Any()).Return(nil, nil).AnyTimes() + s.mockMatchingClient.EXPECT().RespondQueryTaskCompleted(ctx, gomock.Any()).Return(nil, nil).AnyTimes() cfg := s.newConfig() cfg.EnableTokenNamespaceEnforcement = dynamicconfig.GetBoolPropertyFn(enforce) return s.getWorkflowHandler(cfg) From 9861ffe98acfd9e49dd8af1e76a04e67a333319f Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Tue, 15 Dec 2020 08:58:24 -0800 Subject: [PATCH 8/9] Address review feedback --- service/frontend/workflowHandler.go | 35 +++++++++++++++++------------ 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index fd33279968f..783a0fe0596 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -27,7 +27,6 @@ package frontend import ( "context" "fmt" - "strings" "sync/atomic" "time" @@ -884,9 +883,11 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w // event in the history for that session. Use the 'taskToken' provided as response of PollWorkflowTaskQueue API call // for completing the WorkflowTask. // The response could contain a new workflow task if there is one or if the request asking for one. -func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, - request *workflowservice.RespondWorkflowTaskCompletedRequest) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, - retError error) { +func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( + ctx context.Context, + request *workflowservice.RespondWorkflowTaskCompletedRequest, +) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, retError error) { + defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskCompletedScope) @@ -976,9 +977,11 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, // WorkflowTaskFailedEvent written to the history and a new WorkflowTask created. This API can be used by client to // either clear sticky taskqueue or report any panics during WorkflowTask processing. Temporal will only append first // WorkflowTaskFailed event to the history of workflow execution for consecutive failures. -func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, - request *workflowservice.RespondWorkflowTaskFailedRequest) (_ *workflowservice.RespondWorkflowTaskFailedResponse, - retError error) { +func (wh *WorkflowHandler) RespondWorkflowTaskFailed( + ctx context.Context, + request *workflowservice.RespondWorkflowTaskFailedRequest, +) (_ *workflowservice.RespondWorkflowTaskFailedResponse, retError error) { + defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskFailedScope) @@ -1578,9 +1581,11 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, // created for the workflow instance so new commands could be made. Use the 'taskToken' provided as response of // PollActivityTaskQueue API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid // anymore due to activity timeout. -func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, - request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, - retError error) { +func (wh *WorkflowHandler) RespondActivityTaskFailed( + ctx context.Context, + request *workflowservice.RespondActivityTaskFailedRequest, +) (_ *workflowservice.RespondActivityTaskFailedResponse, retError error) { + defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskFailedScope) @@ -2853,9 +2858,11 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows // RespondQueryTaskCompleted is called by application worker to complete a QueryTask (which is a WorkflowTask for query) // as a result of 'PollWorkflowTaskQueue' API call. Completing a QueryTask will unblock the client call to 'QueryWorkflow' // API and return the query result to client as a response to 'QueryWorkflow' API call. -func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, - request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, - retError error) { +func (wh *WorkflowHandler) RespondQueryTaskCompleted( + ctx context.Context, + request *workflowservice.RespondQueryTaskCompletedRequest, +) (_ *workflowservice.RespondQueryTaskCompletedResponse, retError error) { + defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondQueryTaskCompletedScope) @@ -3833,7 +3840,7 @@ func (wh *WorkflowHandler) checkNamespaceMatch(requestNamespace string, tokenNam if !wh.config.EnableTokenNamespaceEnforcement() { return nil } - if !strings.EqualFold(requestNamespace, tokenNamespace) { + if requestNamespace != tokenNamespace { return wh.error(errTokenNamespaceMismatch, scope) } return nil From b950a5fb368c54479d6705b2c89b0d9d988117a8 Mon Sep 17 00:00:00 2001 From: Sergey Bykov Date: Tue, 15 Dec 2020 10:29:14 -0800 Subject: [PATCH 9/9] Address review feedback --- service/frontend/workflowHandler.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 783a0fe0596..53c941c73e5 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -1369,9 +1369,11 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, // created for the workflow so new commands could be made. Use the 'taskToken' provided as response of // PollActivityTaskQueue API call for completion. It fails with 'NotFoundFailure' if the taskToken is not valid // anymore due to activity timeout. -func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, - request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, - retError error) { +func (wh *WorkflowHandler) RespondActivityTaskCompleted( + ctx context.Context, + request *workflowservice.RespondActivityTaskCompletedRequest, +) (_ *workflowservice.RespondActivityTaskCompletedResponse, retError error) { + defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskCompletedScope)