Skip to content

Commit

Permalink
[manager/orchestrator/reaper] Fix the condition used for skipping ove…
Browse files Browse the repository at this point in the history
…r running tasks.

Signed-off-by: Anshul Pundir <[email protected]>
  • Loading branch information
anshulpundir committed Jul 26, 2018
1 parent cb78da3 commit fcb6519
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 32 deletions.
28 changes: 13 additions & 15 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,23 +267,21 @@ func (tr *TaskReaper) tick() {

runningTasks := 0
for _, t := range historicTasks {
// Skip tasks which are desired to be running but the current state
// is less than or equal to running.
// This check is important to ignore tasks which are running or need to be running,
// but to delete tasks which are either past running,
// or have not reached running but need to be shutdown (because of a service update, for example).
if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
// Don't delete running tasks
runningTasks++
continue
}

deleteTasks[t.ID] = struct{}{}
// Historical tasks can be considered for cleanup if:
// 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning.
// 2. The task has not yet become running and desired state is a terminal state i.e.
// actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning.
if t.Status.State > api.TaskStateRunning ||
(t.Status.State < api.TaskStateAssigned && t.DesiredState > api.TaskStateRunning) {
deleteTasks[t.ID] = struct{}{}
taskHistory++
if int64(len(historicTasks)) <= taskHistory {
break
}

taskHistory++
if int64(len(historicTasks)) <= taskHistory {
break
continue
}
runningTasks++
}

// The only case when we keep the slot dirty at the end of tick()
Expand Down
125 changes: 108 additions & 17 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.Len(t, foundTasks, 1)
}

// setupTaskReaperDirty adds slots to the task reaper dirty set for testing.
func setupTaskReaperDirty(tr *TaskReaper) {
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
Expand All @@ -897,23 +898,6 @@ func TestTick(t *testing.T) {
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

// create the task reaper.
taskReaper := New(s)

Expand Down Expand Up @@ -1076,3 +1060,110 @@ func TestTick(t *testing.T) {
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")
}

// TestNonRunningTasks tests the condition the task reaper
// uses to delete historic tasks which are either in terminal state,
// or are not assigned yet and the desired state is a terminal state.
// We will create multiple running tasks for the same slot,
// and verify that the slot remains dirty.
// At the same time, we'll verify that the task is cleaned up
// when it shuts down.
func TestNonRunningTasks(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

// Create a service.
service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create tasks.
task1 := &api.Task{
ID: "id1task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}

s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
return nil
})

// Create another task desired state running since previous task was desired state shutdown.
task2 := &api.Task{
ID: "id1task2",
Slot: 1,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})

// create the task reaper.
taskReaper := New(s)
taskReaper.taskHistory = 1

// Tick to make sure that the slot is still dirty.
setupTaskReaperDirty(taskReaper)
taskReaper.tick()
assert.Equal(t, 1, len(taskReaper.dirty))
assert.NotNil(t, taskReaper.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id1",
NodeID: "node1",
}])

task1.Status.State = api.TaskStateShutdown
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, task1))
return nil
})

watch, cancel := state.Watch(s.WatchQueue() /*api.EventDeleteTask{}*/)
defer cancel()

// Tick to make sure the slot was cleaned up.
setupTaskReaperDirty(taskReaper)
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Task should have been cleaned up.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1")
}

0 comments on commit fcb6519

Please sign in to comment.