-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add enforcement of namespace match for async completion tokens #1086
Changes from 7 commits
1e15e93
2afe06a
d1ce9f8
6ef9abe
8e6ee5a
63f0cfb
fb3f899
9861ffe
b950a5f
4f17621
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ package frontend | |
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
|
@@ -883,7 +884,9 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w | |
// event in the history for that session. Use the 'taskToken' provided as response of PollWorkflowTaskQueue API call | ||
// for completing the WorkflowTask. | ||
// The response could contain a new workflow task if there is one or if the request asking for one. | ||
func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, request *workflowservice.RespondWorkflowTaskCompletedRequest) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, retError error) { | ||
func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, | ||
request *workflowservice.RespondWorkflowTaskCompletedRequest) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, | ||
retError error) { | ||
defer log.CapturePanic(wh.GetLogger(), &retError) | ||
|
||
scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskCompletedScope) | ||
|
@@ -916,15 +919,21 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, req | |
return nil, wh.error(err, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondWorkflowTaskCompletedScope, namespaceEntry.GetInfo().Name, | ||
metrics.FrontendRespondWorkflowTaskCompletedScope, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
histResp, err := wh.GetHistoryClient().RespondWorkflowTaskCompleted(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ | ||
NamespaceId: namespaceId, | ||
CompleteRequest: request}, | ||
|
@@ -967,7 +976,9 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(ctx context.Context, req | |
// WorkflowTaskFailedEvent written to the history and a new WorkflowTask created. This API can be used by client to | ||
// either clear sticky taskqueue or report any panics during WorkflowTask processing. Temporal will only append first | ||
// WorkflowTaskFailed event to the history of workflow execution for consecutive failures. | ||
func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, request *workflowservice.RespondWorkflowTaskFailedRequest) (_ *workflowservice.RespondWorkflowTaskFailedResponse, retError error) { | ||
func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, | ||
request *workflowservice.RespondWorkflowTaskFailedRequest) (_ *workflowservice.RespondWorkflowTaskFailedResponse, | ||
retError error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same |
||
defer log.CapturePanic(wh.GetLogger(), &retError) | ||
|
||
scope := wh.getDefaultScope(metrics.FrontendRespondWorkflowTaskFailedScope) | ||
|
@@ -999,15 +1010,21 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed(ctx context.Context, reques | |
return nil, wh.error(err, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondWorkflowTaskFailedScope, namespaceEntry.GetInfo().Name, | ||
metrics.FrontendRespondWorkflowTaskFailedScope, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { | ||
return nil, wh.error(errIdentityTooLong, scope) | ||
} | ||
|
@@ -1349,7 +1366,9 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, | |
// created for the workflow so new commands could be made. Use the 'taskToken' provided as response of | ||
// PollActivityTaskQueue API call for completion. It fails with 'NotFoundFailure' if the taskToken is not valid | ||
// anymore due to activity timeout. | ||
func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, retError error) { | ||
func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, | ||
request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, | ||
retError error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same, |
||
defer log.CapturePanic(wh.GetLogger(), &retError) | ||
|
||
scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskCompletedScope) | ||
|
@@ -1384,16 +1403,21 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(ctx context.Context, req | |
return nil, wh.error(errIdentityTooLong, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondActivityTaskCompletedScope, | ||
namespaceEntry.GetInfo().Name, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.GetInfo().Name) | ||
sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.GetInfo().Name) | ||
|
||
|
@@ -1554,7 +1578,9 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, | |
// created for the workflow instance so new commands could be made. Use the 'taskToken' provided as response of | ||
// PollActivityTaskQueue API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid | ||
// anymore due to activity timeout. | ||
func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, retError error) { | ||
func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, | ||
request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, | ||
retError error) { | ||
defer log.CapturePanic(wh.GetLogger(), &retError) | ||
|
||
scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskFailedScope) | ||
|
@@ -1590,16 +1616,21 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(ctx context.Context, reques | |
return nil, wh.error(errFailureMustHaveApplicationFailureInfo, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondActivityTaskFailedScope, | ||
namespaceEntry.GetInfo().Name, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { | ||
return nil, wh.error(errIdentityTooLong, scope) | ||
} | ||
|
@@ -1775,16 +1806,21 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ | |
return nil, wh.error(err, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondActivityTaskCanceledScope, | ||
namespaceEntry.GetInfo().Name, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { | ||
return nil, wh.error(errIdentityTooLong, scope) | ||
} | ||
|
@@ -2817,7 +2853,9 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows | |
// RespondQueryTaskCompleted is called by application worker to complete a QueryTask (which is a WorkflowTask for query) | ||
// as a result of 'PollWorkflowTaskQueue' API call. Completing a QueryTask will unblock the client call to 'QueryWorkflow' | ||
// API and return the query result to client as a response to 'QueryWorkflow' API call. | ||
func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, retError error) { | ||
func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, | ||
request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, | ||
retError error) { | ||
defer log.CapturePanic(wh.GetLogger(), &retError) | ||
|
||
scope := wh.getDefaultScope(metrics.FrontendRespondQueryTaskCompletedScope) | ||
|
@@ -2848,16 +2886,21 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(ctx context.Context, reques | |
return nil, wh.error(err, scope) | ||
} | ||
|
||
namespaceName := namespaceEntry.GetInfo().Name | ||
scope, sw := wh.startRequestProfileWithNamespace( | ||
metrics.FrontendRespondQueryTaskCompletedScope, | ||
namespaceEntry.GetInfo().Name, | ||
namespaceName, | ||
) | ||
defer sw.Stop() | ||
|
||
if wh.isStopped() { | ||
return nil, errShuttingDown | ||
} | ||
|
||
if err := wh.checkNamespaceMatch(request.Namespace, namespaceName, scope); err != nil { | ||
return nil, err | ||
} | ||
|
||
sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.GetInfo().Name) | ||
sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.GetInfo().Name) | ||
|
||
|
@@ -3785,3 +3828,13 @@ func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts( | |
|
||
return nil | ||
} | ||
|
||
func (wh *WorkflowHandler) checkNamespaceMatch(requestNamespace string, tokenNamespace string, scope metrics.Scope) error { | ||
if !wh.config.EnableTokenNamespaceEnforcement() { | ||
return nil | ||
} | ||
if !strings.EqualFold(requestNamespace, tokenNamespace) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Namespaces are case sensitive. At least I was able to create two namespaces which are different in case only. It means someone else might already have such namespaces created and this check will be backward compatible for them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. namespace is case sensitive |
||
return wh.error(errTokenNamespaceMismatch, scope) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to start this, but yes, let's make some standart here. Seems like nor
gofmt
neitherGo Land
are not care much about it. So there are two cases: