diff --git a/src/Temporalio/Bridge/Cargo.lock b/src/Temporalio/Bridge/Cargo.lock index 7ad7962a..81c79535 100644 --- a/src/Temporalio/Bridge/Cargo.lock +++ b/src/Temporalio/Bridge/Cargo.lock @@ -311,12 +311,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "chrono" version = "0.4.38" @@ -627,15 +621,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "enum-iterator" version = "2.1.0" @@ -1108,20 +1093,24 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", ] [[package]] name = "hyper-rustls" -version = "0.24.2" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 0.2.12", - "hyper 0.14.29", - "rustls 0.21.12", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls", + "rustls-pki-types", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", + "tower-service", ] [[package]] @@ -1143,12 +1132,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", "hyper 1.3.1", "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1398,18 +1392,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "nix" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" -dependencies = [ - "bitflags 2.5.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "no-std-compat" version = "0.4.1" @@ -2022,20 +2004,20 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" -version = "0.11.27" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.29", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", "hyper-rustls", + "hyper-util", "ipnet", "js-sys", "log", @@ -2043,15 +2025,15 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.12", - "rustls-pemfile 1.0.4", + "rustls", + "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "tower-service", "url", @@ -2138,18 +2120,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.22.4" @@ -2159,7 +2129,7 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki", "subtle", "zeroize", ] @@ -2171,21 +2141,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", + "rustls-pemfile", "rustls-pki-types", "schannel", "security-framework", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", -] - [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -2202,16 +2163,6 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.102.4" @@ -2250,16 +2201,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "security-framework" version = "2.11.0" @@ -2483,27 +2424,6 @@ dependencies = [ "windows", ] -[[package]] -name = "system-configuration" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" -dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tar" version = "0.4.41" @@ -2606,7 +2526,6 @@ dependencies = [ "itertools 0.13.0", "lru", "mockall", - "nix", "once_cell", "opentelemetry", "opentelemetry-otlp", @@ -2806,23 +2725,13 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.4", + "rustls", "rustls-pki-types", "tokio", ] @@ -2880,10 +2789,10 @@ dependencies = [ "pin-project", "prost", "rustls-native-certs", - "rustls-pemfile 2.1.2", + "rustls-pemfile", "rustls-pki-types", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -3206,9 +3115,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "winapi" @@ -3401,9 +3313,9 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winreg" -version = "0.50.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" dependencies = [ "cfg-if", "windows-sys 0.48.0", diff --git a/src/Temporalio/Bridge/sdk-core b/src/Temporalio/Bridge/sdk-core index 90db5b48..0fabad09 160000 --- a/src/Temporalio/Bridge/sdk-core +++ b/src/Temporalio/Bridge/sdk-core @@ -1 +1 @@ -Subproject commit 90db5b485f8805902834f48c5ed87f6de5cdaac9 +Subproject commit 0fabad09421a444b8246f751a0b2643e96db01d5 diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index 9505b6e7..7cad9e39 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -89,7 +89,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) OnTaskStarting: options.OnTaskStarting, OnTaskCompleted: options.OnTaskCompleted, RuntimeMetricMeter: MetricMeter, - WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes)); + WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes, + DisableCompletionCommandReordering: options.DisableWorkflowCompletionCommandReordering)); } } diff --git a/src/Temporalio/Worker/TemporalWorkerOptions.cs b/src/Temporalio/Worker/TemporalWorkerOptions.cs index a05740ee..8567a3b8 100644 --- a/src/Temporalio/Worker/TemporalWorkerOptions.cs +++ b/src/Temporalio/Worker/TemporalWorkerOptions.cs @@ -286,6 +286,15 @@ public TemporalWorkerOptions() internal Func WorkflowInstanceFactory { get; set; } = DefaultWorkflowInstanceFactory; + /// + /// Gets or sets a value indicating whether the workflow completion command reordering will + /// apply. + /// + /// + /// This is visible for testing only. + /// + internal bool DisableWorkflowCompletionCommandReordering { get; set; } + /// /// Add the given delegate with as an activity. This is /// usually a method reference. diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index 9a36874a..de494901 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -67,6 +67,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon private readonly Action onTaskStarting; private readonly Action onTaskCompleted; private readonly IReadOnlyCollection? workerLevelFailureExceptionTypes; + private readonly bool disableCompletionCommandReordering; private WorkflowActivationCompletion? completion; // Will be set to null after last use (i.e. when workflow actually started) private Lazy? startArgs; @@ -187,6 +188,7 @@ public WorkflowInstance(WorkflowInstanceDetails details) Random = new(details.Start.RandomnessSeed); TracingEventsEnabled = !details.DisableTracingEvents; workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes; + disableCompletionCommandReordering = details.DisableCompletionCommandReordering; } /// @@ -573,28 +575,9 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act) } } - // Remove any non-query commands after terminal commands - if (completion.Successful != null) - { - var seenCompletion = false; - var i = 0; - while (i < completion.Successful.Commands.Count) - { - var cmd = completion.Successful.Commands[i]; - if (!seenCompletion) - { - seenCompletion = cmd.CompleteWorkflowExecution != null || - cmd.ContinueAsNewWorkflowExecution != null || - cmd.FailWorkflowExecution != null; - } - else if (cmd.RespondToQuery != null) - { - completion.Successful.Commands.RemoveAt(i); - continue; - } - i++; - } - } + // Maybe apply workflow completion command reordering logic + ApplyCompletionCommandReordering(act, completion); + // Unset the completion var toReturn = completion; completion = null; @@ -1402,6 +1385,57 @@ private string GetStackTrace() }).Where(s => !string.IsNullOrEmpty(s)).Select(s => $"Task waiting at:\n{s}")); } + private void ApplyCompletionCommandReordering( + WorkflowActivation act, WorkflowActivationCompletion completion) + { + // In earlier versions of the SDK we allowed commands to be sent after workflow + // completion. These ended up being removed effectively making the result of the + // workflow function mean any other later coroutine commands be ignored. To match + // Go/Java, we are now going to move workflow completion to the end so that + // same-task-post-completion commands are still accounted for. + // + // Note this only applies for successful activations that don't have completion + // reordering disabled and that are either not replaying or have the flag set. + if (completion.Successful == null || disableCompletionCommandReordering) + { + return; + } + if (IsReplaying && !act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion)) + { + return; + } + + // We know we're on a newer SDK and can move completion to the end if we need to. First, + // find the completion command. + var completionCommandIndex = -1; + for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--) + { + var cmd = completion.Successful.Commands[i]; + // Set completion index if the command is a completion + if (cmd.CancelWorkflowExecution != null || + cmd.CompleteWorkflowExecution != null || + cmd.ContinueAsNewWorkflowExecution != null || + cmd.FailWorkflowExecution != null) + { + completionCommandIndex = i; + break; + } + } + + // If there is no completion command or it's already at the end, nothing to do + if (completionCommandIndex == -1 || + completionCommandIndex == completion.Successful.Commands.Count - 1) + { + return; + } + + // Now we know the completion is in the wrong spot, so set the SDK flag and move it + completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion); + var compCmd = completion.Successful.Commands[completionCommandIndex]; + completion.Successful.Commands.RemoveAt(completionCommandIndex); + completion.Successful.Commands.Insert(completion.Successful.Commands.Count, compCmd); + } + /// /// Workflow inbound implementation. /// diff --git a/src/Temporalio/Worker/WorkflowInstanceDetails.cs b/src/Temporalio/Worker/WorkflowInstanceDetails.cs index d2c569a8..e9cc3fcc 100644 --- a/src/Temporalio/Worker/WorkflowInstanceDetails.cs +++ b/src/Temporalio/Worker/WorkflowInstanceDetails.cs @@ -26,6 +26,9 @@ namespace Temporalio.Worker /// Callback for every instance task complete. /// Lazy runtime-level metric meter. /// Failure exception types at worker level. + /// + /// Whether to disable completion command reordering. + /// internal record WorkflowInstanceDetails( string Namespace, string TaskQueue, @@ -41,5 +44,6 @@ internal record WorkflowInstanceDetails( Action OnTaskStarting, Action OnTaskCompleted, Lazy RuntimeMetricMeter, - IReadOnlyCollection? WorkerLevelFailureExceptionTypes); + IReadOnlyCollection? WorkerLevelFailureExceptionTypes, + bool DisableCompletionCommandReordering); } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowLogicFlag.cs b/src/Temporalio/Worker/WorkflowLogicFlag.cs new file mode 100644 index 00000000..51ba7d7f --- /dev/null +++ b/src/Temporalio/Worker/WorkflowLogicFlag.cs @@ -0,0 +1,15 @@ +namespace Temporalio.Worker +{ + /// + /// Flags that may be set on task/activation completion to differentiate new from old workflow + /// behavior. + /// + internal enum WorkflowLogicFlag : uint + { + /// + /// When set, this makes sure that workflow completion is moved to the end of the command + /// set. + /// + ReorderWorkflowCompletion = 1, + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowReplayer.cs b/src/Temporalio/Worker/WorkflowReplayer.cs index 6ff11ac2..0ead74c5 100644 --- a/src/Temporalio/Worker/WorkflowReplayer.cs +++ b/src/Temporalio/Worker/WorkflowReplayer.cs @@ -173,7 +173,8 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay OnTaskStarting: options.OnTaskStarting, OnTaskCompleted: options.OnTaskCompleted, RuntimeMetricMeter: new(() => runtime.MetricMeter), - WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes), + WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes, + DisableCompletionCommandReordering: options.DisableWorkflowCompletionCommandReordering), (runId, removeFromCache) => SetResult(removeFromCache)); } catch diff --git a/src/Temporalio/Worker/WorkflowReplayerOptions.cs b/src/Temporalio/Worker/WorkflowReplayerOptions.cs index e43c0519..766e0be1 100644 --- a/src/Temporalio/Worker/WorkflowReplayerOptions.cs +++ b/src/Temporalio/Worker/WorkflowReplayerOptions.cs @@ -128,6 +128,15 @@ public class WorkflowReplayerOptions : ICloneable internal Func WorkflowInstanceFactory { get; set; } = TemporalWorkerOptions.DefaultWorkflowInstanceFactory; + /// + /// Gets or sets a value indicating whether the workflow completion command reordering will + /// apply. + /// + /// + /// This is visible for testing only. + /// + internal bool DisableWorkflowCompletionCommandReordering { get; set; } + /// /// Add the given type as a workflow. /// diff --git a/src/Temporalio/Worker/WorkflowWorker.cs b/src/Temporalio/Worker/WorkflowWorker.cs index 3d37dd9a..0daa23a3 100644 --- a/src/Temporalio/Worker/WorkflowWorker.cs +++ b/src/Temporalio/Worker/WorkflowWorker.cs @@ -283,7 +283,8 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act) OnTaskStarting: options.OnTaskStarting, OnTaskCompleted: options.OnTaskCompleted, RuntimeMetricMeter: options.RuntimeMetricMeter, - WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes)); + WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes, + DisableCompletionCommandReordering: options.DisableCompletionCommandReordering)); } } } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowWorkerOptions.cs b/src/Temporalio/Worker/WorkflowWorkerOptions.cs index a56865b7..26fcc449 100644 --- a/src/Temporalio/Worker/WorkflowWorkerOptions.cs +++ b/src/Temporalio/Worker/WorkflowWorkerOptions.cs @@ -21,5 +21,6 @@ internal record WorkflowWorkerOptions( Action OnTaskStarting, Action OnTaskCompleted, Lazy RuntimeMetricMeter, - IReadOnlyCollection? WorkerLevelFailureExceptionTypes); + IReadOnlyCollection? WorkerLevelFailureExceptionTypes, + bool DisableCompletionCommandReordering); } \ No newline at end of file diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 01830fdc..89e2f671 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4101,7 +4101,7 @@ public async Task ExecuteWorkflowAsync_CurrentBuildId_SetProperly() { var tq = $"tq-{Guid.NewGuid()}"; var handle = - await ExecuteWorkerAsyncReturning>( async worker => { @@ -4399,60 +4399,57 @@ await ExecuteWorkerAsync(async worker => } [Workflow] - public class TickingWorkflow + public class WaitOnSignalWorkflow { + private bool complete; + [WorkflowRun] - public async Task RunAsync() - { - // Just tick every 100ms for 10s - for (var i = 0; i < 100; i++) - { - await Workflow.DelayAsync(100); - } - } + public Task RunAsync() => Workflow.WaitConditionAsync(() => complete); + + [WorkflowSignal] + public async Task CompleteAsync() => complete = true; } [Fact] public async Task ExecuteWorkflowAsync_WorkerClientReplacement_UsesNewClient() { - // We are going to start a second ephemeral server and then replace the client. So we will - // start a no-cache ticking workflow with the current client and confirm it has accomplished - // at least one task. Then we will start another on the other client, and confirm it gets - // started too. Then we will terminate both. We have to use a ticking workflow with only one - // poller to force a quick re-poll to recognize our client change quickly (as opposed to - // just waiting the minute for poll timeout). + // We are going to create a second ephemeral server and start a workflow on each server. + // The worker will start with a client on the first, then we'll swap the clients, signal + // both workflows, and confirm the second workflow completes as expected. await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync(); // Start both workflows on different servers var taskQueue = $"tq-{Guid.NewGuid()}"; var handle1 = await Client.StartWorkflowAsync( - (TickingWorkflow wf) => wf.RunAsync(), + (WaitOnSignalWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); var handle2 = await otherEnv.Client.StartWorkflowAsync( - (TickingWorkflow wf) => wf.RunAsync(), + (WaitOnSignalWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); // Run the worker on the first env - await ExecuteWorkerAsync( + await ExecuteWorkerAsync( async worker => { - // Confirm the first ticking workflow has completed a task but not the second workflow + // Confirm the first workflow has completed a task but not the second workflow await AssertMore.HasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null); await foreach (var evt in handle2.FetchHistoryEventsAsync()) { Assert.Null(evt.WorkflowTaskCompletedEventAttributes); } - // Now replace the client, which should be used fairly quickly because we should have - // timer-done poll completions every 100ms + // Now replace the client worker.Client = otherEnv.Client; - // Now confirm the other workflow has started - await AssertMore.HasEventEventuallyAsync(handle2, e => e.WorkflowTaskCompletedEventAttributes != null); + // Signal both which should allow the current poll to wake up and it'll be a task + // failure when trying to submit that to the new client which is ignored. But also the + // new client will poll for the new workflow, which we will wait for it to complete. + await handle1.SignalAsync(wf => wf.CompleteAsync()); + await handle2.SignalAsync(wf => wf.CompleteAsync()); - // Terminate both + // Now confirm the other workflow completes + await handle2.GetResultAsync(); await handle1.TerminateAsync(); - await handle2.TerminateAsync(); }, new(taskQueue) { @@ -4678,6 +4675,101 @@ await ExecuteWorkerAsync( }); } + [Workflow] + public class CoroutinesAfterCompleteWorkflow + { + private string? completeWorkflowWith; + private string? completeUpdateWith; + + [WorkflowRun] + public async Task RunAsync() + { + await Workflow.WaitConditionAsync(() => completeWorkflowWith != null); + completeUpdateWith = "complete-update"; + return completeWorkflowWith!; + } + + [WorkflowUpdate] + public async Task DoUpdateAsync() + { + completeWorkflowWith = "complete-workflow"; + await Workflow.WaitConditionAsync(() => completeUpdateWith != null); + return completeUpdateWith!; + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_CoroutinesAfterComplete_GetProcessed() + { + // Run one workflow without the flag set and one with and get histories + WorkflowHistory? historyWithoutFlag = null; + await ExecuteWorkerAsync( + async worker => + { + // Start + var handle = await Client.StartWorkflowAsync( + (CoroutinesAfterCompleteWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + // Start update, wait on workflow result (update result would hang) + await handle.StartUpdateAsync( + wf => wf.DoUpdateAsync(), new(WorkflowUpdateStage.Accepted)); + Assert.Equal("complete-workflow", await handle.GetResultAsync()); + historyWithoutFlag = await handle.FetchHistoryAsync(); + }, + new() { DisableWorkflowCompletionCommandReordering = true }); + + // Confirm without flag that the workflow ends with task complete (sans flag), update + // accepted, and workflow complete, but notably missing update complete + var lastEvents = historyWithoutFlag!.Events.TakeLast(3).ToArray(); + Assert.NotNull(lastEvents[0].WorkflowTaskCompletedEventAttributes); + Assert.Equal(0, lastEvents[0].WorkflowTaskCompletedEventAttributes.SdkMetadata?.LangUsedFlags.Count ?? 0); + Assert.NotNull(lastEvents[1].WorkflowExecutionUpdateAcceptedEventAttributes); + Assert.NotNull(lastEvents[2].WorkflowExecutionCompletedEventAttributes); + + WorkflowHistory? historyWithFlag = null; + await ExecuteWorkerAsync( + async worker => + { + // Start + var handle = await Client.StartWorkflowAsync( + (CoroutinesAfterCompleteWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + // Start update, wait on workflow and update + var updateHandle = await handle.StartUpdateAsync( + wf => wf.DoUpdateAsync(), new(WorkflowUpdateStage.Accepted)); + Assert.Equal("complete-workflow", await handle.GetResultAsync()); + Assert.Equal("complete-update", await updateHandle.GetResultAsync()); + historyWithFlag = await handle.FetchHistoryAsync(); + }); + + // Now confirm with the flag that the workflow ends with task complete (with flag), update + // accepted, update complete, and workflow complete + lastEvents = historyWithFlag!.Events.TakeLast(4).ToArray(); + Assert.NotNull(lastEvents[0].WorkflowTaskCompletedEventAttributes); + Assert.Contains( + (uint)WorkflowLogicFlag.ReorderWorkflowCompletion, + lastEvents[0].WorkflowTaskCompletedEventAttributes.SdkMetadata.LangUsedFlags); + Assert.NotNull(lastEvents[1].WorkflowExecutionUpdateAcceptedEventAttributes); + Assert.NotNull(lastEvents[2].WorkflowExecutionUpdateCompletedEventAttributes); + Assert.NotNull(lastEvents[3].WorkflowExecutionCompletedEventAttributes); + + // Now for extra sanity checking, we'll check the replayer. First, confirm with a + // flag-disabled replayer that without-flag succeeds but with-flag has non-determinism + var noFlagReplayer = new WorkflowReplayer(new WorkflowReplayerOptions() + { + DisableWorkflowCompletionCommandReordering = true, + }.AddWorkflow()); + await noFlagReplayer.ReplayWorkflowAsync(historyWithoutFlag); + await Assert.ThrowsAsync(() => + noFlagReplayer.ReplayWorkflowAsync(historyWithFlag)); + + // Confirm with flag-based replayer everything is ok for both histories + var flagReplayer = new WorkflowReplayer( + new WorkflowReplayerOptions().AddWorkflow()); + await flagReplayer.ReplayWorkflowAsync(historyWithoutFlag); + await flagReplayer.ReplayWorkflowAsync(historyWithFlag); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) { @@ -4686,12 +4778,11 @@ internal static Task AssertTaskFailureContainsEventuallyAsync( attrs => Assert.Contains(messageContains, attrs.Failure?.Message)); } - private async Task ExecuteWorkerAsync( + private Task ExecuteWorkerAsync( Func action, TemporalWorkerOptions? options = null, - IWorkerClient? client = null) - { - await ExecuteWorkerAsyncReturning( + IWorkerClient? client = null) => + ExecuteWorkerAsync( async (w) => { await action(w); @@ -4699,17 +4790,16 @@ await ExecuteWorkerAsyncReturning( }, options, client); - } - private async Task ExecuteWorkerAsyncReturning( - Func> action, + private async Task ExecuteWorkerAsync( + Func> action, TemporalWorkerOptions? options = null, IWorkerClient? client = null) { options ??= new(); options = (TemporalWorkerOptions)options.Clone(); options.TaskQueue ??= $"tq-{Guid.NewGuid()}"; - options.AddWorkflow(); + options.AddWorkflow(); options.Interceptors ??= new[] { new XunitExceptionInterceptor() }; using var worker = new TemporalWorker(client ?? Client, options); return await worker.ExecuteAsync(() => action(worker));