From 40c2afdc5894dc163c8e196d3011293875b36f6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Fri, 23 Jun 2023 10:41:55 -0400 Subject: [PATCH] Add support to Task Manager for validating and versioning the task state objects (#159048) Part of https://github.com/elastic/kibana/issues/155764. In this PR, I'm modifying task manager to allow task types to report a versioned schema for the `state` object. When defining `stateSchemaByVersion`, the following will happen: - The `state` returned from the task runner will get validated against the latest version and throw an error if ever it is invalid (to capture mismatches at development and testing time) - When task manager reads a task, it will migrate the task state to the latest version (if necessary) and validate against the latest schema, dropping any unknown fields (in the scenario of a downgrade). By default, Task Manager will validate the state on write once a versioned schema is provided, however the following config must be enabled for errors to be thrown on read: `xpack.task_manager.allow_reading_invalid_state: true`. We plan to enable this in serverless by default but be cautious on existing deployments and wait for telemetry to show no issues. I've onboarded the `alerts_invalidate_api_keys` task type which can be used as an example to onboard others. See [this commit](https://github.com/elastic/kibana/pull/159048/commits/214bae38d89409d4f5527887f46ce9c4988146d1). ### How to configure a task type to version and validate The structure is defined as: ``` taskManager.registerTaskDefinitions({ ... stateSchemaByVersion: { 1: { // All existing tasks don't have a version so will get `up` migrated to 1 up: (state: Record) => ({ runs: state.runs || 0, total_invalidated: state.total_invalidated || 0, }), schema: schema.object({ runs: schema.number(), total_invalidated: schema.number(), }), }, }, ... }); ``` However, look at [this commit](https://github.com/elastic/kibana/pull/159048/commits/214bae38d89409d4f5527887f46ce9c4988146d1) for an example that you can leverage type safety from the schema. ### Follow up issues - Onboard non-alerting task types to have a versioned state schema (https://github.com/elastic/kibana/issues/159342) - Onboard alerting task types to have a versioned state schema for the framework fields (https://github.com/elastic/kibana/issues/159343) - Onboard alerting task types to have a versioned rule and alert state schema within the task state (https://github.com/elastic/kibana/issues/159344) - Telemetry on the validation failures (https://github.com/elastic/kibana/issues/159345) - Remove feature flag so `allow_reading_invalid_state` is always `false` (https://github.com/elastic/kibana/issues/159346) - Force validation on all tasks using state by removing the exemption code (https://github.com/elastic/kibana/issues/159347) - Release tasks when encountering a validation failure after run (https://github.com/elastic/kibana/issues/159964) ### To Verify NOTE: I have the following verification scenarios in a jest integration test as well => https://github.com/elastic/kibana/pull/159048/files#diff-5f06228df58fa74d5a0f2722c30f1f4bee2ee9df7a14e0700b9aa9bc3864a858. You will need to log the state when the task runs to observe what the task runner receives in different scenarios. ``` diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts index 1e624bcd807..4aa4c2c7805 100644 --- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts +++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts @@ -140,6 +140,7 @@ function taskRunner( ) { return ({ taskInstance }: RunContext) => { const state = taskInstance.state as LatestTaskStateSchema; + console.log('*** Running task with the following state:', JSON.stringify(state)); return { async run() { let totalInvalidated = 0; ``` #### Scenario 1: Adding an unknown field to the task saved-object gets dropped 1. Startup a fresh Kibana instance 2. Make the following call to Elasticsearch (I used postman). This call adds an unknown property (`foo`) to the task state and makes the task run right away. ``` POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys { "doc": { "task": { "runAt": "2023-06-08T00:00:00.000Z", "state": "{\"runs\":1,\"total_invalidated\":0,\"foo\":true}" } } } ``` 3. Observe the task run log message, with state not containing `foo`. #### Scenario 2: Task running returning an unknown property causes the task to fail to update 1. Apply the following changes to the code (and ignore TypeScript issues) ``` diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts index 1e624bcd807..b15d4a4f478 100644 --- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts +++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts @@ -183,6 +183,7 @@ function taskRunner( const updatedState: LatestTaskStateSchema = { runs: (state.runs || 0) + 1, + foo: true, total_invalidated: totalInvalidated, }; return { ``` 2. Make the task run right away by calling Elasticsearch with the following ``` POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys { "doc": { "task": { "runAt": "2023-06-08T00:00:00.000Z" } } } ``` 3. Notice the validation errors logged as debug ``` [ERROR][plugins.taskManager] Task alerts_invalidate_api_keys "Alerts-alerts_invalidate_api_keys" failed: Error: [foo]: definition for this key is missing ``` #### Scenario 3: Task state gets migrated 1. Apply the following code change ``` diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts index 1e624bcd807..338f21bed5b 100644 --- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts +++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts @@ -41,6 +41,18 @@ const stateSchemaByVersion = { total_invalidated: schema.number(), }), }, + 2: { + up: (state: Record) => ({ + runs: state.runs, + total_invalidated: state.total_invalidated, + foo: true, + }), + schema: schema.object({ + runs: schema.number(), + total_invalidated: schema.number(), + foo: schema.boolean(), + }), + }, }; const latestSchema = stateSchemaByVersion[1].schema; ``` 2. Make the task run right away by calling Elasticsearch with the following ``` POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys { "doc": { "task": { "runAt": "2023-06-08T00:00:00.000Z" } } } ``` 3. Observe the state now contains `foo` property when the task runs. #### Scenario 4: Reading invalid state causes debug logs 1. Run the following request to Elasticsearch ``` POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys { "doc": { "task": { "runAt": "2023-06-08T00:00:00.000Z", "state": "{}" } } } ``` 2. Observe the Kibana debug log mentioning the validation failure while letting the task through ``` [DEBUG][plugins.taskManager] [alerts_invalidate_api_keys][Alerts-alerts_invalidate_api_keys] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: [runs]: expected value of type [number] but got [undefined] ``` #### Scenario 5: Reading invalid state when setting `allow_reading_invalid_state: false` causes tasks to fail to run 1. Set `xpack.task_manager.allow_reading_invalid_state: false` in your kibana.yml settings 2. Run the following request to Elasticsearch ``` POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys { "doc": { "task": { "runAt": "2023-06-08T00:00:00.000Z", "state": "{}" } } } ``` 3. Observe the Kibana error log mentioning the validation failure ``` [ERROR][plugins.taskManager] Failed to poll for work: Error: [runs]: expected value of type [number] but got [undefined] ``` NOTE: While corrupting the task directly is rare, we plan to re-queue the tasks that failed to read, leveraging work from https://github.com/elastic/kibana/issues/159302 in a future PR (hence why the yml config is enabled by default, allowing invalid reads). --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Ying Mao --- .../invalidate_pending_api_keys/task.ts | 46 +- x-pack/plugins/task_manager/README.md | 14 + .../server/buffered_task_store.mock.ts | 21 + .../server/buffered_task_store.test.ts | 46 +- .../server/buffered_task_store.ts | 29 +- .../task_manager/server/config.test.ts | 3 + x-pack/plugins/task_manager/server/config.ts | 1 + .../server/ephemeral_task_lifecycle.test.ts | 1 + .../server/integration_tests/lib/index.ts | 10 + .../integration_tests/lib/inject_task.ts | 32 ++ .../server/integration_tests/lib/retry.ts | 29 ++ .../lib/setup_test_servers.ts | 56 +++ .../managed_configuration.test.ts | 1 + .../task_state_validation.test.ts | 326 ++++++++++++++ .../server/lib/bulk_operation_buffer.test.ts | 6 +- .../server/lib/bulk_operation_buffer.ts | 2 +- .../server/lib/retryable_bulk_update.test.ts | 19 +- .../server/lib/retryable_bulk_update.ts | 4 +- .../configuration_statistics.test.ts | 1 + .../monitoring_stats_stream.test.ts | 1 + .../task_manager/server/plugin.test.ts | 1 + x-pack/plugins/task_manager/server/plugin.ts | 2 + .../server/polling_lifecycle.test.ts | 1 + x-pack/plugins/task_manager/server/task.ts | 19 +- .../server/task_running/task_runner.test.ts | 41 +- .../server/task_running/task_runner.ts | 69 +-- .../server/task_scheduling.test.ts | 6 +- .../task_manager/server/task_scheduling.ts | 18 +- .../task_manager/server/task_store.mock.ts | 4 + .../task_manager/server/task_store.test.ts | 206 ++++++++- .../plugins/task_manager/server/task_store.ts | 98 +++-- .../server/task_type_dictionary.ts | 8 + .../server/task_validator.test.ts | 397 ++++++++++++++++++ .../task_manager/server/task_validator.ts | 205 +++++++++ x-pack/plugins/task_manager/tsconfig.json | 3 +- 35 files changed, 1595 insertions(+), 131 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.mock.ts create mode 100644 x-pack/plugins/task_manager/server/integration_tests/lib/index.ts create mode 100644 x-pack/plugins/task_manager/server/integration_tests/lib/inject_task.ts create mode 100644 x-pack/plugins/task_manager/server/integration_tests/lib/retry.ts create mode 100644 x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts create mode 100644 x-pack/plugins/task_manager/server/integration_tests/task_state_validation.test.ts create mode 100644 x-pack/plugins/task_manager/server/task_validator.test.ts create mode 100644 x-pack/plugins/task_manager/server/task_validator.ts diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts index f1e97c9fd93d02..1e624bcd807cf7 100644 --- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts +++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts @@ -12,6 +12,7 @@ import { KibanaRequest, SavedObjectsClientContract, } from '@kbn/core/server'; +import { schema, TypeOf } from '@kbn/config-schema'; import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; import { InvalidateAPIKeysParams, SecurityPluginStart } from '@kbn/security-plugin/server'; import { @@ -29,6 +30,22 @@ import { InvalidatePendingApiKey } from '../types'; const TASK_TYPE = 'alerts_invalidate_api_keys'; export const TASK_ID = `Alerts-${TASK_TYPE}`; +const stateSchemaByVersion = { + 1: { + up: (state: Record) => ({ + runs: state.runs || 0, + total_invalidated: state.total_invalidated || 0, + }), + schema: schema.object({ + runs: schema.number(), + total_invalidated: schema.number(), + }), + }, +}; + +const latestSchema = stateSchemaByVersion[1].schema; +type LatestTaskStateSchema = TypeOf; + const invalidateAPIKeys = async ( params: InvalidateAPIKeysParams, securityPluginStart?: SecurityPluginStart @@ -65,17 +82,21 @@ export async function scheduleApiKeyInvalidatorTask( ) { const interval = config.invalidateApiKeysTask.interval; try { + const state: LatestTaskStateSchema = { + runs: 0, + total_invalidated: 0, + }; await taskManager.ensureScheduled({ id: TASK_ID, taskType: TASK_TYPE, schedule: { interval, }, - state: {}, + state, params: {}, }); } catch (e) { - logger.debug(`Error scheduling task, received ${e.message}`); + logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`); } } @@ -88,6 +109,7 @@ function registerApiKeyInvalidatorTaskDefinition( taskManager.registerTaskDefinitions({ [TASK_TYPE]: { title: 'Invalidate alert API Keys', + stateSchemaByVersion, createTaskRunner: taskRunner(logger, coreStartServices, config), }, }); @@ -117,7 +139,7 @@ function taskRunner( config: AlertingConfig ) { return ({ taskInstance }: RunContext) => { - const { state } = taskInstance; + const state = taskInstance.state as LatestTaskStateSchema; return { async run() { let totalInvalidated = 0; @@ -159,22 +181,24 @@ function taskRunner( hasApiKeysPendingInvalidation = apiKeysToInvalidate.total > PAGE_SIZE; } while (hasApiKeysPendingInvalidation); + const updatedState: LatestTaskStateSchema = { + runs: (state.runs || 0) + 1, + total_invalidated: totalInvalidated, + }; return { - state: { - runs: (state.runs || 0) + 1, - total_invalidated: totalInvalidated, - }, + state: updatedState, schedule: { interval: config.invalidateApiKeysTask.interval, }, }; } catch (e) { logger.warn(`Error executing alerting apiKey invalidation task: ${e.message}`); + const updatedState: LatestTaskStateSchema = { + runs: state.runs + 1, + total_invalidated: totalInvalidated, + }; return { - state: { - runs: (state.runs || 0) + 1, - total_invalidated: totalInvalidated, - }, + state: updatedState, schedule: { interval: config.invalidateApiKeysTask.interval, }, diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index dd6565ae16d524..c7ca9e5eeea082 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -92,6 +92,20 @@ export class Plugin { // can add significant load to the ES cluster, so please use this configuration only when absolutly necesery. maxConcurrency: 1, + // To ensure the validity of task state during read and write operations, utilize the stateSchemaByVersion configuration. This functionality validates the state before executing a task. Make sure to define the schema property using the @kbn/config-schema plugin, specifically as an ObjectType (schema.object) at the top level. + stateSchemaByVersion: { + 1: { + schema: schema.object({ + count: schema.number(), + }), + up: (state) => { + return { + count: state.count || 0, + }; + }, + } + } + // The createTaskRunner function / method returns an object that is responsible for // performing the work of the task. context: { taskInstance }, is documented below. createTaskRunner(context) { diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.mock.ts b/x-pack/plugins/task_manager/server/buffered_task_store.mock.ts new file mode 100644 index 00000000000000..efdea5a49bc384 --- /dev/null +++ b/x-pack/plugins/task_manager/server/buffered_task_store.mock.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { PublicMethodsOf } from '@kbn/utility-types'; +import { BufferedTaskStore } from './buffered_task_store'; + +const createBufferedTaskStoreMock = () => { + const mocked: jest.Mocked> = { + update: jest.fn(), + remove: jest.fn(), + }; + return mocked; +}; + +export const bufferedTaskStoreMock = { + create: createBufferedTaskStoreMock, +}; diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts index 935192f7cc48e4..08d84c8236cf12 100644 --- a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts +++ b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts @@ -29,8 +29,36 @@ describe('Buffered Task Store', () => { taskStore.bulkUpdate.mockResolvedValue([asOk(task)]); - expect(await bufferedStore.update(task)).toMatchObject(task); - expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]); + expect(await bufferedStore.update(task, { validate: true })).toMatchObject(task); + expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false }); + + expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1); + expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1); + expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith( + task, + { validate: true } + ); + expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledWith( + task, + { validate: true } + ); + }); + + test(`doesn't validate when specified`, async () => { + const taskStore = taskStoreMock.create(); + const bufferedStore = new BufferedTaskStore(taskStore, {}); + + const task = taskManagerMock.createTask(); + + taskStore.bulkUpdate.mockResolvedValue([asOk(task)]); + + expect(await bufferedStore.update(task, { validate: false })).toMatchObject(task); + expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false }); + + expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith( + task, + { validate: false } + ); }); test('handles partially successfull bulkUpdates resolving each call appropriately', async () => { @@ -58,9 +86,9 @@ describe('Buffered Task Store', () => { ]); const results = [ - bufferedStore.update(tasks[0]), - bufferedStore.update(tasks[1]), - bufferedStore.update(tasks[2]), + bufferedStore.update(tasks[0], { validate: true }), + bufferedStore.update(tasks[1], { validate: true }), + bufferedStore.update(tasks[2], { validate: true }), ]; expect(await results[0]).toMatchObject(tasks[0]); expect(results[1]).rejects.toMatchInlineSnapshot(` @@ -105,10 +133,10 @@ describe('Buffered Task Store', () => { ]); const results = [ - bufferedStore.update(tasks[0]), - bufferedStore.update(tasks[1]), - bufferedStore.update(tasks[2]), - bufferedStore.update(tasks[3]), + bufferedStore.update(tasks[0], { validate: true }), + bufferedStore.update(tasks[1], { validate: true }), + bufferedStore.update(tasks[2], { validate: true }), + bufferedStore.update(tasks[3], { validate: true }), ]; expect(await results[0]).toMatchObject(tasks[0]); expect(results[1]).rejects.toMatchInlineSnapshot(` diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts index 0e24e8e49e43f2..4135164c1e442e 100644 --- a/x-pack/plugins/task_manager/server/buffered_task_store.ts +++ b/x-pack/plugins/task_manager/server/buffered_task_store.ts @@ -17,14 +17,31 @@ const DEFAULT_BUFFER_MAX_DURATION = 50; export class BufferedTaskStore implements Updatable { private bufferedUpdate: Operation; constructor(private readonly taskStore: TaskStore, options: BufferOptions) { - this.bufferedUpdate = createBuffer((docs) => taskStore.bulkUpdate(docs), { - bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION, - ...options, - }); + this.bufferedUpdate = createBuffer( + // Setting validate: false because we'll validate per update call + // + // Ideally we could accumulate the "validate" options and pass them + // to .bulkUpdate per doc, but the required changes to the bulk_operation_buffer + // to track the values are high and deffered for now. + (docs) => taskStore.bulkUpdate(docs, { validate: false }), + { + bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION, + ...options, + } + ); } - public async update(doc: ConcreteTaskInstance): Promise { - return unwrapPromise(this.bufferedUpdate(doc)); + public async update( + doc: ConcreteTaskInstance, + options: { validate: boolean } + ): Promise { + const docToUpdate = this.taskStore.taskValidator.getValidatedTaskInstanceForUpdating(doc, { + validate: options.validate, + }); + const result = await unwrapPromise(this.bufferedUpdate(docToUpdate)); + return this.taskStore.taskValidator.getValidatedTaskInstanceFromReading(result, { + validate: options.validate, + }); } public async remove(id: string): Promise { diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index d2679efc5042a5..4eca3e971de542 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -12,6 +12,7 @@ describe('config validation', () => { const config: Record = {}; expect(configSchema.validate(config)).toMatchInlineSnapshot(` Object { + "allow_reading_invalid_state": true, "ephemeral_tasks": Object { "enabled": false, "request_capacity": 10, @@ -64,6 +65,7 @@ describe('config validation', () => { const config: Record = {}; expect(configSchema.validate(config)).toMatchInlineSnapshot(` Object { + "allow_reading_invalid_state": true, "ephemeral_tasks": Object { "enabled": false, "request_capacity": 10, @@ -114,6 +116,7 @@ describe('config validation', () => { }; expect(configSchema.validate(config)).toMatchInlineSnapshot(` Object { + "allow_reading_invalid_state": true, "ephemeral_tasks": Object { "enabled": false, "request_capacity": 10, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index dffc9677c7c776..2025b44d5b50d7 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -137,6 +137,7 @@ export const configSchema = schema.object( exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), authenticate_background_task_utilization: schema.boolean({ defaultValue: true }), }), + allow_reading_invalid_state: schema.boolean({ defaultValue: true }), }, { validate: (config) => { diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 586fa9db5d4469..a12ce476ec2f73 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -50,6 +50,7 @@ describe('EphemeralTaskLifecycle', () => { poll_interval: 6000000, version_conflict_threshold: 80, request_capacity: 1000, + allow_reading_invalid_state: false, monitored_aggregated_stats_refresh_rate: 5000, monitored_stats_required_freshness: 5000, monitored_stats_running_average_window: 50, diff --git a/x-pack/plugins/task_manager/server/integration_tests/lib/index.ts b/x-pack/plugins/task_manager/server/integration_tests/lib/index.ts new file mode 100644 index 00000000000000..ab7f23c98e06ff --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/lib/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { injectTask } from './inject_task'; +export { setupTestServers } from './setup_test_servers'; +export { retry } from './retry'; diff --git a/x-pack/plugins/task_manager/server/integration_tests/lib/inject_task.ts b/x-pack/plugins/task_manager/server/integration_tests/lib/inject_task.ts new file mode 100644 index 00000000000000..2624c234dd526f --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/lib/inject_task.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { type ElasticsearchClient } from '@kbn/core/server'; +import { type ConcreteTaskInstance } from '../../task'; + +export async function injectTask( + esClient: ElasticsearchClient, + { id, ...task }: ConcreteTaskInstance +) { + const soId = `task:${id}`; + await esClient.index({ + id: soId, + index: '.kibana_task_manager', + document: { + references: [], + type: 'task', + updated_at: new Date().toISOString(), + task: { + ...task, + state: JSON.stringify(task.state), + params: JSON.stringify(task.params), + runAt: task.runAt.toISOString(), + scheduledAt: task.scheduledAt.toISOString(), + }, + }, + }); +} diff --git a/x-pack/plugins/task_manager/server/integration_tests/lib/retry.ts b/x-pack/plugins/task_manager/server/integration_tests/lib/retry.ts new file mode 100644 index 00000000000000..17aa83c6479bf2 --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/lib/retry.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +interface RetryOpts { + times: number; + intervalMs: number; +} + +export async function retry( + cb: () => Promise, + options: RetryOpts = { times: 60, intervalMs: 500 } +) { + let attempt = 1; + while (true) { + try { + return await cb(); + } catch (e) { + if (attempt >= options.times) { + throw e; + } + } + attempt++; + await new Promise((resolve) => setTimeout(resolve, options.intervalMs)); + } +} diff --git a/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts b/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts new file mode 100644 index 00000000000000..7ded9629eb31ee --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/lib/setup_test_servers.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import deepmerge from 'deepmerge'; +import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server'; + +export async function setupTestServers(settings = {}) { + const { startES } = createTestServers({ + adjustTimeout: (t) => jest.setTimeout(t), + settings: { + es: { + license: 'trial', + }, + }, + }); + + const esServer = await startES(); + + const root = createRootWithCorePlugins( + deepmerge( + { + logging: { + root: { + level: 'warn', + }, + loggers: [ + { + name: 'plugins.taskManager', + level: 'all', + }, + ], + }, + }, + settings + ), + { oss: false } + ); + + await root.preboot(); + const coreSetup = await root.setup(); + const coreStart = await root.start(); + + return { + esServer, + kibanaServer: { + root, + coreSetup, + coreStart, + stop: async () => await root.shutdown(), + }, + }; +} diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index 4b024ac1e7e1be..5ba49664895cd9 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -43,6 +43,7 @@ describe('managed configuration', () => { max_workers: 10, max_attempts: 9, poll_interval: 3000, + allow_reading_invalid_state: false, version_conflict_threshold: 80, monitored_aggregated_stats_refresh_rate: 60000, monitored_stats_health_verbose_log: { diff --git a/x-pack/plugins/task_manager/server/integration_tests/task_state_validation.test.ts b/x-pack/plugins/task_manager/server/integration_tests/task_state_validation.test.ts new file mode 100644 index 00000000000000..ac72a726850ea7 --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/task_state_validation.test.ts @@ -0,0 +1,326 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + type TestElasticsearchUtils, + type TestKibanaUtils, +} from '@kbn/core-test-helpers-kbn-server'; +import { schema } from '@kbn/config-schema'; +import { TaskStatus } from '../task'; +import { type TaskPollingLifecycleOpts } from '../polling_lifecycle'; +import { type TaskClaimingOpts } from '../queries/task_claiming'; +import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin'; +import { injectTask, setupTestServers, retry } from './lib'; + +const { TaskPollingLifecycle: TaskPollingLifecycleMock } = jest.requireMock('../polling_lifecycle'); +jest.mock('../polling_lifecycle', () => { + const actual = jest.requireActual('../polling_lifecycle'); + return { + ...actual, + TaskPollingLifecycle: jest.fn().mockImplementation((opts) => { + return new actual.TaskPollingLifecycle(opts); + }), + }; +}); + +const mockTaskTypeRunFn = jest.fn(); +const mockCreateTaskRunner = jest.fn(); +const mockTaskType = { + title: '', + description: '', + stateSchemaByVersion: { + 1: { + up: (state: Record) => ({ foo: state.foo || '' }), + schema: schema.object({ + foo: schema.string(), + }), + }, + 2: { + up: (state: Record) => ({ ...state, bar: state.bar || '' }), + schema: schema.object({ + foo: schema.string(), + bar: schema.string(), + }), + }, + 3: { + up: (state: Record) => ({ ...state, baz: state.baz || '' }), + schema: schema.object({ + foo: schema.string(), + bar: schema.string(), + baz: schema.string(), + }), + }, + }, + createTaskRunner: mockCreateTaskRunner.mockImplementation(() => ({ + run: mockTaskTypeRunFn, + })), +}; +jest.mock('../queries/task_claiming', () => { + const actual = jest.requireActual('../queries/task_claiming'); + return { + ...actual, + TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => { + // We need to register here because once the class is instantiated, adding + // definitions won't get claimed because of "partitionIntoClaimingBatches". + opts.definitions.registerTaskDefinitions({ + fooType: mockTaskType, + }); + return new actual.TaskClaiming(opts); + }), + }; +}); + +const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start'); + +describe('task state validation', () => { + describe('allow_reading_invalid_state: true', () => { + let esServer: TestElasticsearchUtils; + let kibanaServer: TestKibanaUtils; + let taskManagerPlugin: TaskManagerStartContract; + let pollingLifecycleOpts: TaskPollingLifecycleOpts; + + beforeAll(async () => { + const setupResult = await setupTestServers(); + esServer = setupResult.esServer; + kibanaServer = setupResult.kibanaServer; + + expect(taskManagerStartSpy).toHaveBeenCalledTimes(1); + taskManagerPlugin = taskManagerStartSpy.mock.results[0].value; + + expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1); + pollingLifecycleOpts = TaskPollingLifecycleMock.mock.calls[0][0]; + }); + + afterAll(async () => { + if (kibanaServer) { + await kibanaServer.stop(); + } + if (esServer) { + await esServer.stop(); + } + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + afterEach(async () => { + await taskManagerPlugin.removeIfExists('foo'); + }); + + it('should drop unknown fields from the task state', async () => { + const taskRunnerPromise = new Promise((resolve) => { + mockTaskTypeRunFn.mockImplementation(() => { + setTimeout(resolve, 0); + return { state: {} }; + }); + }); + + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: 'foo', + taskType: 'fooType', + params: { foo: true }, + state: { foo: 'test', bar: 'test', baz: 'test', invalidProperty: 'invalid' }, + stateVersion: 4, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + + await taskRunnerPromise; + + expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1); + const call = mockCreateTaskRunner.mock.calls[0][0]; + expect(call.taskInstance.state).toEqual({ + foo: 'test', + bar: 'test', + baz: 'test', + }); + }); + + it('should fail to update the task if the task runner returns an unknown property in the state', async () => { + const errorLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'error'); + const taskRunnerPromise = new Promise((resolve) => { + mockTaskTypeRunFn.mockImplementation(() => { + setTimeout(resolve, 0); + return { state: { invalidField: true, foo: 'test', bar: 'test', baz: 'test' } }; + }); + }); + + await taskManagerPlugin.schedule({ + id: 'foo', + taskType: 'fooType', + params: {}, + state: { foo: 'test', bar: 'test', baz: 'test' }, + schedule: { interval: '1d' }, + }); + + await taskRunnerPromise; + + expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1); + const call = mockCreateTaskRunner.mock.calls[0][0]; + expect(call.taskInstance.state).toEqual({ + foo: 'test', + bar: 'test', + baz: 'test', + }); + expect(errorLogSpy).toHaveBeenCalledWith( + 'Task fooType "foo" failed: Error: [invalidField]: definition for this key is missing', + expect.anything() + ); + }); + + it('should migrate the task state', async () => { + const taskRunnerPromise = new Promise((resolve) => { + mockTaskTypeRunFn.mockImplementation(() => { + setTimeout(resolve, 0); + return { state: {} }; + }); + }); + + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: 'foo', + taskType: 'fooType', + params: { foo: true }, + state: {}, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + + await taskRunnerPromise; + + expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1); + const call = mockCreateTaskRunner.mock.calls[0][0]; + expect(call.taskInstance.state).toEqual({ + foo: '', + bar: '', + baz: '', + }); + }); + + it('should debug log by default when reading an invalid task state', async () => { + const debugLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'debug'); + const taskRunnerPromise = new Promise((resolve) => { + mockTaskTypeRunFn.mockImplementation(() => { + setTimeout(resolve, 0); + return { state: {} }; + }); + }); + + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: 'foo', + taskType: 'fooType', + params: { foo: true }, + state: { foo: true, bar: 'test', baz: 'test' }, + stateVersion: 4, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + + await taskRunnerPromise; + + expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1); + const call = mockCreateTaskRunner.mock.calls[0][0]; + expect(call.taskInstance.state).toEqual({ + foo: true, + bar: 'test', + baz: 'test', + }); + + expect(debugLogSpy).toHaveBeenCalledWith( + `[fooType][foo] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: [foo]: expected value of type [string] but got [boolean]` + ); + }); + }); + + describe('allow_reading_invalid_state: false', () => { + let esServer: TestElasticsearchUtils; + let kibanaServer: TestKibanaUtils; + let taskManagerPlugin: TaskManagerStartContract; + let pollingLifecycleOpts: TaskPollingLifecycleOpts; + + beforeAll(async () => { + const setupResult = await setupTestServers({ + xpack: { + task_manager: { + allow_reading_invalid_state: false, + }, + }, + }); + esServer = setupResult.esServer; + kibanaServer = setupResult.kibanaServer; + + expect(taskManagerStartSpy).toHaveBeenCalledTimes(1); + taskManagerPlugin = taskManagerStartSpy.mock.results[0].value; + + expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1); + pollingLifecycleOpts = TaskPollingLifecycleMock.mock.calls[0][0]; + }); + + afterAll(async () => { + if (kibanaServer) { + await kibanaServer.stop(); + } + if (esServer) { + await esServer.stop(); + } + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + afterEach(async () => { + await taskManagerPlugin.removeIfExists('foo'); + }); + + it('should fail the task run when setting allow_reading_invalid_state:false and reading an invalid state', async () => { + const errorLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'error'); + + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: 'foo', + taskType: 'fooType', + params: { foo: true }, + state: { foo: true, bar: 'test', baz: 'test' }, + stateVersion: 4, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + + await retry(async () => { + expect(errorLogSpy).toHaveBeenCalledWith( + `Failed to poll for work: Error: [foo]: expected value of type [string] but got [boolean]` + ); + }); + + expect(mockCreateTaskRunner).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts index 1832e1235aaf63..99bb7c5b84274d 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts @@ -44,7 +44,7 @@ describe('Bulk Operation Buffer', () => { return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]); }); - const bufferedUpdate = createBuffer(bulkUpdate); + const bufferedUpdate = createBuffer(bulkUpdate, {}); const task1 = createTask(); const task2 = createTask(); @@ -173,7 +173,7 @@ describe('Bulk Operation Buffer', () => { } ); - const bufferedUpdate = createBuffer(bulkUpdate); + const bufferedUpdate = createBuffer(bulkUpdate, {}); const task1 = createTask(); const task2 = createTask(); @@ -195,7 +195,7 @@ describe('Bulk Operation Buffer', () => { return Promise.reject(new Error('bulkUpdate is an illusion')); }); - const bufferedUpdate = createBuffer(bulkUpdate); + const bufferedUpdate = createBuffer(bulkUpdate, {}); const task1 = createTask(); const task2 = createTask(); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts index 10bec75c846849..874592c4e5d01e 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts @@ -39,7 +39,7 @@ const FLUSH = true; export function createBuffer( bulkOperation: BulkOperation, - { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {} + { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions ): Operation { const flushBuffer = new Subject(); diff --git a/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.test.ts b/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.test.ts index aefcae67ecdd54..1eada39cb35392 100644 --- a/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.test.ts +++ b/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.test.ts @@ -32,25 +32,26 @@ describe('retryableBulkUpdate()', () => { }); it('should call getTasks with taskIds', async () => { - await retryableBulkUpdate({ taskIds, getTasks, filter, map, store }); + await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false }); expect(getTasks).toHaveBeenCalledWith(taskIds); }); it('should filter tasks returned from getTasks', async () => { filter.mockImplementation((task) => task.id === '2'); - await retryableBulkUpdate({ taskIds, getTasks, filter, map, store }); + await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false }); expect(filter).toHaveBeenCalledTimes(3); // Map happens after filter expect(map).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[1]]); + expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[1]], { validate: false }); }); it('should map tasks returned from getTasks', async () => { map.mockImplementation((task) => ({ ...task, status: TaskStatus.Claiming })); - await retryableBulkUpdate({ taskIds, getTasks, filter, map, store }); + await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false }); expect(map).toHaveBeenCalledTimes(3); expect(store.bulkUpdate).toHaveBeenCalledWith( - tasks.map((task) => ({ ...task, status: TaskStatus.Claiming })) + tasks.map((task) => ({ ...task, status: TaskStatus.Claiming })), + { validate: false } ); }); @@ -71,9 +72,9 @@ describe('retryableBulkUpdate()', () => { ]); getTasks.mockResolvedValueOnce([tasks[0]].map((task) => asOk(task))); store.bulkUpdate.mockResolvedValueOnce(tasks.map((task) => asOk(task))); - await retryableBulkUpdate({ taskIds, getTasks, filter, map, store }); + await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false }); expect(store.bulkUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkUpdate).toHaveBeenNthCalledWith(2, [tasks[0]]); + expect(store.bulkUpdate).toHaveBeenNthCalledWith(2, [tasks[0]], { validate: false }); }); it('should skip updating tasks that cannot be found', async () => { @@ -86,7 +87,7 @@ describe('retryableBulkUpdate()', () => { }), asOk(tasks[2]), ]); - await retryableBulkUpdate({ taskIds, getTasks, filter, map, store }); - expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[0], tasks[2]]); + await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false }); + expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[0], tasks[2]], { validate: false }); }); }); diff --git a/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.ts b/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.ts index 3c832e31af4da0..d50fb88d909c3f 100644 --- a/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.ts +++ b/x-pack/plugins/task_manager/server/lib/retryable_bulk_update.ts @@ -19,6 +19,7 @@ export interface RetryableBulkUpdateOpts { filter: (task: ConcreteTaskInstance) => boolean; map: (task: ConcreteTaskInstance) => ConcreteTaskInstance; store: TaskStore; + validate: boolean; } export async function retryableBulkUpdate({ @@ -27,6 +28,7 @@ export async function retryableBulkUpdate({ filter, map, store, + validate, }: RetryableBulkUpdateOpts): Promise { const resultMap: Record = {}; @@ -42,7 +44,7 @@ export async function retryableBulkUpdate({ }, []) .filter(filter) .map(map); - const bulkUpdateResult = await store.bulkUpdate(tasksToUpdate); + const bulkUpdateResult = await store.bulkUpdate(tasksToUpdate, { validate }); for (const result of bulkUpdateResult) { const taskId = getId(result); resultMap[taskId] = result; diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 665b212d951c07..62303b925fb3f6 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -16,6 +16,7 @@ describe('Configuration Statistics Aggregator', () => { max_workers: 10, max_attempts: 9, poll_interval: 6000000, + allow_reading_invalid_state: false, version_conflict_threshold: 80, monitored_stats_required_freshness: 6000000, request_capacity: 1000, diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index 23312b79cec8ba..0dcd87ab150bfe 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -20,6 +20,7 @@ describe('createMonitoringStatsStream', () => { max_workers: 10, max_attempts: 9, poll_interval: 6000000, + allow_reading_invalid_state: false, version_conflict_threshold: 80, monitored_stats_required_freshness: 6000000, request_capacity: 1000, diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 637f8a2b952f41..aca39e1678a23e 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -43,6 +43,7 @@ const pluginInitializerContextParams = { poll_interval: 3000, version_conflict_threshold: 80, request_capacity: 1000, + allow_reading_invalid_state: false, monitored_aggregated_stats_refresh_rate: 5000, monitored_stats_health_verbose_log: { enabled: false, diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 64da6d6f84f957..e65574cef779a8 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -227,6 +227,8 @@ export class TaskManagerPlugin definitions: this.definitions, taskManagerId: `kibana:${this.taskManagerId!}`, adHocTaskCounter: this.adHocTaskCounter, + allowReadingInvalidState: this.config.allow_reading_invalid_state, + logger: this.logger, }); const managedConfiguration = createManagedConfiguration({ diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index fc72c950a4034d..a18a458ad6510a 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -48,6 +48,7 @@ describe('TaskPollingLifecycle', () => { poll_interval: 6000000, version_conflict_threshold: 80, request_capacity: 1000, + allow_reading_invalid_state: false, monitored_aggregated_stats_refresh_rate: 5000, monitored_stats_health_verbose_log: { enabled: false, diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index a9f49b661a7001..edde074ca4e02c 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { schema, TypeOf } from '@kbn/config-schema'; +import { schema, TypeOf, ObjectType } from '@kbn/config-schema'; import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals'; import { isErr, tryAsResult } from './lib/result_type'; @@ -138,6 +138,15 @@ export const taskDefinitionSchema = schema.object( min: 0, }) ), + stateSchemaByVersion: schema.maybe( + schema.recordOf( + schema.string(), + schema.object({ + schema: schema.any(), + up: schema.any(), + }) + ) + ), }, { validate({ timeout }) { @@ -158,6 +167,13 @@ export type TaskDefinition = TypeOf & { * and an optional cancel function which cancels the task. */ createTaskRunner: TaskRunCreatorFunction; + stateSchemaByVersion?: Record< + number, + { + schema: ObjectType; + up: (state: Record) => Record; + } + >; }; export enum TaskStatus { @@ -248,6 +264,7 @@ export interface TaskInstance { // this can be fixed by supporting generics in the future // eslint-disable-next-line @typescript-eslint/no-explicit-any state: Record; + stateVersion?: number; /** * The serialized traceparent string of the current APM transaction or span. diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 5812384c66c8eb..be47be3a84266c 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -23,10 +23,10 @@ import moment from 'moment'; import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary'; import { mockLogger } from '../test_utils'; import { throwRetryableError, throwUnrecoverableError } from './errors'; -import { taskStoreMock } from '../task_store.mock'; import apm from 'elastic-apm-node'; import { executionContextServiceMock } from '@kbn/core/server/mocks'; import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock'; +import { bufferedTaskStoreMock } from '../buffered_task_store.mock'; import { calculateDelay, TASK_MANAGER_RUN_TRANSACTION_TYPE, @@ -432,17 +432,20 @@ describe('TaskManagerRunner', () => { `[Error: type: Bad Request]` ); - expect(store.update).toHaveBeenCalledWith({ - ...mockInstance({ - id, - attempts: initialAttempts + 1, - schedule: undefined, - }), - status: TaskStatus.Idle, - startedAt: null, - retryAt: null, - ownerId: null, - }); + expect(store.update).toHaveBeenCalledWith( + { + ...mockInstance({ + id, + attempts: initialAttempts + 1, + schedule: undefined, + }), + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }, + { validate: false } + ); }); test(`it doesnt try to increment a task's attempts when markTaskAsRunning fails for version conflict`, async () => { @@ -834,7 +837,9 @@ describe('TaskManagerRunner', () => { await runner.run(); expect(store.update).toHaveBeenCalledTimes(1); - expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt })); + expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), { + validate: true, + }); }); test('reschedules tasks that return a schedule', async () => { @@ -862,7 +867,9 @@ describe('TaskManagerRunner', () => { await runner.run(); expect(store.update).toHaveBeenCalledTimes(1); - expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt })); + expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), { + validate: true, + }); }); test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => { @@ -936,7 +943,9 @@ describe('TaskManagerRunner', () => { await runner.run(); expect(store.update).toHaveBeenCalledTimes(1); - expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt })); + expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), { + validate: true, + }); }); test('removes non-recurring tasks after they complete', async () => { @@ -1654,7 +1663,7 @@ describe('TaskManagerRunner', () => { const instance = mockInstance(opts.instance); - const store = taskStoreMock.create(); + const store = bufferedTaskStoreMock.create(); const usageCounter = usageCountersServiceMock.createSetupContract().createUsageCounter('test'); store.update.mockResolvedValue(instance); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 858d89f46d70ae..0252c565c5d0f1 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -89,7 +89,7 @@ export interface TaskRunning { } export interface Updatable { - update(doc: ConcreteTaskInstance): Promise; + update(doc: ConcreteTaskInstance, options: { validate: boolean }): Promise; remove(id: string): Promise; } @@ -395,27 +395,30 @@ export class TaskManagerRunner implements TaskRunner { } this.instance = asReadyToRun( - (await this.bufferedTaskStore.update({ - ...taskWithoutEnabled(taskInstance), - status: TaskStatus.Running, - startedAt: now, - attempts, - retryAt: - (this.instance.task.schedule - ? maxIntervalFromDate( - now, - this.instance.task.schedule.interval, - this.definition.timeout - ) - : this.getRetryDelay({ - attempts, - // Fake an error. This allows retry logic when tasks keep timing out - // and lets us set a proper "retryAt" value each time. - error: new Error('Task timeout'), - addDuration: this.definition.timeout, - })) ?? null, - // This is a safe convertion as we're setting the startAt above - })) as ConcreteTaskInstanceWithStartedAt + (await this.bufferedTaskStore.update( + { + ...taskWithoutEnabled(taskInstance), + status: TaskStatus.Running, + startedAt: now, + attempts, + retryAt: + (this.instance.task.schedule + ? maxIntervalFromDate( + now, + this.instance.task.schedule.interval, + this.definition.timeout + ) + : this.getRetryDelay({ + attempts, + // Fake an error. This allows retry logic when tasks keep timing out + // and lets us set a proper "retryAt" value each time. + error: new Error('Task timeout'), + addDuration: this.definition.timeout, + })) ?? null, + // This is a safe convertion as we're setting the startAt above + }, + { validate: false } + )) as ConcreteTaskInstanceWithStartedAt ); const timeUntilClaimExpiresAfterUpdate = @@ -476,14 +479,17 @@ export class TaskManagerRunner implements TaskRunner { private async releaseClaimAndIncrementAttempts(): Promise> { return promiseResult( - this.bufferedTaskStore.update({ - ...taskWithoutEnabled(this.instance.task), - status: TaskStatus.Idle, - attempts: this.instance.task.attempts + 1, - startedAt: null, - retryAt: null, - ownerId: null, - }) + this.bufferedTaskStore.update( + { + ...taskWithoutEnabled(this.instance.task), + status: TaskStatus.Idle, + attempts: this.instance.task.attempts + 1, + startedAt: null, + retryAt: null, + ownerId: null, + }, + { validate: false } + ) ); } @@ -580,7 +586,8 @@ export class TaskManagerRunner implements TaskRunner { ownerId: null, }, taskWithoutEnabled(this.instance.task) - ) + ), + { validate: true } ) ); } diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 7f59f9f3625547..1a87bd59ec036f 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -538,7 +538,8 @@ describe('TaskScheduling', () => { status: TaskStatus.Idle, runAt: expect.any(Date), scheduledAt: expect.any(Date), - }) + }), + { validate: false } ); expect(mockTaskStore.get).toHaveBeenCalledWith(id); expect(result).toEqual({ id }); @@ -560,7 +561,8 @@ describe('TaskScheduling', () => { status: TaskStatus.Idle, runAt: expect.any(Date), scheduledAt: expect.any(Date), - }) + }), + { validate: false } ); expect(mockTaskStore.get).toHaveBeenCalledWith(id); expect(result).toEqual({ id }); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 86bf91048253f7..85c346d52da05f 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -159,6 +159,7 @@ export class TaskScheduling { getTasks: async (ids) => await this.bulkGetTasksHelper(ids), filter: (task) => !!task.enabled, map: (task) => ({ ...task, enabled: false }), + validate: false, }); } @@ -174,6 +175,7 @@ export class TaskScheduling { } return { ...task, enabled: true }; }, + validate: false, }); } @@ -208,6 +210,7 @@ export class TaskScheduling { return { ...task, schedule, runAt: new Date(newRunAtInMs) }; }, + validate: false, }); } @@ -229,12 +232,15 @@ export class TaskScheduling { public async runSoon(taskId: string): Promise { const task = await this.getNonRunningTask(taskId); try { - await this.store.update({ - ...task, - status: TaskStatus.Idle, - scheduledAt: new Date(), - runAt: new Date(), - }); + await this.store.update( + { + ...task, + status: TaskStatus.Idle, + scheduledAt: new Date(), + runAt: new Date(), + }, + { validate: false } + ); } catch (e) { if (e.statusCode === 409) { this.logger.debug( diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts index 23818bc9435084..861f7d60bd221e 100644 --- a/x-pack/plugins/task_manager/server/task_store.mock.ts +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -14,6 +14,10 @@ interface TaskStoreOptions { export const taskStoreMock = { create({ index = '', taskManagerId = '' }: TaskStoreOptions = {}) { const mocked = { + taskValidator: { + getValidatedTaskInstanceFromReading: jest.fn().mockImplementation((task) => task), + getValidatedTaskInstanceForUpdating: jest.fn().mockImplementation((task) => task), + }, convertToSavedObjectIds: jest.fn(), update: jest.fn(), remove: jest.fn(), diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index aa194fc2231e24..2261b8ac8f2ee5 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { schema } from '@kbn/config-schema'; import { Client } from '@elastic/elasticsearch'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import _ from 'lodash'; @@ -25,13 +26,36 @@ import { mockLogger } from './test_utils'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { asErr } from './lib/result_type'; +const mockGetValidatedTaskInstanceFromReading = jest.fn(); +const mockGetValidatedTaskInstanceForUpdating = jest.fn(); +jest.mock('./task_validator', () => { + return { + TaskValidator: jest.fn().mockImplementation(() => { + return { + getValidatedTaskInstanceFromReading: mockGetValidatedTaskInstanceFromReading, + getValidatedTaskInstanceForUpdating: mockGetValidatedTaskInstanceForUpdating, + }; + }), + }; +}); + const savedObjectsClient = savedObjectsRepositoryMock.create(); const serializer = savedObjectsServiceMock.createSerializer(); const adHocTaskCounter = new AdHocTaskCounter(); const randomId = () => `id-${_.random(1, 20)}`; -beforeEach(() => jest.resetAllMocks()); +beforeEach(() => { + jest.resetAllMocks(); + jest.requireMock('./task_validator').TaskValidator.mockImplementation(() => { + return { + getValidatedTaskInstanceFromReading: mockGetValidatedTaskInstanceFromReading, + getValidatedTaskInstanceForUpdating: mockGetValidatedTaskInstanceForUpdating, + }; + }); + mockGetValidatedTaskInstanceFromReading.mockImplementation((task) => task); + mockGetValidatedTaskInstanceForUpdating.mockImplementation((task) => task); +}); const mockedDate = new Date('2019-02-12T21:01:22.479Z'); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -49,6 +73,14 @@ const taskDefinitions = new TaskTypeDictionary(mockLogger()); taskDefinitions.registerTaskDefinitions({ report: { title: 'report', + stateSchemaByVersion: { + 1: { + schema: schema.object({ + foo: schema.string(), + }), + up: (doc) => doc, + }, + }, createTaskRunner: jest.fn(), }, dernstraight: { @@ -67,6 +99,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -74,6 +107,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -237,6 +271,7 @@ describe('TaskStore', () => { childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser; esClient.child.mockReturnValue(childEsClient as unknown as Client); store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -244,6 +279,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -306,6 +342,7 @@ describe('TaskStore', () => { beforeAll(() => { esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -313,6 +350,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -404,6 +442,7 @@ describe('TaskStore', () => { beforeAll(() => { esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -411,6 +450,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -443,8 +483,16 @@ describe('TaskStore', () => { } ); - const result = await store.update(task); + const result = await store.update(task, { validate: true }); + expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1); + expect(mockGetValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1); + expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, { + validate: true, + }); + expect(mockGetValidatedTaskInstanceFromReading).toHaveBeenCalledWith(task, { + validate: true, + }); expect(savedObjectsClient.update).toHaveBeenCalledWith( 'task', task.id, @@ -478,6 +526,42 @@ describe('TaskStore', () => { }); }); + test(`doesn't go through validation process to inject stateVersion when validate:false`, async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + traceparent: 'myTraceparent', + }; + + savedObjectsClient.update.mockImplementation( + async (type: string, id: string, attributes: SavedObjectAttributes) => { + return { + id, + type, + attributes, + references: [], + version: '123', + }; + } + ); + + await store.update(task, { validate: false }); + + expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, { + validate: false, + }); + }); + test('pushes error from saved objects client to errors$', async () => { const task = { runAt: mockedDate, @@ -497,7 +581,9 @@ describe('TaskStore', () => { const firstErrorPromise = store.errors$.pipe(first()).toPromise(); savedObjectsClient.update.mockRejectedValue(new Error('Failure')); - await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + await expect( + store.update(task, { validate: true }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); }); @@ -507,6 +593,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -514,6 +601,47 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, + }); + }); + + test(`doesn't validate whenever validate:false is passed-in`, async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + traceparent: '', + }; + + savedObjectsClient.bulkUpdate.mockResolvedValue({ + saved_objects: [ + { + id: '324242', + type: 'task', + attributes: { + ...task, + state: '{"foo":"bar"}', + params: '{"hello":"world"}', + }, + references: [], + version: '123', + }, + ], + }); + + await store.bulkUpdate([task], { validate: false }); + + expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, { + validate: false, }); }); @@ -536,9 +664,9 @@ describe('TaskStore', () => { const firstErrorPromise = store.errors$.pipe(first()).toPromise(); savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure')); - await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot( - `"Failure"` - ); + await expect( + store.bulkUpdate([task], { validate: true }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); }); @@ -548,6 +676,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -555,6 +684,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -582,6 +712,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -589,6 +720,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -616,6 +748,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -623,6 +756,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -635,6 +769,7 @@ describe('TaskStore', () => { id: randomId(), params: { hello: 'world' }, state: { foo: 'bar' }, + stateVersion: 1, taskType: 'report', attempts: 3, status: 'idle' as TaskStatus, @@ -673,6 +808,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -680,6 +816,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -745,6 +882,7 @@ describe('TaskStore', () => { id: randomId(), params: { hello: 'world' }, state: { foo: 'bar' }, + stateVersion: 1, taskType: 'report', attempts: 3, status: status as TaskStatus, @@ -765,6 +903,7 @@ describe('TaskStore', () => { })); const store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -772,6 +911,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); expect(await store.getLifecycle(task.id)).toEqual(status); @@ -785,6 +925,7 @@ describe('TaskStore', () => { ); const store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -792,6 +933,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound); @@ -803,6 +945,7 @@ describe('TaskStore', () => { ); const store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -810,6 +953,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request'); @@ -821,6 +965,7 @@ describe('TaskStore', () => { beforeAll(() => { store = new TaskStore({ + logger: mockLogger(), index: 'tasky', taskManagerId: '', serializer, @@ -828,6 +973,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, adHocTaskCounter, + allowReadingInvalidState: false, }); }); @@ -849,6 +995,7 @@ describe('TaskStore', () => { scheduledAt: '2019-02-12T21:01:22.479Z', startedAt: null, state: '{"foo":"bar"}', + stateVersion: 1, status: 'idle', taskType: 'report', traceparent: 'apmTraceparent', @@ -909,6 +1056,7 @@ describe('TaskStore', () => { scope: undefined, startedAt: null, state: { foo: 'bar' }, + stateVersion: 1, status: 'idle', taskType: 'report', user: undefined, @@ -981,4 +1129,50 @@ describe('TaskStore', () => { expect(adHocTaskCounter.count).toEqual(0); }); }); + + describe('TaskValidator', () => { + test(`should pass allowReadingInvalidState:false accordingly`, () => { + const logger = mockLogger(); + + new TaskStore({ + logger, + index: 'tasky', + taskManagerId: '', + serializer, + esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, + allowReadingInvalidState: false, + }); + + expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({ + logger, + definitions: taskDefinitions, + allowReadingInvalidState: false, + }); + }); + + test(`should pass allowReadingInvalidState:true accordingly`, () => { + const logger = mockLogger(); + + new TaskStore({ + logger, + index: 'tasky', + taskManagerId: '', + serializer, + esClient: elasticsearchServiceMock.createClusterClient().asInternalUser, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, + allowReadingInvalidState: true, + }); + + expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({ + logger, + definitions: taskDefinitions, + allowReadingInvalidState: true, + }); + }); + }); }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index a06ee7b918a7b4..d3c84e2c4e5619 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -13,7 +13,7 @@ import { omit, defaults, get } from 'lodash'; import { SavedObjectError } from '@kbn/core-saved-objects-common'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server'; +import type { SavedObjectsBulkDeleteResponse, Logger } from '@kbn/core/server'; import { SavedObject, @@ -36,6 +36,7 @@ import { import { TaskTypeDictionary } from './task_type_dictionary'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; +import { TaskValidator } from './task_validator'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -45,6 +46,8 @@ export interface StoreOpts { savedObjectsRepository: ISavedObjectsRepository; serializer: ISavedObjectsSerializer; adHocTaskCounter: AdHocTaskCounter; + allowReadingInvalidState: boolean; + logger: Logger; } export interface SearchOpts { @@ -97,6 +100,7 @@ export class TaskStore { public readonly index: string; public readonly taskManagerId: string; public readonly errors$ = new Subject(); + public readonly taskValidator: TaskValidator; private esClient: ElasticsearchClient; private esClientWithoutRetries: ElasticsearchClient; @@ -122,6 +126,11 @@ export class TaskStore { this.serializer = opts.serializer; this.savedObjectsRepository = opts.savedObjectsRepository; this.adHocTaskCounter = opts.adHocTaskCounter; + this.taskValidator = new TaskValidator({ + logger: opts.logger, + definitions: opts.definitions, + allowReadingInvalidState: opts.allowReadingInvalidState, + }); this.esClientWithoutRetries = opts.esClient.child({ // Timeouts are retried and make requests timeout after (requestTimeout * (1 + maxRetries)) // The poller doesn't need retry logic because it will try again at the next polling cycle @@ -150,9 +159,11 @@ export class TaskStore { let savedObject; try { + const validatedTaskInstance = + this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance); savedObject = await this.savedObjectsRepository.create( 'task', - taskInstanceToAttributes(taskInstance), + taskInstanceToAttributes(validatedTaskInstance), { id: taskInstance.id, refresh: false } ); if (get(taskInstance, 'schedule.interval', null) == null) { @@ -163,7 +174,8 @@ export class TaskStore { throw e; } - return savedObjectToConcreteTaskInstance(savedObject); + const result = savedObjectToConcreteTaskInstance(savedObject); + return this.taskValidator.getValidatedTaskInstanceFromReading(result); } /** @@ -174,9 +186,11 @@ export class TaskStore { public async bulkSchedule(taskInstances: TaskInstance[]): Promise { const objects = taskInstances.map((taskInstance) => { this.definitions.ensureHas(taskInstance.taskType); + const validatedTaskInstance = + this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance); return { type: 'task', - attributes: taskInstanceToAttributes(taskInstance), + attributes: taskInstanceToAttributes(validatedTaskInstance), id: taskInstance.id, }; }); @@ -197,7 +211,10 @@ export class TaskStore { throw e; } - return savedObjects.saved_objects.map((so) => savedObjectToConcreteTaskInstance(so)); + return savedObjects.saved_objects.map((so) => { + const taskInstance = savedObjectToConcreteTaskInstance(so); + return this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance); + }); } /** @@ -222,8 +239,14 @@ export class TaskStore { * @param {TaskDoc} doc * @returns {Promise} */ - public async update(doc: ConcreteTaskInstance): Promise { - const attributes = taskInstanceToAttributes(doc); + public async update( + doc: ConcreteTaskInstance, + options: { validate: boolean } + ): Promise { + const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, { + validate: options.validate, + }); + const attributes = taskInstanceToAttributes(taskInstance); let updatedSavedObject; try { @@ -241,13 +264,16 @@ export class TaskStore { throw e; } - return savedObjectToConcreteTaskInstance( + const result = savedObjectToConcreteTaskInstance( // The SavedObjects update api forces a Partial on the `attributes` on the response, // but actually returns the whole object that is passed to it, so as we know we're // passing in the whole object, this is safe to do. // This is far from ideal, but unless we change the SavedObjectsClient this is the best we can do { ...updatedSavedObject, attributes: defaults(updatedSavedObject.attributes, attributes) } ); + return this.taskValidator.getValidatedTaskInstanceFromReading(result, { + validate: options.validate, + }); } /** @@ -257,9 +283,15 @@ export class TaskStore { * @param {Array} docs * @returns {Promise>} */ - public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise { + public async bulkUpdate( + docs: ConcreteTaskInstance[], + options: { validate: boolean } + ): Promise { const attributesByDocId = docs.reduce((attrsById, doc) => { - attrsById.set(doc.id, taskInstanceToAttributes(doc)); + const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, { + validate: options.validate, + }); + attrsById.set(doc.id, taskInstanceToAttributes(taskInstance)); return attrsById; }, new Map()); @@ -283,21 +315,25 @@ export class TaskStore { } return updatedSavedObjects.map((updatedSavedObject) => { - return updatedSavedObject.error !== undefined - ? asErr({ - type: 'task', - id: updatedSavedObject.id, - error: updatedSavedObject.error, - }) - : asOk( - savedObjectToConcreteTaskInstance({ - ...updatedSavedObject, - attributes: defaults( - updatedSavedObject.attributes, - attributesByDocId.get(updatedSavedObject.id)! - ), - }) - ); + if (updatedSavedObject.error !== undefined) { + return asErr({ + type: 'task', + id: updatedSavedObject.id, + error: updatedSavedObject.error, + }); + } + + const taskInstance = savedObjectToConcreteTaskInstance({ + ...updatedSavedObject, + attributes: defaults( + updatedSavedObject.attributes, + attributesByDocId.get(updatedSavedObject.id)! + ), + }); + const result = this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance, { + validate: options.validate, + }); + return asOk(result); }); } @@ -346,7 +382,8 @@ export class TaskStore { this.errors$.next(e); throw e; } - return savedObjectToConcreteTaskInstance(result); + const taskInstance = savedObjectToConcreteTaskInstance(result); + return this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance); } /** @@ -369,7 +406,10 @@ export class TaskStore { if (task.error) { return asErr({ id: task.id, type: task.type, error: task.error }); } - return asOk(savedObjectToConcreteTaskInstance(task)); + const taskInstance = savedObjectToConcreteTaskInstance(task); + const validatedTaskInstance = + this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance); + return asOk(validatedTaskInstance); }); } @@ -413,7 +453,9 @@ export class TaskStore { // @ts-expect-error @elastic/elasticsearch _source is optional .map((doc) => this.serializer.rawToSavedObject(doc)) .map((doc) => omit(doc, 'namespace') as SavedObject) - .map(savedObjectToConcreteTaskInstance), + .map((doc) => savedObjectToConcreteTaskInstance(doc)) + .map((doc) => this.taskValidator.getValidatedTaskInstanceFromReading(doc)) + .filter((doc): doc is ConcreteTaskInstance => !!doc), }; } catch (e) { this.errors$.next(e); diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index f2b6d5b9153b1e..30a4ee169f4e6d 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { ObjectType } from '@kbn/config-schema'; import { Logger } from '@kbn/core/server'; import { TaskDefinition, taskDefinitionSchema, TaskRunCreatorFunction } from './task'; import { CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE } from './constants'; @@ -65,6 +66,13 @@ export interface TaskRegisterDefinition { * The default value, if not given, is 0. */ maxConcurrency?: number; + stateSchemaByVersion?: Record< + number, + { + schema: ObjectType; + up: (state: Record) => Record; + } + >; } /** diff --git a/x-pack/plugins/task_manager/server/task_validator.test.ts b/x-pack/plugins/task_manager/server/task_validator.test.ts new file mode 100644 index 00000000000000..52822adf6f49fe --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_validator.test.ts @@ -0,0 +1,397 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema } from '@kbn/config-schema'; +import { taskManagerMock } from './mocks'; +import { mockLogger } from './test_utils'; +import { TaskValidator } from './task_validator'; +import { TaskTypeDictionary } from './task_type_dictionary'; + +const fooTaskDefinition = { + title: 'Foo', + description: '', + createTaskRunner() { + return { + async run() { + return { + state: {}, + }; + }, + }; + }, +}; + +describe('TaskValidator', () => { + describe('getValidatedTaskInstanceFromReading()', () => { + it(`should return the task as-is whenever the task definition isn't defined`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask(); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result).toEqual(task); + }); + + it(`should return the task as-is whenever the validate:false option is passed-in`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask(); + const result = taskValidator.getValidatedTaskInstanceFromReading(task, { validate: false }); + expect(result).toEqual(task); + }); + + // TODO: Remove skip once all task types have defined their state schema. + // https://github.com/elastic/kibana/issues/159347 + it.skip(`should fail to validate the state schema when the task type doesn't have stateSchemaByVersion defined`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: fooTaskDefinition, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ state: { foo: 'bar' } }); + expect(() => + taskValidator.getValidatedTaskInstanceFromReading(task) + ).toThrowErrorMatchingInlineSnapshot( + `"[TaskValidator] stateSchemaByVersion not defined for task type: foo"` + ); + }); + + it(`should validate the state schema`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'bar' } }); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result).toEqual(task); + }); + + it(`should fail validation when the state schema doesn't match the state data`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: true } }); + expect(() => + taskValidator.getValidatedTaskInstanceFromReading(task) + ).toThrowErrorMatchingInlineSnapshot( + `"[foo]: expected value of type [string] but got [boolean]"` + ); + }); + + it(`should return original state when the state is invalid and allowReadingInvalidState is true`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: true, + }); + const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: true } }); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result.state).toEqual({ foo: true }); + }); + + it(`should remove unknown fields`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ + stateVersion: 1, + state: { foo: 'foo', bar: 'bar' }, + }); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result.state).toEqual({ foo: 'foo' }); + }); + + it(`should migrate state when reading from a document without stateVersion`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => ({ ...state, baz: 'baz' }), + schema: schema.object({ + foo: schema.string(), + baz: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ stateVersion: undefined, state: { foo: 'foo' } }); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result.state).toEqual({ foo: 'foo', baz: 'baz' }); + }); + + it(`should migrate state when reading from an older version`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + 2: { + up: (state) => ({ ...state, baz: 'baz' }), + schema: schema.object({ + foo: schema.string(), + baz: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'foo' } }); + const result = taskValidator.getValidatedTaskInstanceFromReading(task); + expect(result.state).toEqual({ foo: 'foo', baz: 'baz' }); + }); + + it(`should throw during the migration phase if a schema version is missing`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + 3: { + up: (state) => ({ ...state, baz: 'baz' }), + schema: schema.object({ + foo: schema.string(), + baz: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'foo' } }); + expect(() => + taskValidator.getValidatedTaskInstanceFromReading(task) + ).toThrowErrorMatchingInlineSnapshot( + `"[TaskValidator] state schema for foo missing version: 2"` + ); + }); + }); + + describe('getValidatedTaskInstanceForUpdating()', () => { + it(`should return the task as-is whenever the task definition isn't defined`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask(); + const result = taskValidator.getValidatedTaskInstanceForUpdating(task); + expect(result).toEqual(task); + }); + + it(`should return the task as-is whenever the validate:false option is passed-in`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask(); + const result = taskValidator.getValidatedTaskInstanceForUpdating(task, { validate: false }); + expect(result).toEqual(task); + }); + + // TODO: Remove skip once all task types have defined their state schema. + // https://github.com/elastic/kibana/issues/159347 + it.skip(`should fail to validate the state schema when the task type doesn't have stateSchemaByVersion defined`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: fooTaskDefinition, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ state: { foo: 'bar' } }); + expect(() => + taskValidator.getValidatedTaskInstanceForUpdating(task) + ).toThrowErrorMatchingInlineSnapshot( + `"[TaskValidator] stateSchemaByVersion not defined for task type: foo"` + ); + }); + + it(`should validate the state schema`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ state: { foo: 'bar' } }); + const { stateVersion, ...result } = taskValidator.getValidatedTaskInstanceForUpdating(task); + expect(result).toEqual(task); + expect(stateVersion).toEqual(1); + }); + + it(`should fail to validate the state schema when unknown fields are present`, () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + ...fooTaskDefinition, + stateSchemaByVersion: { + 1: { + up: (state) => state, + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + }, + }); + const taskValidator = new TaskValidator({ + logger: mockLogger(), + definitions, + allowReadingInvalidState: false, + }); + const task = taskManagerMock.createTask({ state: { foo: 'foo', bar: 'bar' } }); + expect(() => + taskValidator.getValidatedTaskInstanceForUpdating(task) + ).toThrowErrorMatchingInlineSnapshot(`"[bar]: definition for this key is missing"`); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/task_validator.ts b/x-pack/plugins/task_manager/server/task_validator.ts new file mode 100644 index 00000000000000..61d9a903dd5b4c --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_validator.ts @@ -0,0 +1,205 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { max, memoize } from 'lodash'; +import type { Logger } from '@kbn/core/server'; +import type { ObjectType } from '@kbn/config-schema'; +import { TaskTypeDictionary } from './task_type_dictionary'; +import type { TaskInstance, ConcreteTaskInstance, TaskDefinition } from './task'; + +interface TaskValidatorOpts { + allowReadingInvalidState: boolean; + definitions: TaskTypeDictionary; + logger: Logger; +} + +type LatestStateSchema = + | undefined + | { + schema: ObjectType; + version: number; + up: (state: Record) => Record; + }; + +export class TaskValidator { + private readonly logger: Logger; + private readonly definitions: TaskTypeDictionary; + private readonly allowReadingInvalidState: boolean; + private readonly cachedGetLatestStateSchema: (taskTypeDef: TaskDefinition) => LatestStateSchema; + private readonly cachedExtendSchema: typeof extendSchema; + + constructor({ definitions, allowReadingInvalidState, logger }: TaskValidatorOpts) { + this.logger = logger; + this.definitions = definitions; + this.allowReadingInvalidState = allowReadingInvalidState; + this.cachedGetLatestStateSchema = memoize( + getLatestStateSchema, + (taskTypeDef) => taskTypeDef.type + ); + this.cachedExtendSchema = memoize( + extendSchema, + // We need to cache two outcomes per task type (unknowns: ignore and unknowns: forbid) + (options) => `${options.taskType}|unknowns:${options.unknowns}` + ); + } + + public getValidatedTaskInstanceFromReading( + task: T, + options: { validate: boolean } = { validate: true } + ): T { + if (!options.validate) { + return task; + } + + // In the scenario the task is unused / deprecated and Kibana needs to manipulate the task, + // we'll do a pass-through for those + if (!this.definitions.has(task.taskType)) { + return task; + } + + const taskTypeDef = this.definitions.get(task.taskType); + const latestStateSchema = this.cachedGetLatestStateSchema(taskTypeDef); + + // TODO: Remove once all task types have defined their state schema. + // https://github.com/elastic/kibana/issues/159347 + // Otherwise, failures on read / write would occur. (don't forget to unskip test) + if (!latestStateSchema) { + return task; + } + + let state = task.state; + try { + state = this.getValidatedStateSchema( + this.migrateTaskState(task.state, task.stateVersion, taskTypeDef, latestStateSchema), + task.taskType, + latestStateSchema, + 'ignore' + ); + } catch (e) { + if (!this.allowReadingInvalidState) { + throw e; + } + this.logger.debug( + `[${task.taskType}][${task.id}] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: ${e.message}` + ); + } + + return { + ...task, + state, + }; + } + + public getValidatedTaskInstanceForUpdating( + task: T, + options: { validate: boolean } = { validate: true } + ): T { + if (!options.validate) { + return task; + } + + // In the scenario the task is unused / deprecated and Kibana needs to manipulate the task, + // we'll do a pass-through for those + if (!this.definitions.has(task.taskType)) { + return task; + } + + const taskTypeDef = this.definitions.get(task.taskType); + const latestStateSchema = this.cachedGetLatestStateSchema(taskTypeDef); + + // TODO: Remove once all task types have defined their state schema. + // https://github.com/elastic/kibana/issues/159347 + // Otherwise, failures on read / write would occur. (don't forget to unskip test) + if (!latestStateSchema) { + return task; + } + + // We are doing a write operation which must validate against the latest state schema + return { + ...task, + state: this.getValidatedStateSchema(task.state, task.taskType, latestStateSchema, 'forbid'), + stateVersion: latestStateSchema?.version, + }; + } + + private migrateTaskState( + state: ConcreteTaskInstance['state'], + currentVersion: number | undefined, + taskTypeDef: TaskDefinition, + latestStateSchema: LatestStateSchema + ) { + if (!latestStateSchema || (currentVersion && currentVersion >= latestStateSchema.version)) { + return state; + } + + let migratedState = state; + for (let i = currentVersion || 1; i <= latestStateSchema.version; i++) { + if (!taskTypeDef.stateSchemaByVersion || !taskTypeDef.stateSchemaByVersion[`${i}`]) { + throw new Error( + `[TaskValidator] state schema for ${taskTypeDef.type} missing version: ${i}` + ); + } + migratedState = taskTypeDef.stateSchemaByVersion[i].up(migratedState); + try { + taskTypeDef.stateSchemaByVersion[i].schema.validate(migratedState); + } catch (e) { + throw new Error( + `[TaskValidator] failed to migrate to version ${i} because the data returned from the up migration doesn't match the schema: ${e.message}` + ); + } + } + + return migratedState; + } + + private getValidatedStateSchema( + state: ConcreteTaskInstance['state'], + taskType: string, + latestStateSchema: LatestStateSchema, + unknowns: 'forbid' | 'ignore' + ): ConcreteTaskInstance['state'] { + if (!latestStateSchema) { + throw new Error( + `[TaskValidator] stateSchemaByVersion not defined for task type: ${taskType}` + ); + } + + return this.cachedExtendSchema({ unknowns, taskType, latestStateSchema }).validate(state); + } +} + +function extendSchema(options: { + latestStateSchema: LatestStateSchema; + unknowns: 'forbid' | 'ignore'; + taskType: string; +}) { + if (!options.latestStateSchema) { + throw new Error( + `[TaskValidator] stateSchemaByVersion not defined for task type: ${options.taskType}` + ); + } + return options.latestStateSchema.schema.extendsDeep({ unknowns: options.unknowns }); +} + +function getLatestStateSchema(taskTypeDef: TaskDefinition): LatestStateSchema { + if (!taskTypeDef.stateSchemaByVersion) { + return; + } + + const versions = Object.keys(taskTypeDef.stateSchemaByVersion).map((v) => parseInt(v, 10)); + const latest = max(versions); + + if (latest === undefined) { + return; + } + + return { + version: latest, + schema: taskTypeDef.stateSchemaByVersion[latest].schema, + up: taskTypeDef.stateSchemaByVersion[latest].up, + }; +} diff --git a/x-pack/plugins/task_manager/tsconfig.json b/x-pack/plugins/task_manager/tsconfig.json index 35e024db1f87d1..20fdb90611518f 100644 --- a/x-pack/plugins/task_manager/tsconfig.json +++ b/x-pack/plugins/task_manager/tsconfig.json @@ -19,7 +19,8 @@ "@kbn/es-types", "@kbn/apm-utils", "@kbn/core-saved-objects-common", - "@kbn/core-saved-objects-utils-server" + "@kbn/core-saved-objects-utils-server", + "@kbn/core-test-helpers-kbn-server" ], "exclude": [ "target/**/*",