From efb19145d0b309f88b26a3036c270e573e6d973f Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 5 Aug 2020 20:37:47 +0100 Subject: [PATCH] [Task Manager] Correctly handle `running` tasks when calling RunNow and reduce flakiness in related tests (#73244) (#74387) This PR addresses two issues which caused several tests to be flaky in TM. When `runNow` was introduced to TM we added a pinned query which returned specific tasks by ID. This query does not have the filter applied to it which causes task to return when they're already marked as `running` but we didn't address these correctly which caused flakyness in the tests. This didn't cause a broken beahviour, but it did cause beahviour that was hard to reason about - we now address them correctly. It seems that sometimes, especially if the ES queue is overworked, it can take some time for the update to the underlying task to be visible (we don't user `refresh:true` on purpose), so adding a wait for the index to refresh to make sure the task is updated in time for the next stage of the test. --- x-pack/plugins/alerts/server/alerts_client.ts | 17 ++- .../task_manager/server/task_events.ts | 6 +- .../task_manager/server/task_manager.test.ts | 31 ++-- .../task_manager/server/task_manager.ts | 104 +++++++------ .../task_manager/server/task_store.test.ts | 142 +++++++++++++++--- .../plugins/task_manager/server/task_store.ts | 37 ++++- .../task_manager_fixture/server/plugin.ts | 20 ++- .../sample_task_plugin/server/init_routes.ts | 15 ++ .../task_manager/task_manager_integration.js | 34 ++++- 9 files changed, 316 insertions(+), 90 deletions(-) diff --git a/x-pack/plugins/alerts/server/alerts_client.ts b/x-pack/plugins/alerts/server/alerts_client.ts index e49745b186bb30..61e929d3250669 100644 --- a/x-pack/plugins/alerts/server/alerts_client.ts +++ b/x-pack/plugins/alerts/server/alerts_client.ts @@ -313,11 +313,18 @@ export class AlertsClient { updateResult.scheduledTaskId && !isEqual(alertSavedObject.attributes.schedule, updateResult.schedule) ) { - this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => { - this.logger.error( - `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` - ); - }); + this.taskManager + .runNow(updateResult.scheduledTaskId) + .then(() => { + this.logger.debug( + `Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` + ); + }) + .catch((err: Error) => { + this.logger.error( + `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` + ); + }); } })(), ]); diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index b17a3636c17300..e1dd85f868cdd2 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +import { Option } from 'fp-ts/lib/Option'; + import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; @@ -22,7 +24,7 @@ export interface TaskEvent { } export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; -export type TaskClaim = TaskEvent; +export type TaskClaim = TaskEvent>; export type TaskRunRequest = TaskEvent; export function asTaskMarkRunningEvent( @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result + event: Result> ): TaskClaim { return { id, diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 80215ffa7abbad..7035971ad60610 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -7,6 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { Subject } from 'rxjs'; +import { none } from 'fp-ts/lib/Option'; import { asTaskMarkRunningEvent, @@ -297,7 +298,9 @@ describe('TaskManager', () => { events$.next(asTaskMarkRunningEvent(id, asOk(task))); events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); test('rejects when the task mark as running fails', () => { @@ -311,7 +314,9 @@ describe('TaskManager', () => { events$.next(asTaskClaimEvent(id, asOk(task))); events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); test('when a task claim fails we ensure the task exists', async () => { @@ -321,7 +326,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it does not exist`) @@ -337,7 +342,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -353,7 +358,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -386,9 +391,11 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); - await expect(result).rejects.toEqual(new Error('failed to claim')); + await expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]` + ); expect(getLifecycle).toHaveBeenCalledWith(id); }); @@ -400,9 +407,11 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); - await expect(result).rejects.toEqual(new Error('failed to claim')); + await expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]` + ); expect(getLifecycle).toHaveBeenCalledWith(id); }); @@ -424,7 +433,9 @@ describe('TaskManager', () => { events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); }); }); diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 23cb33cfac6c23..a20f50f483d017 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators'; import { performance } from 'perf_hooks'; import { pipe } from 'fp-ts/lib/pipeable'; -import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; +import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; + import { SavedObjectsSerializer, ILegacyScopedClusterClient, ISavedObjectsRepository, } from '../../../../src/core/server'; -import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; +import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; import { TaskManagerConfig } from './config'; import { Logger } from './types'; @@ -397,7 +398,9 @@ export async function claimAvailableTasks( if (docs.length !== claimedTasks) { logger.warn( - `[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched` + `[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${ + docs.length + } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } return docs; @@ -429,48 +432,65 @@ export async function awaitTaskRunResult( // listen for all events related to the current task .pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId)) .subscribe((taskEvent: TaskLifecycleEvent) => { - either( - taskEvent.event, - (taskInstance: ConcreteTaskInstance) => { - // resolve if the task has run sucessfully - if (isTaskRunEvent(taskEvent)) { - subscription.unsubscribe(); - resolve({ id: taskInstance.id }); - } - }, - async (error: Error) => { + if (isTaskClaimEvent(taskEvent)) { + mapErr(async (error: Option) => { // reject if any error event takes place for the requested task subscription.unsubscribe(); - if (isTaskRunRequestEvent(taskEvent)) { - return reject( - new Error( - `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` - ) - ); - } else if (isTaskClaimEvent(taskEvent)) { - reject( - map( - // if the error happened in the Claim phase - we try to provide better insight - // into why we failed to claim by getting the task's current lifecycle status - await promiseResult(getLifecycle(taskId)), - (taskLifecycleStatus: TaskLifecycle) => { - if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { - return new Error(`Failed to run task "${taskId}" as it does not exist`); - } else if ( - taskLifecycleStatus === TaskStatus.Running || - taskLifecycleStatus === TaskStatus.Claiming - ) { - return new Error(`Failed to run task "${taskId}" as it is currently running`); - } - return error; - }, - () => error - ) - ); + return reject( + map( + await pipe( + error, + mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)), + getOrElse(() => + // if the error happened in the Claim phase - we try to provide better insight + // into why we failed to claim by getting the task's current lifecycle status + promiseResult(getLifecycle(taskId)) + ) + ), + (taskLifecycleStatus: TaskLifecycle) => { + if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { + return new Error(`Failed to run task "${taskId}" as it does not exist`); + } else if ( + taskLifecycleStatus === TaskStatus.Running || + taskLifecycleStatus === TaskStatus.Claiming + ) { + return new Error(`Failed to run task "${taskId}" as it is currently running`); + } + return new Error( + `Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")` + ); + }, + (getLifecycleError: Error) => + new Error( + `Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}` + ) + ) + ); + }, taskEvent.event); + } else { + either>( + taskEvent.event, + (taskInstance: ConcreteTaskInstance) => { + // resolve if the task has run sucessfully + if (isTaskRunEvent(taskEvent)) { + subscription.unsubscribe(); + resolve({ id: taskInstance.id }); + } + }, + async (error: Error | Option) => { + // reject if any error event takes place for the requested task + subscription.unsubscribe(); + if (isTaskRunRequestEvent(taskEvent)) { + return reject( + new Error( + `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` + ) + ); + } + return reject(new Error(`Failed to run task "${taskId}": ${error}`)); } - return reject(error); - } - ); + ); + } }); }); } diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 771b4e2d7d9cb6..d65c39f4f454d6 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -8,6 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; import { filter } from 'rxjs/operators'; +import { Option, some, none } from 'fp-ts/lib/Option'; import { TaskDictionary, @@ -972,7 +973,7 @@ if (doc['task.runAt'].size()!=0) { const runAt = new Date(); const tasks = [ { - _id: 'aaa', + _id: 'claimed-by-id', _source: { type: 'task', task: { @@ -980,7 +981,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -996,7 +997,31 @@ if (doc['task.runAt'].size()!=0) { sort: ['a', 1], }, { - _id: 'bbb', + _id: 'claimed-by-schedule', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + { + _id: 'already-running', _source: { type: 'task', task: { @@ -1045,19 +1070,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'aaa')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-id' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'aaa', + 'claimed-by-id', asOk({ - id: 'aaa', + id: 'claimed-by-id', runAt, taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle' as TaskStatus, + status: 'claiming' as TaskStatus, params: { hello: 'world' }, state: { baby: 'Henhen' }, user: 'jimbo', @@ -1075,7 +1105,7 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); @@ -1102,19 +1132,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'bbb')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-schedule' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'bbb', + 'claimed-by-schedule', asOk({ - id: 'bbb', + id: 'claimed-by-schedule', runAt, taskType: 'bar', schedule: { interval: '5m' }, attempts: 2, - status: 'running' as TaskStatus, + status: 'claiming' as TaskStatus, params: { shazm: 1 }, state: { henry: 'The 8th' }, user: 'dabo', @@ -1132,14 +1167,14 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); }); test('emits an event when the store fails to claim a required task by id', async (done) => { - const { taskManagerId, tasks } = generateTasks(); + const { taskManagerId, runAt, tasks } = generateTasks(); const callCluster = sinon.spy(async (name: string, params?: unknown) => name === 'updateByQuery' ? { @@ -1159,11 +1194,36 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'ccc')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'already-running' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( - asTaskClaimEvent('ccc', asErr(new Error(`failed to claim task 'ccc'`))) + asTaskClaimEvent( + 'already-running', + asErr( + some({ + id: 'already-running', + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'running' as TaskStatus, + params: { shazm: 1 }, + state: { henry: 'The 8th' }, + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }) + ) + ) ); sub.unsubscribe(); done(); @@ -1171,7 +1231,49 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['ccc'], + claimTasksById: ['already-running'], + claimOwnershipUntil: new Date(), + size: 10, + }); + }); + + test('emits an event when the store fails to find a task which was required by id', async (done) => { + const { taskManagerId, tasks } = generateTasks(); + const callCluster = sinon.spy(async (name: string, params?: unknown) => + name === 'updateByQuery' + ? { + total: tasks.length, + updated: tasks.length, + } + : { hits: { hits: tasks } } + ); + const store = new TaskStore({ + callCluster, + maxAttempts: 2, + definitions: taskDefinitions, + serializer, + savedObjectsRepository: savedObjectsClient, + taskManagerId, + index: '', + }); + + const sub = store.events + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'unknown-task' + ) + ) + .subscribe({ + next: (event: TaskEvent>) => { + expect(event).toMatchObject(asTaskClaimEvent('unknown-task', asErr(none))); + sub.unsubscribe(); + done(); + }, + }); + + await store.claimAvailableTasks({ + claimTasksById: ['unknown-task'], claimOwnershipUntil: new Date(), size: 10, }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 4a691e17011e8d..fbb21d154f32dd 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -9,7 +9,9 @@ */ import apm from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; -import { omit, difference, defaults } from 'lodash'; +import { omit, difference, partition, map, defaults } from 'lodash'; + +import { some, none } from 'fp-ts/lib/Option'; import { SearchResponse, UpdateDocumentByQueryResponse } from 'elasticsearch'; import { @@ -30,6 +32,7 @@ import { TaskLifecycle, TaskLifecycleResult, SerializedConcreteTaskInstance, + TaskStatus, } from './task'; import { TaskClaim, asTaskClaimEvent } from './task_events'; @@ -220,13 +223,35 @@ export class TaskStore { // emit success/fail events for claimed tasks by id if (claimTasksById && claimTasksById.length) { - this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc)))); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); + + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); + + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); + + this.emitEvents( + [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => + asTaskClaimEvent(doc.id, asOk(doc)) + ) + ); + + this.emitEvents( + documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) + ); this.emitEvents( - difference( - claimTasksById, - docs.map((doc) => doc.id) - ).map((id) => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`)))) + documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) ); } diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts index 18fdd5f9c3ac37..0833dd0425894f 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts @@ -51,7 +51,8 @@ export class SampleTaskManagerFixturePlugin .toPromise(); public setup(core: CoreSetup) { - core.http.createRouter().get( + const router = core.http.createRouter(); + router.get( { path: '/api/alerting_tasks/{taskId}', validate: { @@ -77,6 +78,23 @@ export class SampleTaskManagerFixturePlugin } } ); + + router.get( + { + path: `/api/ensure_tasks_index_refreshed`, + validate: {}, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + await core.elasticsearch.legacy.client.callAsInternalUser('indices.refresh', { + index: '.kibana_task_manager', + }); + return res.ok({ body: {} }); + } + ); } public start(core: CoreStart, { taskManager }: SampleTaskManagerFixtureStartDeps) { diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index f35d6baac8f5ae..266e66b5a1a452 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -223,6 +223,21 @@ export function initRoutes( } ); + router.get( + { + path: `/api/ensure_tasks_index_refreshed`, + validate: {}, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + await ensureIndexIsRefreshed(); + return res.ok({ body: {} }); + } + ); + router.delete( { path: `/api/sample_tasks`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 165e79fb311ead..c87a5039360b8f 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -28,8 +28,7 @@ export default function ({ getService }) { const testHistoryIndex = '.kibana_task_manager_test_result'; const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - // FLAKY: https://github.com/elastic/kibana/issues/71390 - describe.skip('scheduling and running tasks', () => { + describe('scheduling and running tasks', () => { beforeEach( async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200) ); @@ -69,6 +68,14 @@ export default function ({ getService }) { .then((response) => response.body); } + function ensureTasksIndexRefreshed() { + return supertest + .get(`/api/ensure_tasks_index_refreshed`) + .send({}) + .expect(200) + .then((response) => response.body); + } + function historyDocs(taskId) { return es .search({ @@ -404,7 +411,7 @@ export default function ({ getService }) { const originalTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `30m` }, - params: { failWith: 'error on run now', failOn: 3 }, + params: { failWith: 'this task was meant to fail!', failOn: 3 }, }); await retry.try(async () => { @@ -415,11 +422,14 @@ export default function ({ getService }) { const task = await currentTask(originalTask.id); expect(task.state.count).to.eql(1); + expect(task.status).to.eql('idle'); // ensure this task shouldnt run for another half hour expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000); }); + await ensureTasksIndexRefreshed(); + // second run should still be successful const successfulRunNowResult = await runTaskNow({ id: originalTask.id, @@ -429,14 +439,20 @@ export default function ({ getService }) { await retry.try(async () => { const task = await currentTask(originalTask.id); expect(task.state.count).to.eql(2); + expect(task.status).to.eql('idle'); }); + await ensureTasksIndexRefreshed(); + // third run should fail const failedRunNowResult = await runTaskNow({ id: originalTask.id, }); - expect(failedRunNowResult).to.eql({ id: originalTask.id, error: `Error: error on run now` }); + expect(failedRunNowResult).to.eql({ + id: originalTask.id, + error: `Error: Failed to run task \"${originalTask.id}\": Error: this task was meant to fail!`, + }); await retry.try(async () => { expect( @@ -479,8 +495,13 @@ export default function ({ getService }) { expect( docs.filter((taskDoc) => taskDoc._source.taskId === longRunningTask.id).length ).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('running'); }); + await ensureTasksIndexRefreshed(); + // first runNow should fail const failedRunNowResult = await runTaskNow({ id: longRunningTask.id, @@ -496,8 +517,13 @@ export default function ({ getService }) { await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('idle'); }); + await ensureTasksIndexRefreshed(); + // second runNow should be successful const successfulRunNowResult = runTaskNow({ id: longRunningTask.id,