Skip to content

Commit

Permalink
[browser][MT] fix Promise cancelation race condition (#99259)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelsavara authored Mar 6, 2024
1 parent 22e4825 commit 7183d70
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.Runtime.InteropServices.JavaScript
{
public static partial class CancelablePromise
{
public static void CancelPromise(Task promise)
public static unsafe void CancelPromise(Task promise)
{
// this check makes sure that promiseGCHandle is still valid handle
if (promise.IsCompleted)
Expand All @@ -24,7 +24,6 @@ public static void CancelPromise(Task promise)
{
return;
}
holder.IsCanceling = true;
Interop.Runtime.CancelPromise(holder.GCHandle);
#else

Expand All @@ -34,17 +33,18 @@ public static void CancelPromise(Task promise)
{
return;
}
holder.IsCanceling = true;

if (Interlocked.CompareExchange(ref (*holder.State).IsResolving, 1, 0) != 0)
{
return;
}

if (holder.ProxyContext.IsCurrentThread())
{
Interop.Runtime.CancelPromise(holder.GCHandle);
}
else
{
// FIXME: race condition
// we know that holder.GCHandle is still valid because we hold the ProxyContext lock
// but the message may arrive to the target thread after it was resolved, making GCHandle invalid
Interop.Runtime.CancelPromisePost(holder.ProxyContext.JSNativeTID, holder.GCHandle);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,7 @@ public static void CompleteTask(JSMarshalerArgument* arguments_buffer)
lock (ctx)
{
callback = holder.Callback!;
// if Interop.Runtime.CancelPromisePost is in flight, we can't free the GCHandle, because it's needed in JS
var isOutOfOrderCancellation = holder.IsCanceling && arg_res.slot.Type != MarshalerType.Discard;
// FIXME: when it happens we are leaking GCHandle + holder
if (!isOutOfOrderCancellation)
{
ctx.ReleasePromiseHolder(arg_1.slot.GCHandle);
}
ctx.ReleasePromiseHolder(arg_1.slot.GCHandle);
}
#else
callback = holder.Callback!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ internal static unsafe void InvokeJSImportImpl(JSFunctionBinding signature, Span
var holder = targetContext.CreatePromiseHolder();
res.slot.Type = MarshalerType.TaskPreCreated;
res.slot.GCHandle = holder.GCHandle;
#if FEATURE_WASM_MANAGED_THREADS
res.slot.IntPtrValue = (IntPtr)holder.State;
#endif
}
#if FEATURE_WASM_MANAGED_THREADS
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,46 @@ internal static partial class JSHostImplementation
{
internal unsafe delegate void ToManagedCallback(JSMarshalerArgument* arguments_buffer);

public sealed class PromiseHolder
public sealed unsafe class PromiseHolder
{
public bool IsDisposed;
public readonly nint GCHandle; // could be also virtual GCVHandle
public ToManagedCallback? Callback;
public JSProxyContext ProxyContext;
public bool IsDisposed;
public bool IsCanceling;
#if FEATURE_WASM_MANAGED_THREADS
public ManualResetEventSlim? CallbackReady;
public PromiseHolderState* State;
#endif

public PromiseHolder(JSProxyContext targetContext)
{
GCHandle = (IntPtr)InteropServices.GCHandle.Alloc(this, GCHandleType.Normal);
ProxyContext = targetContext;
#if FEATURE_WASM_MANAGED_THREADS
State = (PromiseHolderState*)Marshal.AllocHGlobal(sizeof(PromiseHolderState));
Interlocked.Exchange(ref (*State).IsResolving, 0);
#endif
}

public PromiseHolder(JSProxyContext targetContext, nint gcvHandle)
{
GCHandle = gcvHandle;
ProxyContext = targetContext;
#if FEATURE_WASM_MANAGED_THREADS
State = (PromiseHolderState*)Marshal.AllocHGlobal(sizeof(PromiseHolderState));
Interlocked.Exchange(ref (*State).IsResolving, 0);
#endif
}
}

// NOTE: layout has to match PromiseHolderState in marshal-to-cs.ts
[StructLayout(LayoutKind.Explicit)]
public struct PromiseHolderState
{
[FieldOffset(0)]
public volatile int IsResolving;
}

[StructLayout(LayoutKind.Explicit)]
public struct IntPtrAndHandle
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle)
holder.IsDisposed = true;
handle.Free();
}
#if FEATURE_WASM_MANAGED_THREADS
Marshal.FreeHGlobal((IntPtr)holder.State);
holder.State = null;
#endif
}
}

Expand Down Expand Up @@ -421,6 +425,10 @@ public unsafe void ReleaseJSOwnedObjectByGCHandle(nint gcHandle)
{
holderCallback = holder.Callback;
holder.IsDisposed = true;
#if FEATURE_WASM_MANAGED_THREADS
Marshal.FreeHGlobal((IntPtr)holder.State);
holder.State = null;
#endif
}
}
holderCallback?.Invoke(null);
Expand Down
151 changes: 136 additions & 15 deletions src/mono/browser/runtime/cancelable-promise.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

import { _lookup_js_owned_object } from "./gc-handles";
import WasmEnableThreads from "consts:wasmEnableThreads";

import { _lookup_js_owned_object, teardown_managed_proxy, upgrade_managed_proxy_to_strong_ref } from "./gc-handles";
import { createPromiseController, loaderHelpers, mono_assert } from "./globals";
import { mono_log_warn } from "./logging";
import { PromiseHolder } from "./marshal-to-cs";
import { ControllablePromise, GCHandle } from "./types/internal";
import { ControllablePromise, GCHandle, MarshalerToCs } from "./types/internal";
import { ManagedObject } from "./marshal";
import { compareExchangeI32, forceThreadMemoryViewRefresh } from "./memory";
import { mono_log_debug } from "./logging";
import { settleUnsettledPromise } from "./pthreads";
import { complete_task } from "./managed-exports";
import { marshal_cs_object_to_cs } from "./marshal-to-cs";

export const _are_promises_supported = ((typeof Promise === "object") || (typeof Promise === "function")) && (typeof Promise.resolve === "function");

Expand All @@ -32,17 +38,132 @@ export function wrap_as_cancelable<T>(inner: Promise<T>): ControllablePromise<T>
export function mono_wasm_cancel_promise(task_holder_gc_handle: GCHandle): void {
const holder = _lookup_js_owned_object(task_holder_gc_handle) as PromiseHolder;
mono_assert(!!holder, () => `Expected Promise for GCHandle ${task_holder_gc_handle}`);
holder.cancel();
}

const promise = holder.promise;
loaderHelpers.assertIsControllablePromise(promise);
const promise_control = loaderHelpers.getPromiseController(promise);
if (holder.isResolved) {
// FIXME: something needs to free the GCHandle
mono_log_warn("Canceling a promise that has already resolved.");
return;
}
mono_assert(!holder.isCanceled, "This promise already canceled.");
holder.isCanceled = true;
promise_control.reject(new Error("OperationCanceledException"));
// NOTE: layout has to match PromiseHolderState in JSHostImplementation.Types.cs
const enum PromiseHolderState {
IsResolving = 0,
}

const promise_holder_symbol = Symbol.for("wasm promise_holder");

export class PromiseHolder extends ManagedObject {
public isResolved = false;
public isPosted = false;
public isPostponed = false;
public data: any = null;
public reason: any = null;
public constructor(public promise: Promise<any>,
private gc_handle: GCHandle,
private promiseHolderPtr: number, // could be null for GCV_handle
private res_converter?: MarshalerToCs) {
super();
}

// returns false if the promise is being canceled by another thread in managed code
setIsResolving(): boolean {
if (!WasmEnableThreads || this.promiseHolderPtr === 0) {
return true;
}
forceThreadMemoryViewRefresh();
if (compareExchangeI32(this.promiseHolderPtr + PromiseHolderState.IsResolving, 1, 0) === 0) {
return true;
}
return false;
}

resolve(data: any) {
mono_assert(!this.isResolved, "resolve could be called only once");
if (WasmEnableThreads && !this.setIsResolving()) {
// we know that cancelation is in flight
// because we need to keep the GCHandle alive until until the cancelation arrives
// we skip the this resolve and let the cancelation to reject the Task
// we store the original data and use it later
this.data = data;
this.isPostponed = true;

// but after the promise is resolved, nothing holds the weak reference to the PromiseHolder anymore
// we know that cancelation is in flight, so we upgrade the weak reference to strong for the meantime
upgrade_managed_proxy_to_strong_ref(this, this.gc_handle);
return;
}
this.isResolved = true;
this.complete_task(data, null);
}

reject(reason: any) {
mono_assert(!this.isResolved, "reject could be called only once");
const isCancelation = reason && reason[promise_holder_symbol] === this;
if (WasmEnableThreads && !isCancelation && !this.setIsResolving()) {
// we know that cancelation is in flight
// because we need to keep the GCHandle alive until until the cancelation arrives
// we skip the this reject and let the cancelation to reject the Task
// we store the original reason and use it later
this.reason = reason;
this.isPostponed = true;

// but after the promise is resolved, nothing holds the weak reference to the PromiseHolder anymore
// we know that cancelation is in flight, so we upgrade the weak reference to strong for the meantime
upgrade_managed_proxy_to_strong_ref(this, this.gc_handle);
return;
}
this.isResolved = true;
this.complete_task(null, reason);
}

cancel() {
mono_assert(!this.isResolved, "cancel could be called only once");

if (this.isPostponed) {
// there was racing resolve/reject which was postponed, to retain valid GCHandle
// in this case we just finish the original resolve/reject
// and we need to use the postponed data/reason
this.isResolved = true;
if (this.reason !== undefined) {
this.complete_task(null, this.reason);
} else {
this.complete_task(this.data, null);
}
} else {
// there is no racing resolve/reject, we can reject/cancel the promise
const promise = this.promise;
loaderHelpers.assertIsControllablePromise(promise);
const promise_control = loaderHelpers.getPromiseController(promise);

const reason = new Error("OperationCanceledException") as any;
reason[promise_holder_symbol] = this;
promise_control.reject(reason);
}
}

// we can do this just once, because it will be dispose the GCHandle
complete_task(data: any, reason: any) {
if (!loaderHelpers.is_runtime_running()) {
mono_log_debug("This promise can't be propagated to managed code, mono runtime already exited.");
return;
}
try {
mono_assert(!this.isPosted, "Promise is already posted to managed.");
this.isPosted = true;
if (WasmEnableThreads) {
forceThreadMemoryViewRefresh();
settleUnsettledPromise();
}

// we can unregister the GC handle just on JS side
teardown_managed_proxy(this, this.gc_handle, /*skipManaged: */ true);
// order of operations with teardown_managed_proxy matters
// so that managed user code running in the continuation could allocate the same GCHandle number and the local registry would be already ok with that
complete_task(this.gc_handle, reason, data, this.res_converter || marshal_cs_object_to_cs);
}
catch (ex) {
try {
loaderHelpers.mono_exit(1, ex);
}
catch (ex2) {
// there is no point to propagate the exception into the unhandled promise rejection
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

import { VoidPtr } from "../../types/emscripten";
import * as Memory from "../../memory";
import { getI32, notifyI32, setI32, storeI32 } from "../../memory";


/// One-reader, one-writer, size 1 queue for messages from an EventPipe streaming thread to
Expand Down Expand Up @@ -68,22 +68,22 @@ export class StreamQueue {
}

private onWorkAvailable(this: StreamQueue /*,event: Event */): void {
const buf = Memory.getI32(this.buf_addr) as unknown as VoidPtr;
const buf = getI32(this.buf_addr) as unknown as VoidPtr;
const intptr_buf = buf as unknown as number;
if (intptr_buf === STREAM_CLOSE_SENTINEL) {
// special value signaling that the streaming thread closed the queue.
this.syncSendClose();
} else {
const count = Memory.getI32(this.count_addr);
Memory.setI32(this.buf_addr, 0);
const count = getI32(this.count_addr);
setI32(this.buf_addr, 0);
if (count > 0) {
this.syncSendBuffer(buf, count);
}
}
/* buffer is now not full */
Memory.Atomics.storeI32(this.buf_full_addr, 0);
storeI32(this.buf_full_addr, 0);
/* wake up the writer thread */
Memory.Atomics.notifyI32(this.buf_full_addr, 1);
notifyI32(this.buf_full_addr, 1);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/mono/browser/runtime/gc-handles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { assert_js_interop, js_import_wrapper_by_fn_handle } from "./invoke-js";
import { mono_log_info, mono_log_warn } from "./logging";
import { bound_cs_function_symbol, imported_js_function_symbol, proxy_debug_symbol } from "./marshal";
import { GCHandle, GCHandleNull, JSHandle, WeakRefInternal } from "./types/internal";
import { _use_weak_ref, create_weak_ref } from "./weak-ref";
import { _use_weak_ref, create_strong_ref, create_weak_ref } from "./weak-ref";
import { exportsByAssembly } from "./invoke-cs";
import { release_js_owned_object_by_gc_handle } from "./managed-exports";

Expand Down Expand Up @@ -137,6 +137,11 @@ export function setup_managed_proxy(owner: any, gc_handle: GCHandle): void {
_js_owned_object_table.set(gc_handle, wr);
}

export function upgrade_managed_proxy_to_strong_ref(owner: any, gc_handle: GCHandle): void {
const sr = create_strong_ref(owner);
_js_owned_object_table.set(gc_handle, sr);
}

export function teardown_managed_proxy(owner: any, gc_handle: GCHandle, skipManaged?: boolean): void {
assert_js_interop();
// The JS object associated with this gc_handle has been collected by the JS GC.
Expand Down
6 changes: 1 addition & 5 deletions src/mono/browser/runtime/managed-exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,18 @@ export function release_js_owned_object_by_gc_handle(gc_handle: GCHandle) {
}

// the marshaled signature is: void CompleteTask<T>(GCHandle holder, Exception? exceptionResult, T? result)
export function complete_task(holder_gc_handle: GCHandle, isCanceling: boolean, error?: any, data?: any, res_converter?: MarshalerToCs) {
export function complete_task(holder_gc_handle: GCHandle, error?: any, data?: any, res_converter?: MarshalerToCs) {
loaderHelpers.assert_runtime_running();
const sp = Module.stackSave();
try {
const size = 5;
const args = alloc_stack_frame(size);
const res = get_arg(args, 1);
const arg1 = get_arg(args, 2);
set_arg_type(arg1, MarshalerType.Object);
set_gc_handle(arg1, holder_gc_handle);
const arg2 = get_arg(args, 3);
if (error) {
marshal_exception_to_cs(arg2, error);
if (isCanceling) {
set_arg_type(res, MarshalerType.Discard);
}
} else {
set_arg_type(arg2, MarshalerType.None);
const arg3 = get_arg(args, 4);
Expand Down
Loading

0 comments on commit 7183d70

Please sign in to comment.