From 14c9acc08a1823531f8e6d77e25d00135734b5d5 Mon Sep 17 00:00:00 2001 From: Stojan Dimitrovski Date: Wed, 12 Jul 2023 11:24:51 +0200 Subject: [PATCH] feat: add experimental lock option with no-op default --- src/GoTrueClient.ts | 61 ++++++++- src/lib/helpers.ts | 75 ++++++++++- src/lib/locks.ts | 257 +++++++++++++++++++++++++++++++++++++ src/lib/types.ts | 15 +++ test/broadcastLock.test.ts | 137 ++++++++++++++++++++ 5 files changed, 538 insertions(+), 7 deletions(-) create mode 100644 src/lib/locks.ts create mode 100644 test/broadcastLock.test.ts diff --git a/src/GoTrueClient.ts b/src/GoTrueClient.ts index 1fc0351ab..9ea7f4bdd 100644 --- a/src/GoTrueClient.ts +++ b/src/GoTrueClient.ts @@ -77,11 +77,12 @@ import type { MFAChallengeAndVerifyParams, ResendParams, AuthFlowType, + LockFunc, } from './lib/types' polyfillGlobalThis() // Make "globalThis" available -const DEFAULT_OPTIONS: Omit, 'fetch' | 'storage'> = { +const DEFAULT_OPTIONS: Omit, 'fetch' | 'storage' | 'lock'> = { url: GOTRUE_URL, storageKey: STORAGE_KEY, autoRefreshToken: true, @@ -99,6 +100,10 @@ const AUTO_REFRESH_TICK_DURATION = 30 * 1000 * A token refresh will be attempted this many ticks before the current session expires. */ const AUTO_REFRESH_TICK_THRESHOLD = 3 +async function lockNoOp(name: string, acquireTimeout: number, fn: () => Promise): Promise { + return await fn() +} + export default class GoTrueClient { private static nextInstanceID = 0 @@ -146,6 +151,7 @@ export default class GoTrueClient { [key: string]: string } protected fetch: Fetch + protected lock: LockFunc /** * Used to broadcast state change events to other tabs listening. @@ -174,6 +180,7 @@ export default class GoTrueClient { this.autoRefreshToken = settings.autoRefreshToken this.persistSession = settings.persistSession this.storage = settings.storage || localStorageAdapter + this.lock = settings.lock || lockNoOp this.admin = new GoTrueAdminApi({ url: settings.url, headers: settings.headers, @@ -775,6 +782,28 @@ export default class GoTrueClient { }) } + private async _acquireLock( + name: string, + acquireTimeout: number, + fn: () => Promise + ): Promise { + try { + this._debug('#_acquireLock', name, 'start') + + return this.lock(`lock:${this.storageKey}:${name}`, acquireTimeout, async () => { + try { + this._debug('#_acquireLock', name, 'acquired') + + return await fn() + } finally { + this._debug('#_acquireLock', name, 'released') + } + }) + } finally { + this._debug('#_acquireLock', name, 'end') + } + } + /** * Use instead of {@link #getSession} inside the library. It is * semantically usually what you want, as getting a session involves some @@ -802,14 +831,27 @@ export default class GoTrueClient { } error: null } - ) => Promise + ) => Promise, + acquireTimeout: number = -1 ): Promise { - return await stackGuard('_useSession', async () => { + if (isInStackGuard('_useSession')) { + this._debug('#_useSession', 'recursive use detected') + + // the lock should not be recursively held to avoid dead-locks // the use of __loadSession here is the only correct use of the function! const result = await this.__loadSession() return await fn(result) - }) + } else { + return await this._acquireLock('_useSession', acquireTimeout, async () => { + return await stackGuard('_useSession', async () => { + // the use of __loadSession here is the only correct use of the function! + const result = await this.__loadSession() + + return await fn(result) + }) + }) + } } /** @@ -1720,9 +1762,16 @@ export default class GoTrueClient { if (expiresInTicks <= AUTO_REFRESH_TICK_THRESHOLD) { await this._callRefreshToken(session.refresh_token) } - }) + }, 0 /* try-lock */) } catch (e: any) { - console.error('Auto refresh tick failed with error. This is likely a transient error.', e) + if (e.isAcquireTimeout) { + this._debug( + '#_autoRefreshTokenTick()', + 'lock is already acquired, skipping for next tick' + ) + } else { + console.error('Auto refresh tick failed with error. This is likely a transient error.', e) + } } } finally { this._debug('#_autoRefreshTokenTick()', 'end') diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index 856afb07b..630ae45a7 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -290,7 +290,7 @@ const STACK_GUARD_SUFFIX = `__` // they all include the function name. So instead of trying to parse the entry, // we're only looking for the special string `__stack_guard__${guardName}__`. // Guard names can only be letters with dashes or underscores. -// +// // Example Firefox stack trace: // ``` // __stack_guard__EXAMPLE__@debugger eval code:1:55 @@ -393,3 +393,76 @@ STACK_GUARD_CHECK_FN = async () => { }) } } + +const LOCAL_CHANNELS: { [name: string]: Set> } = {} + +/** + * Wraps a `BroadcastChannel` for use in environments where it is not + * available, like Node.js. + */ +export class WrappedBroadcastChannel { + private _bc: BroadcastChannel | null = null + + private _onmessage: null | ((event: { data: D }) => any) = null + + set onmessage(cb: null | ((event: { data: D }) => any)) { + this._onmessage = cb + + if (this._bc) { + if (cb) { + this._bc.onmessage = (event) => { + cb(event) + } + } else { + this._bc.onmessage = null + } + } + } + + get onmessage() { + return this._onmessage + } + + constructor(readonly name: string) { + if (globalThis.BroadcastChannel) { + this._bc = new globalThis.BroadcastChannel(name) + } else { + if (!LOCAL_CHANNELS[name]) { + LOCAL_CHANNELS[name] = new Set() + } + + LOCAL_CHANNELS[name].add(this) + } + } + + postMessage(data: D) { + if (this._bc) { + this._bc.postMessage(data) + return + } + + setTimeout(() => { + LOCAL_CHANNELS[this.name].forEach((ch) => { + if (ch === this) { + return + } + + if (ch._onmessage) { + ch._onmessage({ data }) + } + }) + }, 0) + } + + close() { + this.onmessage = null + + if (this._bc) { + this._bc.close() + } else { + LOCAL_CHANNELS[this.name].delete(this) + } + + this._bc = null + } +} diff --git a/src/lib/locks.ts b/src/lib/locks.ts new file mode 100644 index 000000000..2f67d05aa --- /dev/null +++ b/src/lib/locks.ts @@ -0,0 +1,257 @@ +import { uuid, WrappedBroadcastChannel } from './helpers' + +/** + * Exposed to allow for slight tweaks of the locking algorithm for testing. + * + * @experimental + */ +export const INTERNALS = { + /** + * Whether debug messages for each lock are emitted. + * + * @experimental + */ + DEBUG: + globalThis.localStorage && + globalThis.localStorage.getItem && + globalThis.localStorage.getItem('supabase.gotrue-js.broadcastLock.debug') === 'true', + + /** + * Minimum time a lock will wait for messages on the channel before + * considering the lock as acquired. + * + * @experimental + */ + LOCK_WAIT: + globalThis.localStorage && globalThis.localStorage.getItem + ? parseInt( + globalThis.localStorage.getItem('supabase.gotrue-js.broadcastLock.lock-wait') || '25', + 10 + ) + : 25, + + PROCESSED_MESSAGES: 0, +} + +/** + * Implements cancellable sleep for the specified duration in ms. + * + * @returns An object with a `sleeper` promise which you should await. If you + * call the `cancel` function, the sleep is cancelled and the promise resolves + * immediately. + */ +function sleep(duration: number) { + if (globalThis && globalThis.document && globalThis.document.visibilityState !== 'visible') { + console.warn( + 'gotrue-js broadcastLock: sleeping when the document is not visible may use throttled timers: https://developer.chrome.com/blog/timer-throttling-in-chrome-88/' + ) + } + + let timeout: any | null = null + let resolve: (() => void) | null = null + + const promise = new Promise((accept) => { + resolve = accept + timeout = setTimeout(() => { + timeout = null + resolve = null + + accept() + }, duration) + }) + + return { + sleeper: promise, + cancel: () => { + if (timeout) { + clearTimeout(timeout) + } + + if (resolve) { + resolve() + } + + timeout = null + resolve = null + }, + } +} + +/** + * Messages sent on the `BroadcastChannel` for {@link #broadcastLock}. + */ +type LockEvent = { + msg: 'I will acquire the lock!' | 'I have the lock, please wait!' | 'Go' + id: string + go?: string +} + +/** + * Implements a distributed global lock based on `BroadcastChannel` with the + * provided name. + * + * The lock will attempt to be acquired until `acquireTimeout` is up. If + * negative, there will be no timeout. If 0, if the lock can't be acquired + * immediately a timeout will be thrown. + * + * You must not call this recursively, as it will lead to a deadlock. + * + * Internals: The lock has 3 states -- acquiring, backoff and acquired. + * + * When in the Acquiring state, a message is broadcast `I will acquire the + * lock!` and a timeout is started. If any message is received in this state, + * the lock immediately moves to the Backoff state. If no message is received, + * the lock moves to the Acquired state. It is assumed that if two messages are + * posted simultaneously at the channel, that both locks will receive the + * other's message. + * + * When in the Backoff state, the lock sleeps for random amount of time before + * moving back in the Acquiring state. Each time it enters this state, it waits + * exponentially longer than the last time. + * + * When in the Acquired state, the lock broadcasts `I have the lock, please + * wait!`. If any message is received on the channel, `I have the lock!` is + * broadcast immediately. The sender of the first `I will acquire the lock!` + * message received in this state will be sent the `Go` message after the + * operation is done which gives it priority over all other competing locks. + * This helps reduce the time needed for the locks to identify who should go + * next. Once the operation is done, the lock stops replying with `I have the + * lock, please wait!` messages on the channel. + * + * Lock wait times have a default of 25ms but can be configured with the + * `supabase.gotrue-js.broadcastLock.lock-wait` local storage key. + * + * You can check for timeout with the `isAcquireTimeout` property on the error. + * + * @experimental + */ +export async function broadcastLock( + name: string, + acquireTimeout: number, + fn: () => Promise +): Promise { + const bc = new WrappedBroadcastChannel(name) + + try { + if (acquireTimeout >= 0 && acquireTimeout < INTERNALS.LOCK_WAIT) { + acquireTimeout = INTERNALS.LOCK_WAIT + } + + const start = Date.now() + + const id = uuid() + + let state: 'acquiring' | 'backoff' | 'backoff-extend' | 'acquired' = 'acquiring' + let sleepOperation: ReturnType = { + sleeper: Promise.resolve(), + cancel: () => { + /* no-op */ + }, + } + + let nextId: string | null = null + + bc.onmessage = (event: { data: { msg: string; id: string; go?: string } }) => { + if (INTERNALS.DEBUG) { + console.log(`broadcastLock(${name})@${id}: received message`, event.data) + } + + if (state !== 'acquired' && event.data.msg === 'Go' && event.data.go === id) { + // current lock owner says we can acquire the lock right away + state = 'acquired' + sleepOperation.cancel() + } else { + if (state === 'acquiring') { + // any message was received, move to backoff state + state = 'backoff' + sleepOperation.cancel() + } else if (state === 'acquired') { + // any message was received, reply that the lock is taken + bc.postMessage({ msg: 'I have the lock, please wait!', id }) + + if (event.data.msg === 'I have the lock, please wait!') { + console.error( + `broadcastLock(${name})@${id}: multiple tabs have the lock!`, + event.data.id + ) + } else if (event.data.msg !== 'Go') { + if (!nextId) { + // record the first one who wants the lock since we acquired it so we + // give them the chance to go after us + nextId = event.data.id + } + } + } else { + // backoff state + if (event.data.msg === 'I have the lock, please wait!') { + // someone asked about the lock and it's still taken, so immediately pick another longer backoff to minimize the number of messages sent over the channel + state = 'backoff-extend' + sleepOperation.cancel() + } + } + } + } + + let backoffMultiplier = 0 + + while (acquireTimeout < 0 || Date.now() - start < acquireTimeout) { + if (state === 'acquiring') { + bc.postMessage({ msg: 'I will acquire the lock!', id }) + + sleepOperation = sleep(INTERNALS.LOCK_WAIT) + await sleepOperation.sleeper + + if (state === 'acquiring') { + // state did not change while sleeping + state = 'acquired' as typeof state + // ^^^ cast is to force typescript to consider the onmessage handler + // above, otherwise it thinks that state can't ever be backoff + } + } else if (state === 'backoff') { + backoffMultiplier += 1 + // sleep randomly but exponentially longer each time + sleepOperation = sleep( + INTERNALS.LOCK_WAIT + Math.random() * ((INTERNALS.LOCK_WAIT * backoffMultiplier) / 2) + ) + await sleepOperation.sleeper + + if (state === 'backoff') { + // state did not change while in backoff + state = 'acquiring' as typeof state + } else if (state === 'backoff-extend') { + // re-enter the backoff state to extend it + backoffMultiplier -= 1 + state = 'backoff' as typeof state + } + } else { + bc.postMessage({ msg: 'I have the lock, please wait!', id }) + // ^^^^ essentially moves any other locks from the acquiring state into + // the backoff state + + try { + // lock is acquired, do the operation + return await fn() + } finally { + if (nextId) { + // someone wanted the lock while we had it, so let's give them a + // prompt chance to go right after us + bc.postMessage({ msg: 'Go', id, go: nextId }) + // this message also moves all locks that are not mentioned in `go` + // into the backoff state immediately, meaning that the `nextId` + // lock has `LOCK_WAIT` time to notice it's now its turn + } + + nextId = null + } + } + } + + const timeout: any = new Error(`Acquiring the lock "${name}" timed out!`) + timeout.isAcquireTimeout = true + + throw timeout + } finally { + bc.onmessage = null + bc.close() + } +} diff --git a/src/lib/types.ts b/src/lib/types.ts index 5a392e682..d72e8ea5c 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -33,6 +33,8 @@ export type AuthChangeEvent = | 'USER_UPDATED' | AuthChangeEventMFA +export type LockFunc = (name: string, acquireTimeout: number, fn: () => Promise) => Promise + export type GoTrueClientOptions = { /* The URL of the GoTrue server. */ url?: string @@ -50,6 +52,19 @@ export type GoTrueClientOptions = { storage?: SupportedStorage /* A custom fetch implementation. */ fetch?: Fetch + /** + * Provide your own global lock implementation instead of the default + * implementation. The function should acquire a lock for the duration of the + * `fn` async function, such that no other client instances will be able to + * hold it at the same time. + * + * @experimental + * + * @param name Name of the lock to be acquired. + * @param acquireTimeout If negative, no timeout should occur. If positive it should throw an Error with an `isAcquireTimeout` property set to true if the operation fails to be acquired after this much time (ms). + * @param fn The operation to execute when the lock is acquired. + */ + lock?: LockFunc /* If set to 'pkce' PKCE flow. Defaults to the 'implicit' flow otherwise */ flowType?: AuthFlowType /* If debug messages are emitted. Can be used to inspect the behavior of the library. */ diff --git a/test/broadcastLock.test.ts b/test/broadcastLock.test.ts new file mode 100644 index 000000000..ff7f74d37 --- /dev/null +++ b/test/broadcastLock.test.ts @@ -0,0 +1,137 @@ +import { broadcastLock, INTERNALS } from '../src/lib/locks' +import { sleep } from '../src/lib/helpers' + +jest.useRealTimers() + +function now() { + const hrTime = process.hrtime() + + return hrTime[0] * 1000000 + hrTime[1] / 1000 +} + +describe('broadcastLock', () => { + let MAX_CONCURRENT_LOCKS_UNDER_TEST = 25 + + if (process.env.CI === 'true') { + // Github Actions is very slow + MAX_CONCURRENT_LOCKS_UNDER_TEST = 10 + } + + it(`should run ${MAX_CONCURRENT_LOCKS_UNDER_TEST} concurrent broadcastLocks in order`, async () => { + const durations = Array.from({ length: MAX_CONCURRENT_LOCKS_UNDER_TEST }, () => [0, 0]) + + const start = now() + + const locks = await Promise.all( + durations.map(async (duration, i) => { + await broadcastLock('test', -1, async () => { + duration[0] = now() + + await sleep(50 + 25 * Math.random()) + + duration[1] = now() - duration[0] + }) + }) + ) + + // sort by start times + durations.sort((a, b) => a[0] - b[0]) + + let maxSyncTime = 0 + let minSyncTime = Number.MAX_SAFE_INTEGER + + for (let i = 1; i < durations.length; i += 1) { + const previous = durations[i - 1] + const current = durations[i] + + if (previous[0] > current[0]) { + throw new Error('Not sorted based on start times') + } + + // current start time - previous end time + const syncTime = current[0] - (previous[0] + previous[1]) + + minSyncTime = Math.min(minSyncTime, syncTime) + maxSyncTime = Math.max(maxSyncTime, syncTime) + } + + const firstSyncTime = durations[0][0] - start + + minSyncTime = Math.min(minSyncTime, firstSyncTime) + maxSyncTime = Math.max(maxSyncTime, firstSyncTime) + + const usefulTime = durations.reduce((a, i) => a + i[1], 0) + const totalTime = + durations[durations.length - 1][0] + durations[durations.length - 1][1] - durations[0][0] + + console.log( + 'Concurrency =', + durations.length, + 'Useful time =', + (usefulTime / 1000).toFixed(4), + 'Total time =', + (totalTime / 1000).toFixed(4), + 'Ratio =', + (totalTime / usefulTime).toFixed(4), + 'Max sync time =', + (maxSyncTime / 1000).toFixed(4), + 'Min sync time =', + (minSyncTime / 1000).toFixed(4), + 'First sync time =', + (firstSyncTime / 1000).toFixed(4) + ) + + if (totalTime - usefulTime <= 0) { + throw new Error( + 'Assumptions seem broken (useful time always must be < total time). Test is broken?' + ) + } + + if (firstSyncTime < INTERNALS.LOCK_WAIT * 1000) { + throw new Error( + `First sync time must not be quicker than LOCK_WAIT (${INTERNALS.LOCK_WAIT}). Check algorithm!` + ) + } + + if (minSyncTime <= 10 /* microseconds */) { + throw new Error( + `Interleaved ordering, locks were serialized very close one after another ${minSyncTime}! Check algorithm!` + ) + } + + if (totalTime / usefulTime >= 2) { + throw new Error( + `Algorithm is inefficient at ordering a high concurrency of locks, overhead = ${( + (totalTime / usefulTime - 1) * + 100 + ).toFixed(4)}%` + ) + } + }) + + it('should fail with a isAcquireTimeout error with acquireTimeout of 0', async () => { + let acquired = false + let error: any | null = null + + // first acquire the lock without any acquireTimeout and run it in the background for 100ms + broadcastLock('test', -1, async () => { + acquired = true + await sleep(100) + }) + + await sleep(50) // to make sure the lock got fully acquired + + expect(acquired).toBe(true) + + try { + await broadcastLock('test', 0, async () => { + await sleep(50) + }) + } catch (e: any) { + error = e + } + + expect(error).not.toBeNull() + expect(error.isAcquireTimeout).toBe(true) + }) +})