diff --git a/src/legacy/core_plugins/kibana/server/lib/kql_usage_collector/make_kql_usage_collector.js b/src/legacy/core_plugins/kibana/server/lib/kql_usage_collector/make_kql_usage_collector.js index 534cc266d83a3c..19fb64b7ecc745 100644 --- a/src/legacy/core_plugins/kibana/server/lib/kql_usage_collector/make_kql_usage_collector.js +++ b/src/legacy/core_plugins/kibana/server/lib/kql_usage_collector/make_kql_usage_collector.js @@ -25,6 +25,7 @@ export function makeKQLUsageCollector(server) { const kqlUsageCollector = server.usage.collectorSet.makeUsageCollector({ type: 'kql', fetch, + isReady: () => true, }); server.usage.collectorSet.register(kqlUsageCollector); diff --git a/src/legacy/core_plugins/ui_metric/server/usage/collector.ts b/src/legacy/core_plugins/ui_metric/server/usage/collector.ts index 2053c6cee1dd51..bbb7b1af8e7c75 100644 --- a/src/legacy/core_plugins/ui_metric/server/usage/collector.ts +++ b/src/legacy/core_plugins/ui_metric/server/usage/collector.ts @@ -52,6 +52,7 @@ export function registerUiMetricUsageCollector(server: any) { return uiMetricsByAppName; }, + isReady: () => true, }); server.usage.collectorSet.register(collector); diff --git a/src/legacy/server/config/schema.js b/src/legacy/server/config/schema.js index 23d1e2067f00e6..5c88c6f445e190 100644 --- a/src/legacy/server/config/schema.js +++ b/src/legacy/server/config/schema.js @@ -175,6 +175,10 @@ export default () => Joi.object({ pollInterval: Joi.number().default(1500), }).default(), + stats: Joi.object({ + maximumWaitTimeForAllCollectorsInS: Joi.number().default(60) + }).default(), + optimize: Joi.object({ enabled: Joi.boolean().default(true), bundleFilter: Joi.string().default('!tests'), diff --git a/src/legacy/server/sample_data/usage/collector.ts b/src/legacy/server/sample_data/usage/collector.ts index 4ed7487807eecc..8561a6c3f10078 100644 --- a/src/legacy/server/sample_data/usage/collector.ts +++ b/src/legacy/server/sample_data/usage/collector.ts @@ -36,6 +36,7 @@ export function makeSampleDataUsageCollector(server: KbnServer) { server.usage.collectorSet.makeUsageCollector({ type: 'sample-data', fetch: fetchProvider(index), + isReady: () => true, }) ); } diff --git a/src/legacy/server/status/collectors/get_ops_stats_collector.js b/src/legacy/server/status/collectors/get_ops_stats_collector.js index 0ebb6b3539c0e0..aded85384fd85c 100644 --- a/src/legacy/server/status/collectors/get_ops_stats_collector.js +++ b/src/legacy/server/status/collectors/get_ops_stats_collector.js @@ -45,6 +45,7 @@ export function getOpsStatsCollector(server, kbnServer) { ...kbnServer.metrics // latest metrics captured from the ops event listener in src/legacy/server/status/index }; }, + isReady: () => true, ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there. }); } diff --git a/src/legacy/server/status/routes/api/register_stats.js b/src/legacy/server/status/routes/api/register_stats.js index 60545dc3b66bd5..30fa2ca86940e1 100644 --- a/src/legacy/server/status/routes/api/register_stats.js +++ b/src/legacy/server/status/routes/api/register_stats.js @@ -18,10 +18,15 @@ */ import Joi from 'joi'; -import { boomify } from 'boom'; +import boom from 'boom'; +import { i18n } from '@kbn/i18n'; import { wrapAuthConfig } from '../../wrap_auth_config'; import { KIBANA_STATS_TYPE } from '../../constants'; +const STATS_NOT_READY_MESSAGE = i18n.translate('server.stats.notReadyMessage', { + defaultMessage: 'Stats are not ready yet. Please try again later.', +}); + /* * API for Kibana meta info and accumulated operations stats * Including ?extended in the query string fetches Elasticsearch cluster_uuid and server.usage.collectorSet data @@ -69,6 +74,11 @@ export function registerStatsApi(kbnServer, server, config) { if (isExtended) { const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('admin'); const callCluster = (...args) => callWithRequest(req, ...args); + const collectorsReady = await collectorSet.areAllCollectorsReady(); + + if (shouldGetUsage && !collectorsReady) { + return boom.serverUnavailable(STATS_NOT_READY_MESSAGE); + } const usagePromise = shouldGetUsage ? getUsage(callCluster) : Promise.resolve(); try { @@ -77,7 +87,6 @@ export function registerStatsApi(kbnServer, server, config) { getClusterUuid(callCluster), ]); - let modifiedUsage = usage; if (isLegacy) { // In an effort to make telemetry more easily augmented, we need to ensure @@ -123,7 +132,7 @@ export function registerStatsApi(kbnServer, server, config) { }); } } catch (e) { - throw boomify(e); + throw boom.boomify(e); } } @@ -131,6 +140,9 @@ export function registerStatsApi(kbnServer, server, config) { * for health-checking Kibana and fetch does not rely on fetching data * from ES */ const kibanaStatsCollector = collectorSet.getCollectorByType(KIBANA_STATS_TYPE); + if (!await kibanaStatsCollector.isReady()) { + return boom.serverUnavailable(STATS_NOT_READY_MESSAGE); + } let kibanaStats = await kibanaStatsCollector.fetch(); kibanaStats = collectorSet.toApiFieldNames(kibanaStats); diff --git a/src/legacy/server/usage/classes/collector.js b/src/legacy/server/usage/classes/collector.js index f103126740e130..40b004f51e49ac 100644 --- a/src/legacy/server/usage/classes/collector.js +++ b/src/legacy/server/usage/classes/collector.js @@ -28,7 +28,7 @@ export class Collector { * @param {Function} options.formatForBulkUpload - optional * @param {Function} options.rest - optional other properties */ - constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) { + constructor(server, { type, init, fetch, formatForBulkUpload = null, isReady = null, ...options } = {}) { if (type === undefined) { throw new Error('Collector must be instantiated with a options.type string property'); } @@ -49,6 +49,9 @@ export class Collector { const defaultFormatterForBulkUpload = result => ({ type, payload: result }); this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload; + if (typeof isReady === 'function') { + this.isReady = isReady; + } } /* @@ -69,4 +72,8 @@ export class Collector { formatForBulkUpload(result) { return this._formatForBulkUpload(result); } + + isReady() { + throw `isReady() must be implemented in ${this.type} collector`; + } } diff --git a/src/legacy/server/usage/classes/collector_set.js b/src/legacy/server/usage/classes/collector_set.js index c7140b7abb486e..a6b7e69b312615 100644 --- a/src/legacy/server/usage/classes/collector_set.js +++ b/src/legacy/server/usage/classes/collector_set.js @@ -23,18 +23,19 @@ import { getCollectorLogger } from '../lib'; import { Collector } from './collector'; import { UsageCollector } from './usage_collector'; +let _waitingForAllCollectorsTimestamp = null; + /* * A collector object has types registered into it with the register(type) * function. Each type that gets registered defines how to fetch its own data * and optionally, how to combine it into a unified payload for bulk upload. */ export class CollectorSet { - /* * @param {Object} server - server object * @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance */ - constructor(server, collectors = []) { + constructor(server, collectors = [], config = null) { this._log = getCollectorLogger(server); this._collectors = collectors; @@ -44,7 +45,9 @@ export class CollectorSet { */ this.makeStatsCollector = options => new Collector(server, options); this.makeUsageCollector = options => new UsageCollector(server, options); - this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray); + this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray, config); + + this._maximumWaitTimeForAllCollectorsInS = config ? config.get('stats.maximumWaitTimeForAllCollectorsInS') : 60; } /* @@ -73,6 +76,40 @@ export class CollectorSet { return x instanceof UsageCollector; } + async areAllCollectorsReady(collectorSet = this) { + if (!(collectorSet instanceof CollectorSet)) { + throw new Error(`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet); + } + + const collectorTypesNotReady = []; + let allReady = true; + await collectorSet.asyncEach(async collector => { + if (!await collector.isReady()) { + allReady = false; + collectorTypesNotReady.push(collector.type); + } + }); + + if (!allReady && this._maximumWaitTimeForAllCollectorsInS >= 0) { + const nowTimestamp = +new Date(); + _waitingForAllCollectorsTimestamp = _waitingForAllCollectorsTimestamp || nowTimestamp; + const timeWaitedInMS = nowTimestamp - _waitingForAllCollectorsTimestamp; + const timeLeftInMS = (this._maximumWaitTimeForAllCollectorsInS * 1000) - timeWaitedInMS; + if (timeLeftInMS <= 0) { + this._log.debug(`All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) ` + + `but we have waited the required ` + + `${this._maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`); + return true; + } else { + this._log.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`); + } + } else { + _waitingForAllCollectorsTimestamp = null; + } + + return allReady; + } + /* * Call a bunch of fetch methods and then do them in bulk * @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors @@ -155,4 +192,14 @@ export class CollectorSet { map(mapFn) { return this._collectors.map(mapFn); } + + some(someFn) { + return this._collectors.some(someFn); + } + + async asyncEach(eachFn) { + for (const collector of this._collectors) { + await eachFn(collector); + } + } } diff --git a/src/legacy/server/usage/index.js b/src/legacy/server/usage/index.js index 7cf9ae5b55f52b..2a02070a55f953 100644 --- a/src/legacy/server/usage/index.js +++ b/src/legacy/server/usage/index.js @@ -19,8 +19,8 @@ import { CollectorSet } from './classes'; -export function usageMixin(kbnServer, server) { - const collectorSet = new CollectorSet(server); +export function usageMixin(kbnServer, server, config) { + const collectorSet = new CollectorSet(server, undefined, config); /* * expose the collector set object on the server diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/make_apm_usage_collector.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/make_apm_usage_collector.ts index 6ea2bdb1eb944f..960b833ebb6775 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/make_apm_usage_collector.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/make_apm_usage_collector.ts @@ -37,7 +37,8 @@ export function makeApmUsageCollector(core: CoreSetupWithUsageCollector) { } catch (err) { return createApmTelementry(); } - } + }, + isReady: () => true }); server.usage.collectorSet.register(apmUsageCollector); } diff --git a/x-pack/plugins/canvas/server/usage/collector.js b/x-pack/plugins/canvas/server/usage/collector.js index 83bfbfa80b4c25..0926f33543d8be 100644 --- a/x-pack/plugins/canvas/server/usage/collector.js +++ b/x-pack/plugins/canvas/server/usage/collector.js @@ -136,6 +136,7 @@ export function registerCanvasUsageCollector(server) { const index = server.config().get('kibana.index'); const collector = server.usage.collectorSet.makeUsageCollector({ type: CANVAS_USAGE_TYPE, + isReady: () => true, fetch: async callCluster => { const searchParams = { size: 10000, // elasticsearch index.max_result_window default value diff --git a/x-pack/plugins/cloud/get_cloud_usage_collector.ts b/x-pack/plugins/cloud/get_cloud_usage_collector.ts index 2784a85176ba67..5ce7be59a1c9cc 100644 --- a/x-pack/plugins/cloud/get_cloud_usage_collector.ts +++ b/x-pack/plugins/cloud/get_cloud_usage_collector.ts @@ -37,6 +37,7 @@ export function getCloudUsageCollector(server: KibanaHapiServer) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_CLOUD_STATS_TYPE, + isReady: () => true, fetch: createCollectorFetch(server), }); } diff --git a/x-pack/plugins/infra/server/usage/usage_collector.ts b/x-pack/plugins/infra/server/usage/usage_collector.ts index b50d627d2ce094..018c903009bbec 100644 --- a/x-pack/plugins/infra/server/usage/usage_collector.ts +++ b/x-pack/plugins/infra/server/usage/usage_collector.ts @@ -22,6 +22,7 @@ export class UsageCollector { return collectorSet.makeUsageCollector({ type: KIBANA_REPORTING_TYPE, + isReady: () => true, fetch: async () => { return this.getReport(); }, diff --git a/x-pack/plugins/maps/server/maps_telemetry/maps_usage_collector.js b/x-pack/plugins/maps/server/maps_telemetry/maps_usage_collector.js index 88713a44711238..9c76be739fbe2e 100644 --- a/x-pack/plugins/maps/server/maps_telemetry/maps_usage_collector.js +++ b/x-pack/plugins/maps/server/maps_telemetry/maps_usage_collector.js @@ -13,37 +13,63 @@ export function initTelemetryCollection(server) { registerMapsUsageCollector(server); } -export function buildCollectorObj(server) { - return { - type: 'maps', - fetch: async () => { - let docs; - try { - ({ docs } = await server.taskManager.fetch({ - query: { - bool: { - filter: { - term: { - _id: TASK_ID - } - } +async function isTaskManagerReady(server) { + const result = await fetch(server); + return result !== null; +} + +async function fetch(server) { + let docs; + try { + ({ docs } = await server.taskManager.fetch({ + query: { + bool: { + filter: { + term: { + _id: TASK_ID } } - })); - } catch (err) { - const errMessage = err && err.message ? err.message : err.toString(); - /* - * The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager - * has to wait for all plugins to initialize first. - * It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error) - */ - if (errMessage.indexOf('NotInitialized') >= 0) { - docs = {}; - } else { - throw err; } } + })); + } catch (err) { + const errMessage = err && err.message ? err.message : err.toString(); + /* + * The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager + * has to wait for all plugins to initialize first. + * It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error) + */ + if (errMessage.indexOf('NotInitialized') >= 0) { + return null; + } else { + throw err; + } + } + + return docs; +} + +export function buildCollectorObj(server) { + let isCollectorReady = false; + async function determineIfTaskManagerIsReady() { + let isReady = false; + try { + isReady = await isTaskManagerReady(server); + } catch (err) {} // eslint-disable-line + + if (isReady) { + isCollectorReady = true; + } else { + setTimeout(determineIfTaskManagerIsReady, 500); + } + } + determineIfTaskManagerIsReady(); + return { + type: 'maps', + isReady: () => isCollectorReady, + fetch: async () => { + const docs = await fetch(server); return _.get(docs, '[0].state.stats'); }, }; diff --git a/x-pack/plugins/ml/server/lib/ml_telemetry/make_ml_usage_collector.ts b/x-pack/plugins/ml/server/lib/ml_telemetry/make_ml_usage_collector.ts index fe0e11826246e3..e012b7a06e91db 100644 --- a/x-pack/plugins/ml/server/lib/ml_telemetry/make_ml_usage_collector.ts +++ b/x-pack/plugins/ml/server/lib/ml_telemetry/make_ml_usage_collector.ts @@ -27,6 +27,7 @@ interface KibanaHapiServer extends Server { export function makeMlUsageCollector(server: KibanaHapiServer): void { const mlUsageCollector = server.usage.collectorSet.makeUsageCollector({ type: 'ml', + isReady: () => true, fetch: async (): Promise => { try { const savedObjectsClient = getSavedObjectsClient(server); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index fe5a3f51dff463..5e2adbb2d65abe 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -20,6 +20,9 @@ class MockCollectorSet { isUsageCollector(x) { return !!x.isUsageCollector; } + areAllCollectorsReady() { + return this.mockCollectors.every(collector => collector.isReady()); + } getCollectorByType(type) { return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0]; } @@ -29,6 +32,9 @@ class MockCollectorSet { async bulkFetch() { return this.mockCollectors.map(({ fetch }) => fetch()); } + some(someFn) { + return this.mockCollectors.some(someFn); + } } describe('BulkUploader', () => { @@ -61,6 +67,7 @@ describe('BulkUploader', () => { { type: 'type_collector_test', fetch: noop, // empty payloads, + isReady: () => true, formatForBulkUpload: result => result, } ]); @@ -94,10 +101,56 @@ describe('BulkUploader', () => { }, CHECK_DELAY); }); + it('should not upload if some collectors are not ready', done => { + const collectors = new MockCollectorSet(server, [ + { + type: 'type_collector_test', + fetch: noop, // empty payloads, + isReady: () => false, + formatForBulkUpload: result => result, + }, + { + type: 'type_collector_test2', + fetch: noop, // empty payloads, + isReady: () => true, + formatForBulkUpload: result => result, + } + ]); + + const uploader = new BulkUploader(server, { + interval: FETCH_INTERVAL + }); + + uploader.start(collectors); + + // allow interval to tick a few times + setTimeout(() => { + uploader.stop(); + + const loggingCalls = server.log.getCalls(); + expect(loggingCalls.length).to.be.greaterThan(2); // should be 3-5: start, fetch, skip, fetch, skip + expect(loggingCalls[0].args).to.eql([ + ['info', 'monitoring', 'kibana-monitoring'], + 'Starting monitoring stats collection', + ]); + expect(loggingCalls[1].args).to.eql([ + ['debug', 'monitoring', 'kibana-monitoring'], + 'Skipping bulk uploading because not all collectors are ready', + ]); + expect(loggingCalls[loggingCalls.length - 1].args).to.eql([ + ['info', 'monitoring', 'kibana-monitoring'], + 'Monitoring stats collection is stopped', + ]); + + done(); + }, CHECK_DELAY); + }); + it('should run the bulk upload handler', done => { const collectors = new MockCollectorSet(server, [ { fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }), + isReady: () => true, formatForBulkUpload: result => result } ]); @@ -135,11 +188,13 @@ describe('BulkUploader', () => { const collectors = new MockCollectorSet(server, [ { fetch: usageCollectorFetch, + isReady: () => true, formatForBulkUpload: result => result, isUsageCollector: true, }, { fetch: collectorFetch, + isReady: () => true, formatForBulkUpload: result => result, isUsageCollector: false, } @@ -166,11 +221,13 @@ describe('BulkUploader', () => { const collectors = new MockCollectorSet(server, [ { fetch: usageCollectorFetch, + isReady: () => true, formatForBulkUpload: result => result, isUsageCollector: true, }, { fetch: collectorFetch, + isReady: () => true, formatForBulkUpload: result => result, isUsageCollector: false, } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index b16ca429f34079..ea4f8e4c461444 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -69,7 +69,6 @@ export class BulkUploader { this._log.info('Starting monitoring stats collection'); const filterCollectorSet = _collectorSet => { const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now(); - this._lastFetchWithUsage = !filterUsage; if (!filterUsage) { this._lastFetchUsageTime = Date.now(); } @@ -123,6 +122,16 @@ export class BulkUploader { * @return {Promise} - resolves to undefined */ async _fetchAndUpload(collectorSet) { + const collectorsReady = await collectorSet.areAllCollectorsReady(); + if (!collectorsReady) { + this._log.debug('Skipping bulk uploading because not all collectors are ready'); + if (collectorSet.some(collectorSet.isUsageCollector)) { + this._lastFetchUsageTime = null; + this._log.debug('Resetting lastFetchWithUsage because not all collectors are ready'); + } + return; + } + const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser); const payload = this.toBulkUploadFormat(compact(data), collectorSet); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js index a81db5aca586a1..96d0c98cf3f050 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js @@ -23,7 +23,7 @@ export function getKibanaUsageCollector(server) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_USAGE_TYPE, - + isReady: () => true, async fetch(callCluster) { const index = server.config().get('kibana.index'); const savedObjectCountSearchParams = { diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js index 0c202b1c9e3657..509689df84d8c1 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js @@ -80,6 +80,7 @@ export function getOpsStatsCollector(server, kbnServer) { return collectorSet.makeStatsCollector({ type: KIBANA_STATS_TYPE_MONITORING, init: opsMonitor.start, + isReady: () => buffer.hasEvents(), fetch: async () => { return await buffer.flush(); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js index 4b9fb5a51efb39..11129fe94b5798 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js @@ -86,6 +86,7 @@ export function getSettingsCollector(server) { return collectorSet.makeStatsCollector({ type: KIBANA_SETTINGS_TYPE, + isReady: () => true, async fetch(callCluster) { let kibanaSettingsData; const defaultAdminEmail = await checkForEmailValue(config, callCluster, this.log); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/event_roller.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/event_roller.js index 4f88bd0b75093a..4b25fd9b706cf2 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/event_roller.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/event_roller.js @@ -22,6 +22,10 @@ export class EventRoller { return get(this.rollup, path); } + hasEvents() { + return this.rollup !== null; + } + rollupEvent(event) { const heapStats = v8.getHeapStatistics(); const requests = mapRequests(event.requests); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js index b8c670edf1297e..457213c784ad6b 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js @@ -29,6 +29,10 @@ export function opsBuffer(server) { server.log(['debug', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG], 'Received Kibana Ops event data'); }, + hasEvents() { + return eventRoller.hasEvents(); + }, + async flush() { let cloud; // a property that will be left out of the result if the details are undefined const cloudDetails = cloudDetector.getCloudDetails(); diff --git a/x-pack/plugins/oss_telemetry/server/lib/collectors/visualizations/get_usage_collector.ts b/x-pack/plugins/oss_telemetry/server/lib/collectors/visualizations/get_usage_collector.ts index 159ca7eec8063d..8976cffc8ea400 100644 --- a/x-pack/plugins/oss_telemetry/server/lib/collectors/visualizations/get_usage_collector.ts +++ b/x-pack/plugins/oss_telemetry/server/lib/collectors/visualizations/get_usage_collector.ts @@ -8,30 +8,55 @@ import { get } from 'lodash'; import { HapiServer } from '../../../../'; import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_USAGE_TYPE } from '../../../../constants'; -export function getUsageCollector(server: HapiServer) { +async function isTaskManagerReady(server: HapiServer) { + const result = await fetch(server); + return result !== null; +} + +async function fetch(server: HapiServer) { const { taskManager } = server; + + let docs; + try { + ({ docs } = await taskManager.fetch({ + query: { bool: { filter: { term: { _id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}` } } } }, + })); + } catch (err) { + const errMessage = err && err.message ? err.message : err.toString(); + /* + The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager has to wait for all plugins to initialize first. It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error) + */ + if (errMessage.includes('NotInitialized')) { + docs = null; + } else { + throw err; + } + } + + return docs; +} + +export function getUsageCollector(server: HapiServer) { + let isCollectorReady = false; + async function determineIfTaskManagerIsReady() { + let isReady = false; + try { + isReady = await isTaskManagerReady(server); + } catch (err) {} // eslint-disable-line + + if (isReady) { + isCollectorReady = true; + } else { + setTimeout(determineIfTaskManagerIsReady, 500); + } + } + determineIfTaskManagerIsReady(); + return { type: VIS_USAGE_TYPE, + isReady: () => isCollectorReady, fetch: async () => { - let docs; - try { - ({ docs } = await taskManager.fetch({ - query: { bool: { filter: { term: { _id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}` } } } }, - })); - } catch (err) { - const errMessage = err && err.message ? err.message : err.toString(); - /* - * The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager - * has to wait for all plugins to initialize first. - * It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error) - */ - if (errMessage.includes('NotInitialized')) { - docs = {}; - } else { - throw err; - } - } - + const docs = await fetch(server); // get the accumulated state from the recurring task return get(docs, '[0].state.stats'); }, diff --git a/x-pack/plugins/reporting/index.js b/x-pack/plugins/reporting/index.js index eb40d5d51b5f74..839ba965a0e640 100644 --- a/x-pack/plugins/reporting/index.js +++ b/x-pack/plugins/reporting/index.js @@ -151,6 +151,11 @@ export const reporting = (kibana) => { }, init: async function (server) { + let isCollectorReady = false; + const isReady = () => isCollectorReady; + // Register a function with server to manage the collection of usage stats + server.usage.collectorSet.register(getReportingUsageCollector(server, isReady)); + const exportTypesRegistry = await exportTypesRegistryFactory(server); const browserFactory = await createBrowserDriverFactory(server); server.expose('exportTypesRegistry', exportTypesRegistry); @@ -172,8 +177,8 @@ export const reporting = (kibana) => { xpackMainPlugin.info.feature(this.id).registerLicenseCheckResultsGenerator(checkLicense); }); - // Register a function with server to manage the collection of usage stats - server.usage.collectorSet.register(getReportingUsageCollector(server)); + // Post initialization of the above code, the collector is now ready to fetch its data + isCollectorReady = true; server.expose('browserDriverFactory', browserFactory); server.expose('queue', createQueueFactory(server)); diff --git a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js index d5f056f32ad03f..36cd4f6ac4ffbc 100644 --- a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js +++ b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js @@ -114,11 +114,11 @@ async function getReportingUsageWithinRange(callCluster, server, reportingAvaila * @param {Object} server * @return {Object} kibana usage stats type collection object */ -export function getReportingUsageCollector(server) { +export function getReportingUsageCollector(server, isReady) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_REPORTING_TYPE, - + isReady, fetch: async callCluster => { const xpackInfo = server.plugins.xpack_main.info; const config = server.config(); diff --git a/x-pack/plugins/rollup/server/usage/collector.js b/x-pack/plugins/rollup/server/usage/collector.js index f7d70f04d47500..977253dfa53fb9 100644 --- a/x-pack/plugins/rollup/server/usage/collector.js +++ b/x-pack/plugins/rollup/server/usage/collector.js @@ -168,6 +168,7 @@ export function registerRollupUsageCollector(server) { const collector = server.usage.collectorSet.makeUsageCollector({ type: ROLLUP_USAGE_TYPE, + isReady: () => true, fetch: async callCluster => { const rollupIndexPatterns = await fetchRollupIndexPatterns(kibanaIndex, callCluster); const rollupIndexPatternToFlagMap = createIdToFlagMap(rollupIndexPatterns); diff --git a/x-pack/plugins/spaces/server/lib/get_spaces_usage_collector.ts b/x-pack/plugins/spaces/server/lib/get_spaces_usage_collector.ts index 693fcea699449a..d7c1fe2bdf111b 100644 --- a/x-pack/plugins/spaces/server/lib/get_spaces_usage_collector.ts +++ b/x-pack/plugins/spaces/server/lib/get_spaces_usage_collector.ts @@ -98,6 +98,7 @@ export function getSpacesUsageCollector(server: any) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_SPACES_STATS_TYPE, + isReady: () => true, fetch: async (callCluster: any) => { const xpackInfo = server.plugins.xpack_main.info; const config = server.config(); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/telemetry/usage_collector.ts b/x-pack/plugins/upgrade_assistant/server/lib/telemetry/usage_collector.ts index 333d1cb2190bc3..a52ee1c01c5265 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/telemetry/usage_collector.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/telemetry/usage_collector.ts @@ -100,6 +100,7 @@ export function makeUpgradeAssistantUsageCollector(server: UpgradeAssistantTelem const kbnServer = server as UpgradeAssistantTelemetryServer; const upgradeAssistantUsageCollector = kbnServer.usage.collectorSet.makeUsageCollector({ type: UPGRADE_ASSISTANT_TYPE, + isReady: () => true, fetch: async (callCluster: any) => fetchUpgradeAssistantMetrics(callCluster, server), }); diff --git a/x-pack/plugins/xpack_main/server/lib/get_localization_usage_collector.ts b/x-pack/plugins/xpack_main/server/lib/get_localization_usage_collector.ts index 61c7b8e77458cb..f1b280919b5aa7 100644 --- a/x-pack/plugins/xpack_main/server/lib/get_localization_usage_collector.ts +++ b/x-pack/plugins/xpack_main/server/lib/get_localization_usage_collector.ts @@ -48,6 +48,7 @@ export function getLocalizationUsageCollector(server: any) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_LOCALIZATION_STATS_TYPE, + isReady: () => true, fetch: createCollectorFetch(server), }); } diff --git a/x-pack/plugins/xpack_main/server/lib/telemetry/usage/telemetry_usage_collector.ts b/x-pack/plugins/xpack_main/server/lib/telemetry/usage/telemetry_usage_collector.ts index f14870b4c5099a..99f76112bb91be 100644 --- a/x-pack/plugins/xpack_main/server/lib/telemetry/usage/telemetry_usage_collector.ts +++ b/x-pack/plugins/xpack_main/server/lib/telemetry/usage/telemetry_usage_collector.ts @@ -83,6 +83,7 @@ export async function readTelemetryFile(path: string): Promise true, fetch: async () => { const configPath: string = server.config().get('xpack.xpack_main.telemetry.config'); const telemetryPath = join(dirname(configPath), 'telemetry.yml'); diff --git a/x-pack/test/functional/config.js b/x-pack/test/functional/config.js index b698a26545de20..6b8dc8a2fc1060 100644 --- a/x-pack/test/functional/config.js +++ b/x-pack/test/functional/config.js @@ -192,6 +192,7 @@ export default async function ({ readConfigFile }) { '--server.uuid=5b2de169-2785-441b-ae8c-186a1936b17d', '--xpack.xpack_main.telemetry.enabled=false', '--xpack.maps.showMapsInspectorAdapter=true', + '--stats.maximumWaitTimeForAllCollectorsInS=0', '--xpack.security.encryptionKey="wuGNaIhoMpk5sO4UBxgr3NyW1sFcLgIf"', // server restarts should not invalidate active sessions '--xpack.code.security.enableGitCertCheck=false', // Disable git certificate check '--timelion.ui.enabled=true',