Skip to content

Commit

Permalink
refactor config, add some jest tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pmuellr committed Mar 7, 2022
1 parent b8e7c47 commit eeed244
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 24 deletions.
15 changes: 12 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_blocked_warning_limit": 2000,
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down Expand Up @@ -63,7 +66,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_blocked_warning_limit": 2000,
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down Expand Up @@ -108,7 +114,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_blocked_warning_limit": 2000,
"event_loop_delay": Object {
"monitor": true,
"warn_on_delay": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
Expand Down
12 changes: 10 additions & 2 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ export const taskExecutionFailureThresholdSchema = schema.object(
}
);

const eventLoopDelaySchema = schema.object({
monitor: schema.boolean({ defaultValue: true }),
warn_on_delay: schema.number({
defaultValue: 5000,
min: 10,
}),
});

export const configSchema = schema.object(
{
/* The maximum number of times a task will be attempted before being abandoned as failed */
Expand Down Expand Up @@ -118,11 +126,10 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
event_loop_blocked_warning_limit: schema.number({ defaultValue: 2000 }),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
}),
event_loop_delay: eventLoopDelaySchema,
},
{
validate: (config) => {
Expand All @@ -139,3 +146,4 @@ export const configSchema = schema.object(

export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ describe('EphemeralTaskLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
...config,
},
elasticsearchAndSOAvailability$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ describe.skip('managed configuration', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});
logger = context.logger.get('taskManager');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ describe('Configuration Statistics Aggregator', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
};

const managedConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ describe('createMonitoringStatsStream', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
};

it('returns the initial config used to configure Task Manager', async () => {
Expand Down
15 changes: 12 additions & 3 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

pluginInitializerContext.env.instanceUuid = '';
Expand Down Expand Up @@ -85,7 +88,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
Expand Down Expand Up @@ -156,7 +162,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: ['*'],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
});

const logger = pluginInitializerContext.logger.get();
Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ describe('TaskPollingLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_blocked_warning_limit: 2000,
event_loop_delay: {
monitor: true,
warn_on_delay: 5000,
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ export class TaskPollingLifecycle {
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopBlockedWarningLimit: this.config.event_loop_blocked_warning_limit,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
});
};

Expand Down
83 changes: 83 additions & 0 deletions x-pack/plugins/task_manager/server/task_events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { range } from 'lodash';
import { startTaskTimer, startTaskTimerWithEventLoopMonitoring } from './task_events';

const DelayIterations = 4;
const DelayMillis = 250;
const DelayTotal = DelayIterations * DelayMillis;

async function nonBlockingDelay(millis: number) {
await new Promise((resolve) => setTimeout(resolve, millis));
}

async function blockingDelay(millis: number) {
// get task in async queue
await nonBlockingDelay(0);

const end = Date.now() + millis;
// eslint-disable-next-line no-empty
while (Date.now() < end) {}
}

async function nonBlockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await nonBlockingDelay(DelayMillis);
}
}

async function blockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await blockingDelay(DelayMillis);
}
}

describe('task_events', () => {
test('startTaskTimer', async () => {
const stopTaskTimer = startTaskTimer();
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(undefined);
});

describe('startTaskTimerWithEventLoopMonitoring', () => {
test('non-blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_on_delay: 5000,
});
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBeLessThan(DelayMillis);
});

test('blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_on_delay: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).not.toBeLessThan(DelayMillis);
});

test('not monitoring', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: false,
warn_on_delay: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(0);
});
});
});
7 changes: 5 additions & 2 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';

export enum TaskPersistence {
Recurring = 'recurring',
Expand Down Expand Up @@ -51,9 +52,11 @@ export function startTaskTimer(): () => TaskTiming {
return () => ({ start, stop: Date.now() });
}

export function startTaskTimerWithEventBlocking(elbLimit: number): () => TaskTiming {
export function startTaskTimerWithEventLoopMonitoring(
eventLoopDelayConfig: EventLoopDelayConfig
): () => TaskTiming {
const stopTaskTimer = startTaskTimer();
const eldHistogram = elbLimit > 0 ? monitorEventLoopDelay() : null;
const eldHistogram = eventLoopDelayConfig.monitor ? monitorEventLoopDelay() : null;
eldHistogram?.enable();

return () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,10 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopBlockedWarningLimit: 2000,
eventLoopDelayConfig: {
monitor: true,
warn_on_delay: 5000,
},
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
15 changes: 8 additions & 7 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
TaskMarkRunning,
asTaskRunEvent,
asTaskMarkRunningEvent,
startTaskTimerWithEventBlocking,
startTaskTimerWithEventLoopMonitoring,
TaskTiming,
TaskPersistence,
} from '../task_events';
Expand All @@ -56,6 +56,7 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';

const defaultBackoffPerFailure = 5 * 60 * 1000;
export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
Expand Down Expand Up @@ -105,7 +106,7 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopBlockedWarningLimit: number;
eventLoopDelayConfig: EventLoopDelayConfig;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -153,7 +154,7 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopBlockedWarningLimit: number;
private eventLoopDelayConfig: EventLoopDelayConfig;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -176,7 +177,7 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopBlockedWarningLimit,
eventLoopDelayConfig,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -189,7 +190,7 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuid.v4();
this.eventLoopBlockedWarningLimit = eventLoopBlockedWarningLimit;
this.eventLoopDelayConfig = eventLoopDelayConfig;
}

/**
Expand Down Expand Up @@ -296,7 +297,7 @@ export class TaskManagerRunner implements TaskRunner {
taskInstance: this.instance.task,
});

const stopTaskTimer = startTaskTimerWithEventBlocking(this.eventLoopBlockedWarningLimit);
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);

try {
this.task = this.definition.createTaskRunner(modifiedContext);
Expand Down Expand Up @@ -624,7 +625,7 @@ export class TaskManagerRunner implements TaskRunner {

const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopBlockedWarningLimit) {
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_on_delay) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
Expand Down

0 comments on commit eeed244

Please sign in to comment.