diff --git a/service/worker/scheduler/testdata/replay_v1.22.0.json.gz b/service/worker/scheduler/testdata/replay_v1.22.0.json.gz new file mode 100644 index 00000000000..97592fa0f7b Binary files /dev/null and b/service/worker/scheduler/testdata/replay_v1.22.0.json.gz differ diff --git a/service/worker/scheduler/testdata/replay_v1.23-pre.json.gz b/service/worker/scheduler/testdata/replay_v1.23-pre.json.gz new file mode 100644 index 00000000000..066f0ae359b Binary files /dev/null and b/service/worker/scheduler/testdata/replay_v1.23-pre.json.gz differ diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 136740dbded..63bcd9a2a8c 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -65,6 +65,9 @@ const ( BatchAndCacheTimeQueries = 1 // use cache v2, and include ids in jitter NewCacheAndJitter = 2 + // Don't put possibly-overlapping runs (from SCHEDULE_OVERLAP_POLICY_ALLOW_ALL) in + // RunningWorkflows. + DontTrackOverlapping = 3 ) const ( @@ -191,8 +194,8 @@ var ( MaxBufferSize: 1000, AllowZeroSleep: true, ReuseTimer: true, - NextTimeCacheV2Size: 14, // see note below - Version: NewCacheAndJitter, + NextTimeCacheV2Size: 14, // see note below + Version: NewCacheAndJitter, // TODO: switch to DontTrackOverlapping } // Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each @@ -962,7 +965,8 @@ func (s *scheduler) processBuffer() bool { continue } metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1) - s.recordAction(result) + nonOverlapping := start == action.nonOverlappingStart + s.recordAction(result, nonOverlapping) } // Terminate or cancel if required (terminate overrides cancel if both are present) @@ -991,10 +995,11 @@ func (s *scheduler) processBuffer() bool { return tryAgain } -func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) { +func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult, nonOverlapping bool) { s.Info.ActionCount++ s.Info.RecentActions = util.SliceTail(append(s.Info.RecentActions, result), s.tweakables.RecentActionCount) - if result.StartWorkflowResult != nil { + canTrack := nonOverlapping || !s.hasMinVersion(DontTrackOverlapping) + if canTrack && result.StartWorkflowResult != nil { s.Info.RunningWorkflows = append(s.Info.RunningWorkflows, result.StartWorkflowResult) } } @@ -1027,6 +1032,13 @@ func (s *scheduler) startWorkflow( } ctx := workflow.WithLocalActivityOptions(s.ctx, options) + lastCompletionResult, continuedFailure := s.State.LastCompletionResult, s.State.ContinuedFailure + if start.OverlapPolicy == enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL && s.hasMinVersion(DontTrackOverlapping) { + // ALLOW_ALL runs don't participate in lastCompletionResult/continuedFailure at all + lastCompletionResult = nil + continuedFailure = nil + } + req := &schedspb.StartWorkflowRequest{ Request: &workflowservice.StartWorkflowExecutionRequest{ WorkflowId: workflowID, @@ -1043,8 +1055,8 @@ func (s *scheduler) startWorkflow( Memo: newWorkflow.Memo, SearchAttributes: s.addSearchAttributes(newWorkflow.SearchAttributes, nominalTimeSec), Header: newWorkflow.Header, - LastCompletionResult: s.State.LastCompletionResult, - ContinuedFailure: s.State.ContinuedFailure, + LastCompletionResult: lastCompletionResult, + ContinuedFailure: continuedFailure, }, } for { diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 993bb321a25..73d37881b44 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -738,7 +738,6 @@ func (s *workflowSuite) TestOverlapTerminate() { } func (s *workflowSuite) TestOverlapAllowAll() { - // also contains tests for RunningWorkflows and refresh, since it's convenient to do here s.runAcrossContinue( []workflowRun{ { @@ -766,70 +765,7 @@ func (s *workflowSuite) TestOverlapAllowAll() { result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, }, }, - []delayedCallback{ - { - at: time.Date(2022, 6, 1, 0, 6, 0, 0, time.UTC), - f: func() { s.Equal([]string{"myid-2022-06-01T00:05:00Z"}, s.runningWorkflows()) }, - }, - { - at: time.Date(2022, 6, 1, 0, 11, 0, 0, time.UTC), - f: func() { - s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows()) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 15, 30, 0, time.UTC), - f: func() { - s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:15:00Z"}, s.runningWorkflows()) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 16, 30, 0, time.UTC), - f: func() { - // :15 has ended here, but we won't know until we refresh since we don't have a long-poll watcher - s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:15:00Z"}, s.runningWorkflows()) - // poke it to refresh - s.env.SignalWorkflow(SignalNameRefresh, nil) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 16, 31, 0, time.UTC), - f: func() { - // now we'll see it end - s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows()) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 18, 0, 0, time.UTC), - f: func() { - // :05 has ended, but we won't see it yet - s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows()) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 21, 0, 0, time.UTC), - f: func() { - // we'll see :05 ended because :20 started and did an implicit refresh - s.Equal([]string{"myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:20:00Z"}, s.runningWorkflows()) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 23, 0, 0, time.UTC), - f: func() { - // we won't see these ended yet - s.Equal([]string{"myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:20:00Z"}, s.runningWorkflows()) - // poke it to refresh - s.env.SignalWorkflow(SignalNameRefresh, nil) - }, - }, - { - at: time.Date(2022, 6, 1, 0, 23, 1, 0, time.UTC), - f: func() { - // now we will - s.Equal([]string(nil), s.runningWorkflows()) - }, - }, - }, + nil, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ Interval: []*schedpb.IntervalSpec{{ @@ -840,7 +776,7 @@ func (s *workflowSuite) TestOverlapAllowAll() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, }, }, - 7, + 5, ) } @@ -958,6 +894,45 @@ func (s *workflowSuite) TestLastCompletionResultAndContinuedFailure() { s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError())) } +func (s *workflowSuite) TestOnlyStartForAllowAll() { + if currentTweakablePolicies.Version < DontTrackOverlapping { + s.T().Skip("test will run after Version updated") + } + // written using low-level mocks so we can check fields of start workflow requests + + s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { + s.Equal("myid-2022-06-01T00:05:00Z", req.Request.WorkflowId) + s.Nil(req.Request.LastCompletionResult) + s.Nil(req.Request.ContinuedFailure) + return nil, nil + }) + s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { + s.Equal("myid-2022-06-01T00:10:00Z", req.Request.WorkflowId) + s.Nil(req.Request.LastCompletionResult) + s.Nil(req.Request.ContinuedFailure) + return nil, nil + }) + s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { + s.Equal("myid-2022-06-01T00:15:00Z", req.Request.WorkflowId) + s.Nil(req.Request.LastCompletionResult) + s.Nil(req.Request.ContinuedFailure) + return nil, nil + }) + + s.run(&schedpb.Schedule{ + Spec: &schedpb.ScheduleSpec{ + Interval: []*schedpb.IntervalSpec{{ + Interval: timestamp.DurationPtr(5 * time.Minute), + }}, + }, + Policies: &schedpb.SchedulePolicies{ + OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, + }, + }, 4) + s.True(s.env.IsWorkflowCompleted()) + s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError())) +} + func (s *workflowSuite) TestPauseOnFailure() { // written using low-level mocks so we can return failures @@ -1398,7 +1373,7 @@ func (s *workflowSuite) TestLimitedActions() { RemainingActions: 2, }, Policies: &schedpb.SchedulePolicies{ - OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, + OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, }, }, 4) s.True(s.env.IsWorkflowCompleted())