Skip to content

Commit

Permalink
[manager/orchestrator/reaper] Fix the condition used for skipping ove…
Browse files Browse the repository at this point in the history
…r running tasks.

Signed-off-by: Anshul Pundir <[email protected]>
  • Loading branch information
anshulpundir committed Jun 27, 2018
1 parent cb78da3 commit 8fc2e5e
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 22 deletions.
8 changes: 3 additions & 5 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,9 @@ func (tr *TaskReaper) tick() {
for _, t := range historicTasks {
// Skip tasks which are desired to be running but the current state
// is less than or equal to running.
// This check is important to ignore tasks which are running or need to be running,
// but to delete tasks which are either past running,
// or have not reached running but need to be shutdown (because of a service update, for example).
if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
// Don't delete running tasks
// Ignore tasks which are running (including tasks which are desired to be shutdown),
// or which are desired to be running (desired state running).
if t.Status.State == api.TaskStateRunning || t.DesiredState <= api.TaskStateRunning {
runningTasks++
continue
}
Expand Down
124 changes: 107 additions & 17 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.Len(t, foundTasks, 1)
}

// setupTaskReaperDirty adds slots to the task reaper dirty set for testing.
func setupTaskReaperDirty(tr *TaskReaper) {
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
Expand All @@ -897,23 +898,6 @@ func TestTick(t *testing.T) {
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

// create the task reaper.
taskReaper := New(s)

Expand Down Expand Up @@ -1076,3 +1060,109 @@ func TestTick(t *testing.T) {
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")
}

// TestDesiredShutdownTasks tests the condition the task reaper
// uses to not delete tasks which are considered to be running.
// We will create multiple running tasks for the same slot,
// and verify that the slot remains dirty.
// At the same time, we'll verify that the task is cleaned up
// when it shuts down.
func TestDesiredShutdownTasks(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

// Create a service.
service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create tasks.
task1 := &api.Task{
ID: "id1task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}

s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
return nil
})

// Create another task desired state running since previous task was desired state shutdown.
task2 := &api.Task{
ID: "id1task2",
Slot: 1,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})

// create the task reaper.
taskReaper := New(s)
taskReaper.taskHistory = 1

// Tick to make sure that the slot is still dirty.
setupTaskReaperDirty(taskReaper)
taskReaper.tick()
assert.Equal(t, 1, len(taskReaper.dirty))
assert.NotNil(t, taskReaper.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id1",
NodeID: "node1",
}])

task1.Status.State = api.TaskStateShutdown
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, task1))
return nil
})

watch, cancel := state.Watch(s.WatchQueue() /*api.EventDeleteTask{}*/)
defer cancel()

// Tick to make sure the slot was cleaned up.
setupTaskReaperDirty(taskReaper)
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Task should have been cleaned up.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1")
}

0 comments on commit 8fc2e5e

Please sign in to comment.