Skip to content

Commit

Permalink
Don't track overlapping scheduled workflows (#4911)
Browse files Browse the repository at this point in the history
**What changed?**
For workflows started by a schedule with "allow all" overlap policy,
don't track them in running workflows in schedule info, and don't pass
last completion result or continued failure.

This means that "pause on failure", "last completion result", and
"continued failure" won't be available for those workflows and their
result/failure won't be tracked. Those features have somewhat confusing
semantics in this case anyway since there's not an obvious "last" run
when runs overlap.

Note that the change is not enabled yet, this logic change will be
included in a patch release to allow for downgrades, and enabled in
1.23.0.

**Why?**
The current implementation of schedules requires polling to get the
open/closed status of tracked running workflows. With allow all, the
number of running workflows is not bounded, and if the number grows very
high, it would cause various system instability including overloading
history and the schedule workflow itself being terminated due to event
count.

Ideally, we'd be able to offer bounded concurrent executions, but that
will take more work so this is more of a quick fix.

**How did you test it?**
Updated existing tests

**Potential risks**
Users using that specific combination of features will have to find a
new solution.
  • Loading branch information
dnr authored and rodrigozhou committed Oct 6, 2023
1 parent 1f01c72 commit 4a70d62
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 74 deletions.
Binary file not shown.
Binary file not shown.
26 changes: 19 additions & 7 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ const (
BatchAndCacheTimeQueries = 1
// use cache v2, and include ids in jitter
NewCacheAndJitter = 2
// Don't put possibly-overlapping runs (from SCHEDULE_OVERLAP_POLICY_ALLOW_ALL) in
// RunningWorkflows.
DontTrackOverlapping = 3
)

const (
Expand Down Expand Up @@ -191,8 +194,8 @@ var (
MaxBufferSize: 1000,
AllowZeroSleep: true,
ReuseTimer: true,
NextTimeCacheV2Size: 14, // see note below
Version: NewCacheAndJitter,
NextTimeCacheV2Size: 14, // see note below
Version: NewCacheAndJitter, // TODO: switch to DontTrackOverlapping
}

// Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each
Expand Down Expand Up @@ -962,7 +965,8 @@ func (s *scheduler) processBuffer() bool {
continue
}
metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1)
s.recordAction(result)
nonOverlapping := start == action.nonOverlappingStart
s.recordAction(result, nonOverlapping)
}

// Terminate or cancel if required (terminate overrides cancel if both are present)
Expand Down Expand Up @@ -991,10 +995,11 @@ func (s *scheduler) processBuffer() bool {
return tryAgain
}

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult, nonOverlapping bool) {
s.Info.ActionCount++
s.Info.RecentActions = util.SliceTail(append(s.Info.RecentActions, result), s.tweakables.RecentActionCount)
if result.StartWorkflowResult != nil {
canTrack := nonOverlapping || !s.hasMinVersion(DontTrackOverlapping)
if canTrack && result.StartWorkflowResult != nil {
s.Info.RunningWorkflows = append(s.Info.RunningWorkflows, result.StartWorkflowResult)
}
}
Expand Down Expand Up @@ -1027,6 +1032,13 @@ func (s *scheduler) startWorkflow(
}
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

lastCompletionResult, continuedFailure := s.State.LastCompletionResult, s.State.ContinuedFailure
if start.OverlapPolicy == enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL && s.hasMinVersion(DontTrackOverlapping) {
// ALLOW_ALL runs don't participate in lastCompletionResult/continuedFailure at all
lastCompletionResult = nil
continuedFailure = nil
}

req := &schedspb.StartWorkflowRequest{
Request: &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: workflowID,
Expand All @@ -1043,8 +1055,8 @@ func (s *scheduler) startWorkflow(
Memo: newWorkflow.Memo,
SearchAttributes: s.addSearchAttributes(newWorkflow.SearchAttributes, nominalTimeSec),
Header: newWorkflow.Header,
LastCompletionResult: s.State.LastCompletionResult,
ContinuedFailure: s.State.ContinuedFailure,
LastCompletionResult: lastCompletionResult,
ContinuedFailure: continuedFailure,
},
}
for {
Expand Down
109 changes: 42 additions & 67 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,6 @@ func (s *workflowSuite) TestOverlapTerminate() {
}

func (s *workflowSuite) TestOverlapAllowAll() {
// also contains tests for RunningWorkflows and refresh, since it's convenient to do here
s.runAcrossContinue(
[]workflowRun{
{
Expand Down Expand Up @@ -766,70 +765,7 @@ func (s *workflowSuite) TestOverlapAllowAll() {
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
at: time.Date(2022, 6, 1, 0, 6, 0, 0, time.UTC),
f: func() { s.Equal([]string{"myid-2022-06-01T00:05:00Z"}, s.runningWorkflows()) },
},
{
at: time.Date(2022, 6, 1, 0, 11, 0, 0, time.UTC),
f: func() {
s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows())
},
},
{
at: time.Date(2022, 6, 1, 0, 15, 30, 0, time.UTC),
f: func() {
s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:15:00Z"}, s.runningWorkflows())
},
},
{
at: time.Date(2022, 6, 1, 0, 16, 30, 0, time.UTC),
f: func() {
// :15 has ended here, but we won't know until we refresh since we don't have a long-poll watcher
s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:15:00Z"}, s.runningWorkflows())
// poke it to refresh
s.env.SignalWorkflow(SignalNameRefresh, nil)
},
},
{
at: time.Date(2022, 6, 1, 0, 16, 31, 0, time.UTC),
f: func() {
// now we'll see it end
s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows())
},
},
{
at: time.Date(2022, 6, 1, 0, 18, 0, 0, time.UTC),
f: func() {
// :05 has ended, but we won't see it yet
s.Equal([]string{"myid-2022-06-01T00:05:00Z", "myid-2022-06-01T00:10:00Z"}, s.runningWorkflows())
},
},
{
at: time.Date(2022, 6, 1, 0, 21, 0, 0, time.UTC),
f: func() {
// we'll see :05 ended because :20 started and did an implicit refresh
s.Equal([]string{"myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:20:00Z"}, s.runningWorkflows())
},
},
{
at: time.Date(2022, 6, 1, 0, 23, 0, 0, time.UTC),
f: func() {
// we won't see these ended yet
s.Equal([]string{"myid-2022-06-01T00:10:00Z", "myid-2022-06-01T00:20:00Z"}, s.runningWorkflows())
// poke it to refresh
s.env.SignalWorkflow(SignalNameRefresh, nil)
},
},
{
at: time.Date(2022, 6, 1, 0, 23, 1, 0, time.UTC),
f: func() {
// now we will
s.Equal([]string(nil), s.runningWorkflows())
},
},
},
nil,
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Expand All @@ -840,7 +776,7 @@ func (s *workflowSuite) TestOverlapAllowAll() {
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
},
7,
5,
)
}

Expand Down Expand Up @@ -958,6 +894,45 @@ func (s *workflowSuite) TestLastCompletionResultAndContinuedFailure() {
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestOnlyStartForAllowAll() {
if currentTweakablePolicies.Version < DontTrackOverlapping {
s.T().Skip("test will run after Version updated")
}
// written using low-level mocks so we can check fields of start workflow requests

s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-2022-06-01T00:05:00Z", req.Request.WorkflowId)
s.Nil(req.Request.LastCompletionResult)
s.Nil(req.Request.ContinuedFailure)
return nil, nil
})
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-2022-06-01T00:10:00Z", req.Request.WorkflowId)
s.Nil(req.Request.LastCompletionResult)
s.Nil(req.Request.ContinuedFailure)
return nil, nil
})
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-2022-06-01T00:15:00Z", req.Request.WorkflowId)
s.Nil(req.Request.LastCompletionResult)
s.Nil(req.Request.ContinuedFailure)
return nil, nil
})

s.run(&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(5 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
}, 4)
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestPauseOnFailure() {
// written using low-level mocks so we can return failures

Expand Down Expand Up @@ -1398,7 +1373,7 @@ func (s *workflowSuite) TestLimitedActions() {
RemainingActions: 2,
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP,
},
}, 4)
s.True(s.env.IsWorkflowCompleted())
Expand Down

0 comments on commit 4a70d62

Please sign in to comment.