From 8c5d35342591be0f095678b71446867cd131ca98 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Wed, 27 Jun 2018 14:39:14 -0700 Subject: [PATCH] [manager/orchestrator/reaper] Fix the condition used for skipping over running tasks. Signed-off-by: Anshul Pundir --- .../orchestrator/taskreaper/task_reaper.go | 38 +++-- .../taskreaper/task_reaper_test.go | 157 ++++++++++++++++-- 2 files changed, 163 insertions(+), 32 deletions(-) diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 9dbb0851e6..c426a5d1b1 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -168,6 +168,16 @@ func (tr *TaskReaper) Run(ctx context.Context) { } } +// taskInTerminalState returns true if task is in a terminal state. +func taskInTerminalState(task *api.Task) bool { + return task.Status.State > api.TaskStateRunning +} + +// taskWillNeverRun returns true if task will never reach running state. +func taskWillNeverRun(task *api.Task) bool { + return task.Status.State < api.TaskStateAssigned && task.DesiredState > api.TaskStateRunning +} + // tick performs task history cleanup. func (tr *TaskReaper) tick() { if len(tr.dirty) == 0 && len(tr.cleanup) == 0 { @@ -267,22 +277,20 @@ func (tr *TaskReaper) tick() { runningTasks := 0 for _, t := range historicTasks { - // Skip tasks which are desired to be running but the current state - // is less than or equal to running. - // This check is important to ignore tasks which are running or need to be running, - // but to delete tasks which are either past running, - // or have not reached running but need to be shutdown (because of a service update, for example). - if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning { - // Don't delete running tasks + // Historical tasks can be considered for cleanup if: + // 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning. + // 2. The task has not yet become running and desired state is a terminal state i.e. + // actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning. + if taskInTerminalState(t) || taskWillNeverRun(t) { + deleteTasks[t.ID] = struct{}{} + + taskHistory++ + if int64(len(historicTasks)) <= taskHistory { + break + } + } else { + // all other tasks are counted as running. runningTasks++ - continue - } - - deleteTasks[t.ID] = struct{}{} - - taskHistory++ - if int64(len(historicTasks)) <= taskHistory { - break } } diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index f1fe7c69ef..a695defbe1 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -873,6 +873,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.Len(t, foundTasks, 1) } +// setupTaskReaperDirty adds slots to the task reaper dirty set for testing. func setupTaskReaperDirty(tr *TaskReaper) { tr.dirty[orchestrator.SlotTuple{ Slot: 1, @@ -897,23 +898,6 @@ func TestTick(t *testing.T) { assert.NotNil(t, s) defer s.Close() - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - // set TaskHistoryRetentionLimit to a negative value, so - // that tasks are cleaned up right away. - TaskHistoryRetentionLimit: 1, - }, - }, - }) - return nil - })) - // create the task reaper. taskReaper := New(s) @@ -1076,3 +1060,142 @@ func TestTick(t *testing.T) { assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" || deletedTask1.ServiceAnnotations.Name == "name2") } + +// TestTickHistoryCleanup tests the condition the task reaper +// uses to delete historic tasks: +// 1. task in terminal state i.e. actual state > running +// 2. actual State < assigned and desired state > running. +func TestTickHistoryCleanup(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + // Create a service. + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventDeleteTask{}*/) + defer cancel() + taskReaper := New(s) + taskReaper.taskHistory = 0 + + // Test function will create a task with the given desired and actual state, + // setup the task reaper dirty list and call tick for testing. + testfunc := func(desiredState api.TaskState, actualState api.TaskState) { + var task *api.Task + s.View(func(tx store.ReadTx) { + task = store.GetTask(tx, "id1task3") + }) + + if task == nil { + // create task3 + task3 := &api.Task{ + ID: "id1task3", + Slot: 1, + DesiredState: desiredState, + Status: api.TaskStatus{ + State: actualState, + }, + ServiceID: "id1", + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + } + s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, task3)) + return nil + }) + } else { + task.DesiredState = desiredState + task.Status.State = actualState + s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, task)) + return nil + }) + } + + setupTaskReaperDirty(taskReaper) + taskReaper.tick() + } + + // Function to verify task was deleted. + waitForTaskDelete := func(desiredState api.TaskState, actualState api.TaskState) { + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, actualState, deletedTask1.Status.State) + assert.Equal(t, desiredState, deletedTask1.DesiredState) + assert.Equal(t, "name1", deletedTask1.ServiceAnnotations.Name) + assert.Equal(t, "id1task3", deletedTask1.ID) + } + + for _, testcase := range []struct { + // Desired and actual states to test. + desired, actual api.TaskState + + // Flag to indicate whether the task should have been deleted by tick(). + cleanedUp bool + }{ + {desired: api.TaskStateRunning, actual: api.TaskStateNew, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStatePending, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateAssigned, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateAccepted, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStatePreparing, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateReady, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateStarting, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateRunning, cleanedUp: false}, + {desired: api.TaskStateRunning, actual: api.TaskStateCompleted, cleanedUp: true}, + {desired: api.TaskStateRunning, actual: api.TaskStateFailed, cleanedUp: true}, + {desired: api.TaskStateRunning, actual: api.TaskStateRejected, cleanedUp: true}, + {desired: api.TaskStateRunning, actual: api.TaskStateRemove, cleanedUp: true}, + {desired: api.TaskStateRunning, actual: api.TaskStateOrphaned, cleanedUp: true}, + + {desired: api.TaskStateShutdown, actual: api.TaskStateNew, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStatePending, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStateAssigned, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStateAccepted, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStatePreparing, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStateReady, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStateStarting, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStateRunning, cleanedUp: false}, + {desired: api.TaskStateShutdown, actual: api.TaskStateCompleted, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStateFailed, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStateRejected, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStateRemove, cleanedUp: true}, + {desired: api.TaskStateShutdown, actual: api.TaskStateOrphaned, cleanedUp: true}, + } { + testfunc(testcase.desired, testcase.actual) + assert.Zero(t, len(taskReaper.dirty)) + if testcase.cleanedUp { + waitForTaskDelete(testcase.desired, testcase.actual) + } + s.View(func(tx store.ReadTx) { + task := store.GetTask(tx, "id1task3") + if testcase.cleanedUp { + assert.Nil(t, task) + } else { + assert.NotNil(t, task) + } + }) + } +}