Skip to content

Commit

Permalink
Merge pull request #2706 from dani-docker/task_leak
Browse files Browse the repository at this point in the history
address unassigned task leak when service is removed
  • Loading branch information
anshulpundir authored Jul 16, 2018
2 parents 231d181 + 9d977ce commit 6826639
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 6 deletions.
11 changes: 11 additions & 0 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,20 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup
return tasksScheduled
}

// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
explanation := s.pipeline.Explain()
for _, t := range taskGroup {
var service *api.Service
s.store.View(func(tx store.ReadTx) {
service = store.GetService(tx, t.ServiceID)
})
if service == nil {
log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
continue
}

log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")

newT := *t
Expand Down
135 changes: 129 additions & 6 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
ctx := context.Background()
initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1128,7 +1129,8 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task
// Add initial service and task
assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"}))
assert.NoError(t, store.CreateTask(tx, initialTask))
return nil
})
Expand Down Expand Up @@ -1536,6 +1538,7 @@ func TestSchedulerResourceConstraint(t *testing.T) {

initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -1559,12 +1562,17 @@ func TestSchedulerResourceConstraint(t *testing.T) {
},
}

initialService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, initialService))
assert.NoError(t, store.CreateTask(tx, initialTask))
assert.NoError(t, store.CreateNode(tx, underprovisionedNode))
assert.NoError(t, store.CreateNode(tx, nonready1))
Expand Down Expand Up @@ -1788,6 +1796,7 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask1 := &api.Task{
DesiredState: api.TaskStateRunning,
ID: "id1",
ServiceID: "serviceID1",
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
Expand All @@ -1809,12 +1818,17 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask2 := bigTask1.Copy()
bigTask2.ID = "id2"

bigService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, bigService))
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateTask(tx, bigTask1))
return nil
Expand Down Expand Up @@ -1951,6 +1965,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task1 - has a node it can run on
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1973,6 +1988,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task2 - has no node it can run on
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
Expand All @@ -1995,6 +2011,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task3 - no platform constraints, should run on any node
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name3",
Expand All @@ -2007,6 +2024,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task4 - only OS constraint, is runnable on any linux node
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
Expand All @@ -2029,6 +2047,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task5 - supported on multiple platforms
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
Expand Down Expand Up @@ -2103,12 +2122,16 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
Description: &api.NodeDescription{},
}

service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task and nodes to the store
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
Expand Down Expand Up @@ -2168,6 +2191,86 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
}

// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed
func TestSchedulerUnassignedMap(t *testing.T) {
ctx := context.Background()
// create a service and a task with OS constraint that is not met
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "windows",
},
},
},
},
}

node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "x86_64",
OS: "linux",
},
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
return nil
})
assert.NoError(t, err)

scheduler := New(s)
scheduler.unassignedTasks["id1"] = task1

scheduler.tick(ctx)
// task1 is in the unassigned map
assert.Contains(t, scheduler.unassignedTasks, task1.ID)

// delete the service of an unassigned task
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteService(tx, service1.ID))
return nil
})
assert.NoError(t, err)

scheduler.tick(ctx)
// task1 is removed from the unassigned map
assert.NotContains(t, scheduler.unassignedTasks, task1.ID)
}

func TestPreassignedTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
Expand Down Expand Up @@ -2523,6 +2626,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task0: bind mount
t0 := &api.Task{
ID: "task0_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2548,6 +2652,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task1: vol plugin1
t1 := &api.Task{
ID: "task1_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2574,6 +2679,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task2: vol plugin1, vol plugin2
t2 := &api.Task{
ID: "task2_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -2606,6 +2712,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task3: vol plugin1, network plugin1
t3 := &api.Task{
ID: "task3_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Networks: []*api.NetworkAttachment{
{
Expand Down Expand Up @@ -2646,6 +2753,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task4: log plugin1
t4 := &api.Task{
ID: "task4_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2663,6 +2771,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task5: log plugin1
t5 := &api.Task{
ID: "task5_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2681,6 +2790,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// no logging
t6 := &api.Task{
ID: "task6_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2699,6 +2809,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// log driver with no name
t7 := &api.Task{
ID: "task7_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2718,12 +2829,16 @@ func TestSchedulerPluginConstraint(t *testing.T) {
},
}

s1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

// Add initial node and task
// Add initial node, service and task
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, s1))
assert.NoError(t, store.CreateTask(tx, t1))
assert.NoError(t, store.CreateNode(tx, n1))
return nil
Expand Down Expand Up @@ -3014,6 +3129,7 @@ func TestSchedulerHostPort(t *testing.T) {

task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3038,6 +3154,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3062,6 +3179,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -3090,12 +3208,17 @@ func TestSchedulerHostPort(t *testing.T) {
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
Expand Down

0 comments on commit 6826639

Please sign in to comment.