Skip to content

Commit

Permalink
[ResponseOps][task manager] log event loop delay when over configured…
Browse files Browse the repository at this point in the history
… limit

resolves #124366

Adds new task manager configuration keys.

- `xpack.task_manager.event_loop_delay.monitor` - whether to monitor
  event loop delay or not; added in case this specific monitoring
  causes other issues and we'd want to disable it.  We don't know
  of any cases where we'd need this today

- `xpack.task_manager.event_loop_delay.warn_on_delay` - the number
  of milliseconds of event loop delay before logging a warning

This code uses the `perf_hooks.monitorEventLoopDelay()` API[1] to collect
the event loop delay while a task is running.

[1] https://nodejs.org/api/perf_hooks.html#perf_hooksmonitoreventloopdelayoptions

When a significant event loop delay is encountered, it's very likely
that other tasks running at the same time will be affected, and so
will also end up having a long event loop delay value, and warnings
will be logged on those.  Over time, though, tasks which have consistently
long event loop delays will outnumber those unfortunate peer tasks, and
be obvious from the volume in the logs.

To make it a bit easier to find these when viewing Kibana logs in Discover,
tags are added to the logged messages to make it easier to find them.  One
tag is `event-loop-blocked`, and the other tag added is a string consisting
of the task type and task id.
  • Loading branch information
pmuellr committed Mar 8, 2022
1 parent 7a00d24 commit a535177
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/settings/task-manager-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ These non-persisted action tasks have a risk that they won't be run at all if th
`xpack.task_manager.ephemeral_tasks.request_capacity`::
Sets the size of the ephemeral queue defined above. Defaults to 10.

`xpack.task_manager.event_loop_delay.monitor`::
Enables event loop delay monitoring, which will log a warning when a task causes an event loop delay which exceeds the `warn_on_delay` setting. Defaults to true.

`xpack.task_manager.event_loop_delay.warn_on_delay`::
Sets the amount of event loop delay during a task execution which will cause a warning to be logged. Defaults to 5000 milliseconds (5 seconds).

[float]
[[task-manager-health-settings]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ kibana_vars=(
xpack.task_manager.poll_interval
xpack.task_manager.request_capacity
xpack.task_manager.version_conflict_threshold
xpack.task_manager.event_loop_delay.monitor
xpack.task_manager.event_loop_delay.warn_on_delay
xpack.uptime.index
)

Expand Down
12 changes: 12 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down Expand Up @@ -62,6 +66,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down Expand Up @@ -106,6 +114,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down
10 changes: 10 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ export const taskExecutionFailureThresholdSchema = schema.object(
}
);

const eventLoopDelaySchema = schema.object({
monitor: schema.boolean({ defaultValue: true }),
warn_on_delay: schema.number({
defaultValue: 5000,
min: 10,
}),
});

export const configSchema = schema.object(
{
/* The maximum number of times a task will be attempted before being abandoned as failed */
Expand Down Expand Up @@ -118,6 +126,7 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
event_loop_delay: eventLoopDelaySchema,
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
Expand All @@ -138,3 +147,4 @@ export const configSchema = schema.object(

export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ describe('EphemeralTaskLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
...config,
},
elasticsearchAndSOAvailability$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ describe.skip('managed configuration', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});
logger = context.logger.get('taskManager');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ describe('Configuration Statistics Aggregator', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
};

const managedConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ describe('createMonitoringStatsStream', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
};

it('returns the initial config used to configure Task Manager', async () => {
Expand Down
12 changes: 12 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

pluginInitializerContext.env.instanceUuid = '';
Expand Down Expand Up @@ -84,6 +88,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
Expand Down Expand Up @@ -154,6 +162,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: ['*'],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

const logger = pluginInitializerContext.logger.get();
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ describe('TaskPollingLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class TaskPollingLifecycle {
private middleware: Middleware;

private usageCounter?: UsageCounter;
private config: TaskManagerConfig;

/**
* Initializes the task manager, preventing any further addition of middleware,
Expand All @@ -117,6 +118,7 @@ export class TaskPollingLifecycle {
this.store = taskStore;
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;

const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);

Expand Down Expand Up @@ -240,6 +242,7 @@ export class TaskPollingLifecycle {
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
});
};

Expand Down
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/task_events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { startTaskTimer, startTaskTimerWithEventLoopMonitoring } from './task_events';

const DelayIterations = 4;
const DelayMillis = 250;
const DelayTotal = DelayIterations * DelayMillis;

async function nonBlockingDelay(millis: number) {
await new Promise((resolve) => setTimeout(resolve, millis));
}

async function blockingDelay(millis: number) {
// get task in async queue
await nonBlockingDelay(0);

const end = Date.now() + millis;
// eslint-disable-next-line no-empty
while (Date.now() < end) {}
}

async function nonBlockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await nonBlockingDelay(DelayMillis);
}
}

async function blockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await blockingDelay(DelayMillis);
}
}

describe('task_events', () => {
test('startTaskTimer', async () => {
const stopTaskTimer = startTaskTimer();
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(undefined);
});

describe('startTaskTimerWithEventLoopMonitoring', () => {
test('non-blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_on_delay: 5000,
});
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBeLessThan(DelayMillis);
});

test('blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_on_delay: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).not.toBeLessThan(DelayMillis);
});

test('not monitoring', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: false,
warn_on_delay: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(0);
});
});
});
20 changes: 20 additions & 0 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* 2.0.
*/

import { monitorEventLoopDelay } from 'perf_hooks';

import { Option } from 'fp-ts/lib/Option';

import { ConcreteTaskInstance } from './task';
Expand All @@ -14,6 +16,7 @@ import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';

export enum TaskPersistence {
Recurring = 'recurring',
Expand All @@ -40,6 +43,7 @@ export enum TaskClaimErrorType {
export interface TaskTiming {
start: number;
stop: number;
eventLoopBlockMs?: number;
}
export type WithTaskTiming<T> = T & { timing: TaskTiming };

Expand All @@ -48,6 +52,22 @@ export function startTaskTimer(): () => TaskTiming {
return () => ({ start, stop: Date.now() });
}

export function startTaskTimerWithEventLoopMonitoring(
eventLoopDelayConfig: EventLoopDelayConfig
): () => TaskTiming {
const stopTaskTimer = startTaskTimer();
const eldHistogram = eventLoopDelayConfig.monitor ? monitorEventLoopDelay() : null;
eldHistogram?.enable();

return () => {
const { start, stop } = stopTaskTimer();
eldHistogram?.disable();
const eldMax = eldHistogram?.max ?? 0;
const eventLoopBlockMs = Math.round(eldMax / 1000 / 1000); // original in nanoseconds
return { start, stop, eventLoopBlockMs };
};
}

export interface TaskEvent<OkResult, ErrorResult, ID = string> {
id?: ID;
timing?: TaskTiming;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,11 @@ describe('TaskManagerRunner', () => {
function withAnyTiming(taskRun: TaskRun) {
return {
...taskRun,
timing: { start: expect.any(Number), stop: expect.any(Number) },
timing: {
start: expect.any(Number),
stop: expect.any(Number),
eventLoopBlockMs: expect.any(Number),
},
};
}

Expand Down Expand Up @@ -1590,6 +1594,10 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_on_delay: 5000,
},
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
Loading

0 comments on commit a535177

Please sign in to comment.