Skip to content

Commit

Permalink
Add support to Task Manager for validating and versioning the task st…
Browse files Browse the repository at this point in the history
…ate objects (elastic#159048)

Part of elastic#155764.

In this PR, I'm modifying task manager to allow task types to report a
versioned schema for the `state` object. When defining
`stateSchemaByVersion`, the following will happen:
- The `state` returned from the task runner will get validated against
the latest version and throw an error if ever it is invalid (to capture
mismatches at development and testing time)
- When task manager reads a task, it will migrate the task state to the
latest version (if necessary) and validate against the latest schema,
dropping any unknown fields (in the scenario of a downgrade).

By default, Task Manager will validate the state on write once a
versioned schema is provided, however the following config must be
enabled for errors to be thrown on read:
`xpack.task_manager.allow_reading_invalid_state: true`. We plan to
enable this in serverless by default but be cautious on existing
deployments and wait for telemetry to show no issues.

I've onboarded the `alerts_invalidate_api_keys` task type which can be
used as an example to onboard others. See [this
commit](elastic@214bae3).

### How to configure a task type to version and validate
The structure is defined as:
```
taskManager.registerTaskDefinitions({
  ...
  stateSchemaByVersion: {
    1: {
      // All existing tasks don't have a version so will get `up` migrated to 1
      up: (state: Record<string, unknown>) => ({
        runs: state.runs || 0,
        total_invalidated: state.total_invalidated || 0,
      }),
      schema: schema.object({
        runs: schema.number(),
        total_invalidated: schema.number(),
      }),
    },
  },
  ...
});
```

However, look at [this
commit](elastic@214bae3)
for an example that you can leverage type safety from the schema.

### Follow up issues
- Onboard non-alerting task types to have a versioned state schema
(elastic#159342)
- Onboard alerting task types to have a versioned state schema for the
framework fields (elastic#159343)
- Onboard alerting task types to have a versioned rule and alert state
schema within the task state
(elastic#159344)
- Telemetry on the validation failures
(elastic#159345)
- Remove feature flag so `allow_reading_invalid_state` is always `false`
(elastic#159346)
- Force validation on all tasks using state by removing the exemption
code (elastic#159347)
- Release tasks when encountering a validation failure after run
(elastic#159964)

### To Verify

NOTE: I have the following verification scenarios in a jest integration
test as well =>
https://github.com/elastic/kibana/pull/159048/files#diff-5f06228df58fa74d5a0f2722c30f1f4bee2ee9df7a14e0700b9aa9bc3864a858.

You will need to log the state when the task runs to observe what the
task runner receives in different scenarios.

```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..4aa4c2c7805 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -140,6 +140,7 @@ function taskRunner(
 ) {
   return ({ taskInstance }: RunContext) => {
     const state = taskInstance.state as LatestTaskStateSchema;
+    console.log('*** Running task with the following state:', JSON.stringify(state));
     return {
       async run() {
         let totalInvalidated = 0;
```

#### Scenario 1: Adding an unknown field to the task saved-object gets
dropped
1. Startup a fresh Kibana instance
2. Make the following call to Elasticsearch (I used postman). This call
adds an unknown property (`foo`) to the task state and makes the task
run right away.
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{\"runs\":1,\"total_invalidated\":0,\"foo\":true}"
    }
  }
}
```
3. Observe the task run log message, with state not containing `foo`.

#### Scenario 2: Task running returning an unknown property causes the
task to fail to update
1. Apply the following changes to the code (and ignore TypeScript
issues)
```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..b15d4a4f478 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -183,6 +183,7 @@ function taskRunner(
 
           const updatedState: LatestTaskStateSchema = {
             runs: (state.runs || 0) + 1,
+            foo: true,
             total_invalidated: totalInvalidated,
           };
           return {
```
2. Make the task run right away by calling Elasticsearch with the
following
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z"
    }
  }
}
```
3. Notice the validation errors logged as debug
```
[ERROR][plugins.taskManager] Task alerts_invalidate_api_keys "Alerts-alerts_invalidate_api_keys" failed: Error: [foo]: definition for this key is missing
```

#### Scenario 3: Task state gets migrated
1. Apply the following code change
```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..338f21bed5b 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -41,6 +41,18 @@ const stateSchemaByVersion = {
       total_invalidated: schema.number(),
     }),
   },
+  2: {
+    up: (state: Record<string, unknown>) => ({
+      runs: state.runs,
+      total_invalidated: state.total_invalidated,
+      foo: true,
+    }),
+    schema: schema.object({
+      runs: schema.number(),
+      total_invalidated: schema.number(),
+      foo: schema.boolean(),
+    }),
+  },
 };
 
 const latestSchema = stateSchemaByVersion[1].schema;
```

2. Make the task run right away by calling Elasticsearch with the
following
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z"
    }
  }
}
```
3. Observe the state now contains `foo` property when the task runs.

#### Scenario 4: Reading invalid state causes debug logs
1. Run the following request to Elasticsearch
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{}"
    }
  }
}
```
2. Observe the Kibana debug log mentioning the validation failure while
letting the task through
```
[DEBUG][plugins.taskManager] [alerts_invalidate_api_keys][Alerts-alerts_invalidate_api_keys] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: [runs]: expected value of type [number] but got [undefined]
```

#### Scenario 5: Reading invalid state when setting
`allow_reading_invalid_state: false` causes tasks to fail to run
1. Set `xpack.task_manager.allow_reading_invalid_state: false` in your
kibana.yml settings
2. Run the following request to Elasticsearch
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{}"
    }
  }
}
```
3. Observe the Kibana error log mentioning the validation failure
```
[ERROR][plugins.taskManager] Failed to poll for work: Error: [runs]: expected value of type [number] but got [undefined]
```

NOTE: While corrupting the task directly is rare, we plan to re-queue
the tasks that failed to read, leveraging work from
elastic#159302 in a future PR (hence
why the yml config is enabled by default, allowing invalid reads).

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Ying Mao <[email protected]>
  • Loading branch information
3 people authored Jun 23, 2023
1 parent 498e8a6 commit 40c2afd
Show file tree
Hide file tree
Showing 35 changed files with 1,595 additions and 131 deletions.
46 changes: 35 additions & 11 deletions x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
KibanaRequest,
SavedObjectsClientContract,
} from '@kbn/core/server';
import { schema, TypeOf } from '@kbn/config-schema';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import { InvalidateAPIKeysParams, SecurityPluginStart } from '@kbn/security-plugin/server';
import {
Expand All @@ -29,6 +30,22 @@ import { InvalidatePendingApiKey } from '../types';
const TASK_TYPE = 'alerts_invalidate_api_keys';
export const TASK_ID = `Alerts-${TASK_TYPE}`;

const stateSchemaByVersion = {
1: {
up: (state: Record<string, unknown>) => ({
runs: state.runs || 0,
total_invalidated: state.total_invalidated || 0,
}),
schema: schema.object({
runs: schema.number(),
total_invalidated: schema.number(),
}),
},
};

const latestSchema = stateSchemaByVersion[1].schema;
type LatestTaskStateSchema = TypeOf<typeof latestSchema>;

const invalidateAPIKeys = async (
params: InvalidateAPIKeysParams,
securityPluginStart?: SecurityPluginStart
Expand Down Expand Up @@ -65,17 +82,21 @@ export async function scheduleApiKeyInvalidatorTask(
) {
const interval = config.invalidateApiKeysTask.interval;
try {
const state: LatestTaskStateSchema = {
runs: 0,
total_invalidated: 0,
};
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: {
interval,
},
state: {},
state,
params: {},
});
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`);
}
}

Expand All @@ -88,6 +109,7 @@ function registerApiKeyInvalidatorTaskDefinition(
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Invalidate alert API Keys',
stateSchemaByVersion,
createTaskRunner: taskRunner(logger, coreStartServices, config),
},
});
Expand Down Expand Up @@ -117,7 +139,7 @@ function taskRunner(
config: AlertingConfig
) {
return ({ taskInstance }: RunContext) => {
const { state } = taskInstance;
const state = taskInstance.state as LatestTaskStateSchema;
return {
async run() {
let totalInvalidated = 0;
Expand Down Expand Up @@ -159,22 +181,24 @@ function taskRunner(
hasApiKeysPendingInvalidation = apiKeysToInvalidate.total > PAGE_SIZE;
} while (hasApiKeysPendingInvalidation);

const updatedState: LatestTaskStateSchema = {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},
};
} catch (e) {
logger.warn(`Error executing alerting apiKey invalidation task: ${e.message}`);
const updatedState: LatestTaskStateSchema = {
runs: state.runs + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},
Expand Down
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ export class Plugin {
// can add significant load to the ES cluster, so please use this configuration only when absolutly necesery.
maxConcurrency: 1,

// To ensure the validity of task state during read and write operations, utilize the stateSchemaByVersion configuration. This functionality validates the state before executing a task. Make sure to define the schema property using the @kbn/config-schema plugin, specifically as an ObjectType (schema.object) at the top level.
stateSchemaByVersion: {
1: {
schema: schema.object({
count: schema.number(),
}),
up: (state) => {
return {
count: state.count || 0,
};
},
}
}

// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
createTaskRunner(context) {
Expand Down
21 changes: 21 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { BufferedTaskStore } from './buffered_task_store';

const createBufferedTaskStoreMock = () => {
const mocked: jest.Mocked<PublicMethodsOf<BufferedTaskStore>> = {
update: jest.fn(),
remove: jest.fn(),
};
return mocked;
};

export const bufferedTaskStoreMock = {
create: createBufferedTaskStoreMock,
};
46 changes: 37 additions & 9 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,36 @@ describe('Buffered Task Store', () => {

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
expect(await bufferedStore.update(task, { validate: true })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });

expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: true }
);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledWith(
task,
{ validate: true }
);
});

test(`doesn't validate when specified`, async () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = taskManagerMock.createTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task, { validate: false })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });

expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: false }
);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
Expand Down Expand Up @@ -58,9 +86,9 @@ describe('Buffered Task Store', () => {
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Expand Down Expand Up @@ -105,10 +133,10 @@ describe('Buffered Task Store', () => {
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[3]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
bufferedStore.update(tasks[3], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Expand Down
29 changes: 23 additions & 6 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,31 @@ const DEFAULT_BUFFER_MAX_DURATION = 50;
export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>((docs) => taskStore.bulkUpdate(docs), {
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
});
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>(
// Setting validate: false because we'll validate per update call
//
// Ideally we could accumulate the "validate" options and pass them
// to .bulkUpdate per doc, but the required changes to the bulk_operation_buffer
// to track the values are high and deffered for now.
(docs) => taskStore.bulkUpdate(docs, { validate: false }),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
public async update(
doc: ConcreteTaskInstance,
options: { validate: boolean }
): Promise<ConcreteTaskInstance> {
const docToUpdate = this.taskStore.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const result = await unwrapPromise(this.bufferedUpdate(docToUpdate));
return this.taskStore.taskValidator.getValidatedTaskInstanceFromReading(result, {
validate: options.validate,
});
}

public async remove(id: string): Promise<void> {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -64,6 +65,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -114,6 +116,7 @@ describe('config validation', () => {
};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export const configSchema = schema.object(
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
}),
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
},
{
validate: (config) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ describe('EphemeralTaskLifecycle', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
Expand Down
10 changes: 10 additions & 0 deletions x-pack/plugins/task_manager/server/integration_tests/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export { injectTask } from './inject_task';
export { setupTestServers } from './setup_test_servers';
export { retry } from './retry';
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type ElasticsearchClient } from '@kbn/core/server';
import { type ConcreteTaskInstance } from '../../task';

export async function injectTask(
esClient: ElasticsearchClient,
{ id, ...task }: ConcreteTaskInstance
) {
const soId = `task:${id}`;
await esClient.index({
id: soId,
index: '.kibana_task_manager',
document: {
references: [],
type: 'task',
updated_at: new Date().toISOString(),
task: {
...task,
state: JSON.stringify(task.state),
params: JSON.stringify(task.params),
runAt: task.runAt.toISOString(),
scheduledAt: task.scheduledAt.toISOString(),
},
},
});
}
29 changes: 29 additions & 0 deletions x-pack/plugins/task_manager/server/integration_tests/lib/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

interface RetryOpts {
times: number;
intervalMs: number;
}

export async function retry<T>(
cb: () => Promise<T>,
options: RetryOpts = { times: 60, intervalMs: 500 }
) {
let attempt = 1;
while (true) {
try {
return await cb();
} catch (e) {
if (attempt >= options.times) {
throw e;
}
}
attempt++;
await new Promise((resolve) => setTimeout(resolve, options.intervalMs));
}
}
Loading

0 comments on commit 40c2afd

Please sign in to comment.