Skip to content

Commit

Permalink
[YUNIKORN-2808] E2E test Verify_preemption_on_priority_queue test is …
Browse files Browse the repository at this point in the history
…flaky

Squashing all commits
  • Loading branch information
manirajv06 committed Aug 27, 2024
1 parent 2278b32 commit 9c6a45d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
88 changes: 52 additions & 36 deletions test/e2e/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var sleepPodMemLimit int64
var sleepPodMemLimit2 int64
var taintKey = "e2e_test_preemption"
var nodesToTaint []string
var cantBeScheduledMesg = "The sleep pod %s can't be scheduled"
var deployMesg = "Deploy the sleep pod %s to the development namespace"

var _ = ginkgo.BeforeSuite(func() {
_, filename, _, _ := runtime.Caller(0)
Expand Down Expand Up @@ -164,12 +166,12 @@ var _ = ginkgo.Describe("Preemption", func() {
})

// Define sleepPod
sleepPodConfigs := createSandbox1SleepPodCofigs(3, 600)
sleepPodConfigs := createSandbox1SleepPodCofigs(3, 600, "root.sandbox1")
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}
sleepPodConfigs = append(sleepPodConfigs, sleepPod4Config)

for _, config := range sleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, config.Name))
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
Expand Down Expand Up @@ -226,12 +228,12 @@ var _ = ginkgo.Describe("Preemption", func() {
})

// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30, "root.sandbox1")
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 30, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}

// Deploy pods in root.sandbox1
for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, config.Name))
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
Expand All @@ -245,14 +247,14 @@ var _ = ginkgo.Describe("Preemption", func() {
}

// Deploy sleepjob4 pod in root.sandbox2
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, sleepPod4Config.Name))
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
ginkgo.By(fmt.Sprintf(cantBeScheduledMesg, sleepPod4Config.Name))
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 60*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

Expand Down Expand Up @@ -292,12 +294,12 @@ var _ = ginkgo.Describe("Preemption", func() {
})

// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30, "root.sandbox1")
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 30, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}

// Deploy pods in root.sandbox1
for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, config.Name))
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
Expand All @@ -311,14 +313,14 @@ var _ = ginkgo.Describe("Preemption", func() {
}

// Deploy sleepjob4 pod in root.sandbox2
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, sleepPod4Config.Name))
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
ginkgo.By(fmt.Sprintf(cantBeScheduledMesg, sleepPod4Config.Name))
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 60*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

Expand All @@ -333,29 +335,34 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.It("Verify_preemption_on_priority_queue", func() {
ginkgo.By("A task can only preempt a task with lower or equal priority")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1, root.low-priority, root.high-priority with guaranteed memory %dM", sleepPodMemLimit))
ginkgo.By(fmt.Sprintf("Update root.parent.sandbox1, root.parent.low-priority, root.parent.high-priority with guaranteed memory %dM", sleepPodMemLimit))
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil

var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "parent",
Resources: configs.Resources{Max: map[string]string{"memory": fmt.Sprintf("%dM", 3*sleepPodMemLimit)}},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root.parent", configs.QueueConfig{
Name: "high-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "100"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
if err = common.AddQueue(sc, "default", "root.parent", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "0"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
if err = common.AddQueue(sc, "default", "root.parent", configs.QueueConfig{
Name: "low-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "-100"},
Expand All @@ -366,12 +373,12 @@ var _ = ginkgo.Describe("Preemption", func() {
})

// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.low-priority"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.high-priority"}}
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 60, "root.parent.sandbox1")
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.parent.low-priority"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.parent.high-priority"}}

for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, config.Name))
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
Expand All @@ -384,34 +391,43 @@ var _ = ginkgo.Describe("Preemption", func() {
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}

// Deploy sleepjob4 pod in root.low-priority
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
// Deploy sleepjob4 pod in root.parent.low-priority
ginkgo.By(fmt.Sprintf(deployMesg, sleepPod4Config.Name))
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// Deploy sleepjob5 pod in root.high-priority
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
// Deploy sleepjob5 pod in root.parent.high-priority
ginkgo.By(fmt.Sprintf(deployMesg, sleepPod5Config.Name))
sleepObj, podErr = k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob5 pod can be scheduled before pods in root.sandbox1 are succeeded
// sleepjob4 pod can't be scheduled before pods in root.parent.sandbox1 are succeeded
ginkgo.By(fmt.Sprintf(cantBeScheduledMesg, sleepPod4Config.Name))
podErr = kClient.WaitForPodPending(dev, sleepPod4Config.Name, time.Duration(60)*time.Second)
Ω(podErr).NotTo(HaveOccurred())

ginkgo.By("Verify the sleep pod " + sleepPod4Config.Name + " request failed scheduling")
podErr = restClient.WaitForAllocationLog("default", "root.parent.low-priority", sleepRespPod4.ObjectMeta.Labels["applicationId"], sleepPod4Config.Name, 60)
Ω(podErr).NotTo(HaveOccurred())
log, podErr := restClient.GetAllocationLog("default", "root.parent.low-priority", sleepRespPod4.ObjectMeta.Labels["applicationId"], sleepPod4Config.Name)
Ω(podErr).NotTo(HaveOccurred())
Ω(log).NotTo(gomega.BeNil(), "Log can't be empty")
logEntries := yunikorn.AllocLogToStrings(log)
Ω(logEntries).To(gomega.ContainElement(gomega.MatchRegexp(".*Not enough queue quota")), "Log entry message mismatch")

// sleepjob5 pod can be scheduled before pods in root.parent.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod5Config.Name + " can be scheduled")
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 30*time.Second)
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 90*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// assert one of the pods in root.sandbox1 is preempted
ginkgo.By("One of the pods in root.sanbox1 is preempted")
ginkgo.By("One of the pods in root.parent.sanbox1 is preempted")
sandbox1RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox1")
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.parent.sandbox1")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
Expand All @@ -421,7 +437,7 @@ var _ = ginkgo.Describe("Preemption", func() {
sandbox1RunningPodsCnt++
}
}
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.sandbox1 should be preempted")
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.parent.sandbox1 should be preempted")
})

ginkgo.It("Verify_allow_preemption_tag", func() {
Expand Down Expand Up @@ -489,7 +505,7 @@ var _ = ginkgo.Describe("Preemption", func() {
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit2, Time: 600, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox5"}}

for _, config := range []k8s.SleepPodConfig{sleepPod1Config, sleepPod2Config, sleepPod3Config, sleepPod4Config} {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, config.Name))
sleepObj, podErr := k8s.InitSleepPod(config)

// Setting PriorityClasses for Pods in a specific queue
Expand All @@ -511,7 +527,7 @@ var _ = ginkgo.Describe("Preemption", func() {
}

// Deploy sleepjob5 pod in root.sandbox5
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
ginkgo.By(fmt.Sprintf(deployMesg, sleepPod5Config.Name))
sleepObj, podErr := k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
Expand Down Expand Up @@ -558,10 +574,10 @@ var _ = ginkgo.Describe("Preemption", func() {
})
})

func createSandbox1SleepPodCofigs(cnt, time int) []k8s.SleepPodConfig {
func createSandbox1SleepPodCofigs(cnt, time int, queueName string) []k8s.SleepPodConfig {
sandbox1Configs := make([]k8s.SleepPodConfig, 0, cnt)
for i := 0; i < cnt; i++ {
sandbox1Configs = append(sandbox1Configs, k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: dev, Mem: sleepPodMemLimit, Time: time, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox1"}})
sandbox1Configs = append(sandbox1Configs, k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: dev, Mem: sleepPodMemLimit, Time: time, Optedout: k8s.Allow, Labels: map[string]string{"queue": queueName}})
}
return sandbox1Configs
}
1 change: 1 addition & 0 deletions test/e2e/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func dumpYunikornContainer(suiteName string, specName string) error {
return getErr
}

ginkgo.By("\n\nYunikorn Logs: " + string(logBytes))
_, err = fmt.Fprintf(file, "Yunikorn Logs:\n%s\n", string(logBytes))
return err
}
Expand Down

0 comments on commit 9c6a45d

Please sign in to comment.