From df81f02953b1062aeeacbc7e0fb949ec6458c4d4 Mon Sep 17 00:00:00 2001 From: Kyle Painter Date: Thu, 7 Mar 2024 09:48:08 +1100 Subject: [PATCH 1/2] Use task scheduler to process optimistic responses and execute error rollback. --- .../relay-runtime/store/OperationExecutor.js | 46 ++-- ...yModernEnvironment-ExecuteMutation-test.js | 203 ++++++++++++++++++ 2 files changed, 229 insertions(+), 20 deletions(-) diff --git a/packages/relay-runtime/store/OperationExecutor.js b/packages/relay-runtime/store/OperationExecutor.js index 88424d3993130..b415520ea78bf 100644 --- a/packages/relay-runtime/store/OperationExecutor.js +++ b/packages/relay-runtime/store/OperationExecutor.js @@ -216,13 +216,15 @@ class Executor { RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION && optimisticConfig != null ) { - this._processOptimisticResponse( - optimisticConfig.response != null - ? {data: optimisticConfig.response} - : null, - optimisticConfig.updater, - false, - ); + this._schedule(() => { + this._processOptimisticResponse( + optimisticConfig.response != null + ? {data: optimisticConfig.response} + : null, + optimisticConfig.updater, + false, + ); + }); } source.subscribe({ complete: () => this._complete(id), @@ -250,13 +252,15 @@ class Executor { !RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION && optimisticConfig != null ) { - this._processOptimisticResponse( - optimisticConfig.response != null - ? {data: optimisticConfig.response} - : null, - optimisticConfig.updater, - false, - ); + this._schedule(() => { + this._processOptimisticResponse( + optimisticConfig.response != null + ? {data: optimisticConfig.response} + : null, + optimisticConfig.updater, + false, + ); + }); } } @@ -358,12 +362,14 @@ class Executor { } _error(error: Error): void { - this.cancel(); - this._sink.error(error); - this._log({ - name: 'execute.error', - executeId: this._executeId, - error, + this._schedule(() => { + this.cancel(); + this._sink.error(error); + this._log({ + name: 'execute.error', + executeId: this._executeId, + error, + }); }); } diff --git a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js index ea8e3be40d71c..74fc27b67d3f3 100644 --- a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js +++ b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js @@ -611,6 +611,209 @@ describe.each(['RelayModernEnvironment', 'MultiActorEnvironment'])( }, }); }); + + describe('when using a scheduler', () => { + let taskID; + let tasks; + let scheduler; + let runTask; + + beforeEach(() => { + taskID = 0; + tasks = new Map void>(); + scheduler = { + cancel: (id: string) => { + tasks.delete(id); + }, + schedule: (task: () => void) => { + const id = String(taskID++); + tasks.set(id, task); + return id; + }, + }; + runTask = () => { + for (const [id, task] of tasks) { + tasks.delete(id); + task(); + break; + } + }; + + const multiActorEnvironment = new MultiActorEnvironment({ + // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file + createNetworkForActor: _actorID => RelayNetwork.create(fetch), + createStoreForActor: _actorID => store, + scheduler, + }); + environment = + environmentType === 'MultiActorEnvironment' + ? multiActorEnvironment.forActor(getActorIdentifier('actor:1234')) + : new RelayModernEnvironment({ + // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file + network: RelayNetwork.create(fetch), + store, + scheduler, + }); + }); + + it('applies the optimistic update', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); + + // Verify task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + // Update is applied after scheduler runs scheduled task + expect(complete).not.toBeCalled(); + expect(error).not.toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); + }); + + it('reverts the optimistic update and commits the server payload', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); + + // Verify optimistic update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + // Update is applied after scheduler runs scheduled task + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); + + callback.mockClear(); + subject.next({ + data: { + commentCreate: { + comment: { + id: commentID, + body: { + text: 'Gave Relay', + }, + }, + }, + }, + }); + subject.complete(); + + // Verify update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + // Update is applied after scheduler runs scheduled task + expect(complete).toBeCalled(); + expect(error).not.toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Gave Relay', + }, + }); + }); + + it('reverts the optimistic update if the fetch is rejected', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); + + // Verify optimistic update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + // Update is applied after scheduler runs scheduled task + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); + + callback.mockClear(); + subject.error(new Error('wtf')); + + // Verify rollback task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + expect(complete).not.toBeCalled(); + expect(error).toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual(undefined); + }); + }); }); }, ); From 74de55f86740042c0d0c9bde923801de9d21a4e6 Mon Sep 17 00:00:00 2001 From: Kyle Painter Date: Tue, 9 Apr 2024 16:17:55 +1000 Subject: [PATCH 2/2] Move optimistic response and rollback scheduling behind a feature flag. --- .../relay-runtime/store/OperationExecutor.js | 21 +- ...yModernEnvironment-ExecuteMutation-test.js | 400 ++++++++++-------- .../relay-runtime/util/RelayFeatureFlags.js | 2 + 3 files changed, 229 insertions(+), 194 deletions(-) diff --git a/packages/relay-runtime/store/OperationExecutor.js b/packages/relay-runtime/store/OperationExecutor.js index b415520ea78bf..d5e50c11bdac7 100644 --- a/packages/relay-runtime/store/OperationExecutor.js +++ b/packages/relay-runtime/store/OperationExecutor.js @@ -216,7 +216,7 @@ class Executor { RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION && optimisticConfig != null ) { - this._schedule(() => { + const task = () => { this._processOptimisticResponse( optimisticConfig.response != null ? {data: optimisticConfig.response} @@ -224,7 +224,10 @@ class Executor { optimisticConfig.updater, false, ); - }); + }; + RelayFeatureFlags.SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK + ? this._schedule(task) + : task(); } source.subscribe({ complete: () => this._complete(id), @@ -252,7 +255,7 @@ class Executor { !RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION && optimisticConfig != null ) { - this._schedule(() => { + const task = () => { this._processOptimisticResponse( optimisticConfig.response != null ? {data: optimisticConfig.response} @@ -260,7 +263,10 @@ class Executor { optimisticConfig.updater, false, ); - }); + }; + RelayFeatureFlags.SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK + ? this._schedule(task) + : task(); } } @@ -362,7 +368,7 @@ class Executor { } _error(error: Error): void { - this._schedule(() => { + const task = () => { this.cancel(); this._sink.error(error); this._log({ @@ -370,7 +376,10 @@ class Executor { executeId: this._executeId, error, }); - }); + }; + RelayFeatureFlags.SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK + ? this._schedule(task) + : task(); } _start(id: number, subscription: Subscription): void { diff --git a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js index 74fc27b67d3f3..2d7c8d16a9ce1 100644 --- a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js +++ b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteMutation-test.js @@ -612,208 +612,232 @@ describe.each(['RelayModernEnvironment', 'MultiActorEnvironment'])( }); }); - describe('when using a scheduler', () => { - let taskID; - let tasks; - let scheduler; - let runTask; - - beforeEach(() => { - taskID = 0; - tasks = new Map void>(); - scheduler = { - cancel: (id: string) => { - tasks.delete(id); - }, - schedule: (task: () => void) => { - const id = String(taskID++); - tasks.set(id, task); - return id; - }, - }; - runTask = () => { - for (const [id, task] of tasks) { - tasks.delete(id); - task(); - break; - } - }; + describe.each([false, true])( + 'with PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION=%s', + featureFlagValue => { + beforeEach(() => { + RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION = + featureFlagValue; + }); - const multiActorEnvironment = new MultiActorEnvironment({ - // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file - createNetworkForActor: _actorID => RelayNetwork.create(fetch), - createStoreForActor: _actorID => store, - scheduler, + afterEach(() => { + RelayFeatureFlags.PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION = + false; }); - environment = - environmentType === 'MultiActorEnvironment' - ? multiActorEnvironment.forActor(getActorIdentifier('actor:1234')) - : new RelayModernEnvironment({ - // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file - network: RelayNetwork.create(fetch), - store, - scheduler, - }); - }); - it('applies the optimistic update', () => { - const selector = createReaderSelector( - CommentFragment, - commentID, - {}, - queryOperation.request, - ); - const snapshot = environment.lookup(selector); - const callback = jest.fn<[Snapshot], void>(); - environment.subscribe(snapshot, callback); + describe('when using a scheduler', () => { + let taskID; + let tasks; + let scheduler; + let runTask; + + beforeEach(() => { + RelayFeatureFlags.SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK = + true; + taskID = 0; + tasks = new Map void>(); + scheduler = { + cancel: (id: string) => { + tasks.delete(id); + }, + schedule: (task: () => void) => { + const id = String(taskID++); + tasks.set(id, task); + return id; + }, + }; + runTask = () => { + for (const [id, task] of tasks) { + tasks.delete(id); + task(); + break; + } + }; - environment - .executeMutation({ - operation, - optimisticUpdater: _store => { - const comment = _store.create(commentID, 'Comment'); - comment.setValue(commentID, 'id'); - const body = _store.create(commentID + '.text', 'Text'); - comment.setLinkedRecord(body, 'body'); - body.setValue('Give Relay', 'text'); - }, - }) - .subscribe(callbacks); - - // Verify task was scheduled and run it - expect(tasks.size).toBe(1); - runTask(); - - // Update is applied after scheduler runs scheduled task - expect(complete).not.toBeCalled(); - expect(error).not.toBeCalled(); - expect(callback.mock.calls.length).toBe(1); - expect(callback.mock.calls[0][0].data).toEqual({ - id: commentID, - body: { - text: 'Give Relay', - }, - }); - }); + const multiActorEnvironment = new MultiActorEnvironment({ + // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file + createNetworkForActor: _actorID => RelayNetwork.create(fetch), + createStoreForActor: _actorID => store, + scheduler, + }); + environment = + environmentType === 'MultiActorEnvironment' + ? multiActorEnvironment.forActor( + getActorIdentifier('actor:1234'), + ) + : new RelayModernEnvironment({ + // $FlowFixMe[invalid-tuple-arity] Error found while enabling LTI on this file + network: RelayNetwork.create(fetch), + store, + scheduler, + }); + }); + + afterEach(() => { + RelayFeatureFlags.SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK = + false; + }); + + it('applies the optimistic update', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); + + // Verify task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + // Update is applied after scheduler runs scheduled task + expect(complete).not.toBeCalled(); + expect(error).not.toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); + }); + + it('reverts the optimistic update and commits the server payload', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); - it('reverts the optimistic update and commits the server payload', () => { - const selector = createReaderSelector( - CommentFragment, - commentID, - {}, - queryOperation.request, - ); - const snapshot = environment.lookup(selector); - const callback = jest.fn<[Snapshot], void>(); - environment.subscribe(snapshot, callback); + // Verify optimistic update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); - environment - .executeMutation({ - operation, - optimisticUpdater: _store => { - const comment = _store.create(commentID, 'Comment'); - comment.setValue(commentID, 'id'); - const body = _store.create(commentID + '.text', 'Text'); - comment.setLinkedRecord(body, 'body'); - body.setValue('Give Relay', 'text'); - }, - }) - .subscribe(callbacks); - - // Verify optimistic update task was scheduled and run it - expect(tasks.size).toBe(1); - runTask(); - - // Update is applied after scheduler runs scheduled task - expect(callback.mock.calls.length).toBe(1); - expect(callback.mock.calls[0][0].data).toEqual({ - id: commentID, - body: { - text: 'Give Relay', - }, - }); + // Update is applied after scheduler runs scheduled task + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); - callback.mockClear(); - subject.next({ - data: { - commentCreate: { - comment: { - id: commentID, - body: { - text: 'Gave Relay', + callback.mockClear(); + subject.next({ + data: { + commentCreate: { + comment: { + id: commentID, + body: { + text: 'Gave Relay', + }, + }, }, }, - }, - }, - }); - subject.complete(); - - // Verify update task was scheduled and run it - expect(tasks.size).toBe(1); - runTask(); - - // Update is applied after scheduler runs scheduled task - expect(complete).toBeCalled(); - expect(error).not.toBeCalled(); - expect(callback.mock.calls.length).toBe(1); - expect(callback.mock.calls[0][0].data).toEqual({ - id: commentID, - body: { - text: 'Gave Relay', - }, - }); - }); + }); + subject.complete(); - it('reverts the optimistic update if the fetch is rejected', () => { - const selector = createReaderSelector( - CommentFragment, - commentID, - {}, - queryOperation.request, - ); - const snapshot = environment.lookup(selector); - const callback = jest.fn<[Snapshot], void>(); - environment.subscribe(snapshot, callback); + // Verify update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); - environment - .executeMutation({ - operation, - optimisticUpdater: _store => { - const comment = _store.create(commentID, 'Comment'); - comment.setValue(commentID, 'id'); - const body = _store.create(commentID + '.text', 'Text'); - comment.setLinkedRecord(body, 'body'); - body.setValue('Give Relay', 'text'); - }, - }) - .subscribe(callbacks); - - // Verify optimistic update task was scheduled and run it - expect(tasks.size).toBe(1); - runTask(); - - // Update is applied after scheduler runs scheduled task - expect(callback.mock.calls.length).toBe(1); - expect(callback.mock.calls[0][0].data).toEqual({ - id: commentID, - body: { - text: 'Give Relay', - }, - }); + // Update is applied after scheduler runs scheduled task + expect(complete).toBeCalled(); + expect(error).not.toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Gave Relay', + }, + }); + }); + + it('reverts the optimistic update if the fetch is rejected', () => { + const selector = createReaderSelector( + CommentFragment, + commentID, + {}, + queryOperation.request, + ); + const snapshot = environment.lookup(selector); + const callback = jest.fn<[Snapshot], void>(); + environment.subscribe(snapshot, callback); + + environment + .executeMutation({ + operation, + optimisticUpdater: _store => { + const comment = _store.create(commentID, 'Comment'); + comment.setValue(commentID, 'id'); + const body = _store.create(commentID + '.text', 'Text'); + comment.setLinkedRecord(body, 'body'); + body.setValue('Give Relay', 'text'); + }, + }) + .subscribe(callbacks); - callback.mockClear(); - subject.error(new Error('wtf')); + // Verify optimistic update task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); - // Verify rollback task was scheduled and run it - expect(tasks.size).toBe(1); - runTask(); + // Update is applied after scheduler runs scheduled task + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual({ + id: commentID, + body: { + text: 'Give Relay', + }, + }); - expect(complete).not.toBeCalled(); - expect(error).toBeCalled(); - expect(callback.mock.calls.length).toBe(1); - expect(callback.mock.calls[0][0].data).toEqual(undefined); - }); - }); + callback.mockClear(); + subject.error(new Error('wtf')); + + // Verify rollback task was scheduled and run it + expect(tasks.size).toBe(1); + runTask(); + + expect(complete).not.toBeCalled(); + expect(error).toBeCalled(); + expect(callback.mock.calls.length).toBe(1); + expect(callback.mock.calls[0][0].data).toEqual(undefined); + }); + }); + }, + ); }); }, ); diff --git a/packages/relay-runtime/util/RelayFeatureFlags.js b/packages/relay-runtime/util/RelayFeatureFlags.js index f73ec0a441db9..1471cae42e4fb 100644 --- a/packages/relay-runtime/util/RelayFeatureFlags.js +++ b/packages/relay-runtime/util/RelayFeatureFlags.js @@ -48,6 +48,7 @@ export type FeatureFlags = { ENABLE_FIELD_ERROR_HANDLING_CATCH_DIRECTIVE: boolean, PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION: boolean, + SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK: boolean, }; const RelayFeatureFlags: FeatureFlags = { @@ -71,6 +72,7 @@ const RelayFeatureFlags: FeatureFlags = { ENABLE_FIELD_ERROR_HANDLING_THROW_BY_DEFAULT: false, ENABLE_FIELD_ERROR_HANDLING_CATCH_DIRECTIVE: false, PROCESS_OPTIMISTIC_UPDATE_BEFORE_SUBSCRIPTION: false, + SCHEDULE_OPTIMISTIC_RESPONSE_AND_ROLLBACK: false, }; module.exports = RelayFeatureFlags;