-
Notifications
You must be signed in to change notification settings - Fork 8.2k
/
telemetry_task.js
118 lines (104 loc) · 3.51 KB
/
telemetry_task.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { getMapsTelemetry } from './maps_telemetry';
const TELEMETRY_TASK_TYPE = 'maps_telemetry';
export const TASK_ID = `Maps-${TELEMETRY_TASK_TYPE}`;
export function scheduleTask(server, taskManager) {
const { kbnServer } = server.plugins.xpack_main.status.plugin;
kbnServer.afterPluginsInit(() => {
// The code block below can't await directly within "afterPluginsInit"
// callback due to circular dependency. The server isn't "ready" until
// this code block finishes. Migrations wait for server to be ready before
// executing. Saved objects repository waits for migrations to finish before
// finishing the request. To avoid this, we'll await within a separate
// function block.
(async () => {
try {
await taskManager.schedule({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { stats: {}, runs: 0 },
});
} catch(e) {
server.log(['warning', 'maps'], `Error scheduling telemetry task, received ${e.message}`);
}
})();
});
}
export function registerMapsTelemetryTask(server) {
const taskManager = server.plugins.task_manager;
taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Maps telemetry fetch task',
type: TELEMETRY_TASK_TYPE,
timeout: '1m',
numWorkers: 2,
createTaskRunner: telemetryTaskRunner(server),
},
});
}
export function telemetryTaskRunner(server) {
return ({ taskInstance }) => {
const { state } = taskInstance;
const prevState = state;
const callCluster = server.plugins.elasticsearch.getCluster('admin')
.callWithInternalUser;
let mapsTelemetryTask;
return {
async run({ taskCanceled = false } = {}) {
try {
mapsTelemetryTask = makeCancelable(
getMapsTelemetry(server, callCluster),
taskCanceled
);
} catch (err) {
server.log(['warning'], `Error loading maps telemetry: ${err}`);
} finally {
return mapsTelemetryTask
.promise
.then((mapsTelemetry = {}) => {
return {
state: {
runs: state.runs || 0 + 1,
stats: mapsTelemetry.attributes || prevState.stats || {},
},
runAt: getNextMidnight(),
};
})
.catch(errMsg => server.log(['warning'],
`Error executing maps telemetry task: ${errMsg}`));
}
},
async cancel() {
if (mapsTelemetryTask) {
mapsTelemetryTask.cancel();
} else {
server.log(['warning'], `Can not cancel "mapsTelemetryTask", it has not been defined`);
}
}
};
};
}
function makeCancelable(promise, isCanceled) {
const logMsg = 'Maps telemetry task has been cancelled';
const wrappedPromise = new Promise((resolve, reject) => {
promise
.then(val => isCanceled ? reject(logMsg) : resolve(val))
.catch(err => isCanceled ? reject(logMsg) : reject(err.message));
});
return {
promise: wrappedPromise,
cancel() {
isCanceled = true;
},
};
}
function getNextMidnight() {
const nextMidnight = new Date();
nextMidnight.setHours(0, 0, 0, 0);
nextMidnight.setDate(nextMidnight.getDate() + 1);
return nextMidnight;
}