Skip to content

Commit

Permalink
migrate version check to new client (#71215)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgayvallet authored Jul 15, 2020
1 parent 7f31e3e commit 7a325c9
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 45 deletions.
31 changes: 19 additions & 12 deletions src/core/server/elasticsearch/elasticsearch_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
* under the License.
*/

import { first } from 'rxjs/operators';

import { MockLegacyClusterClient, MockClusterClient } from './elasticsearch_service.test.mocks';

import { BehaviorSubject } from 'rxjs';
import { first } from 'rxjs/operators';
import { Env } from '../config';
import { getEnvOptions } from '../config/__mocks__/env';
import { CoreContext } from '../core_context';
Expand Down Expand Up @@ -227,28 +225,34 @@ describe('#setup', () => {
});

it('esNodeVersionCompatibility$ only starts polling when subscribed to', async (done) => {
mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);
await delay(10);

expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(0);
setupContract.esNodesCompatibility$.subscribe(() => {
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});

it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async (done) => {
mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);

expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(0);
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
sub.unsubscribe();
await delay(100);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});
Expand Down Expand Up @@ -353,16 +357,19 @@ describe('#stop', () => {
it('stops pollEsNodeVersions even if there are active subscriptions', async (done) => {
expect.assertions(2);

mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);

setupContract.esNodesCompatibility$.subscribe(async () => {
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);

await elasticsearchService.stop();
await delay(100);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});
Expand Down
8 changes: 4 additions & 4 deletions src/core/server/elasticsearch/elasticsearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ export class ElasticsearchService

this.getAuthHeaders = deps.http.getAuthHeaders;
this.legacyClient = this.createLegacyClusterClient('data', config);
this.client = this.createClusterClient('data', config);

const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser: this.legacyClient.callAsInternalUser,
internalClient: this.client.asInternalUser,
log: this.log,
ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
Expand Down Expand Up @@ -109,7 +110,6 @@ export class ElasticsearchService
}

const config = await this.config$.pipe(first()).toPromise();
this.client = this.createClusterClient('data', config);

const createClient = (
type: string,
Expand All @@ -120,7 +120,7 @@ export class ElasticsearchService
};

return {
client: this.client,
client: this.client!,
createClient,
legacy: {
client: this.legacyClient,
Expand All @@ -133,7 +133,7 @@ export class ElasticsearchService
this.log.debug('Stopping elasticsearch service');
this.stop$.next();
if (this.client) {
this.client.close();
await this.client.close();
}
if (this.legacyClient) {
this.legacyClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
import { mapNodesVersionCompatibility, pollEsNodesVersion, NodesInfo } from './ensure_es_version';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { elasticsearchClientMock } from '../client/mocks';
import { take, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of } from 'rxjs';
Expand All @@ -27,6 +28,9 @@ const mockLogger = mockLoggerFactory.get('mock logger');

const KIBANA_VERSION = '5.1.0';

const createEsSuccess = elasticsearchClientMock.createClientResponse;
const createEsError = elasticsearchClientMock.createClientError;

function createNodes(...versions: string[]): NodesInfo {
const nodes = {} as any;
versions
Expand Down Expand Up @@ -111,25 +115,34 @@ describe('mapNodesVersionCompatibility', () => {
});

describe('pollEsNodesVersion', () => {
const callWithInternalUser = jest.fn();
let internalClient: ReturnType<typeof elasticsearchClientMock.createInternalClient>;
const getTestScheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});

beforeEach(() => {
callWithInternalUser.mockReset();
internalClient = elasticsearchClientMock.createInternalClient();
});

const nodeInfosSuccessOnce = (infos: NodesInfo) => {
internalClient.nodes.info.mockImplementationOnce(() => createEsSuccess(infos));
};
const nodeInfosErrorOnce = (error: any) => {
internalClient.nodes.info.mockImplementationOnce(() => createEsError(error));
};

it('returns iscCompatible=false and keeps polling when a poll request throws', (done) => {
expect.assertions(3);
const expectedCompatibilityResults = [false, false, true];
jest.clearAllMocks();
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.0.0'));
callWithInternalUser.mockRejectedValueOnce(new Error('mock request error'));
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1'));

nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0'));
nodeInfosErrorOnce('mock request error');
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1'));

pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
Expand All @@ -148,9 +161,11 @@ describe('pollEsNodesVersion', () => {
it('returns compatibility results', (done) => {
expect.assertions(1);
const nodes = createNodes('5.1.0', '5.2.0', '5.0.0');
callWithInternalUser.mockResolvedValueOnce(nodes);

nodeInfosSuccessOnce(nodes);

pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
Expand All @@ -168,15 +183,15 @@ describe('pollEsNodesVersion', () => {

it('only emits if the node versions changed since the previous poll', (done) => {
expect.assertions(4);
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore
callWithInternalUser.mockResolvedValueOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering
nodeInfosSuccessOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version

pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
Expand All @@ -192,14 +207,21 @@ describe('pollEsNodesVersion', () => {

it('starts polling immediately and then every esVersionCheckInterval', () => {
expect.assertions(1);
callWithInternalUser.mockReturnValueOnce([createNodes('5.1.0', '5.2.0', '5.0.0')]);
callWithInternalUser.mockReturnValueOnce([createNodes('5.1.1', '5.2.0', '5.0.0')]);

// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.nodes.info.mockReturnValueOnce([
{ body: createNodes('5.1.0', '5.2.0', '5.0.0') },
]);
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.nodes.info.mockReturnValueOnce([
{ body: createNodes('5.1.1', '5.2.0', '5.0.0') },
]);

getTestScheduler().run(({ expectObservable }) => {
const expected = 'a 99ms (b|)';

const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 100,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
Expand Down Expand Up @@ -227,15 +249,17 @@ describe('pollEsNodesVersion', () => {
getTestScheduler().run(({ expectObservable }) => {
const expected = '100ms a 99ms (b|)';

callWithInternalUser.mockReturnValueOnce(
of(createNodes('5.1.0', '5.2.0', '5.0.0')).pipe(delay(100))
internalClient.nodes.info.mockReturnValueOnce(
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
of({ body: createNodes('5.1.0', '5.2.0', '5.0.0') }).pipe(delay(100))
);
callWithInternalUser.mockReturnValueOnce(
of(createNodes('5.1.1', '5.2.0', '5.0.0')).pipe(delay(100))
internalClient.nodes.info.mockReturnValueOnce(
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
of({ body: createNodes('5.1.1', '5.2.0', '5.0.0') }).pipe(delay(100))
);

const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 10,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
Expand All @@ -256,6 +280,6 @@ describe('pollEsNodesVersion', () => {
});
});

expect(callWithInternalUser).toHaveBeenCalledTimes(2);
expect(internalClient.nodes.info).toHaveBeenCalledTimes(2);
});
});
11 changes: 6 additions & 5 deletions src/core/server/elasticsearch/version_check/ensure_es_version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import {
esVersionEqualsKibana,
} from './es_kibana_version_compatability';
import { Logger } from '../../logging';
import { LegacyAPICaller } from '../legacy';
import type { ElasticsearchClient } from '../client';

export interface PollEsNodesVersionOptions {
callWithInternalUser: LegacyAPICaller;
internalClient: ElasticsearchClient;
log: Logger;
kibanaVersion: string;
ignoreVersionMismatch: boolean;
Expand Down Expand Up @@ -137,7 +137,7 @@ function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompati
}

export const pollEsNodesVersion = ({
callWithInternalUser,
internalClient,
log,
kibanaVersion,
ignoreVersionMismatch,
Expand All @@ -147,10 +147,11 @@ export const pollEsNodesVersion = ({
return timer(0, healthCheckInterval).pipe(
exhaustMap(() => {
return from(
callWithInternalUser('nodes.info', {
filterPath: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'],
internalClient.nodes.info<NodesInfo>({
filter_path: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'],
})
).pipe(
map(({ body }) => body),
catchError((_err) => {
return of({ nodes: {} });
})
Expand Down

0 comments on commit 7a325c9

Please sign in to comment.