Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Monitoring/Telemetry] Force collectors to indicate when they are ready (#36153) #36706

Merged
merged 1 commit into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export function makeKQLUsageCollector(server) {
const kqlUsageCollector = server.usage.collectorSet.makeUsageCollector({
type: 'kql',
fetch,
isReady: () => true,
});

server.usage.collectorSet.register(kqlUsageCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export function registerUiMetricUsageCollector(server: any) {

return uiMetricsByAppName;
},
isReady: () => true,
});

server.usage.collectorSet.register(collector);
Expand Down
4 changes: 4 additions & 0 deletions src/legacy/server/config/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ export default () => Joi.object({
pollInterval: Joi.number().default(1500),
}).default(),

stats: Joi.object({
maximumWaitTimeForAllCollectorsInS: Joi.number().default(60)
}).default(),

optimize: Joi.object({
enabled: Joi.boolean().default(true),
bundleFilter: Joi.string().default('!tests'),
Expand Down
1 change: 1 addition & 0 deletions src/legacy/server/sample_data/usage/collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export function makeSampleDataUsageCollector(server: KbnServer) {
server.usage.collectorSet.makeUsageCollector({
type: 'sample-data',
fetch: fetchProvider(index),
isReady: () => true,
})
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export function getOpsStatsCollector(server, kbnServer) {
...kbnServer.metrics // latest metrics captured from the ops event listener in src/legacy/server/status/index
};
},
isReady: () => true,
ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there.
});
}
18 changes: 15 additions & 3 deletions src/legacy/server/status/routes/api/register_stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/

import Joi from 'joi';
import { boomify } from 'boom';
import boom from 'boom';
import { i18n } from '@kbn/i18n';
import { wrapAuthConfig } from '../../wrap_auth_config';
import { KIBANA_STATS_TYPE } from '../../constants';

const STATS_NOT_READY_MESSAGE = i18n.translate('server.stats.notReadyMessage', {
defaultMessage: 'Stats are not ready yet. Please try again later.',
});

/*
* API for Kibana meta info and accumulated operations stats
* Including ?extended in the query string fetches Elasticsearch cluster_uuid and server.usage.collectorSet data
Expand Down Expand Up @@ -69,6 +74,11 @@ export function registerStatsApi(kbnServer, server, config) {
if (isExtended) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('admin');
const callCluster = (...args) => callWithRequest(req, ...args);
const collectorsReady = await collectorSet.areAllCollectorsReady();

if (shouldGetUsage && !collectorsReady) {
return boom.serverUnavailable(STATS_NOT_READY_MESSAGE);
}

const usagePromise = shouldGetUsage ? getUsage(callCluster) : Promise.resolve();
try {
Expand All @@ -77,7 +87,6 @@ export function registerStatsApi(kbnServer, server, config) {
getClusterUuid(callCluster),
]);


let modifiedUsage = usage;
if (isLegacy) {
// In an effort to make telemetry more easily augmented, we need to ensure
Expand Down Expand Up @@ -123,14 +132,17 @@ export function registerStatsApi(kbnServer, server, config) {
});
}
} catch (e) {
throw boomify(e);
throw boom.boomify(e);
}
}

/* kibana_stats gets singled out from the collector set as it is used
* for health-checking Kibana and fetch does not rely on fetching data
* from ES */
const kibanaStatsCollector = collectorSet.getCollectorByType(KIBANA_STATS_TYPE);
if (!await kibanaStatsCollector.isReady()) {
return boom.serverUnavailable(STATS_NOT_READY_MESSAGE);
}
let kibanaStats = await kibanaStatsCollector.fetch();
kibanaStats = collectorSet.toApiFieldNames(kibanaStats);

Expand Down
9 changes: 8 additions & 1 deletion src/legacy/server/usage/classes/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class Collector {
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
constructor(server, { type, init, fetch, formatForBulkUpload = null, isReady = null, ...options } = {}) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
}
Expand All @@ -49,6 +49,9 @@ export class Collector {

const defaultFormatterForBulkUpload = result => ({ type, payload: result });
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
if (typeof isReady === 'function') {
this.isReady = isReady;
}
}

/*
Expand All @@ -69,4 +72,8 @@ export class Collector {
formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}

isReady() {
throw `isReady() must be implemented in ${this.type} collector`;
}
}
53 changes: 50 additions & 3 deletions src/legacy/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';

let _waitingForAllCollectorsTimestamp = null;

/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
* and optionally, how to combine it into a unified payload for bulk upload.
*/
export class CollectorSet {

/*
* @param {Object} server - server object
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
*/
constructor(server, collectors = []) {
constructor(server, collectors = [], config = null) {
this._log = getCollectorLogger(server);
this._collectors = collectors;

Expand All @@ -44,7 +45,9 @@ export class CollectorSet {
*/
this.makeStatsCollector = options => new Collector(server, options);
this.makeUsageCollector = options => new UsageCollector(server, options);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray, config);

this._maximumWaitTimeForAllCollectorsInS = config ? config.get('stats.maximumWaitTimeForAllCollectorsInS') : 60;
}

/*
Expand Down Expand Up @@ -73,6 +76,40 @@ export class CollectorSet {
return x instanceof UsageCollector;
}

async areAllCollectorsReady(collectorSet = this) {
if (!(collectorSet instanceof CollectorSet)) {
throw new Error(`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet);
}

const collectorTypesNotReady = [];
let allReady = true;
await collectorSet.asyncEach(async collector => {
if (!await collector.isReady()) {
allReady = false;
collectorTypesNotReady.push(collector.type);
}
});

if (!allReady && this._maximumWaitTimeForAllCollectorsInS >= 0) {
const nowTimestamp = +new Date();
_waitingForAllCollectorsTimestamp = _waitingForAllCollectorsTimestamp || nowTimestamp;
const timeWaitedInMS = nowTimestamp - _waitingForAllCollectorsTimestamp;
const timeLeftInMS = (this._maximumWaitTimeForAllCollectorsInS * 1000) - timeWaitedInMS;
if (timeLeftInMS <= 0) {
this._log.debug(`All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) `
+ `but we have waited the required `
+ `${this._maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`);
return true;
} else {
this._log.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);
}
} else {
_waitingForAllCollectorsTimestamp = null;
}

return allReady;
}

/*
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
Expand Down Expand Up @@ -155,4 +192,14 @@ export class CollectorSet {
map(mapFn) {
return this._collectors.map(mapFn);
}

some(someFn) {
return this._collectors.some(someFn);
}

async asyncEach(eachFn) {
for (const collector of this._collectors) {
await eachFn(collector);
}
}
}
4 changes: 2 additions & 2 deletions src/legacy/server/usage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import { CollectorSet } from './classes';

export function usageMixin(kbnServer, server) {
const collectorSet = new CollectorSet(server);
export function usageMixin(kbnServer, server, config) {
const collectorSet = new CollectorSet(server, undefined, config);

/*
* expose the collector set object on the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ export function makeApmUsageCollector(core: CoreSetupWithUsageCollector) {
} catch (err) {
return createApmTelementry();
}
}
},
isReady: () => true
});
server.usage.collectorSet.register(apmUsageCollector);
}
1 change: 1 addition & 0 deletions x-pack/plugins/canvas/server/usage/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export function registerCanvasUsageCollector(server) {
const index = server.config().get('kibana.index');
const collector = server.usage.collectorSet.makeUsageCollector({
type: CANVAS_USAGE_TYPE,
isReady: () => true,
fetch: async callCluster => {
const searchParams = {
size: 10000, // elasticsearch index.max_result_window default value
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/cloud/get_cloud_usage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export function getCloudUsageCollector(server: KibanaHapiServer) {
const { collectorSet } = server.usage;
return collectorSet.makeUsageCollector({
type: KIBANA_CLOUD_STATS_TYPE,
isReady: () => true,
fetch: createCollectorFetch(server),
});
}
1 change: 1 addition & 0 deletions x-pack/plugins/infra/server/usage/usage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export class UsageCollector {

return collectorSet.makeUsageCollector({
type: KIBANA_REPORTING_TYPE,
isReady: () => true,
fetch: async () => {
return this.getReport();
},
Expand Down
78 changes: 52 additions & 26 deletions x-pack/plugins/maps/server/maps_telemetry/maps_usage_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,63 @@ export function initTelemetryCollection(server) {
registerMapsUsageCollector(server);
}

export function buildCollectorObj(server) {
return {
type: 'maps',
fetch: async () => {
let docs;
try {
({ docs } = await server.taskManager.fetch({
query: {
bool: {
filter: {
term: {
_id: TASK_ID
}
}
async function isTaskManagerReady(server) {
const result = await fetch(server);
return result !== null;
}

async function fetch(server) {
let docs;
try {
({ docs } = await server.taskManager.fetch({
query: {
bool: {
filter: {
term: {
_id: TASK_ID
}
}
}));
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
/*
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
* has to wait for all plugins to initialize first.
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
*/
if (errMessage.indexOf('NotInitialized') >= 0) {
docs = {};
} else {
throw err;
}
}
}));
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
/*
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
* has to wait for all plugins to initialize first.
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
*/
if (errMessage.indexOf('NotInitialized') >= 0) {
return null;
} else {
throw err;
}
}

return docs;
}

export function buildCollectorObj(server) {
let isCollectorReady = false;
async function determineIfTaskManagerIsReady() {
let isReady = false;
try {
isReady = await isTaskManagerReady(server);
} catch (err) {} // eslint-disable-line

if (isReady) {
isCollectorReady = true;
} else {
setTimeout(determineIfTaskManagerIsReady, 500);
}
}
determineIfTaskManagerIsReady();

return {
type: 'maps',
isReady: () => isCollectorReady,
fetch: async () => {
const docs = await fetch(server);
return _.get(docs, '[0].state.stats');
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ interface KibanaHapiServer extends Server {
export function makeMlUsageCollector(server: KibanaHapiServer): void {
const mlUsageCollector = server.usage.collectorSet.makeUsageCollector({
type: 'ml',
isReady: () => true,
fetch: async (): Promise<MlTelemetry> => {
try {
const savedObjectsClient = getSavedObjectsClient(server);
Expand Down
Loading