Skip to content

Commit

Permalink
[savedObjects/esAvailability] touchup some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
spalger committed Sep 22, 2017
1 parent dfe3b37 commit 9afa80e
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions src/server/saved_objects/es_availability/es_availability.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,47 @@ export function createEsAvailability(kbnServer) {
const sub = new Subscription();

/**
* Stores the esIsUsable state.
* Stores the esIsAvailable state.
* `undefined`: state is not known, fetch is probably in progress, wait for the next value
* `true`: es is usable and we think it's in a good state
* `false`: es is not usable, we were not able to get it into a good state
* `true`: es is available and we think it's in a good state
* `false`: es is not available, we were not able to get it into a good state
* @type {BehaviorSubject}
*/
const esIsUsable$ = new BehaviorSubject(undefined);
sub.add(() => esIsUsable$.complete());
const esIsAvailable$ = new BehaviorSubject(undefined);

// complete esIsAvailable$ when the subscription is torn down so
// that pending requests will be released
sub.add(() => esIsAvailable$.complete());

/**
* Emits trigger checks for es availablility (throttled)
* Emits when a check is requested
* @type {Subject}
*/
const checkReq$ = new Subject();

sub.add(
checkReq$
// throttle check requests so that we only have one every 1 second at the most
.throttleTime(1000, undefined, { leading: true, trailing: true })
// and after that, if a check is delivered before the previous has
// completed exhaustMap ignores it
.exhaustMap(async () => {
const esExports = server.plugins.elasticsearch;
if (!esExports) {
esIsUsable$.next(false);
esIsAvailable$.next(false);
return;
}

const callCluster = esExports.getCluster('admin').callWithInternalUser;
const index = config.get('kibana.index');

esIsUsable$.next(undefined);
// tell external parties that we don't know if es is available right now,
// they should wait to see what we discover
esIsAvailable$.next(undefined);

try {
// try to add the index template to elasticsearch. if it already exists
// then a new version will be written
await callCluster('indices.putTemplate', {
name: `kibana_index_template:${index}`,
body: {
Expand All @@ -52,19 +64,28 @@ export function createEsAvailability(kbnServer) {
}
});

return esIsUsable$.next(true);
// TODO: we need to check for existing index and make sure it
// has the types it needs

// es seems to be available, let the listeners know
return esIsAvailable$.next(true);
} catch (error) {
// we log every error
server.log(['error', 'savedObjects', 'putTemplate'], {
tmpl: 'Failed to put index template "<%= err.message %>"',
err: {
message: error.message,
stack: error.stack,
}
});
return esIsUsable$.next(false);

// and then notify listeners that es is not available
return esIsAvailable$.next(false);
}
})
.catch((error, resubscribe) => {
// this should only happen if there is a syntax error,
// undefined methods, or something like that.
server.log(['error', 'savedObjects'], {
tmpl: 'Failure attempting to set index template "<%= err.message %>"',
err: {
Expand All @@ -90,31 +111,38 @@ export function createEsAvailability(kbnServer) {
return;
}

// check every time the elasticsearch plugin goes between green and anything else
sub.add(
Observable
// listen to es plugin's status and on each change
.fromEvent(esStatus, 'change', (prev, prevMsg, state) => state)
// determine if the status is green or not
.map(state => state === 'green')
// when we toggle between green and not green
.distinctUntilChanged()
// request a check
.subscribe(() => checkReq$.next())
);
});

// when the server is stopping close the collective subscription
server.ext('onPostStop', (server, next) => {
server.ext('onPreStop', (server, next) => {
sub.unsubscribe();
next();
});

return new class EsAvailability {
wrapCallClusterFunction(callCluster) {
return async (method, params) => {
const esIsUsable = await esIsUsable$
.filter(usable => usable !== undefined)

// wait for the first non-undefined availablility
const esIsAvailable = await esIsAvailable$
.filter(available => available !== undefined)
.take(1)
.toPromise();

if (!esIsUsable) { // could still be undefined if esIsUsable$ is completed while we wait
// PS, availability could still be undefined if esIsAvailable$
// completes before producing anything other than undefined
if (!esIsAvailable) {
checkReq$.next();
throw errors.decorateEsUnavailableError(
Boom.serverUnavailable('Elasticsearch is unavailable')
Expand Down

0 comments on commit 9afa80e

Please sign in to comment.