Skip to content

Commit

Permalink
Update how aborting works
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasolson committed Feb 24, 2020
1 parent a6921d7 commit 4267bf2
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 26 deletions.
2 changes: 1 addition & 1 deletion examples/demo_search/server/async_demo_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { ASYNC_DEMO_SEARCH_STRATEGY } from '../common';

function getFibonacciSequence(n = 0) {
const beginning = [0, 1].slice(0, n);
return Array(n)
return Array(Math.max(0, n))
.fill(null)
.reduce((sequence, value, i) => {
if (i < 2) return sequence;
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/data/server/search/create_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export function createApi({
if (!strategyProvider) {
throw new Error(`No strategy found for ${strategyName}`);
}
const strategy = await strategyProvider(caller);
return strategy.cancel ?? strategy.cancel(id);
const strategy = await strategyProvider(caller, api.search);
return strategy.cancel && strategy.cancel(id);
},
};
return api;
Expand Down
5 changes: 2 additions & 3 deletions src/plugins/data/server/search/i_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { APICaller } from 'kibana/server';
import { ISearch, ICancel, ISearchGeneric, ICancelGeneric } from './i_search';
import { ISearch, ICancel, ISearchGeneric } from './i_search';
import { TStrategyTypes } from './strategy_types';
import { ISearchContext } from './i_search_context';

Expand All @@ -38,8 +38,7 @@ export interface ISearchStrategy<T extends TStrategyTypes> {
*/
export type TSearchStrategyProviderEnhanced<T extends TStrategyTypes> = (
caller: APICaller,
search: ISearchGeneric,
cancel?: ICancelGeneric
search: ISearchGeneric
) => Promise<ISearchStrategy<T>>;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@ describe('Async search strategy', () => {
it('only sends one request if the first response is complete', async () => {
mockSearch.mockReturnValueOnce(of({ id: 1, total: 1, loaded: 1 }));

const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreSetup }, mockSearch);
const asyncSearch = asyncSearchStrategyProvider(
{
core: mockCoreSetup,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
},
mockSearch
);

await asyncSearch.search(mockRequest, mockOptions).toPromise();

Expand All @@ -39,7 +51,19 @@ describe('Async search strategy', () => {
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));

const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreSetup }, mockSearch);
const asyncSearch = asyncSearchStrategyProvider(
{
core: mockCoreSetup,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
},
mockSearch
);

expect(mockSearch).toBeCalledTimes(0);

Expand All @@ -53,7 +77,19 @@ describe('Async search strategy', () => {
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));

const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreSetup }, mockSearch);
const asyncSearch = asyncSearchStrategyProvider(
{
core: mockCoreSetup,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
},
mockSearch
);

expect(mockSearch).toBeCalledTimes(0);

Expand All @@ -70,15 +106,31 @@ describe('Async search strategy', () => {
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));

const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreSetup }, mockSearch);
const asyncSearch = asyncSearchStrategyProvider(
{
core: mockCoreSetup,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
},
mockSearch
);
const abortController = new AbortController();
const options = { ...mockOptions, signal: abortController.signal };

const promise = asyncSearch.search(mockRequest, options).toPromise();
abortController.abort();

await promise;
expect(mockSearch).toBeCalledTimes(2);
expect(mockCoreSetup.http.delete).toBeCalled();
try {
await promise;
} catch (e) {
expect(e.name).toBe('AbortError');
expect(mockSearch).toBeCalledTimes(1);
expect(mockCoreSetup.http.delete).toBeCalled();
}
});
});
34 changes: 21 additions & 13 deletions x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Observable, timer } from 'rxjs';
import { mergeMap, takeWhile, expand } from 'rxjs/operators';
import { EMPTY, fromEvent, NEVER, Observable, throwError, timer } from 'rxjs';
import { mergeMap, expand, takeUntil } from 'rxjs/operators';
import {
IKibanaSearchResponse,
ISearchContext,
Expand Down Expand Up @@ -37,19 +37,28 @@ export const asyncSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_S
const { serverStrategy } = request;
let id: string | undefined = request.id;

if (options.signal) {
options.signal.addEventListener('abort', () => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search
if (id === undefined) return;
const aborted$ = options.signal
? fromEvent(options.signal, 'abort').pipe(
mergeMap(() => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
// send the follow-up request to delete this search, then throw an abort error.
if (id !== undefined) {
context.core.http.delete(`/internal/search/${request.serverStrategy}/${id}`);
}

// Send the follow-up request to delete this search, then throw an abort error
context.core.http.delete(`/internal/search/${request.serverStrategy}/${id}`);
});
}
const error = new Error('Aborted');
error.name = 'AbortError';
return throwError(error);
})
)
: NEVER;

return search(request, options).pipe(
expand(response => {
// If the response indicates it is complete, stop polling and complete the observable
if (response.loaded >= response.total) return EMPTY;

id = response.id;
// Delay by the given poll interval
return timer(pollInterval).pipe(
Expand All @@ -59,8 +68,7 @@ export const asyncSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_S
})
);
}),
// Continue polling until the response indicates it is complete
takeWhile(({ total = 1, loaded = 1 }) => loaded < total, true)
takeUntil(aborted$)
);
},
};
Expand Down

0 comments on commit 4267bf2

Please sign in to comment.