Skip to content

Commit

Permalink
Do not extend activity expiration time (#3489)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Sep 26, 2020
1 parent d066d27 commit 1b00d51
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
19 changes: 13 additions & 6 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func (s *integrationSuite) TestActivityTimeouts() {
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

workflowComplete := false
workflowFailed := false
activitiesScheduled := false
activitiesMap := map[int64]*historypb.HistoryEvent{}
failWorkflow := false
Expand Down Expand Up @@ -718,7 +719,7 @@ func (s *integrationSuite) TestActivityTimeouts() {
ActivityId: "B",
ActivityType: &commonpb.ActivityType{Name: activityName},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: payloads.EncodeString("ScheduleClose"),
Input: payloads.EncodeString("ScheduleToClose"),
ScheduleToCloseTimeout: timestamp.DurationPtr(7 * time.Second), // ActivityID B is expected to timeout using ScheduleClose
ScheduleToStartTimeout: timestamp.DurationPtr(5 * time.Second),
StartToCloseTimeout: timestamp.DurationPtr(10 * time.Second),
Expand All @@ -735,7 +736,11 @@ func (s *integrationSuite) TestActivityTimeouts() {
ScheduleToStartTimeout: timestamp.DurationPtr(1 * time.Second),
StartToCloseTimeout: timestamp.DurationPtr(5 * time.Second), // ActivityID C is expected to timeout using StartToClose
HeartbeatTimeout: timestamp.DurationPtr(0 * time.Second),
},
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: timestamp.DurationPtr(1 * time.Second),
MaximumInterval: timestamp.DurationPtr(1 * time.Second),
BackoffCoefficient: 1,
}},
}}, {
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
Expand Down Expand Up @@ -803,6 +808,7 @@ func (s *integrationSuite) TestActivityTimeouts() {

if failWorkflow {
s.Logger.Error("Failing workflow")
workflowFailed = true
workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,
Expand Down Expand Up @@ -836,11 +842,11 @@ func (s *integrationSuite) TestActivityTimeouts() {
switch timeoutType {
case "ScheduleToStart":
s.Fail("Activity A not expected to be started")
case "ScheduleClose":
s.Logger.Info("Sleeping activityB for 6 seconds")
case "ScheduleToClose":
s.Logger.Info("Sleeping activityB for 7 seconds")
time.Sleep(7 * time.Second)
case "StartToClose":
s.Logger.Info("Sleeping activityC for 6 seconds")
s.Logger.Info("Sleeping activityC for 8 seconds")
time.Sleep(8 * time.Second)
case "Heartbeat":
s.Logger.Info("Starting hearbeat activity")
Expand Down Expand Up @@ -888,12 +894,13 @@ func (s *integrationSuite) TestActivityTimeouts() {
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err, "Poll for workflow task failed")

if workflowComplete {
if workflowComplete || workflowFailed {
break
}
}

s.True(workflowComplete)
s.False(workflowFailed)
}

func (s *integrationSuite) TestActivityHeartbeatTimeouts() {
Expand Down
2 changes: 0 additions & 2 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2144,8 +2144,6 @@ func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(
HasRetryPolicy: attributes.RetryPolicy != nil,
Attempt: 1,
}
retryTime := timestamp.TimeValue(ai.ScheduledTime).Add(timestamp.DurationValue(scheduleToCloseTimeout))
ai.RetryExpirationTime = &retryTime
if ai.HasRetryPolicy {
ai.RetryInitialInterval = attributes.RetryPolicy.GetInitialInterval()
ai.RetryBackoffCoefficient = attributes.RetryPolicy.GetBackoffCoefficient()
Expand Down

0 comments on commit 1b00d51

Please sign in to comment.