From 2f9e9dfe464e2e5b5b7ebddc0997ae70d11e11d9 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Tue, 28 Feb 2023 21:06:24 -0700 Subject: [PATCH] Add ability to serialize a `JoinableTask` across multiple `JoinableTaskContext` objects and processes --- Directory.Packages.props | 1 + .../JoinableTask.cs | 115 ++++++++- .../JoinableTaskContext.cs | 189 ++++++++++++++- .../JoinableTaskFactory.cs | 193 ++++++---------- .../JoinableTask`1.cs | 13 +- .../net472/PublicAPI.Unshipped.txt | 3 + .../net6.0-windows/PublicAPI.Unshipped.txt | 3 + .../net6.0/PublicAPI.Unshipped.txt | 3 + .../netstandard2.0/PublicAPI.Unshipped.txt | 3 + .../JoinableTaskTokenTests.cs | 218 ++++++++++++++++++ 10 files changed, 612 insertions(+), 129 deletions(-) create mode 100644 test/Microsoft.VisualStudio.Threading.Tests/JoinableTaskTokenTests.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 24ab59f63..25da1daaf 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -48,6 +48,7 @@ + diff --git a/src/Microsoft.VisualStudio.Threading/JoinableTask.cs b/src/Microsoft.VisualStudio.Threading/JoinableTask.cs index ae03d95d0..703c35323 100644 --- a/src/Microsoft.VisualStudio.Threading/JoinableTask.cs +++ b/src/Microsoft.VisualStudio.Threading/JoinableTask.cs @@ -43,6 +43,15 @@ public partial class JoinableTask : IJoinableTaskDependent [DebuggerBrowsable(DebuggerBrowsableState.Never)] private readonly JoinableTaskCreationOptions creationOptions; + /// + /// The serializable token associated with this particular . + /// + /// + /// This will be created when the was created with a parent token + /// or lazily created when this needs to be serialized. + /// + private SerializableToken? token; + /// /// Other instances of that should be posted /// to with any main thread bound work. @@ -125,9 +134,10 @@ public partial class JoinableTask : IJoinableTaskDependent /// /// The instance that began the async operation. /// A value indicating whether the launching thread will synchronously block for this job's completion. + /// An optional token that identifies one or more instances, typically in other processes, that serve as 'parents' to this one. /// The used to customize the task's behavior. /// The entry method's info for diagnostics. - internal JoinableTask(JoinableTaskFactory owner, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions, Delegate initialDelegate) + internal JoinableTask(JoinableTaskFactory owner, bool synchronouslyBlocking, string? parentToken, JoinableTaskCreationOptions creationOptions, Delegate initialDelegate) { Requires.NotNull(owner, nameof(owner)); @@ -147,6 +157,7 @@ internal JoinableTask(JoinableTaskFactory owner, bool synchronouslyBlocking, Joi } this.creationOptions = creationOptions; + this.token = SerializableToken.From(parentToken, this); this.owner.Context.OnJoinableTaskStarted(this); this.initialDelegate = initialDelegate; } @@ -654,6 +665,32 @@ void IJoinableTaskDependent.OnDependencyRemoved(IJoinableTaskDependent joinChild { } + /// + /// Gets the full token that should be serialized when this owns the context to be shared. + /// + /// The token; or when this task is already completed. + internal string? GetSerializableToken() + { + if (this.token is null && !this.IsCompleteRequested) + { + using (this.JoinableTaskContext.NoMessagePumpSynchronizationContext.Apply()) + { + lock (this.JoinableTaskContext.SyncContextLock) + { + this.token ??= SerializableToken.New(this); + } + } + } + + return this.token?.ToString(); + } + + /// + /// Looks up the that serves as this instance's parent due to the token provided when this was created. + /// + /// A task; or if no parent token was provided at construction time or no match was found (possibly due to the parent having already completed). + internal JoinableTask? GetTokenizedParent() => this.JoinableTaskContext.Lookup(this.token?.ParentToken); + /// /// Gets a very likely value whether the main thread is blocked by this . /// @@ -891,6 +928,10 @@ internal void Complete(Task wrappedTask) if (!this.IsCompleteRequested) { this.IsCompleteRequested = true; + if (this.token?.TaskId is ulong taskId) + { + this.JoinableTaskContext.RemoveSerializableIdentifier(taskId); + } if (this.mainThreadQueue is object) { @@ -1243,4 +1284,76 @@ private void AddStateFlags(JoinableTaskFlags flags) } } } + + private class SerializableToken + { + private readonly JoinableTask owner; + private ulong? taskId; + private string? fullToken; + + private SerializableToken(JoinableTask owner) + { + this.owner = owner; + } + + /// + /// Gets the original token provided by the remote parent task. + /// + internal string? ParentToken { get; init; } + + /// + /// Gets the unique ID that identifies the owning in the dictionary. + /// + internal ulong? TaskId + { + get + { + if (this.taskId is null && !this.owner.IsCompleteRequested) + { + using (this.owner.JoinableTaskContext.NoMessagePumpSynchronizationContext.Apply()) + { + lock (this.owner.JoinableTaskContext.SyncContextLock) + { + if (this.taskId is null && !this.owner.IsCompleteRequested) + { + this.taskId = this.owner.JoinableTaskContext.AssignUniqueIdentifier(this.owner); + } + } + } + } + + return this.owner.IsCompleteRequested ? null : this.taskId; + } + } + + /// + /// Serializes this token as a string. + /// + /// A string that identifies the owner and retains information about its parents, if any. May be if the owning task has already completed. + public override string? ToString() + { + if (this.fullToken is null && this.TaskId is ulong taskId) + { + this.fullToken = this.owner.JoinableTaskContext.ConstructFullToken(taskId, this.ParentToken); + } + + return this.owner.IsCompleteRequested ? null : this.fullToken; + } + + /// + /// Creates a when a parent token is provided. + /// + /// The parent token, if any. + /// The owning task. + /// The token, if any is required. + [return: NotNullIfNotNull(nameof(parentToken))] + internal static SerializableToken? From(string? parentToken, JoinableTask owner) => parentToken is null ? null : new SerializableToken(owner) { ParentToken = parentToken }; + + /// + /// Creates a for a given if it has not yet completed. + /// + /// The owning task. + /// A token, if the task has not yet completed; otherwise . + internal static SerializableToken? New(JoinableTask owner) => owner.IsCompleteRequested ? null : new SerializableToken(owner); + } } diff --git a/src/Microsoft.VisualStudio.Threading/JoinableTaskContext.cs b/src/Microsoft.VisualStudio.Threading/JoinableTaskContext.cs index 318fc01ac..e2ea752fe 100644 --- a/src/Microsoft.VisualStudio.Threading/JoinableTaskContext.cs +++ b/src/Microsoft.VisualStudio.Threading/JoinableTaskContext.cs @@ -2,17 +2,16 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Collections.Specialized; using System.Diagnostics; +using System.Globalization; using System.Linq; using System.Reflection; -using System.Runtime.CompilerServices; +using System.Text; using System.Threading; using System.Threading.Tasks; +using static System.FormattableString; using JoinableTaskSynchronizationContext = Microsoft.VisualStudio.Threading.JoinableTask.JoinableTaskSynchronizationContext; -using SingleExecuteProtector = Microsoft.VisualStudio.Threading.JoinableTaskFactory.SingleExecuteProtector; namespace Microsoft.VisualStudio.Threading; @@ -66,6 +65,14 @@ namespace Microsoft.VisualStudio.Threading; /// public partial class JoinableTaskContext : IDisposable { + /// + /// The expected length of the serialized task ID. + /// + /// + /// This value is the length required to hex-encode a 64-bit integer. We use for task IDs, so this is appropriate. + /// + private const int TaskIdHexLength = 16; + /// /// A "global" lock that allows the graph of interconnected sync context and JoinableSet instances /// communicate in a thread-safe way without fear of deadlocks due to each taking their own private @@ -116,6 +123,24 @@ public partial class JoinableTaskContext : IDisposable /// private readonly int mainThreadManagedThreadId; + /// + /// A dictionary of incomplete objects for which serializable identifiers have been requested. + /// + /// + /// Only access this while locking . + /// + private readonly Dictionary serializedTasks = new(); + + /// + /// A unique instance ID that is used when creating IDs for JoinableTasks that come from this instance. + /// + private readonly string contextId = Guid.NewGuid().ToString("n"); + + /// + /// The next unique ID to assign to a for which a token is required. + /// + private ulong nextTaskId = 1; + /// /// The count of s blocking the main thread. /// @@ -385,6 +410,18 @@ public JoinableTaskCollection CreateCollection() return new JoinableTaskCollection(this); } + /// + /// Captures the caller's context and serializes it as a string + /// that is suitable for application via a subsequent call to . + /// + /// A string that represent the current context, or if there is none. + /// + /// To optimize calling patterns, this method returns even when inside a context + /// when this was initialized without a , which means no main thread exists + /// and thus there is no need to capture and reapply tokens. + /// + public string? Capture() => this.UnderlyingSynchronizationContext is null ? null : this.AmbientTask?.GetSerializableToken(); + /// public void Dispose() { @@ -500,6 +537,113 @@ internal void DecrementMainThreadBlockingCount() this.mainThreadBlockingJoinableTaskCount--; } + /// + /// Reserves a unique ID for the given and records the association in the table. + /// + /// The to associate with the new ID. + /// + /// An ID assignment that is unique for this . + /// It must be passed to when the task completes to avoid a memory leak. + /// + internal ulong AssignUniqueIdentifier(JoinableTask joinableTask) + { + // The caller must have entered this lock because it's required that it only do it while it has not completed + // so that we don't have a leak in our dictionary, since completion removes the id's from the dictionary. + Assumes.True(Monitor.IsEntered(this.SyncContextLock)); + ulong taskId = checked(this.nextTaskId++); + this.serializedTasks.Add(taskId, joinableTask); + return taskId; + } + + /// + /// Applies the result of a call to to the caller's context. + /// + /// The result of a prior call. + /// The task referenced by the parent token if it came from our context and is still running; otherwise . + internal JoinableTask? Lookup(string? parentToken) + { + if (parentToken is not null) + { +#if NET6_0_OR_GREATER + ReadOnlySpan taskIdChars = this.GetOurTaskId(parentToken); +#else + string taskIdChars = this.GetOurTaskId(parentToken.AsSpan()).ToString(); +#endif + if (ulong.TryParse(taskIdChars, NumberStyles.AllowHexSpecifier, CultureInfo.InvariantCulture, out ulong taskId)) + { + using (this.NoMessagePumpSynchronizationContext.Apply()) + { + lock (this.SyncContextLock) + { + if (this.serializedTasks.TryGetValue(taskId, out JoinableTask? deserialized)) + { + return deserialized; + } + } + } + } + } + + return null; + } + + /// + /// Assembles a new token based on a parent token and the unique ID for some . + /// + /// The value previously obtained from . + /// The parent token the was created with, if any. + /// A token that may be serialized to recreate the dependency chain for this and its remote parents. + internal string ConstructFullToken(ulong taskId, string? parentToken) + { + const char ContextAndTaskSeparator = ':'; + if (parentToken is null) + { + return Invariant($"{this.contextId}{ContextAndTaskSeparator}{taskId:X16}"); + } + else + { + const char ContextSeparator = ';'; + + StringBuilder builder = new(parentToken.Length + 1 + this.contextId.Length + 1 + TaskIdHexLength); + builder.Append(parentToken); + + string taskIdString = taskId.ToString("X16", CultureInfo.InvariantCulture); + + // Replace our own contextual unique ID if it is found in the parent token. + int ownTaskIdIndex = this.FindOurTaskId(parentToken.AsSpan()); + if (ownTaskIdIndex < 0) + { + // Add our own task ID because we have no presence in the parent token already. + builder.Append(ContextSeparator); + builder.Append(this.contextId); + builder.Append(ContextAndTaskSeparator); + builder.Append(taskIdString); + } + else + { + // Replace our existing task ID that appears in the parent token. + builder.Remove(ownTaskIdIndex, TaskIdHexLength); + builder.Insert(ownTaskIdIndex, taskIdString); + } + + return builder.ToString(); + } + } + + /// + /// Removes an association between a and a unique ID that was generated for it + /// from the table. + /// + /// The value previously obtained from . + /// + /// This method must be called when a is completed to avoid a memory leak. + /// + internal void RemoveSerializableIdentifier(ulong taskId) + { + Assumes.True(Monitor.IsEntered(this.SyncContextLock)); + Assumes.True(this.serializedTasks.Remove(taskId)); + } + /// /// Invoked when a hang is suspected to have occurred involving the main thread. /// @@ -590,6 +734,43 @@ protected virtual void Dispose(bool disposing) { } + /// + /// Searches a parent token for a task ID that belongs to this instance. + /// + /// A parent token. + /// The 0-based index into the string where the context of the local task ID begins, if found; otherwise -1. + private int FindOurTaskId(ReadOnlySpan parentToken) + { + // Fetch the unique id for the JoinableTask that came from *this* context, if any. + int matchingContextIndex = parentToken.IndexOf(this.contextId.AsSpan(), StringComparison.Ordinal); + if (matchingContextIndex < 0) + { + return -1; + } + + // IMPORTANT: As the parent token frequently comes in over RPC, take care to never throw exceptions based on bad input + // as we're called on a critical scheduling callstack where an exception would lead to an Environment.FailFast call. + // To that end, only report that we found the task id if the remaining string is long enough to support it. + int uniqueIdStartIndex = matchingContextIndex + this.contextId.Length + 1; + if (parentToken.Length < uniqueIdStartIndex + TaskIdHexLength) + { + return -1; + } + + return uniqueIdStartIndex; + } + + /// + /// Gets the task ID that came from this that is carried in a given a parent token. + /// + /// A parent token. + /// The characters that formulate the task ID that originally came from this instance, if found; otherwise an empty span. + private ReadOnlySpan GetOurTaskId(ReadOnlySpan parentToken) + { + int index = this.FindOurTaskId(parentToken); + return index < 0 ? default : parentToken.Slice(index, TaskIdHexLength); + } + /// /// A structure that clears CallContext and SynchronizationContext async/thread statics and /// restores those values when this structure is disposed. diff --git a/src/Microsoft.VisualStudio.Threading/JoinableTaskFactory.cs b/src/Microsoft.VisualStudio.Threading/JoinableTaskFactory.cs index 91b489790..c24f07553 100644 --- a/src/Microsoft.VisualStudio.Threading/JoinableTaskFactory.cs +++ b/src/Microsoft.VisualStudio.Threading/JoinableTaskFactory.cs @@ -203,62 +203,19 @@ protected SynchronizationContext? UnderlyingSynchronizationContext return new MainThreadAwaitable(this, this.Context.AmbientTask, cancellationToken, alwaysYield); } - /// - /// Runs the specified asynchronous method to completion while synchronously blocking the calling thread. - /// - /// The asynchronous method to execute. - /// - /// Any exception thrown by the delegate is rethrown in its original type to the caller of this method. - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// - /// - /// // On threadpool or Main thread, this method will block - /// // the calling thread until all async operations in the - /// // delegate complete. - /// joinableTaskFactory.Run(async delegate { - /// // still on the threadpool or Main thread as before. - /// await OperationAsync(); - /// // still on the threadpool or Main thread as before. - /// await Task.Run(async delegate { - /// // Now we're on a threadpool thread. - /// await Task.Yield(); - /// // still on a threadpool thread. - /// }); - /// // Now back on the Main thread (or threadpool thread if that's where we started). - /// }); - /// - /// - /// + /// public void Run(Func asyncMethod) { this.Run(asyncMethod, JoinableTaskCreationOptions.None, entrypointOverride: null); } - /// - /// Runs the specified asynchronous method to completion while synchronously blocking the calling thread. - /// - /// The asynchronous method to execute. - /// The used to customize the task's behavior. + /// public void Run(Func asyncMethod, JoinableTaskCreationOptions creationOptions) { this.Run(asyncMethod, creationOptions, entrypointOverride: null); } - /// - /// Runs the specified asynchronous method to completion while synchronously blocking the calling thread. - /// - /// The type of value returned by the asynchronous operation. - /// The asynchronous method to execute. - /// The result of the Task returned by . - /// - /// Any exception thrown by the delegate is rethrown in its original type to the caller of this method. - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// See the overload documentation for an example. - /// + /// public T Run(Func> asyncMethod) { return this.Run(asyncMethod, JoinableTaskCreationOptions.None); @@ -276,92 +233,66 @@ public T Run(Func> asyncMethod) /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. + /// + /// + /// /// public T Run(Func> asyncMethod, JoinableTaskCreationOptions creationOptions) { VerifyNoNonConcurrentSyncContext(); - JoinableTask? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, creationOptions: creationOptions); + JoinableTask? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, parentToken: null, creationOptions: creationOptions); return joinable.CompleteOnCurrentThread(); } - /// - /// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields. - /// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method - /// requires the main thread while the main thread is blocked waiting for the async method's completion. - /// - /// The method that, when executed, will begin the async operation. - /// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary. - /// - /// Exceptions thrown by the delegate are captured by the returned . - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// + /// public JoinableTask RunAsync(Func asyncMethod) { - return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: JoinableTaskCreationOptions.None); + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken: null, creationOptions: JoinableTaskCreationOptions.None); } - /// - /// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields. - /// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method - /// requires the main thread while the main thread is blocked waiting for the async method's completion. - /// - /// The method that, when executed, will begin the async operation. - /// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary. - /// The used to customize the task's behavior. - /// - /// Exceptions thrown by the delegate are captured by the returned . - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// + /// public JoinableTask RunAsync(Func asyncMethod, JoinableTaskCreationOptions creationOptions) { - return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: creationOptions); + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken: null, creationOptions: creationOptions); } - /// - /// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields. - /// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method - /// requires the main thread while the main thread is blocked waiting for the async method's completion. - /// - /// The type of value returned by the asynchronous operation. - /// The method that, when executed, will begin the async operation. - /// - /// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary. - /// - /// - /// Exceptions thrown by the delegate are captured by the returned . - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// + /// + public JoinableTask RunAsync(Func asyncMethod, string? parentToken, JoinableTaskCreationOptions creationOptions) + { + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken, creationOptions: creationOptions); + } + + /// public JoinableTask RunAsync(Func> asyncMethod) { - return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: JoinableTaskCreationOptions.None); + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken: null, creationOptions: JoinableTaskCreationOptions.None); } - /// - /// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields. - /// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method - /// requires the main thread while the main thread is blocked waiting for the async method's completion. - /// - /// The type of value returned by the asynchronous operation. - /// The method that, when executed, will begin the async operation. - /// The used to customize the task's behavior. - /// - /// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary. - /// - /// - /// Exceptions thrown by the delegate are captured by the returned . - /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context - /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution - /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. - /// + /// public JoinableTask RunAsync(Func> asyncMethod, JoinableTaskCreationOptions creationOptions) { - return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: creationOptions); + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken: null, creationOptions: creationOptions); + } + + /// + public JoinableTask RunAsync(Func> asyncMethod, string? parentToken, JoinableTaskCreationOptions creationOptions) + { + return this.RunAsync(asyncMethod, synchronouslyBlocking: false, parentToken, creationOptions: creationOptions); } /// @@ -394,6 +325,7 @@ internal SingleExecuteProtector RequestSwitchToMainThread(Action callback) return Task.CompletedTask; }, synchronouslyBlocking: false, + parentToken: null, creationOptions: JoinableTaskCreationOptions.None, entrypointOverride: callback); @@ -438,7 +370,7 @@ internal void PostToUnderlyingSynchronizationContextOrThreadPool(SingleExecutePr internal void Run(Func asyncMethod, JoinableTaskCreationOptions creationOptions, Delegate? entrypointOverride) { VerifyNoNonConcurrentSyncContext(); - JoinableTask? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, creationOptions: creationOptions, entrypointOverride: entrypointOverride); + JoinableTask? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, parentToken: null, creationOptions, entrypointOverride); joinable.CompleteOnCurrentThread(); } @@ -674,23 +606,43 @@ private static void VerifyNoNonConcurrentSyncContext() /// synchronously completed (waited on) in the future. /// /// The asynchronous method to execute. - /// A value indicating whether the launching thread will synchronously block for this job's completion. - /// The used to customize the task's behavior. + /// + /// + /// /// The entry method's info for diagnostics. - private JoinableTask RunAsync(Func asyncMethod, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions, Delegate? entrypointOverride = null) + private JoinableTask RunAsync(Func asyncMethod, bool synchronouslyBlocking, string? parentToken, JoinableTaskCreationOptions creationOptions, Delegate? entrypointOverride = null) { Requires.NotNull(asyncMethod, nameof(asyncMethod)); - var job = new JoinableTask(this, synchronouslyBlocking, creationOptions, entrypointOverride ?? asyncMethod); + var job = new JoinableTask(this, synchronouslyBlocking, parentToken, creationOptions, entrypointOverride ?? asyncMethod); this.ExecuteJob(asyncMethod, job); return job; } - private JoinableTask RunAsync(Func> asyncMethod, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions) + /// + /// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields. + /// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method + /// requires the main thread while the main thread is blocked waiting for the async method's completion. + /// + /// The type of value returned by the asynchronous operation. + /// The method that, when executed, will begin the async operation. + /// A value indicating whether the caller is synchronously blocking. + /// An optional token that identifies one or more instances, typically in other processes, that serve as 'parents' to this one. + /// The used to customize the task's behavior. + /// + /// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary. + /// + /// + /// Exceptions thrown by the delegate are captured by the returned . + /// When the delegate resumes from a yielding await, the default behavior is to resume in its original context + /// as an ordinary async method execution would. For example, if the caller was on the main thread, execution + /// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread. + /// + private JoinableTask RunAsync(Func> asyncMethod, bool synchronouslyBlocking, string? parentToken, JoinableTaskCreationOptions creationOptions) { Requires.NotNull(asyncMethod, nameof(asyncMethod)); - var job = new JoinableTask(this, synchronouslyBlocking, creationOptions, asyncMethod); + var job = new JoinableTask(this, synchronouslyBlocking, parentToken, creationOptions, asyncMethod); this.ExecuteJob(asyncMethod, job); return job; } @@ -1049,6 +1001,11 @@ internal RunFramework(JoinableTaskFactory factory, JoinableTask joinable) this.joinable.NestingFactories = nestingFactories; } + + if (joinable.GetTokenizedParent() is JoinableTask tokenizedParent) + { + JoinableTaskDependencyGraph.AddDependency(tokenizedParent, joinable); + } } /// diff --git a/src/Microsoft.VisualStudio.Threading/JoinableTask`1.cs b/src/Microsoft.VisualStudio.Threading/JoinableTask`1.cs index caa99dba3..1ebead4ee 100644 --- a/src/Microsoft.VisualStudio.Threading/JoinableTask`1.cs +++ b/src/Microsoft.VisualStudio.Threading/JoinableTask`1.cs @@ -27,12 +27,13 @@ public class JoinableTask : JoinableTask /// /// Initializes a new instance of the class. /// - /// The instance that began the async operation. - /// A value indicating whether the launching thread will synchronously block for this job's completion. - /// The used to customize the task's behavior. - /// The entry method's info for diagnostics. - internal JoinableTask(JoinableTaskFactory owner, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions, Delegate initialDelegate) - : base(owner, synchronouslyBlocking, creationOptions, initialDelegate) + /// + /// + /// + /// + /// + internal JoinableTask(JoinableTaskFactory owner, bool synchronouslyBlocking, string? parentToken, JoinableTaskCreationOptions creationOptions, Delegate initialDelegate) + : base(owner, synchronouslyBlocking, parentToken, creationOptions, initialDelegate) { } diff --git a/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt index e69de29bb..df9e7698e 100644 --- a/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +Microsoft.VisualStudio.Threading.JoinableTaskContext.Capture() -> string? +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func!>! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt index e69de29bb..df9e7698e 100644 --- a/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +Microsoft.VisualStudio.Threading.JoinableTaskContext.Capture() -> string? +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func!>! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt index e69de29bb..df9e7698e 100644 --- a/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +Microsoft.VisualStudio.Threading.JoinableTaskContext.Capture() -> string? +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func!>! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt index e69de29bb..df9e7698e 100644 --- a/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +Microsoft.VisualStudio.Threading.JoinableTaskContext.Capture() -> string? +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! +Microsoft.VisualStudio.Threading.JoinableTaskFactory.RunAsync(System.Func!>! asyncMethod, string? parentToken, Microsoft.VisualStudio.Threading.JoinableTaskCreationOptions creationOptions) -> Microsoft.VisualStudio.Threading.JoinableTask! \ No newline at end of file diff --git a/test/Microsoft.VisualStudio.Threading.Tests/JoinableTaskTokenTests.cs b/test/Microsoft.VisualStudio.Threading.Tests/JoinableTaskTokenTests.cs new file mode 100644 index 000000000..25c078496 --- /dev/null +++ b/test/Microsoft.VisualStudio.Threading.Tests/JoinableTaskTokenTests.cs @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.Threading; +using Xunit; +using Xunit.Abstractions; + +public class JoinableTaskTokenTests : JoinableTaskTestBase +{ + public JoinableTaskTokenTests(ITestOutputHelper logger) + : base(logger) + { + } + + [Fact] + public void Capture_NoContext() + { + Assert.Null(this.context.Capture()); + } + + [Fact] + public void Capture_InsideRunContext() + { + string? token = null; + this.asyncPump.Run(delegate + { + token = this.context.Capture(); + return Task.CompletedTask; + }); + Assert.NotNull(token); + this.Logger.WriteLine($"Token {token}"); + } + + [Fact] + public void Capture_InsideRunContextWithoutSyncContext() + { + SynchronizationContext.SetSynchronizationContext(null); + JoinableTaskContext asyncPump = new(); + asyncPump.Factory.Run(delegate + { + Assert.Null(asyncPump.Capture()); + return Task.CompletedTask; + }); + } + + [Fact] + public void Capture_InheritsFromParent() + { + const string UnknownParent = "97f67c3ce2c74dc6bdb1d8a58edb9176:13"; + string? token = null; + this.asyncPump.RunAsync( + delegate + { + token = this.context.Capture(); + return Task.CompletedTask; + }, + UnknownParent, + JoinableTaskCreationOptions.None).Join(); + Assert.NotNull(token); + this.Logger.WriteLine($"Token {token}"); + Assert.Contains(UnknownParent, token); + Assert.True(token.Length > UnknownParent.Length); + } + + [Fact] + public void Capture_ReplacesParent() + { + AsyncManualResetEvent unblockParent = new(); + string? parentToken = null; + JoinableTask parent = this.asyncPump.RunAsync( + async delegate + { + parentToken = this.context.Capture(); + await unblockParent; + }); + Assert.NotNull(parentToken); + this.Logger.WriteLine($"Parent: {parentToken}"); + + string? childToken = null; + this.asyncPump.RunAsync( + delegate + { + childToken = this.context.Capture(); + unblockParent.Set(); + return Task.CompletedTask; + }, + parentToken, + JoinableTaskCreationOptions.None).Join(); + Assert.NotNull(childToken); + this.Logger.WriteLine($"Child: {childToken}"); + + // Assert that the child token *replaced* the parent token since they both came from the same context. + Assert.Equal(parentToken.Length, childToken.Length); + Assert.NotEqual(parentToken, childToken); + } + + [Fact] + public void RunAsync_AfterParentCompletes() + { + string? token = null; + this.asyncPump.Run(delegate + { + token = this.context.Capture(); + return Task.CompletedTask; + }); + Assert.NotNull(token); + this.Logger.WriteLine($"Token: {token}"); + + this.asyncPump.RunAsync( + () => Task.CompletedTask, + token, + JoinableTaskCreationOptions.None).Join(); + } + + [Theory, PairwiseData] + public async Task RunAsync_AvoidsDeadlockWithParent(bool includeOtherContexts) + { + string? parentToken = includeOtherContexts ? "abc:dead;ghi:coffee" : null; + TaskCompletionSource tokenSource = new(); + AsyncManualResetEvent releaseOuterTask = new(); + + JoinableTask outerTask = this.asyncPump.RunAsync( + async delegate + { + try + { + tokenSource.SetResult(this.context.Capture()); + await releaseOuterTask; + } + catch (Exception ex) + { + tokenSource.SetException(ex); + } + }, + parentToken, + JoinableTaskCreationOptions.None); + + string? token = await tokenSource.Task; + Assert.NotNull(token); + this.Logger.WriteLine($"Token: {token}"); + if (parentToken is not null) + { + Assert.Contains(parentToken, token); + token += ";even=feed"; + this.Logger.WriteLine($"Token (modified): {token}"); + } + + JoinableTask innerTask = this.asyncPump.RunAsync( + async delegate + { + await Task.Yield(); + releaseOuterTask.Set(); + }, + token, + JoinableTaskCreationOptions.None); + + // Sync block the main thread using the outer task. + // No discernable dependency chain exists from outer to inner task, + // yet one subtly exists. Only the serialized context should allow inner + // to complete and thus unblock outer and avoid a deadlock. + outerTask.Join(this.TimeoutToken); + } + + [Theory, PairwiseData] + public async Task RunAsyncOfT_AvoidsDeadlockWithParent(bool includeOtherContexts) + { + string? parentToken = includeOtherContexts ? "abc:dead;ghi:coffee" : null; + TaskCompletionSource tokenSource = new(); + AsyncManualResetEvent releaseOuterTask = new(); + + JoinableTask outerTask = this.asyncPump.RunAsync( + async delegate + { + try + { + tokenSource.SetResult(this.context.Capture()); + await releaseOuterTask; + } + catch (Exception ex) + { + tokenSource.SetException(ex); + } + + return true; + }, + parentToken, + JoinableTaskCreationOptions.None); + + string? token = await tokenSource.Task; + Assert.NotNull(token); + this.Logger.WriteLine($"Token: {token}"); + if (parentToken is not null) + { + Assert.Contains(parentToken, token); + token += ";even=feed"; + this.Logger.WriteLine($"Token (modified): {token}"); + } + + JoinableTask innerTask = this.asyncPump.RunAsync( + async delegate + { + await Task.Yield(); + releaseOuterTask.Set(); + return true; + }, + token, + JoinableTaskCreationOptions.None); + + // Sync block the main thread using the outer task. + // No discernable dependency chain exists from outer to inner task, + // yet one subtly exists. Only the serialized context should allow inner + // to complete and thus unblock outer and avoid a deadlock. + outerTask.Join(this.TimeoutToken); + } +}