Skip to content

Commit

Permalink
[manager/orchestrator/reaper] Fix the condition used for skipping ove…
Browse files Browse the repository at this point in the history
…r running tasks.

Signed-off-by: Anshul Pundir <[email protected]>
  • Loading branch information
anshulpundir committed Jul 31, 2018
1 parent cb78da3 commit 8c5d353
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 32 deletions.
38 changes: 23 additions & 15 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ func (tr *TaskReaper) Run(ctx context.Context) {
}
}

// taskInTerminalState returns true if task is in a terminal state.
func taskInTerminalState(task *api.Task) bool {
return task.Status.State > api.TaskStateRunning
}

// taskWillNeverRun returns true if task will never reach running state.
func taskWillNeverRun(task *api.Task) bool {
return task.Status.State < api.TaskStateAssigned && task.DesiredState > api.TaskStateRunning
}

// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
Expand Down Expand Up @@ -267,22 +277,20 @@ func (tr *TaskReaper) tick() {

runningTasks := 0
for _, t := range historicTasks {
// Skip tasks which are desired to be running but the current state
// is less than or equal to running.
// This check is important to ignore tasks which are running or need to be running,
// but to delete tasks which are either past running,
// or have not reached running but need to be shutdown (because of a service update, for example).
if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
// Don't delete running tasks
// Historical tasks can be considered for cleanup if:
// 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning.
// 2. The task has not yet become running and desired state is a terminal state i.e.
// actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning.
if taskInTerminalState(t) || taskWillNeverRun(t) {
deleteTasks[t.ID] = struct{}{}

taskHistory++
if int64(len(historicTasks)) <= taskHistory {
break
}
} else {
// all other tasks are counted as running.
runningTasks++
continue
}

deleteTasks[t.ID] = struct{}{}

taskHistory++
if int64(len(historicTasks)) <= taskHistory {
break
}
}

Expand Down
157 changes: 140 additions & 17 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.Len(t, foundTasks, 1)
}

// setupTaskReaperDirty adds slots to the task reaper dirty set for testing.
func setupTaskReaperDirty(tr *TaskReaper) {
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
Expand All @@ -897,23 +898,6 @@ func TestTick(t *testing.T) {
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

// create the task reaper.
taskReaper := New(s)

Expand Down Expand Up @@ -1076,3 +1060,142 @@ func TestTick(t *testing.T) {
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")
}

// TestTickHistoryCleanup tests the condition the task reaper
// uses to delete historic tasks:
// 1. task in terminal state i.e. actual state > running
// 2. actual State < assigned and desired state > running.
func TestTickHistoryCleanup(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
// Create a service.
service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
return nil
})

watch, cancel := state.Watch(s.WatchQueue() /*api.EventDeleteTask{}*/)
defer cancel()
taskReaper := New(s)
taskReaper.taskHistory = 0

// Test function will create a task with the given desired and actual state,
// setup the task reaper dirty list and call tick for testing.
testfunc := func(desiredState api.TaskState, actualState api.TaskState) {
var task *api.Task
s.View(func(tx store.ReadTx) {
task = store.GetTask(tx, "id1task3")
})

if task == nil {
// create task3
task3 := &api.Task{
ID: "id1task3",
Slot: 1,
DesiredState: desiredState,
Status: api.TaskStatus{
State: actualState,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
} else {
task.DesiredState = desiredState
task.Status.State = actualState
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, task))
return nil
})
}

setupTaskReaperDirty(taskReaper)
taskReaper.tick()
}

// Function to verify task was deleted.
waitForTaskDelete := func(desiredState api.TaskState, actualState api.TaskState) {
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, actualState, deletedTask1.Status.State)
assert.Equal(t, desiredState, deletedTask1.DesiredState)
assert.Equal(t, "name1", deletedTask1.ServiceAnnotations.Name)
assert.Equal(t, "id1task3", deletedTask1.ID)
}

for _, testcase := range []struct {
// Desired and actual states to test.
desired, actual api.TaskState

// Flag to indicate whether the task should have been deleted by tick().
cleanedUp bool
}{
{desired: api.TaskStateRunning, actual: api.TaskStateNew, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStatePending, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateAssigned, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateAccepted, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStatePreparing, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateReady, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateStarting, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateRunning, cleanedUp: false},
{desired: api.TaskStateRunning, actual: api.TaskStateCompleted, cleanedUp: true},
{desired: api.TaskStateRunning, actual: api.TaskStateFailed, cleanedUp: true},
{desired: api.TaskStateRunning, actual: api.TaskStateRejected, cleanedUp: true},
{desired: api.TaskStateRunning, actual: api.TaskStateRemove, cleanedUp: true},
{desired: api.TaskStateRunning, actual: api.TaskStateOrphaned, cleanedUp: true},

{desired: api.TaskStateShutdown, actual: api.TaskStateNew, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStatePending, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStateAssigned, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStateAccepted, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStatePreparing, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStateReady, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStateStarting, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStateRunning, cleanedUp: false},
{desired: api.TaskStateShutdown, actual: api.TaskStateCompleted, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStateFailed, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStateRejected, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStateRemove, cleanedUp: true},
{desired: api.TaskStateShutdown, actual: api.TaskStateOrphaned, cleanedUp: true},
} {
testfunc(testcase.desired, testcase.actual)
assert.Zero(t, len(taskReaper.dirty))
if testcase.cleanedUp {
waitForTaskDelete(testcase.desired, testcase.actual)
}
s.View(func(tx store.ReadTx) {
task := store.GetTask(tx, "id1task3")
if testcase.cleanedUp {
assert.Nil(t, task)
} else {
assert.NotNil(t, task)
}
})
}
}

0 comments on commit 8c5d353

Please sign in to comment.