Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to Task Manager for validating and versioning the task state objects #159048

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8182767
Initial commit
mikecote Jun 5, 2023
2c9d738
Add note to set config default value back to true
mikecote Jun 5, 2023
cac7f08
Remove reference to x-pack/plugins/task_manager/server/task_pool.test.ts
mikecote Jun 5, 2023
1412eb1
Add some task validator jest tests
mikecote Jun 5, 2023
d40c58a
Debug log validation failures
mikecote Jun 5, 2023
9bad9fb
Merge branch 'main' of github.com:elastic/kibana into task-manager/su…
mikecote Jun 6, 2023
ba040ef
Move getLatestStateSchema out of class
mikecote Jun 6, 2023
c471e16
Fix some jest tests
mikecote Jun 6, 2023
532d9bd
Fix tests
mikecote Jun 6, 2023
100a7e3
Code improvements and comments
mikecote Jun 7, 2023
5dd7c14
Revert some code
mikecote Jun 7, 2023
50c0078
Fix failed jest test
mikecote Jun 7, 2023
b9319da
Support specifying whether to validate or not when calling update and…
mikecote Jun 8, 2023
25961ff
Use cache to save processing time
mikecote Jun 8, 2023
a80dd7f
Add task store tests
mikecote Jun 8, 2023
f86a81e
Fix task store tests
mikecote Jun 8, 2023
3e3447a
Add task validator test
mikecote Jun 8, 2023
214bae3
Onboard alerts_invalidate_api_keys task type
mikecote Jun 8, 2023
b5f1295
Fix the validate calls in the buffered store
mikecote Jun 8, 2023
341a2cb
Add docs
mikecote Jun 8, 2023
a2854fd
Final changes
mikecote Jun 8, 2023
09d924d
Add GH issue link
mikecote Jun 8, 2023
31d33cd
Add jest integration tests
mikecote Jun 13, 2023
ce6c637
Add jest integration test for config changes
mikecote Jun 13, 2023
cee9083
[CI] Auto-commit changed files from 'node scripts/lint_ts_projects --…
kibanamachine Jun 13, 2023
8e12be2
Merge branch 'main' of github.com:elastic/kibana into task-manager/su…
mikecote Jun 19, 2023
ce33ff3
Merge branch 'main' of github.com:elastic/kibana into task-manager/su…
mikecote Jun 19, 2023
4a9942b
Merge branch 'task-manager/support-for-validating-and-versioning-stat…
mikecote Jun 19, 2023
36d8d89
Merge branch 'main' of github.com:elastic/kibana into task-manager/su…
mikecote Jun 22, 2023
8fda328
Update x-pack/plugins/task_manager/server/task_validator.ts
mikecote Jun 22, 2023
e590dee
Merge branch 'task-manager/support-for-validating-and-versioning-stat…
mikecote Jun 22, 2023
0d2e54d
Finalize typo fix
mikecote Jun 22, 2023
6b9125a
Pass options.validate to getValidatedTaskInstance to avoid having ter…
mikecote Jun 22, 2023
9187c46
Fix failing jest tests
mikecote Jun 22, 2023
4ee1471
Split the functions in taskvalidator
mikecote Jun 23, 2023
a31f66b
Merge branch 'main' of github.com:elastic/kibana into task-manager/su…
mikecote Jun 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
KibanaRequest,
SavedObjectsClientContract,
} from '@kbn/core/server';
import { schema, TypeOf } from '@kbn/config-schema';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import { InvalidateAPIKeysParams, SecurityPluginStart } from '@kbn/security-plugin/server';
import {
Expand All @@ -29,6 +30,22 @@ import { InvalidatePendingApiKey } from '../types';
const TASK_TYPE = 'alerts_invalidate_api_keys';
export const TASK_ID = `Alerts-${TASK_TYPE}`;

const stateSchemaByVersion = {
1: {
up: (state: Record<string, unknown>) => ({
runs: state.runs || 0,
total_invalidated: state.total_invalidated || 0,
}),
schema: schema.object({
runs: schema.number(),
total_invalidated: schema.number(),
}),
},
};

const latestSchema = stateSchemaByVersion[1].schema;
mikecote marked this conversation as resolved.
Show resolved Hide resolved
type LatestTaskStateSchema = TypeOf<typeof latestSchema>;

const invalidateAPIKeys = async (
params: InvalidateAPIKeysParams,
securityPluginStart?: SecurityPluginStart
Expand Down Expand Up @@ -65,17 +82,21 @@ export async function scheduleApiKeyInvalidatorTask(
) {
const interval = config.invalidateApiKeysTask.interval;
try {
const state: LatestTaskStateSchema = {
runs: 0,
total_invalidated: 0,
};
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: {
interval,
},
state: {},
state,
params: {},
});
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`);
}
}

Expand All @@ -88,6 +109,7 @@ function registerApiKeyInvalidatorTaskDefinition(
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Invalidate alert API Keys',
stateSchemaByVersion,
createTaskRunner: taskRunner(logger, coreStartServices, config),
},
});
Expand Down Expand Up @@ -117,7 +139,7 @@ function taskRunner(
config: AlertingConfig
) {
return ({ taskInstance }: RunContext) => {
const { state } = taskInstance;
const state = taskInstance.state as LatestTaskStateSchema;
return {
async run() {
let totalInvalidated = 0;
Expand Down Expand Up @@ -159,22 +181,24 @@ function taskRunner(
hasApiKeysPendingInvalidation = apiKeysToInvalidate.total > PAGE_SIZE;
} while (hasApiKeysPendingInvalidation);

const updatedState: LatestTaskStateSchema = {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},
};
} catch (e) {
logger.warn(`Error executing alerting apiKey invalidation task: ${e.message}`);
const updatedState: LatestTaskStateSchema = {
runs: state.runs + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},
Expand Down
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ export class Plugin {
// can add significant load to the ES cluster, so please use this configuration only when absolutly necesery.
maxConcurrency: 1,

// To ensure the validity of task state during read and write operations, utilize the stateSchemaByVersion configuration. This functionality validates the state before executing a task. Make sure to define the schema property using the @kbn/config-schema plugin, specifically as an ObjectType (schema.object) at the top level.
stateSchemaByVersion: {
1: {
schema: schema.object({
count: schema.number(),
}),
up: (state) => {
return {
count: state.count || 0,
};
},
}
}

// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
createTaskRunner(context) {
Expand Down
21 changes: 21 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { BufferedTaskStore } from './buffered_task_store';

const createBufferedTaskStoreMock = () => {
const mocked: jest.Mocked<PublicMethodsOf<BufferedTaskStore>> = {
update: jest.fn(),
remove: jest.fn(),
};
return mocked;
};

export const bufferedTaskStoreMock = {
create: createBufferedTaskStoreMock,
};
46 changes: 37 additions & 9 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,36 @@ describe('Buffered Task Store', () => {

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
expect(await bufferedStore.update(task, { validate: true })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });

expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: true }
);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledWith(
task,
{ validate: true }
);
});

test(`doesn't validate when specified`, async () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = taskManagerMock.createTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task, { validate: false })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });

expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: false }
);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
Expand Down Expand Up @@ -58,9 +86,9 @@ describe('Buffered Task Store', () => {
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Expand Down Expand Up @@ -105,10 +133,10 @@ describe('Buffered Task Store', () => {
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[3]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
bufferedStore.update(tasks[3], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Expand Down
29 changes: 23 additions & 6 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,31 @@ const DEFAULT_BUFFER_MAX_DURATION = 50;
export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>((docs) => taskStore.bulkUpdate(docs), {
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
});
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>(
// Setting validate: false because we'll validate per update call
//
// Ideally we could accumulate the "validate" options and pass them
// to .bulkUpdate per doc, but the required changes to the bulk_operation_buffer
// to track the values are high and deffered for now.
(docs) => taskStore.bulkUpdate(docs, { validate: false }),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
public async update(
doc: ConcreteTaskInstance,
options: { validate: boolean }
): Promise<ConcreteTaskInstance> {
const docToUpdate = this.taskStore.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const result = await unwrapPromise(this.bufferedUpdate(docToUpdate));
return this.taskStore.taskValidator.getValidatedTaskInstanceFromReading(result, {
validate: options.validate,
});
}

public async remove(id: string): Promise<void> {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -64,6 +65,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -114,6 +116,7 @@ describe('config validation', () => {
};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export const configSchema = schema.object(
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
}),
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
},
{
validate: (config) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ describe('EphemeralTaskLifecycle', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export { injectTask } from './inject_task';
export { setupTestServers } from './setup_test_servers';
export { retry } from './retry';
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type ElasticsearchClient } from '@kbn/core/server';
import { type ConcreteTaskInstance } from '../../task';

export async function injectTask(
esClient: ElasticsearchClient,
{ id, ...task }: ConcreteTaskInstance
) {
const soId = `task:${id}`;
await esClient.index({
id: soId,
index: '.kibana_task_manager',
document: {
references: [],
type: 'task',
updated_at: new Date().toISOString(),
task: {
...task,
state: JSON.stringify(task.state),
params: JSON.stringify(task.params),
runAt: task.runAt.toISOString(),
scheduledAt: task.scheduledAt.toISOString(),
},
},
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

interface RetryOpts {
times: number;
intervalMs: number;
}

export async function retry<T>(
cb: () => Promise<T>,
options: RetryOpts = { times: 60, intervalMs: 500 }
) {
let attempt = 1;
while (true) {
try {
return await cb();
} catch (e) {
if (attempt >= options.times) {
throw e;
}
}
attempt++;
await new Promise((resolve) => setTimeout(resolve, options.intervalMs));
}
}
Loading