Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): ensure responses resolve in order #753

Merged
merged 12 commits into from
Oct 8, 2021
2 changes: 1 addition & 1 deletion bundlesize.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
},
{
"path": "packages/autocomplete-js/dist/umd/index.production.js",
"maxSize": "15.75 kB"
"maxSize": "16 kB"
},
{
"path": "packages/autocomplete-preset-algolia/dist/umd/index.production.js",
Expand Down
59 changes: 49 additions & 10 deletions packages/autocomplete-core/src/__tests__/concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
import userEvent from '@testing-library/user-event';

import { AutocompleteState } from '..';
import { createSource, defer } from '../../../../test/utils';
import { createAutocomplete } from '../createAutocomplete';

describe.skip('concurrency', () => {
type Item = {
label: string;
};

describe('concurrency', () => {
test('resolves the responses in order from getSources', async () => {
// These delays make the second query come back after the third one.
const delays = [100, 300, 200];
let deferCount = -1;
const sourcesDelays = [100, 150, 200];
const itemsDelays = [0, 150, 0];
let deferSourcesCount = -1;
let deferItemsCount = -1;

const getSources = ({ query }) => {
deferCount++;
deferSourcesCount++;

return defer(() => {
return [
createSource({
getItems() {
return [{ label: query }];
deferItemsCount++;

return defer(
() => [{ label: query }],
itemsDelays[deferItemsCount]
);
},
}),
];
}, delays[deferCount]);
}, sourcesDelays[deferSourcesCount]);
};
const onStateChange = jest.fn();
const autocomplete = createAutocomplete({ getSources, onStateChange });
Expand All @@ -33,11 +45,18 @@ describe.skip('concurrency', () => {
userEvent.type(input, 'b');
userEvent.type(input, 'c');

await defer(() => {}, Math.max(...delays));
const timeout = Math.max(
...sourcesDelays.map((delay, index) => delay + itemsDelays[index])
);

await defer(() => {}, timeout);

const itemsHistory: Array<{ label: string }> = (onStateChange.mock
.calls as any).flatMap((x) =>
x[0].state.collections.flatMap((x) => x.items)
let stateHistory: Array<
AutocompleteState<Item>
> = onStateChange.mock.calls.flatMap((x) => x[0].state);

const itemsHistory: Item[] = stateHistory.flatMap(({ collections }) =>
collections.flatMap((x) => x.items)
);

// The first query should have brought results.
Expand All @@ -50,6 +69,26 @@ describe.skip('concurrency', () => {
expect.objectContaining({ label: 'abc' })
);

expect(stateHistory[stateHistory.length - 1]).toEqual(
expect.objectContaining({ isOpen: true })
);

userEvent.type(input, '{backspace}'.repeat(3));

await defer(() => {}, timeout);

stateHistory = onStateChange.mock.calls.flatMap((x) => x[0].state);

// The collections are empty despite late resolving promises.
expect(stateHistory[stateHistory.length - 1].collections).toEqual([
expect.objectContaining({ items: [] }),
]);

// The panel closes despite late resolving promises.
expect(stateHistory[stateHistory.length - 1]).toEqual(
expect.objectContaining({ isOpen: false })
);

document.body.removeChild(input);
});
});
140 changes: 76 additions & 64 deletions packages/autocomplete-core/src/onInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
BaseItem,
InternalAutocompleteOptions,
} from './types';
import { getActiveItem } from './utils';
import { createConcurrentSafePromise, getActiveItem } from './utils';

let lastStalledId: number | null = null;

Expand All @@ -27,6 +27,8 @@ interface OnInputParams<TItem extends BaseItem>
store: AutocompleteStore<TItem>;
}

const runConcurrentSafePromise = createConcurrentSafePromise();

export function onInput<TItem extends BaseItem>({
event,
nextState = {},
Expand All @@ -52,18 +54,22 @@ export function onInput<TItem extends BaseItem>({
setActiveItemId(props.defaultActiveItemId);

if (!query && props.openOnFocus === false) {
const collections = store.getState().collections.map((collection) => ({
...collection,
items: [],
}));

setStatus('idle');
setCollections(
store.getState().collections.map((collection) => ({
...collection,
items: [],
}))
);
setCollections(collections);
setIsOpen(
nextState.isOpen ?? props.shouldPanelOpen({ state: store.getState() })
);

return Promise.resolve();
// We make sure to update the latest resolved value of the tracked
// promises to keep late resolving promises from "cancelling" the state
// updates performed in this code path.
// We chain with a void promise to respect `onInput`'s expected return type.
return runConcurrentSafePromise(collections).then(() => Promise.resolve());
}

setStatus('loading');
Expand All @@ -72,67 +78,73 @@ export function onInput<TItem extends BaseItem>({
setStatus('stalled');
}, props.stallThreshold);

return props
.getSources({
query,
refresh,
state: store.getState(),
...setters,
})
.then((sources) => {
setStatus('loading');

return Promise.all(
sources.map((source) => {
return Promise.resolve(
source.getItems({
query,
refresh,
state: store.getState(),
...setters,
})
).then((itemsOrDescription) =>
preResolve<TItem>(itemsOrDescription, source.sourceId)
);
})
)
.then(resolve)
.then((responses) => postResolve(responses, sources))
.then((collections) =>
reshape({ collections, props, state: store.getState() })
// We track the entire promise chain triggered by `onInput` before mutating
// the Autocomplete state to make sure that any state manipulation is based on
// fresh data regardless of when promises individually resolve.
// We don't track nested promises and only rely on the full chain resolution,
// meaning we should only ever manipulate the state once this concurrent-safe
// promise is resolved.
return runConcurrentSafePromise(
props
.getSources({
query,
refresh,
state: store.getState(),
...setters,
})
.then((sources) => {
return Promise.all(
sources.map((source) => {
return Promise.resolve(
source.getItems({
query,
refresh,
state: store.getState(),
...setters,
})
).then((itemsOrDescription) =>
preResolve<TItem>(itemsOrDescription, source.sourceId)
);
})
)
.then((collections) => {
setStatus('idle');
setCollections(collections as any);
const isPanelOpen = props.shouldPanelOpen({
state: store.getState(),
});
setIsOpen(
nextState.isOpen ??
((props.openOnFocus && !query && isPanelOpen) || isPanelOpen)
.then(resolve)
.then((responses) => postResolve(responses, sources))
.then((collections) =>
reshape({ collections, props, state: store.getState() })
);
})
)
.then((collections) => {
setStatus('idle');
setCollections(collections as any);
const isPanelOpen = props.shouldPanelOpen({
state: store.getState(),
});
setIsOpen(
nextState.isOpen ??
((props.openOnFocus && !query && isPanelOpen) || isPanelOpen)
);

const highlightedItem = getActiveItem(store.getState());
const highlightedItem = getActiveItem(store.getState());

if (store.getState().activeItemId !== null && highlightedItem) {
const { item, itemInputValue, itemUrl, source } = highlightedItem;
if (store.getState().activeItemId !== null && highlightedItem) {
const { item, itemInputValue, itemUrl, source } = highlightedItem;

source.onActive({
event,
item,
itemInputValue,
itemUrl,
refresh,
source,
state: store.getState(),
...setters,
});
}
})
.finally(() => {
if (lastStalledId) {
props.environment.clearTimeout(lastStalledId);
}
source.onActive({
event,
item,
itemInputValue,
itemUrl,
refresh,
source,
state: store.getState(),
...setters,
});
}
})
.finally(() => {
if (lastStalledId) {
props.environment.clearTimeout(lastStalledId);
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import { MaybePromise } from '@algolia/autocomplete-shared';
* This is useful to prevent older promises to resolve after a newer promise,
* otherwise resulting in stale resolved values.
*/
export function createConcurrentSafePromise<TValue>() {
export function createConcurrentSafePromise() {
let basePromiseId = -1;
let latestResolvedId = -1;
let latestResolvedValue: TValue | undefined = undefined;
let latestResolvedValue: unknown = undefined;

return function runConcurrentSafePromise(promise: MaybePromise<TValue>) {
return function runConcurrentSafePromise<TValue>(
promise: MaybePromise<TValue>
) {
basePromiseId++;
const currentPromiseId = basePromiseId;

Expand All @@ -30,7 +32,7 @@ export function createConcurrentSafePromise<TValue>() {
// | run(3) +--------> R3 |
// +----------------------------------+
if (latestResolvedValue && currentPromiseId < latestResolvedId) {
return latestResolvedValue;
return latestResolvedValue as TValue;
}

latestResolvedId = currentPromiseId;
Expand Down