Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing logic to take both NamespaceID and workflowID into account #629

Merged
merged 10 commits into from
Jul 31, 2020
302 changes: 179 additions & 123 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

560 changes: 308 additions & 252 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

68 changes: 35 additions & 33 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *clientImpl) StartWorkflowExecution(
ctx context.Context,
request *historyservice.StartWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.StartWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.StartRequest.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.StartRequest.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -98,7 +98,7 @@ func (c *clientImpl) GetMutableState(
ctx context.Context,
request *historyservice.GetMutableStateRequest,
opts ...grpc.CallOption) (*historyservice.GetMutableStateResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.Execution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -122,7 +122,7 @@ func (c *clientImpl) PollMutableState(
ctx context.Context,
request *historyservice.PollMutableStateRequest,
opts ...grpc.CallOption) (*historyservice.PollMutableStateResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.Execution.WorkflowId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *clientImpl) DescribeHistoryHost(
if request.GetShardIdForHost() != 0 {
client, err = c.getClientForShardID(int(request.GetShardIdForHost()))
} else if request.ExecutionForHost != nil {
client, err = c.getClientForWorkflowID(request.ExecutionForHost.GetWorkflowId())
client, err = c.getClientForWorkflowID(request.GetNamespaceId(), request.ExecutionForHost.GetWorkflowId())
} else {
ret, err := c.clients.GetClientForClientKey(request.GetHostAddress())
if err != nil {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *clientImpl) DescribeMutableState(
ctx context.Context,
request *historyservice.DescribeMutableStateRequest,
opts ...grpc.CallOption) (*historyservice.DescribeMutableStateResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.Execution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -265,7 +265,7 @@ func (c *clientImpl) ResetStickyTaskQueue(
ctx context.Context,
request *historyservice.ResetStickyTaskQueueRequest,
opts ...grpc.CallOption) (*historyservice.ResetStickyTaskQueueResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.Execution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -289,7 +289,7 @@ func (c *clientImpl) DescribeWorkflowExecution(
ctx context.Context,
request *historyservice.DescribeWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.DescribeWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.Request.Execution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.Request.Execution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -313,7 +313,7 @@ func (c *clientImpl) RecordWorkflowTaskStarted(
ctx context.Context,
request *historyservice.RecordWorkflowTaskStartedRequest,
opts ...grpc.CallOption) (*historyservice.RecordWorkflowTaskStartedResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -337,7 +337,7 @@ func (c *clientImpl) RecordActivityTaskStarted(
ctx context.Context,
request *historyservice.RecordActivityTaskStartedRequest,
opts ...grpc.CallOption) (*historyservice.RecordActivityTaskStartedResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func (c *clientImpl) RespondWorkflowTaskCompleted(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -389,7 +389,7 @@ func (c *clientImpl) RespondWorkflowTaskFailed(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -418,7 +418,7 @@ func (c *clientImpl) RespondActivityTaskCompleted(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func (c *clientImpl) RespondActivityTaskFailed(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (c *clientImpl) RespondActivityTaskCanceled(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func (c *clientImpl) RecordActivityTaskHeartbeat(
if err != nil {
return nil, err
}
client, err := c.getClientForWorkflowID(taskToken.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, taskToken.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -529,7 +529,7 @@ func (c *clientImpl) RequestCancelWorkflowExecution(
ctx context.Context,
request *historyservice.RequestCancelWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.RequestCancelWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.CancelRequest.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.CancelRequest.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -553,7 +553,7 @@ func (c *clientImpl) SignalWorkflowExecution(
ctx context.Context,
request *historyservice.SignalWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.SignalWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.SignalRequest.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.SignalRequest.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -578,7 +578,7 @@ func (c *clientImpl) SignalWithStartWorkflowExecution(
ctx context.Context,
request *historyservice.SignalWithStartWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.SignalWithStartWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.SignalWithStartRequest.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.SignalWithStartRequest.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -602,7 +602,7 @@ func (c *clientImpl) RemoveSignalMutableState(
ctx context.Context,
request *historyservice.RemoveSignalMutableStateRequest,
opts ...grpc.CallOption) (*historyservice.RemoveSignalMutableStateResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -625,7 +625,7 @@ func (c *clientImpl) TerminateWorkflowExecution(
ctx context.Context,
request *historyservice.TerminateWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.TerminateWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.TerminateRequest.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.TerminateRequest.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -650,7 +650,7 @@ func (c *clientImpl) ResetWorkflowExecution(
ctx context.Context,
request *historyservice.ResetWorkflowExecutionRequest,
opts ...grpc.CallOption) (*historyservice.ResetWorkflowExecutionResponse, error) {
client, err := c.getClientForWorkflowID(request.ResetRequest.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.ResetRequest.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -673,7 +673,7 @@ func (c *clientImpl) ScheduleWorkflowTask(
ctx context.Context,
request *historyservice.ScheduleWorkflowTaskRequest,
opts ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -698,7 +698,7 @@ func (c *clientImpl) RecordChildExecutionCompleted(
ctx context.Context,
request *historyservice.RecordChildExecutionCompletedRequest,
opts ...grpc.CallOption) (*historyservice.RecordChildExecutionCompletedResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowId)
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand All @@ -723,7 +723,7 @@ func (c *clientImpl) ReplicateEvents(
ctx context.Context,
request *historyservice.ReplicateEventsRequest,
opts ...grpc.CallOption) (*historyservice.ReplicateEventsResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -748,7 +748,7 @@ func (c *clientImpl) ReplicateRawEvents(
ctx context.Context,
request *historyservice.ReplicateRawEventsRequest,
opts ...grpc.CallOption) (*historyservice.ReplicateRawEventsResponse, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -773,7 +773,7 @@ func (c *clientImpl) ReplicateEventsV2(
ctx context.Context,
request *historyservice.ReplicateEventsV2Request,
opts ...grpc.CallOption) (*historyservice.ReplicateEventsV2Response, error) {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.WorkflowExecution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -826,7 +826,7 @@ func (c *clientImpl) SyncActivity(
request *historyservice.SyncActivityRequest,
opts ...grpc.CallOption) (*historyservice.SyncActivityResponse, error) {

client, err := c.getClientForWorkflowID(request.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -852,7 +852,7 @@ func (c *clientImpl) QueryWorkflow(
request *historyservice.QueryWorkflowRequest,
opts ...grpc.CallOption,
) (*historyservice.QueryWorkflowResponse, error) {
client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.GetRequest().GetExecution().GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -932,8 +932,9 @@ func (c *clientImpl) GetDLQReplicationMessages(
opts ...grpc.CallOption,
) (*historyservice.GetDLQReplicationMessagesResponse, error) {
// All workflow IDs are in the same shard per request
namespaceID := request.GetTaskInfos()[0].GetNamespaceId()
workflowID := request.GetTaskInfos()[0].GetWorkflowId()
client, err := c.getClientForWorkflowID(workflowID)
client, err := c.getClientForWorkflowID(namespaceID, workflowID)
if err != nil {
return nil, err
}
Expand All @@ -950,7 +951,8 @@ func (c *clientImpl) ReapplyEvents(
request *historyservice.ReapplyEventsRequest,
opts ...grpc.CallOption,
) (*historyservice.ReapplyEventsResponse, error) {
client, err := c.getClientForWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId,
request.GetRequest().GetWorkflowExecution().GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1015,7 +1017,7 @@ func (c *clientImpl) RefreshWorkflowTasks(
request *historyservice.RefreshWorkflowTasksRequest,
opts ...grpc.CallOption,
) (*historyservice.RefreshWorkflowTasksResponse, error) {
client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowId())
client, err := c.getClientForWorkflowID(request.NamespaceId, request.GetRequest().GetExecution().GetWorkflowId())
var response *historyservice.RefreshWorkflowTasksResponse
op := func(ctx context.Context, client historyservice.HistoryServiceClient) error {
var err error
Expand All @@ -1035,8 +1037,8 @@ func (c *clientImpl) createContext(parent context.Context) (context.Context, con
return context.WithTimeout(parent, c.timeout)
}

func (c *clientImpl) getClientForWorkflowID(workflowID string) (historyservice.HistoryServiceClient, error) {
key := common.WorkflowIDToHistoryShard(workflowID, c.numberOfShards)
func (c *clientImpl) getClientForWorkflowID(namespaceID, workflowID string) (historyservice.HistoryServiceClient, error) {
key := common.WorkflowIDToHistoryShard(namespaceID, workflowID, c.numberOfShards)
return c.getClientForShardID(key)
}

Expand Down
7 changes: 4 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,10 @@ func IsWhitelistServiceTransientError(err error) bool {
return false
}

// WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
hash := farm.Fingerprint32([]byte(workflowID))
// WorkflowIDToHistoryShard is used to map namespaceID-workflowID pair to a shardID
func WorkflowIDToHistoryShard(namespaceID, workflowID string, numberOfShards int) int {
idBytes := []byte(namespaceID + "_" + workflowID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does separator provides? We are hashing it right away.

hash := farm.Hash32(idBytes)
return int(hash%uint32(numberOfShards)) + 1 // ShardID starts with 1
}

Expand Down
4 changes: 3 additions & 1 deletion host/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func (s *integrationSuite) isHistoryArchived(namespace string, execution *common
}

func (s *integrationSuite) isHistoryDeleted(execution *commonpb.WorkflowExecution) bool {
shardID := common.WorkflowIDToHistoryShard(execution.GetWorkflowId(), s.testClusterConfig.HistoryConfig.NumHistoryShards)
namespaceID := s.getNamespaceID(s.archivalNamespace)
shardID := common.WorkflowIDToHistoryShard(namespaceID, execution.GetWorkflowId(),
s.testClusterConfig.HistoryConfig.NumHistoryShards)
request := &persistence.GetHistoryTreeRequest{
TreeID: execution.GetRunId(),
ShardID: convert.IntPtr(shardID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ message DescribeHistoryHostRequest {
//ip:port
string host_address = 1;
int32 shard_id_for_host = 2;
temporal.api.common.v1.WorkflowExecution execution_for_host = 3;
string namespace = 3;
temporal.api.common.v1.WorkflowExecution execution_for_host = 4;
}

message DescribeHistoryHostResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ message DescribeHistoryHostRequest {
//ip:port
string host_address = 1;
int32 shard_id_for_host = 2;
temporal.api.common.v1.WorkflowExecution execution_for_host = 3;
string namespace_id = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it is namespace above and namespace_id here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They represent 2 different identifiers for namespace. namespace is used whenever API takes namespace name and namespace_id is used whenever API takes internal uuid which represents the namespace. Internally within the system we use namespace_id as the identifier.

temporal.api.common.v1.WorkflowExecution execution_for_host = 4;
}

message DescribeHistoryHostResponse {
Expand Down
18 changes: 14 additions & 4 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ func (adh *AdminHandler) DescribeWorkflowExecution(ctx context.Context, request
return nil, adh.error(err, scope)
}

shardID := common.WorkflowIDToHistoryShard(request.Execution.WorkflowId, adh.numberOfHistoryShards)
namespaceID, err := adh.GetNamespaceCache().GetNamespaceID(request.GetNamespace())

shardID := common.WorkflowIDToHistoryShard(namespaceID, request.Execution.WorkflowId, adh.numberOfHistoryShards)
shardIDstr := string(shardID)
shardIDForOutput := strconv.Itoa(shardID)

Expand All @@ -212,8 +214,6 @@ func (adh *AdminHandler) DescribeWorkflowExecution(ctx context.Context, request
return nil, adh.error(err, scope)
}

namespaceID, err := adh.GetNamespaceCache().GetNamespaceID(request.GetNamespace())

historyAddr := historyHost.GetAddress()
resp2, err := adh.GetHistoryClient().DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: namespaceID,
Expand Down Expand Up @@ -275,7 +275,14 @@ func (adh *AdminHandler) DescribeHistoryHost(ctx context.Context, request *admin
return nil, adh.error(errRequestNotSet, scope)
}

var err error
namespaceID := ""
if request.ExecutionForHost != nil {
namespaceID, err = adh.GetNamespaceCache().GetNamespaceID(request.Namespace)
if err != nil {
return nil, adh.error(err, scope)
}

if err := validateExecution(request.ExecutionForHost); err != nil {
return nil, adh.error(err, scope)
}
Expand All @@ -284,6 +291,7 @@ func (adh *AdminHandler) DescribeHistoryHost(ctx context.Context, request *admin
resp, err := adh.GetHistoryClient().DescribeHistoryHost(ctx, &historyservice.DescribeHistoryHostRequest{
HostAddress: request.GetHostAddress(),
ShardIdForHost: request.GetShardIdForHost(),
NamespaceId: namespaceID,
ExecutionForHost: request.GetExecutionForHost(),
})

Expand Down Expand Up @@ -398,7 +406,8 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistory(ctx context.Context, req

// TODO need to deal with transient workflow task if to be used by client getting history
var historyBatches []*historypb.History
shardID := common.WorkflowIDToHistoryShard(execution.GetWorkflowId(), adh.numberOfHistoryShards)
shardID := common.WorkflowIDToHistoryShard(namespaceID, execution.GetWorkflowId(),
adh.numberOfHistoryShards)
_, historyBatches, continuationToken.PersistenceToken, size, err = history.PaginateHistory(
adh.GetHistoryManager(),
true, // this means that we are getting history by batch
Expand Down Expand Up @@ -532,6 +541,7 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, r
}
pageSize := int(request.GetMaximumPageSize())
shardID := common.WorkflowIDToHistoryShard(
namespaceID,
execution.GetWorkflowId(),
adh.numberOfHistoryShards,
)
Expand Down
Loading