Skip to content

Commit

Permalink
Add test that repros TS crash due to proto changes (temporalio#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Feb 22, 2024
1 parent 69ca251 commit 917c8bc
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 32 deletions.
9 changes: 9 additions & 0 deletions features/activity/basic_no_workflow_timeout/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Basic activity
The most basic workflow which just runs an activity and returns its result.
Importantly, without setting a workflow execution timeout.


# Detailed spec
It's important that the workflow execution timeout is not set here, because server will propagate that to all un-set
activity timeouts. We had a bug where TS would crash (after proto changes from gogo to google) because it was expecting
timeouts to be set to zero rather than null.
54 changes: 54 additions & 0 deletions features/activity/basic_no_workflow_timeout/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace activity.basic_no_workflow_timeout;

using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Exceptions;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class Feature : IFeature
{
public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>().AddAllActivities(new MyActivities(runner.Client));

[Workflow]
class MyWorkflow
{
private string? activityResult;

[WorkflowRun]
public async Task RunAsync()
{
await Workflow.ExecuteActivityAsync(
(MyActivities act) => act.Echo(),
new()
{
ScheduleToCloseTimeout = TimeSpan.FromMinutes(1)
});

await Workflow.ExecuteActivityAsync(
(MyActivities act) => act.Echo(),
new()
{
StartToCloseTimeout = TimeSpan.FromMinutes(1)
});
}

[WorkflowSignal]
public async Task SetActivityResultAsync(string res) => activityResult = res;
}

class MyActivities
{
private readonly ITemporalClient client;

public MyActivities(ITemporalClient client) => this.client = client;

[Activity]
public async Task<string> Echo()
{
return "hi";
}
}
}
40 changes: 40 additions & 0 deletions features/activity/basic_no_workflow_timeout/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package retry_on_error

import (
"context"
"time"

"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

var Feature = harness.Feature{
Workflows: Workflow,
Activities: Echo,
StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) {
o.WorkflowExecutionTimeout = 0
},
}

func Workflow(ctx workflow.Context) (string, error) {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})

var result string
err := workflow.ExecuteActivity(ctx, Echo).Get(ctx, &result)
if err != nil {
return "", err
}

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToCloseTimeout: 1 * time.Minute,
})
err = workflow.ExecuteActivity(ctx, Echo).Get(ctx, &result)
return result, err
}

func Echo(_ context.Context) (string, error) {
return "echo", nil
}
41 changes: 41 additions & 0 deletions features/activity/basic_no_workflow_timeout/feature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package activity.basic_no_workflow_timeout;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.client.WorkflowOptions;
import io.temporal.sdkfeatures.Feature;
import io.temporal.sdkfeatures.SimpleWorkflow;
import java.time.Duration;

@ActivityInterface
public interface feature extends Feature, SimpleWorkflow {
@ActivityMethod
String echo();

class Impl implements feature {
@Override
public void workflow() {
var activities =
activities(
feature.class, builder -> builder.setStartToCloseTimeout(Duration.ofMinutes(1)));

activities.echo();

var activitiesSched2Close =
activities(
feature.class, builder -> builder.setScheduleToCloseTimeout(Duration.ofMinutes(1)));

activitiesSched2Close.echo();
}

@Override
public String echo() {
return "hi";
}
}

@Override
default void workflowOptions(WorkflowOptions.Builder builder) {
builder.setWorkflowExecutionTimeout(Duration.ZERO);
}
}
32 changes: 32 additions & 0 deletions features/activity/basic_no_workflow_timeout/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from datetime import timedelta

from temporalio import activity, workflow

from harness.python.feature import register_feature


@workflow.defn
class Workflow:
@workflow.run
async def run(self) -> str:
await workflow.execute_activity(
echo,
schedule_to_close_timeout=timedelta(minutes=1),
)
return await workflow.execute_activity(
echo,
start_to_close_timeout=timedelta(minutes=1),
)


@activity.defn
async def echo() -> str:
return "echo"


register_feature(
workflows=[Workflow],
activities=[echo],
expect_activity_error="activity attempt 5 failed",
start_options={"execution_timeout": None},
)
26 changes: 26 additions & 0 deletions features/activity/basic_no_workflow_timeout/feature.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Feature } from '@temporalio/harness';
import * as wf from '@temporalio/workflow';

const activities = wf.proxyActivities<typeof activitiesImpl>({
startToCloseTimeout: '1 minute',
});
const activitiesSched2Close = wf.proxyActivities<typeof activitiesImpl>({
scheduleToCloseTimeout: '1 minute',
});

export async function workflow(): Promise<string> {
await activitiesSched2Close.echo('hello');
return await activities.echo('hello');
}

const activitiesImpl = {
async echo(input: string): Promise<string> {
return input;
},
};

export const feature = new Feature({
workflow,
workflowStartOptions: { workflowExecutionTimeout: undefined },
activities: activitiesImpl,
});
9 changes: 6 additions & 3 deletions features/eager_workflow/successful_start/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ const expectedResult = "Hello World"
var numEagerlyStarted atomic.Uint64

var Feature = harness.Feature{
Workflows: Workflow,
StartWorkflowOptions: client.StartWorkflowOptions{EnableEagerStart: true, WorkflowTaskTimeout: 1 * time.Hour},
CheckResult: CheckResult,
Workflows: Workflow,
StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) {
o.EnableEagerStart = true
o.WorkflowTaskTimeout = 1 * time.Hour
},
CheckResult: CheckResult,
ClientOptions: client.Options{
ConnectionOptions: client.ConnectionOptions{
DialOptions: []grpc.DialOption{grpc.WithUnaryInterceptor(EagerDetector(&numEagerlyStarted))},
Expand Down
2 changes: 2 additions & 0 deletions features/features.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package features

import (
activity_basic_no_workflow_timeout "github.com/temporalio/features/features/activity/basic_no_workflow_timeout"
activity_cancel_try_cancel "github.com/temporalio/features/features/activity/cancel_try_cancel"
activity_retry_on_error "github.com/temporalio/features/features/activity/retry_on_error"
bugs_go_activity_start_race "github.com/temporalio/features/features/bugs/go/activity_start_race"
Expand Down Expand Up @@ -52,6 +53,7 @@ import (
func init() {
// Please keep list in alphabetical order
harness.MustRegisterFeatures(
activity_basic_no_workflow_timeout.Feature,
activity_cancel_try_cancel.Feature,
activity_retry_on_error.Feature,
bugs_go_activity_start_race.Feature,
Expand Down
8 changes: 5 additions & 3 deletions features/schedule/cron/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
)

var Feature = harness.Feature{
Workflows: Workflow,
StartWorkflowOptions: client.StartWorkflowOptions{CronSchedule: "@every 2s"},
CheckResult: CheckResult,
Workflows: Workflow,
StartWorkflowOptionsMutator: func(o *client.StartWorkflowOptions) {
o.CronSchedule = "@every 2s"
},
CheckResult: CheckResult,
// Disable history check because we can't guarantee cron execution times
CheckHistory: harness.NoHistoryCheck,
}
Expand Down
10 changes: 4 additions & 6 deletions features/update/self/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ var Feature = harness.Feature{
if reason := updateutil.CheckServerSupportsUpdate(ctx, runner.Client); reason != "" {
return nil, runner.Skip(reason)
}
opts := runner.Feature.StartWorkflowOptions
if opts.TaskQueue == "" {
opts.TaskQueue = runner.TaskQueue
}
if opts.WorkflowExecutionTimeout == 0 {
opts.WorkflowExecutionTimeout = 1 * time.Minute
opts := client.StartWorkflowOptions{
TaskQueue: runner.TaskQueue,
WorkflowExecutionTimeout: 1 * time.Minute,
}
runner.Feature.StartWorkflowOptionsMutator(&opts)
return runner.Client.ExecuteWorkflow(ctx, opts, SelfUpdateWorkflow, ConnMaterial{
HostPort: runner.Feature.ClientOptions.HostPort,
Namespace: runner.Feature.ClientOptions.Namespace,
Expand Down
5 changes: 5 additions & 0 deletions harness/go/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"go.temporal.io/sdk/client"
"io"
"net"
"net/url"
Expand Down Expand Up @@ -204,6 +205,10 @@ func (r *Runner) Run(ctx context.Context, run *Run) error {
return nil
}

if feature.StartWorkflowOptionsMutator == nil {
feature.StartWorkflowOptionsMutator = func(opts *client.StartWorkflowOptions) {}
}

runnerConfig := harness.RunnerConfig{
ServerHostPort: r.config.Server,
Namespace: r.config.Namespace,
Expand Down
16 changes: 8 additions & 8 deletions harness/go/harness/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ type Feature struct {
// overridden internally.
ClientOptions client.Options

// Worker options for worker creation. Some values like WorkflowPanicPolicy
// are always overridden internally.
// Worker options for worker creation. Some values like WorkflowPanicPolicy are always
// overridden internally. By default, the harness sets the WorkflowPanicPolicy to FailWorkflow -
// in order to set that one option here you must *also* set the
// DisableWorkflowPanicPolicyOverride field to true.
WorkerOptions worker.Options

// Start workflow options that are used by the default executor. Some values
// such as task queue and workflow execution timeout, are set by default if
// not already set. By default the harness sets the WorkflowPanicPolicy to
// FailWorkflow - in order to set that one option here you must *also* set the
// DisableWorkflowPanicPolicyOverride field to true.
StartWorkflowOptions client.StartWorkflowOptions
// Can modify the workflow options that are used by the default executor. Some values such as
// task queue and workflow execution timeout, are set by default (but may be overridden by this
// mutator).
StartWorkflowOptionsMutator func(*client.StartWorkflowOptions)

// The harness will override the WorkflowPanicPolicy to be FailWorkflow
// unless this field is set to true, in which case the WorkflowPanicPolicy
Expand Down
10 changes: 4 additions & 6 deletions harness/go/harness/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,11 @@ func (r *Runner) Run(ctx context.Context) error {
// ExecuteDefault is the default execution that just runs the first workflow and
// assumes it takes no parameters.
func (r *Runner) ExecuteDefault(ctx context.Context) (client.WorkflowRun, error) {
opts := r.Feature.StartWorkflowOptions
if opts.TaskQueue == "" {
opts.TaskQueue = r.TaskQueue
}
if opts.WorkflowExecutionTimeout == 0 {
opts.WorkflowExecutionTimeout = 1 * time.Minute
opts := client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
WorkflowExecutionTimeout: 1 * time.Minute,
}
r.Feature.StartWorkflowOptionsMutator(&opts)
firstWorkflow, err := r.Feature.GetPrimaryWorkflow()
if err != nil {
return nil, err
Expand Down
10 changes: 4 additions & 6 deletions harness/go/harness/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ func FindEvent(history client.HistoryEventIterator, cond func(*historypb.History
// ExecuteWithArgs runs a workflow with default arguments
func ExecuteWithArgs(workflow interface{}, args ...interface{}) func(ctx context.Context, r *Runner) (client.WorkflowRun, error) {
return func(ctx context.Context, r *Runner) (client.WorkflowRun, error) {
opts := r.Feature.StartWorkflowOptions
if opts.TaskQueue == "" {
opts.TaskQueue = r.TaskQueue
}
if opts.WorkflowExecutionTimeout == 0 {
opts.WorkflowExecutionTimeout = 1 * time.Minute
opts := client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
WorkflowExecutionTimeout: 1 * time.Minute,
}
r.Feature.StartWorkflowOptionsMutator(&opts)
return r.Client.ExecuteWorkflow(ctx, opts, workflow, args...)
}
}
Expand Down
1 change: 1 addition & 0 deletions harness/java/io/temporal/sdkfeatures/PreparedFeature.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public class PreparedFeature {

static PreparedFeature[] ALL =
PreparedFeature.prepareFeatures(
activity.basic_no_workflow_timeout.feature.Impl.class,
activity.retry_on_error.feature.Impl.class,
activity.cancel_try_cancel.feature.Impl.class,
child_workflow.result.feature.Impl.class,
Expand Down

0 comments on commit 917c8bc

Please sign in to comment.