Skip to content

Commit

Permalink
Refactor TaskPoller.PollAndProcessWorkflowTask (#4903)
Browse files Browse the repository at this point in the history
**What changed?**
- Refactored PollAndProcessWorkflowTask and all variations to use an
options pattern.
- Added WithNoDumpCommands to disable printing commands.

**Why?**
- The stack of variations was getting annoying.
- And I wanted to add WithNoDumpCommands to make
TestTransientWorkflowTaskHistorySize less flaky.

**How did you test it?**
is tests
  • Loading branch information
dnr authored and rodrigozhou committed Oct 6, 2023
1 parent 4a70d62 commit 759b608
Show file tree
Hide file tree
Showing 27 changed files with 449 additions and 460 deletions.
26 changes: 13 additions & 13 deletions tests/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.True(err == nil || err == errNoTasks)

err = poller.PollAndProcessActivityTask(false)
Expand All @@ -164,7 +164,7 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunId))

s.False(workflowComplete)
_, err = poller.PollAndProcessWorkflowTask(true, false)
_, err = poller.PollAndProcessWorkflowTask(WithDumpHistory)
s.NoError(err)
s.True(workflowComplete)
s.Equal(1, activityExecutedCount)
Expand Down Expand Up @@ -338,7 +338,7 @@ func (s *integrationSuite) TestActivityRetry() {
})
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.NoError(err)

err = poller.PollAndProcessActivityTask(false)
Expand Down Expand Up @@ -376,7 +376,7 @@ func (s *integrationSuite) TestActivityRetry() {
s.False(workflowComplete)

s.Logger.Info("Processing workflow task:", tag.Counter(i))
_, err := poller.PollAndProcessWorkflowTaskWithoutRetry(false, false)
_, err := poller.PollAndProcessWorkflowTask(WithRetries(1))
if err != nil {
s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
Expand Down Expand Up @@ -486,15 +486,15 @@ func (s *integrationSuite) TestActivityRetry_Infinite() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.NoError(err)

for i := 0; i <= activityExecutedLimit; i++ {
err = poller.PollAndProcessActivityTask(false)
s.NoError(err)
}

_, err = poller.PollAndProcessWorkflowTaskWithoutRetry(false, false)
_, err = poller.PollAndProcessWorkflowTask(WithRetries(1))
s.NoError(err)
s.True(workflowComplete)
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Timeout() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.True(err == nil || err == errNoTasks)

err = poller.PollAndProcessActivityTask(false)
Expand All @@ -598,7 +598,7 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Timeout() {
s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunId))

s.False(workflowComplete)
_, err = poller.PollAndProcessWorkflowTask(true, false)
_, err = poller.PollAndProcessWorkflowTask(WithDumpHistory)
s.NoError(err)
s.True(workflowComplete)
}
Expand Down Expand Up @@ -712,7 +712,7 @@ func (s *integrationSuite) TestTryActivityCancellationFromWorkflow() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.True(err == nil || err == errNoTasks, err)

cancelCh := make(chan struct{})
Expand All @@ -733,7 +733,7 @@ func (s *integrationSuite) TestTryActivityCancellationFromWorkflow() {

scheduleActivity = false
requestCancellation = true
_, err2 := poller.PollAndProcessWorkflowTask(false, false)
_, err2 := poller.PollAndProcessWorkflowTask()
s.NoError(err2)
close(cancelCh)
}()
Expand Down Expand Up @@ -841,7 +841,7 @@ func (s *integrationSuite) TestActivityCancellationNotStarted() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.True(err == nil || err == errNoTasks)

// Send signal so that worker can send an activity cancel
Expand All @@ -862,12 +862,12 @@ func (s *integrationSuite) TestActivityCancellationNotStarted() {
// Process signal in workflow and send request cancellation
scheduleActivity = false
requestCancellation = true
_, err = poller.PollAndProcessWorkflowTask(true, false)
_, err = poller.PollAndProcessWorkflowTask(WithDumpHistory)
s.NoError(err)

scheduleActivity = false
requestCancellation = false
_, err = poller.PollAndProcessWorkflowTask(false, false)
_, err = poller.PollAndProcessWorkflowTask()
s.True(err == nil || err == errNoTasks)
}

Expand Down
130 changes: 57 additions & 73 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,14 @@ func (s *advancedVisibilitySuite) TestListWorkflow_SearchAttribute() {
Logger: s.Logger,
T: s.T(),
}
_, newTask, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err := poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask := res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)

Expand Down Expand Up @@ -1364,16 +1362,14 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecutionSearchAttributes()
}

// process 1st workflow task and assert workflow task is handled correctly.
_, newTask, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err := poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask := res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)
s.Equal(int64(3), newTask.WorkflowTask.GetPreviousStartedEventId())
Expand Down Expand Up @@ -1414,16 +1410,14 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecutionSearchAttributes()
s.True(verified)

// process 2nd workflow task and assert workflow task is handled correctly.
_, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err = poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask = res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)
s.Equal(4, len(newTask.WorkflowTask.History.Events))
Expand All @@ -1438,16 +1432,14 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecutionSearchAttributes()
s.testListResultForUpsertSearchAttributes(listRequest)

// process 3rd workflow task and assert workflow task is handled correctly.
_, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err = poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask = res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)
s.Equal(4, len(newTask.WorkflowTask.History.Events))
Expand Down Expand Up @@ -1531,16 +1523,14 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecutionSearchAttributes()
}

// process close workflow task and assert search attributes is correct after workflow is closed
_, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err = poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask = res.NewTask
s.NotNil(newTask)
s.Nil(newTask.WorkflowTask)

Expand Down Expand Up @@ -1667,16 +1657,14 @@ func (s *advancedVisibilitySuite) TestModifyWorkflowExecutionProperties() {
}

// process 1st workflow task and assert workflow task is handled correctly.
_, newTask, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err := poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask := res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)
s.Equal(int64(3), newTask.WorkflowTask.GetPreviousStartedEventId())
Expand Down Expand Up @@ -1717,16 +1705,14 @@ func (s *advancedVisibilitySuite) TestModifyWorkflowExecutionProperties() {
s.True(verified)

// process 2nd workflow task and assert workflow task is handled correctly.
_, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err = poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask = res.NewTask
s.NotNil(newTask)
s.NotNil(newTask.WorkflowTask)
s.Equal(4, len(newTask.WorkflowTask.History.Events))
Expand Down Expand Up @@ -1765,16 +1751,14 @@ func (s *advancedVisibilitySuite) TestModifyWorkflowExecutionProperties() {
s.True(verified)

// process close workflow task and assert workflow task is handled correctly.
_, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(
false,
false,
true,
true,
0,
1,
true,
nil)
res, err = poller.PollAndProcessWorkflowTask(
WithPollSticky,
WithRespondSticky,
WithExpectedAttemptCount(0),
WithRetries(1),
WithForceNewWorkflowTask)
s.NoError(err)
newTask = res.NewTask
s.NotNil(newTask)
s.Nil(newTask.WorkflowTask)

Expand Down Expand Up @@ -1906,7 +1890,7 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecution_InvalidKey() {
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
if s.isElasticsearchEnabled {
Expand Down
4 changes: 2 additions & 2 deletions tests/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (s *archivalSuite) startAndFinishWorkflow(id, wt, tq, namespace, namespaceI
}
for run := 0; run < numRuns; run++ {
for i := 0; i < numActivities; i++ {
_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err := poller.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
if i%2 == 0 {
Expand All @@ -435,7 +435,7 @@ func (s *archivalSuite) startAndFinishWorkflow(id, wt, tq, namespace, namespaceI
s.NoError(err)
}

_, err = poller.PollAndProcessWorkflowTask(true, false)
_, err = poller.PollAndProcessWorkflowTask(WithDumpHistory)
s.NoError(err)
}

Expand Down
Loading

0 comments on commit 759b608

Please sign in to comment.