From 917c8bc737acc71947405b2d667757b9686e65b7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 22 Feb 2024 15:43:26 -0800 Subject: [PATCH] Add test that repros TS crash due to proto changes (#423) --- .../basic_no_workflow_timeout/README.md | 9 ++++ .../basic_no_workflow_timeout/feature.cs | 54 +++++++++++++++++++ .../basic_no_workflow_timeout/feature.go | 40 ++++++++++++++ .../basic_no_workflow_timeout/feature.java | 41 ++++++++++++++ .../basic_no_workflow_timeout/feature.py | 32 +++++++++++ .../basic_no_workflow_timeout/feature.ts | 26 +++++++++ .../successful_start/feature.go | 9 ++-- features/features.go | 2 + features/schedule/cron/feature.go | 8 +-- features/update/self/feature.go | 10 ++-- harness/go/cmd/run.go | 5 ++ harness/go/harness/feature.go | 16 +++--- harness/go/harness/runner.go | 10 ++-- harness/go/harness/util.go | 10 ++-- .../temporal/sdkfeatures/PreparedFeature.java | 1 + 15 files changed, 241 insertions(+), 32 deletions(-) create mode 100644 features/activity/basic_no_workflow_timeout/README.md create mode 100644 features/activity/basic_no_workflow_timeout/feature.cs create mode 100644 features/activity/basic_no_workflow_timeout/feature.go create mode 100644 features/activity/basic_no_workflow_timeout/feature.java create mode 100644 features/activity/basic_no_workflow_timeout/feature.py create mode 100644 features/activity/basic_no_workflow_timeout/feature.ts diff --git a/features/activity/basic_no_workflow_timeout/README.md b/features/activity/basic_no_workflow_timeout/README.md new file mode 100644 index 00000000..89a1e512 --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/README.md @@ -0,0 +1,9 @@ +# Basic activity +The most basic workflow which just runs an activity and returns its result. +Importantly, without setting a workflow execution timeout. + + +# Detailed spec +It's important that the workflow execution timeout is not set here, because server will propagate that to all un-set +activity timeouts. We had a bug where TS would crash (after proto changes from gogo to google) because it was expecting +timeouts to be set to zero rather than null. \ No newline at end of file diff --git a/features/activity/basic_no_workflow_timeout/feature.cs b/features/activity/basic_no_workflow_timeout/feature.cs new file mode 100644 index 00000000..f029c8ff --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.cs @@ -0,0 +1,54 @@ +namespace activity.basic_no_workflow_timeout; + +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Exceptions; +using Temporalio.Features.Harness; +using Temporalio.Worker; +using Temporalio.Workflows; + +class Feature : IFeature +{ + public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) => + options.AddWorkflow().AddAllActivities(new MyActivities(runner.Client)); + + [Workflow] + class MyWorkflow + { + private string? activityResult; + + [WorkflowRun] + public async Task RunAsync() + { + await Workflow.ExecuteActivityAsync( + (MyActivities act) => act.Echo(), + new() + { + ScheduleToCloseTimeout = TimeSpan.FromMinutes(1) + }); + + await Workflow.ExecuteActivityAsync( + (MyActivities act) => act.Echo(), + new() + { + StartToCloseTimeout = TimeSpan.FromMinutes(1) + }); + } + + [WorkflowSignal] + public async Task SetActivityResultAsync(string res) => activityResult = res; + } + + class MyActivities + { + private readonly ITemporalClient client; + + public MyActivities(ITemporalClient client) => this.client = client; + + [Activity] + public async Task Echo() + { + return "hi"; + } + } +} \ No newline at end of file diff --git a/features/activity/basic_no_workflow_timeout/feature.go b/features/activity/basic_no_workflow_timeout/feature.go new file mode 100644 index 00000000..2d45607b --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.go @@ -0,0 +1,40 @@ +package retry_on_error + +import ( + "context" + "time" + + "github.com/temporalio/features/harness/go/harness" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" +) + +var Feature = harness.Feature{ + Workflows: Workflow, + Activities: Echo, + StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) { + o.WorkflowExecutionTimeout = 0 + }, +} + +func Workflow(ctx workflow.Context) (string, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + }) + + var result string + err := workflow.ExecuteActivity(ctx, Echo).Get(ctx, &result) + if err != nil { + return "", err + } + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToCloseTimeout: 1 * time.Minute, + }) + err = workflow.ExecuteActivity(ctx, Echo).Get(ctx, &result) + return result, err +} + +func Echo(_ context.Context) (string, error) { + return "echo", nil +} diff --git a/features/activity/basic_no_workflow_timeout/feature.java b/features/activity/basic_no_workflow_timeout/feature.java new file mode 100644 index 00000000..309041b4 --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.java @@ -0,0 +1,41 @@ +package activity.basic_no_workflow_timeout; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.client.WorkflowOptions; +import io.temporal.sdkfeatures.Feature; +import io.temporal.sdkfeatures.SimpleWorkflow; +import java.time.Duration; + +@ActivityInterface +public interface feature extends Feature, SimpleWorkflow { + @ActivityMethod + String echo(); + + class Impl implements feature { + @Override + public void workflow() { + var activities = + activities( + feature.class, builder -> builder.setStartToCloseTimeout(Duration.ofMinutes(1))); + + activities.echo(); + + var activitiesSched2Close = + activities( + feature.class, builder -> builder.setScheduleToCloseTimeout(Duration.ofMinutes(1))); + + activitiesSched2Close.echo(); + } + + @Override + public String echo() { + return "hi"; + } + } + + @Override + default void workflowOptions(WorkflowOptions.Builder builder) { + builder.setWorkflowExecutionTimeout(Duration.ZERO); + } +} diff --git a/features/activity/basic_no_workflow_timeout/feature.py b/features/activity/basic_no_workflow_timeout/feature.py new file mode 100644 index 00000000..412a9631 --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +from temporalio import activity, workflow + +from harness.python.feature import register_feature + + +@workflow.defn +class Workflow: + @workflow.run + async def run(self) -> str: + await workflow.execute_activity( + echo, + schedule_to_close_timeout=timedelta(minutes=1), + ) + return await workflow.execute_activity( + echo, + start_to_close_timeout=timedelta(minutes=1), + ) + + +@activity.defn +async def echo() -> str: + return "echo" + + +register_feature( + workflows=[Workflow], + activities=[echo], + expect_activity_error="activity attempt 5 failed", + start_options={"execution_timeout": None}, +) diff --git a/features/activity/basic_no_workflow_timeout/feature.ts b/features/activity/basic_no_workflow_timeout/feature.ts new file mode 100644 index 00000000..8631d698 --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.ts @@ -0,0 +1,26 @@ +import { Feature } from '@temporalio/harness'; +import * as wf from '@temporalio/workflow'; + +const activities = wf.proxyActivities({ + startToCloseTimeout: '1 minute', +}); +const activitiesSched2Close = wf.proxyActivities({ + scheduleToCloseTimeout: '1 minute', +}); + +export async function workflow(): Promise { + await activitiesSched2Close.echo('hello'); + return await activities.echo('hello'); +} + +const activitiesImpl = { + async echo(input: string): Promise { + return input; + }, +}; + +export const feature = new Feature({ + workflow, + workflowStartOptions: { workflowExecutionTimeout: undefined }, + activities: activitiesImpl, +}); diff --git a/features/eager_workflow/successful_start/feature.go b/features/eager_workflow/successful_start/feature.go index 7c029b0b..5ad79c65 100644 --- a/features/eager_workflow/successful_start/feature.go +++ b/features/eager_workflow/successful_start/feature.go @@ -18,9 +18,12 @@ const expectedResult = "Hello World" var numEagerlyStarted atomic.Uint64 var Feature = harness.Feature{ - Workflows: Workflow, - StartWorkflowOptions: client.StartWorkflowOptions{EnableEagerStart: true, WorkflowTaskTimeout: 1 * time.Hour}, - CheckResult: CheckResult, + Workflows: Workflow, + StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) { + o.EnableEagerStart = true + o.WorkflowTaskTimeout = 1 * time.Hour + }, + CheckResult: CheckResult, ClientOptions: client.Options{ ConnectionOptions: client.ConnectionOptions{ DialOptions: []grpc.DialOption{grpc.WithUnaryInterceptor(EagerDetector(&numEagerlyStarted))}, diff --git a/features/features.go b/features/features.go index d4877fb9..789c38e4 100644 --- a/features/features.go +++ b/features/features.go @@ -1,6 +1,7 @@ package features import ( + activity_basic_no_workflow_timeout "github.com/temporalio/features/features/activity/basic_no_workflow_timeout" activity_cancel_try_cancel "github.com/temporalio/features/features/activity/cancel_try_cancel" activity_retry_on_error "github.com/temporalio/features/features/activity/retry_on_error" bugs_go_activity_start_race "github.com/temporalio/features/features/bugs/go/activity_start_race" @@ -52,6 +53,7 @@ import ( func init() { // Please keep list in alphabetical order harness.MustRegisterFeatures( + activity_basic_no_workflow_timeout.Feature, activity_cancel_try_cancel.Feature, activity_retry_on_error.Feature, bugs_go_activity_start_race.Feature, diff --git a/features/schedule/cron/feature.go b/features/schedule/cron/feature.go index a58709b6..5b7cf8da 100644 --- a/features/schedule/cron/feature.go +++ b/features/schedule/cron/feature.go @@ -13,9 +13,11 @@ import ( ) var Feature = harness.Feature{ - Workflows: Workflow, - StartWorkflowOptions: client.StartWorkflowOptions{CronSchedule: "@every 2s"}, - CheckResult: CheckResult, + Workflows: Workflow, + StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) { + o.CronSchedule = "@every 2s" + }, + CheckResult: CheckResult, // Disable history check because we can't guarantee cron execution times CheckHistory: harness.NoHistoryCheck, } diff --git a/features/update/self/feature.go b/features/update/self/feature.go index 7e043c26..a0ad75f6 100644 --- a/features/update/self/feature.go +++ b/features/update/self/feature.go @@ -33,13 +33,11 @@ var Feature = harness.Feature{ if reason := updateutil.CheckServerSupportsUpdate(ctx, runner.Client); reason != "" { return nil, runner.Skip(reason) } - opts := runner.Feature.StartWorkflowOptions - if opts.TaskQueue == "" { - opts.TaskQueue = runner.TaskQueue - } - if opts.WorkflowExecutionTimeout == 0 { - opts.WorkflowExecutionTimeout = 1 * time.Minute + opts := client.StartWorkflowOptions{ + TaskQueue: runner.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, } + runner.Feature.StartWorkflowOptionsMutator(&opts) return runner.Client.ExecuteWorkflow(ctx, opts, SelfUpdateWorkflow, ConnMaterial{ HostPort: runner.Feature.ClientOptions.HostPort, Namespace: runner.Feature.ClientOptions.Namespace, diff --git a/harness/go/cmd/run.go b/harness/go/cmd/run.go index 31befb2f..47cf122a 100644 --- a/harness/go/cmd/run.go +++ b/harness/go/cmd/run.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "go.temporal.io/sdk/client" "io" "net" "net/url" @@ -204,6 +205,10 @@ func (r *Runner) Run(ctx context.Context, run *Run) error { return nil } + if feature.StartWorkflowOptionsMutator == nil { + feature.StartWorkflowOptionsMutator = func(opts *client.StartWorkflowOptions) {} + } + runnerConfig := harness.RunnerConfig{ ServerHostPort: r.config.Server, Namespace: r.config.Namespace, diff --git a/harness/go/harness/feature.go b/harness/go/harness/feature.go index 79d9f0b4..14893dd1 100644 --- a/harness/go/harness/feature.go +++ b/harness/go/harness/feature.go @@ -41,16 +41,16 @@ type Feature struct { // overridden internally. ClientOptions client.Options - // Worker options for worker creation. Some values like WorkflowPanicPolicy - // are always overridden internally. + // Worker options for worker creation. Some values like WorkflowPanicPolicy are always + // overridden internally. By default, the harness sets the WorkflowPanicPolicy to FailWorkflow - + // in order to set that one option here you must *also* set the + // DisableWorkflowPanicPolicyOverride field to true. WorkerOptions worker.Options - // Start workflow options that are used by the default executor. Some values - // such as task queue and workflow execution timeout, are set by default if - // not already set. By default the harness sets the WorkflowPanicPolicy to - // FailWorkflow - in order to set that one option here you must *also* set the - // DisableWorkflowPanicPolicyOverride field to true. - StartWorkflowOptions client.StartWorkflowOptions + // Can modify the workflow options that are used by the default executor. Some values such as + // task queue and workflow execution timeout, are set by default (but may be overridden by this + // mutator). + StartWorkflowOptionsMutator func(*client.StartWorkflowOptions) // The harness will override the WorkflowPanicPolicy to be FailWorkflow // unless this field is set to true, in which case the WorkflowPanicPolicy diff --git a/harness/go/harness/runner.go b/harness/go/harness/runner.go index dfb242e1..be3f5445 100644 --- a/harness/go/harness/runner.go +++ b/harness/go/harness/runner.go @@ -149,13 +149,11 @@ func (r *Runner) Run(ctx context.Context) error { // ExecuteDefault is the default execution that just runs the first workflow and // assumes it takes no parameters. func (r *Runner) ExecuteDefault(ctx context.Context) (client.WorkflowRun, error) { - opts := r.Feature.StartWorkflowOptions - if opts.TaskQueue == "" { - opts.TaskQueue = r.TaskQueue - } - if opts.WorkflowExecutionTimeout == 0 { - opts.WorkflowExecutionTimeout = 1 * time.Minute + opts := client.StartWorkflowOptions{ + TaskQueue: r.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, } + r.Feature.StartWorkflowOptionsMutator(&opts) firstWorkflow, err := r.Feature.GetPrimaryWorkflow() if err != nil { return nil, err diff --git a/harness/go/harness/util.go b/harness/go/harness/util.go index 80775ca7..6e8874be 100644 --- a/harness/go/harness/util.go +++ b/harness/go/harness/util.go @@ -45,13 +45,11 @@ func FindEvent(history client.HistoryEventIterator, cond func(*historypb.History // ExecuteWithArgs runs a workflow with default arguments func ExecuteWithArgs(workflow interface{}, args ...interface{}) func(ctx context.Context, r *Runner) (client.WorkflowRun, error) { return func(ctx context.Context, r *Runner) (client.WorkflowRun, error) { - opts := r.Feature.StartWorkflowOptions - if opts.TaskQueue == "" { - opts.TaskQueue = r.TaskQueue - } - if opts.WorkflowExecutionTimeout == 0 { - opts.WorkflowExecutionTimeout = 1 * time.Minute + opts := client.StartWorkflowOptions{ + TaskQueue: r.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, } + r.Feature.StartWorkflowOptionsMutator(&opts) return r.Client.ExecuteWorkflow(ctx, opts, workflow, args...) } } diff --git a/harness/java/io/temporal/sdkfeatures/PreparedFeature.java b/harness/java/io/temporal/sdkfeatures/PreparedFeature.java index 0603d6ee..0bf2c1af 100644 --- a/harness/java/io/temporal/sdkfeatures/PreparedFeature.java +++ b/harness/java/io/temporal/sdkfeatures/PreparedFeature.java @@ -6,6 +6,7 @@ public class PreparedFeature { static PreparedFeature[] ALL = PreparedFeature.prepareFeatures( + activity.basic_no_workflow_timeout.feature.Impl.class, activity.retry_on_error.feature.Impl.class, activity.cancel_try_cancel.feature.Impl.class, child_workflow.result.feature.Impl.class,