From b561e0a68f7e2d4e3bd0ae763571f8c89dbcf641 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 21 Sep 2024 12:49:28 -0700 Subject: [PATCH 01/14] Support gang scheduling with Apache YuniKorn --- .../yunikorn/yunikorn_scheduler.go | 74 +++++- .../yunikorn/yunikorn_scheduler_test.go | 227 ++++++++++++++++++ .../yunikorn/yunikorn_task_groups.go | 97 ++++++++ ray-operator/controllers/ray/utils/util.go | 22 +- 4 files changed, 404 insertions(+), 16 deletions(-) create mode 100644 ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 337ed11702..0746ee2a50 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -15,11 +15,14 @@ import ( ) const ( - SchedulerName string = "yunikorn" - YuniKornPodApplicationIDLabelName string = "applicationId" - YuniKornPodQueueLabelName string = "queue" - RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id" - RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name" + SchedulerName string = "yunikorn" + YuniKornPodApplicationIDLabelName string = "applicationId" + YuniKornPodQueueLabelName string = "queue" + RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id" + RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name" + RayClusterGangSchedulingLabelName string = "yunikorn.apache.org/gang-scheduling" + YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name" + YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups" ) type YuniKornScheduler struct { @@ -36,25 +39,76 @@ func (y *YuniKornScheduler) Name() string { return GetPluginName() } -func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { - // yunikorn doesn't require any resources to be created upfront - // this is a no-opt for this implementation +func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error { return nil } +// populatePodLabels is a helper function that copies RayCluster's label to the given pod based on the label key func (y *YuniKornScheduler) populatePodLabels(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) { // check labels if value, exist := app.Labels[sourceKey]; exist { - y.log.Info("Updating pod label based on RayCluster annotations", + y.log.Info("Updating pod label based on RayCluster labels", "sourceKey", sourceKey, "targetKey", targetKey, "value", value) pod.Labels[targetKey] = value } } -func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, _ string, pod *corev1.Pod) { +// populatePodAnnotations is a helper function that copies RayCluster's annotation to the given pod based on the annotation key +func (y *YuniKornScheduler) populatePodAnnotations(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) { + // check labels + if value, exist := app.Annotations[sourceKey]; exist { + y.log.Info("Updating pod annotations based on RayCluster annotations", + "sourceKey", sourceKey, "targetKey", targetKey, "value", value) + pod.Annotations[targetKey] = value + } +} + +// AddMetadataToPod adds essential labels and annotations to the Ray pods +// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly +func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + // the applicationID and queue name must be provided in the labels y.populatePodLabels(app, pod, RayClusterApplicationIDLabelName, YuniKornPodApplicationIDLabelName) y.populatePodLabels(app, pod, RayClusterQueueLabelName, YuniKornPodQueueLabelName) pod.Spec.SchedulerName = y.Name() + + // when gang scheduling is enabled, extra annotations need to be added to all pods + if y.isGangSchedulingEnabled(app) { + // populate the taskGroups info to each pod + y.populateTaskGroupsAnnotationToPod(app, pod) + // set the task group name based on the head or worker group name + // the group name for the head and each of the worker group should be different + pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName + } +} + +func (y *YuniKornScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { + if value, exist := app.Labels[RayClusterGangSchedulingLabelName]; exist { + if value == "true" { + return true + } + } + return false +} + +func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) { + y.log.Info("Gang Scheduling enabled for RayCluster", + "RayCluster", app.Name, "Namespace", app.Namespace) + + taskGroups := newTaskGroupsFromApp(app) + taskGroupsAnnotationValue, err := taskGroups.marshal() + if err != nil { + y.log.Error(err, "failed to marshal task groups info") + return + } + + y.log.Info("add task groups info to pod's annotation", + "key", YuniKornTaskGroupsAnnotationName, + "value", taskGroupsAnnotationValue, + "numOfTaskGroups", taskGroups.size()) + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue } func (yf *YuniKornSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index 54df1f395b..cc1f003a72 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -1,8 +1,11 @@ package yunikorn import ( + "encoding/json" + "fmt" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +54,121 @@ func TestPopulatePodLabels(t *testing.T) { assert.Equal(t, podLabelsContains(rayPod3, YuniKornPodQueueLabelName, queue2), false) } +func TestPopulatePodAnnotations(t *testing.T) { + yk := &YuniKornScheduler{} + + // --- case 1 + // Ray Cluster CR has task groups annotations defined + rayCluster1 := createRayClusterWithAnnotations( + "ray-cluster-with-annotations", + "test", + map[string]string{ + YuniKornTaskGroupsAnnotationName: "some-task-group", + }, + ) + + rayPod := createPod("my-pod-1", "test") + yk.populatePodAnnotations(rayCluster1, rayPod, YuniKornTaskGroupsAnnotationName, YuniKornTaskGroupsAnnotationName) + assert.Equal(t, podAnnotationContains(rayPod, YuniKornTaskGroupsAnnotationName, "some-task-group"), true) + + // --- case 2 + // Ray Cluster CR doesn't have the task groups annotation defined + // In this case, the pod will not be populated with the required annotations + rayCluster2 := createRayClusterWithAnnotations( + "ray-cluster-without-labels", + "test", + nil, // empty annotations + ) + rayPod = createPod("my-pod-2", "test") + yk.populatePodAnnotations(rayCluster2, rayPod, YuniKornTaskGroupsAnnotationName, YuniKornTaskGroupsAnnotationName) + assert.Equal(t, len(rayPod.Annotations), 0) +} + +func TestIsGangSchedulingEnabled(t *testing.T) { + yk := &YuniKornScheduler{} + + job1 := "job-1-01234" + queue1 := "root.default" + rayCluster := createRayClusterWithLabels( + "ray-cluster-with-gang-scheduling", + "test", + map[string]string{ + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + RayClusterGangSchedulingLabelName: "true", + }, + ) + + assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster), true) +} + +func TestPopulateGangSchedulingAnnotations(t *testing.T) { + yk := &YuniKornScheduler{} + + job1 := "job-1-01234" + queue1 := "root.default" + + // test the case when gang-scheduling is enabled + rayClusterWithGangScheduling := createRayClusterWithLabels( + "ray-cluster-with-gang-scheduling", + "test", + map[string]string{ + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + RayClusterGangSchedulingLabelName: "true", + }, + ) + + // head pod: + // cpu: 5 + // memory: 5Gi + addHeadPodSpec(rayClusterWithGangScheduling, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5"), + v1.ResourceMemory: resource.MustParse("5Gi"), + }) + + // worker pod: + // cpu: 2 + // memory: 10Gi + // nvidia.com/gpu: 1 + addWorkerPodSpec(rayClusterWithGangScheduling, + "worker-group-1", 1, 1, 2, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("10Gi"), + "nvidia.com/gpu": resource.MustParse("1"), + }) + + // gang-scheduling enabled case, the plugin should populate the taskGroup annotation to the app + rayPod := createPod("ray-pod", "default") + yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) + + kk, err := GetTaskGroupsFromAnnotation(rayPod) + assert.NoError(t, err) + assert.Equal(t, len(kk), 2) + // verify the annotation value + taskGroupsSpec := rayPod.Annotations[YuniKornTaskGroupsAnnotationName] + assert.Equal(t, true, len(taskGroupsSpec) > 0) + taskGroups := newTaskGroups() + err = taskGroups.unmarshalFrom(taskGroupsSpec) + assert.NoError(t, err) + assert.Equal(t, len(taskGroups.Groups), 2) + + // verify the correctness of head group + headGroup := taskGroups.getTaskGroup(utils.RayNodeHeadGroupLabelValue) + assert.NotNil(t, headGroup) + assert.Equal(t, int32(1), headGroup.MinMember) + assert.Equal(t, resource.MustParse("5"), headGroup.MinResource[v1.ResourceCPU.String()]) + assert.Equal(t, resource.MustParse("5Gi"), headGroup.MinResource[v1.ResourceMemory.String()]) + + // verify the correctness of worker group + workerGroup := taskGroups.getTaskGroup("worker-group-1") + assert.NotNil(t, workerGroup) + assert.Equal(t, int32(1), workerGroup.MinMember) + assert.Equal(t, resource.MustParse("2"), workerGroup.MinResource[v1.ResourceCPU.String()]) + assert.Equal(t, resource.MustParse("10Gi"), workerGroup.MinResource[v1.ResourceMemory.String()]) + assert.Equal(t, resource.MustParse("1"), workerGroup.MinResource["nvidia.com/gpu"]) +} + func createRayClusterWithLabels(name string, namespace string, labels map[string]string) *rayv1.RayCluster { rayCluster := &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -63,6 +181,65 @@ func createRayClusterWithLabels(name string, namespace string, labels map[string return rayCluster } +func createRayClusterWithAnnotations(name string, namespace string, annotations map[string]string) *rayv1.RayCluster { + rayCluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: annotations, + }, + } + + return rayCluster +} + +func addHeadPodSpec(app *rayv1.RayCluster, resource v1.ResourceList) { + // app.Spec.HeadGroupSpec.Template.Spec.Containers + headContainers := []v1.Container{ + { + Name: "head-pod", + Image: "ray.io/ray-head:latest", + Resources: v1.ResourceRequirements{ + Limits: nil, + Requests: resource, + }, + }, + } + + app.Spec.HeadGroupSpec.Template.Spec.Containers = headContainers +} + +func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string, + replicas int32, minReplicas int32, maxReplicas int32, resources v1.ResourceList, +) { + workerContainers := []v1.Container{ + { + Name: "worker-pod", + Image: "ray.io/ray-head:latest", + Resources: v1.ResourceRequirements{ + Limits: nil, + Requests: resources, + }, + }, + } + + if len(app.Spec.WorkerGroupSpecs) == 0 { + app.Spec.WorkerGroupSpecs = make([]rayv1.WorkerGroupSpec, 0) + } + + app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + GroupName: workerGroupName, + Replicas: &replicas, + MinReplicas: &minReplicas, + MaxReplicas: &maxReplicas, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: workerContainers, + }, + }, + }) +} + func createPod(name string, namespace string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -90,3 +267,53 @@ func podLabelsContains(pod *v1.Pod, key string, value string) bool { return false } + +func podAnnotationContains(pod *v1.Pod, key string, value string) bool { + if pod == nil { + return false + } + + if len(pod.Annotations) > 0 { + annotationValue, exist := pod.Annotations[key] + if exist { + if annotationValue == value { + return true + } + } + } + + return false +} + +func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]TaskGroup, error) { + taskGroupInfo, exist := pod.Annotations[YuniKornTaskGroupsAnnotationName] + if !exist { + return nil, fmt.Errorf("not found") + } + + taskGroups := []TaskGroup{} + err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups) + if err != nil { + return nil, err + } + // json.Unmarshal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format. + for _, taskGroup := range taskGroups { + if taskGroup.Name == "" { + return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s", + taskGroupInfo) + } + if taskGroup.MinResource == nil { + return nil, fmt.Errorf("can't get taskGroup MinResource from pod annotation, %s", + taskGroupInfo) + } + if taskGroup.MinMember == int32(0) { + return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s", + taskGroupInfo) + } + if taskGroup.MinMember < int32(0) { + return nil, fmt.Errorf("minMember cannot be negative, %s", + taskGroupInfo) + } + } + return taskGroups, nil +} diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go new file mode 100644 index 0000000000..8948774e8f --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go @@ -0,0 +1,97 @@ +package yunikorn + +import ( + "encoding/json" + + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +// TaskGroups is a list of task Groups recognized as gang Groups +type TaskGroups struct { + Groups []TaskGroup `json:"groups"` +} + +// TaskGroup is the struct for yunikorn to consider a pod belongs to a gang group +type TaskGroup struct { + Name string `json:"name"` + MinMember int32 `json:"minMember"` + MinResource map[string]resource.Quantity `json:"minResource"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + Affinity *corev1.Affinity `json:"affinity,omitempty"` + TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` +} + +func newTaskGroups() *TaskGroups { + return &TaskGroups{ + Groups: make([]TaskGroup, 0), + } +} + +func newTaskGroupsFromApp(app *v1.RayCluster) *TaskGroups { + taskGroups := newTaskGroups() + + // head group + headGroupSpec := app.Spec.HeadGroupSpec + headPodMinResource := utils.CalculatePodResource(headGroupSpec.Template.Spec) + taskGroups.addTaskGroup( + TaskGroup{ + Name: utils.RayNodeHeadGroupLabelValue, + MinMember: 1, + MinResource: utils.ConvertResourceListToMapString(headPodMinResource), + NodeSelector: headGroupSpec.Template.Spec.NodeSelector, + Tolerations: headGroupSpec.Template.Spec.Tolerations, + Affinity: headGroupSpec.Template.Spec.Affinity, + }) + + // worker Groups + for _, workerGroupSpec := range app.Spec.WorkerGroupSpecs { + workerMinResource := utils.CalculatePodResource(workerGroupSpec.Template.Spec) + minWorkers := workerGroupSpec.MinReplicas + taskGroups.addTaskGroup( + TaskGroup{ + Name: workerGroupSpec.GroupName, + MinMember: *minWorkers, + MinResource: utils.ConvertResourceListToMapString(workerMinResource), + NodeSelector: workerGroupSpec.Template.Spec.NodeSelector, + Tolerations: workerGroupSpec.Template.Spec.Tolerations, + Affinity: workerGroupSpec.Template.Spec.Affinity, + }) + } + + return taskGroups +} + +func (t *TaskGroups) size() int { + return len(t.Groups) +} + +func (t *TaskGroups) addTaskGroup(taskGroup TaskGroup) { + t.Groups = append(t.Groups, taskGroup) +} + +func (t *TaskGroups) marshal() (string, error) { + result, err := json.Marshal(t.Groups) + if err != nil { + return "", err + } + return string(result), nil +} + +func (t *TaskGroups) unmarshalFrom(spec string) error { + return json.Unmarshal([]byte(spec), &t.Groups) +} + +func (t *TaskGroups) getTaskGroup(name string) TaskGroup { + for _, group := range t.Groups { + if group.Name == name { + return group + } + } + return TaskGroup{} +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 4375bea338..792c1c1eec 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -12,6 +12,8 @@ import ( "strings" "unicode" + "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/manager" batchv1 "k8s.io/api/batch/v1" @@ -373,10 +375,10 @@ func CalculateAvailableReplicas(pods corev1.PodList) int32 { func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList { desiredResourcesList := []corev1.ResourceList{{}} - headPodResource := calculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec) + headPodResource := CalculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec) desiredResourcesList = append(desiredResourcesList, headPodResource) for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs { - podResource := calculatePodResource(nodeGroup.Template.Spec) + podResource := CalculatePodResource(nodeGroup.Template.Spec) for i := int32(0); i < *nodeGroup.Replicas; i++ { desiredResourcesList = append(desiredResourcesList, podResource) } @@ -386,10 +388,10 @@ func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList { func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { minResourcesList := []corev1.ResourceList{{}} - headPodResource := calculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec) + headPodResource := CalculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec) minResourcesList = append(minResourcesList, headPodResource) for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs { - podResource := calculatePodResource(nodeGroup.Template.Spec) + podResource := CalculatePodResource(nodeGroup.Template.Spec) for i := int32(0); i < *nodeGroup.MinReplicas; i++ { minResourcesList = append(minResourcesList, podResource) } @@ -397,9 +399,9 @@ func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { return sumResourceList(minResourcesList) } -// calculatePodResource returns the total resources of a pod. +// CalculatePodResource returns the total resources of a pod. // Request values take precedence over limit values. -func calculatePodResource(podSpec corev1.PodSpec) corev1.ResourceList { +func CalculatePodResource(podSpec corev1.PodSpec) corev1.ResourceList { podResource := corev1.ResourceList{} for _, container := range podSpec.Containers { containerResource := container.Resources.Requests @@ -423,6 +425,14 @@ func calculatePodResource(podSpec corev1.PodSpec) corev1.ResourceList { return podResource } +func ConvertResourceListToMapString(resourceList corev1.ResourceList) map[string]resource.Quantity { + result := make(map[string]resource.Quantity) + for key, value := range resourceList { + result[string(key)] = value + } + return result +} + func sumResourceList(list []corev1.ResourceList) corev1.ResourceList { totalResource := corev1.ResourceList{} for _, l := range list { From d72d73a91f7fea0ff0fd586ec8209f5b3837dcd6 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 21 Sep 2024 13:02:58 -0700 Subject: [PATCH 02/14] Support gang scheduling with Apache YuniKorn --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler.go | 2 ++ .../ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 0746ee2a50..72c203aebc 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -40,6 +40,8 @@ func (y *YuniKornScheduler) Name() string { } func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error { + // yunikorn doesn't require any resources to be created upfront + // this is a no-opt for this implementation return nil } diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index cc1f003a72..febc1b640c 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -3,11 +3,12 @@ package yunikorn import ( "encoding/json" "fmt" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "testing" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" From 708b86fe3e639c91693f2462d400927dbe91b39c Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 21 Sep 2024 13:10:59 -0700 Subject: [PATCH 03/14] Support gang scheduling with Apache YuniKorn --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler.go | 2 +- .../batchscheduler/yunikorn/yunikorn_scheduler_test.go | 9 +++++---- .../ray/batchscheduler/yunikorn/yunikorn_task_groups.go | 7 +++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 72c203aebc..f4968b883d 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -39,7 +39,7 @@ func (y *YuniKornScheduler) Name() string { return GetPluginName() } -func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error { +func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { // yunikorn doesn't require any resources to be created upfront // this is a no-opt for this implementation return nil diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index febc1b640c..585de400cc 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -3,9 +3,10 @@ package yunikorn import ( "encoding/json" "fmt" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "testing" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -45,7 +46,7 @@ func TestPopulatePodLabels(t *testing.T) { rayCluster2 := createRayClusterWithLabels( "ray-cluster-without-labels", - "test", + "test1", nil, // empty labels ) rayPod3 := createPod("my-pod-2", "test") @@ -92,7 +93,7 @@ func TestIsGangSchedulingEnabled(t *testing.T) { queue1 := "root.default" rayCluster := createRayClusterWithLabels( "ray-cluster-with-gang-scheduling", - "test", + "test2", map[string]string{ RayClusterApplicationIDLabelName: job1, RayClusterQueueLabelName: queue1, @@ -112,7 +113,7 @@ func TestPopulateGangSchedulingAnnotations(t *testing.T) { // test the case when gang-scheduling is enabled rayClusterWithGangScheduling := createRayClusterWithLabels( "ray-cluster-with-gang-scheduling", - "test", + "test3", map[string]string{ RayClusterApplicationIDLabelName: job1, RayClusterQueueLabelName: queue1, diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go index 8948774e8f..b56e7d988d 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go @@ -3,7 +3,6 @@ package yunikorn import ( "encoding/json" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -18,13 +17,13 @@ type TaskGroups struct { // TaskGroup is the struct for yunikorn to consider a pod belongs to a gang group type TaskGroup struct { - Name string `json:"name"` - MinMember int32 `json:"minMember"` MinResource map[string]resource.Quantity `json:"minResource"` NodeSelector map[string]string `json:"nodeSelector,omitempty"` Tolerations []corev1.Toleration `json:"tolerations,omitempty"` - Affinity *corev1.Affinity `json:"affinity,omitempty"` TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` + Affinity *corev1.Affinity `json:"affinity,omitempty"` + Name string `json:"name"` + MinMember int32 `json:"minMember"` } func newTaskGroups() *TaskGroups { From 1ceada851712672c1296e298f4d46f5e6b94791a Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 21 Sep 2024 13:18:50 -0700 Subject: [PATCH 04/14] Support gang scheduling with Apache YuniKorn --- .../ray/batchscheduler/yunikorn/yunikorn_task_groups.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go index b56e7d988d..f7a8c85b09 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go @@ -17,12 +17,12 @@ type TaskGroups struct { // TaskGroup is the struct for yunikorn to consider a pod belongs to a gang group type TaskGroup struct { + Name string `json:"name"` MinResource map[string]resource.Quantity `json:"minResource"` NodeSelector map[string]string `json:"nodeSelector,omitempty"` Tolerations []corev1.Toleration `json:"tolerations,omitempty"` TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` Affinity *corev1.Affinity `json:"affinity,omitempty"` - Name string `json:"name"` MinMember int32 `json:"minMember"` } From 2e962f9dcc33555439941e17faab5f872cb91ee6 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 21 Sep 2024 13:45:31 -0700 Subject: [PATCH 05/14] Support gang scheduling with Apache YuniKorn --- .../ray/batchscheduler/yunikorn/yunikorn_task_groups.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go index f7a8c85b09..12f60b7fec 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go @@ -17,12 +17,12 @@ type TaskGroups struct { // TaskGroup is the struct for yunikorn to consider a pod belongs to a gang group type TaskGroup struct { - Name string `json:"name"` MinResource map[string]resource.Quantity `json:"minResource"` NodeSelector map[string]string `json:"nodeSelector,omitempty"` - Tolerations []corev1.Toleration `json:"tolerations,omitempty"` - TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` Affinity *corev1.Affinity `json:"affinity,omitempty"` + Name string `json:"name"` + TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` MinMember int32 `json:"minMember"` } From 4ee240595ad046730112f2ab0716ec7bb74f0ee1 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 23 Sep 2024 13:25:07 -0700 Subject: [PATCH 06/14] address review comments --- .../yunikorn/yunikorn_scheduler.go | 25 +++++++------- .../yunikorn/yunikorn_scheduler_test.go | 33 ++----------------- 2 files changed, 13 insertions(+), 45 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index f4968b883d..bbbbf8e26d 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -2,6 +2,7 @@ package yunikorn import ( "context" + "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -55,16 +56,6 @@ func (y *YuniKornScheduler) populatePodLabels(app *rayv1.RayCluster, pod *corev1 } } -// populatePodAnnotations is a helper function that copies RayCluster's annotation to the given pod based on the annotation key -func (y *YuniKornScheduler) populatePodAnnotations(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) { - // check labels - if value, exist := app.Annotations[sourceKey]; exist { - y.log.Info("Updating pod annotations based on RayCluster annotations", - "sourceKey", sourceKey, "targetKey", targetKey, "value", value) - pod.Annotations[targetKey] = value - } -} - // AddMetadataToPod adds essential labels and annotations to the Ray pods // the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { @@ -76,7 +67,13 @@ func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName st // when gang scheduling is enabled, extra annotations need to be added to all pods if y.isGangSchedulingEnabled(app) { // populate the taskGroups info to each pod - y.populateTaskGroupsAnnotationToPod(app, pod) + err := y.populateTaskGroupsAnnotationToPod(app, pod) + if err != nil { + y.log.Error(err, "failed to add gang scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "rayCluster", app.Name, "name", pod.Name, "namespace", pod.Namespace) + return + } // set the task group name based on the head or worker group name // the group name for the head and each of the worker group should be different pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName @@ -92,15 +89,14 @@ func (y *YuniKornScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool return false } -func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) { +func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) error { y.log.Info("Gang Scheduling enabled for RayCluster", "RayCluster", app.Name, "Namespace", app.Namespace) taskGroups := newTaskGroupsFromApp(app) taskGroupsAnnotationValue, err := taskGroups.marshal() if err != nil { - y.log.Error(err, "failed to marshal task groups info") - return + return fmt.Errorf("failed to marshal task groups info, error: %v", err) } y.log.Info("add task groups info to pod's annotation", @@ -111,6 +107,7 @@ func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayClus pod.Annotations = make(map[string]string) } pod.Annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue + return nil } func (yf *YuniKornSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index 585de400cc..b42a0bc0f8 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -56,36 +56,6 @@ func TestPopulatePodLabels(t *testing.T) { assert.Equal(t, podLabelsContains(rayPod3, YuniKornPodQueueLabelName, queue2), false) } -func TestPopulatePodAnnotations(t *testing.T) { - yk := &YuniKornScheduler{} - - // --- case 1 - // Ray Cluster CR has task groups annotations defined - rayCluster1 := createRayClusterWithAnnotations( - "ray-cluster-with-annotations", - "test", - map[string]string{ - YuniKornTaskGroupsAnnotationName: "some-task-group", - }, - ) - - rayPod := createPod("my-pod-1", "test") - yk.populatePodAnnotations(rayCluster1, rayPod, YuniKornTaskGroupsAnnotationName, YuniKornTaskGroupsAnnotationName) - assert.Equal(t, podAnnotationContains(rayPod, YuniKornTaskGroupsAnnotationName, "some-task-group"), true) - - // --- case 2 - // Ray Cluster CR doesn't have the task groups annotation defined - // In this case, the pod will not be populated with the required annotations - rayCluster2 := createRayClusterWithAnnotations( - "ray-cluster-without-labels", - "test", - nil, // empty annotations - ) - rayPod = createPod("my-pod-2", "test") - yk.populatePodAnnotations(rayCluster2, rayPod, YuniKornTaskGroupsAnnotationName, YuniKornTaskGroupsAnnotationName) - assert.Equal(t, len(rayPod.Annotations), 0) -} - func TestIsGangSchedulingEnabled(t *testing.T) { yk := &YuniKornScheduler{} @@ -142,7 +112,8 @@ func TestPopulateGangSchedulingAnnotations(t *testing.T) { // gang-scheduling enabled case, the plugin should populate the taskGroup annotation to the app rayPod := createPod("ray-pod", "default") - yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) + err := yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) + assert.NoError(t, err, "failed to populate task groups annotation to pod") kk, err := GetTaskGroupsFromAnnotation(rayPod) assert.NoError(t, err) From f9b68db425faf5c2621ef0434393d012ce045d01 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 23 Sep 2024 13:33:10 -0700 Subject: [PATCH 07/14] address review comments --- .../yunikorn/yunikorn_scheduler.go | 2 +- .../yunikorn/yunikorn_scheduler_test.go | 29 ------------------- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index bbbbf8e26d..f5f61a73ac 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -96,7 +96,7 @@ func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayClus taskGroups := newTaskGroupsFromApp(app) taskGroupsAnnotationValue, err := taskGroups.marshal() if err != nil { - return fmt.Errorf("failed to marshal task groups info, error: %v", err) + return fmt.Errorf("failed to marshal task groups info, error: %w", err) } y.log.Info("add task groups info to pod's annotation", diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index b42a0bc0f8..f8e30f594b 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -154,18 +154,6 @@ func createRayClusterWithLabels(name string, namespace string, labels map[string return rayCluster } -func createRayClusterWithAnnotations(name string, namespace string, annotations map[string]string) *rayv1.RayCluster { - rayCluster := &rayv1.RayCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Annotations: annotations, - }, - } - - return rayCluster -} - func addHeadPodSpec(app *rayv1.RayCluster, resource v1.ResourceList) { // app.Spec.HeadGroupSpec.Template.Spec.Containers headContainers := []v1.Container{ @@ -241,23 +229,6 @@ func podLabelsContains(pod *v1.Pod, key string, value string) bool { return false } -func podAnnotationContains(pod *v1.Pod, key string, value string) bool { - if pod == nil { - return false - } - - if len(pod.Annotations) > 0 { - annotationValue, exist := pod.Annotations[key] - if exist { - if annotationValue == value { - return true - } - } - } - - return false -} - func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]TaskGroup, error) { taskGroupInfo, exist := pod.Annotations[YuniKornTaskGroupsAnnotationName] if !exist { From 08cc7b39a1435d1d33d30608670cedc5e6fd56ba Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 23 Sep 2024 21:32:31 -0700 Subject: [PATCH 08/14] address review comments --- .../yunikorn/yunikorn_scheduler.go | 8 ++--- .../yunikorn/yunikorn_scheduler_test.go | 29 +++++++++++++++++-- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index f5f61a73ac..3bc233d813 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -81,12 +81,8 @@ func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName st } func (y *YuniKornScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { - if value, exist := app.Labels[RayClusterGangSchedulingLabelName]; exist { - if value == "true" { - return true - } - } - return false + _, exist := app.Labels[RayClusterGangSchedulingLabelName] + return exist } func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) error { diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index f8e30f594b..f6344a86f2 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -61,9 +61,9 @@ func TestIsGangSchedulingEnabled(t *testing.T) { job1 := "job-1-01234" queue1 := "root.default" - rayCluster := createRayClusterWithLabels( + rayCluster1 := createRayClusterWithLabels( "ray-cluster-with-gang-scheduling", - "test2", + "test1", map[string]string{ RayClusterApplicationIDLabelName: job1, RayClusterQueueLabelName: queue1, @@ -71,7 +71,30 @@ func TestIsGangSchedulingEnabled(t *testing.T) { }, ) - assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster), true) + assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster1), true) + + rayCluster2 := createRayClusterWithLabels( + "ray-cluster-with-gang-scheduling", + "test2", + map[string]string{ + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + RayClusterGangSchedulingLabelName: "", + }, + ) + + assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster2), true) + + rayCluster3 := createRayClusterWithLabels( + "ray-cluster-with-gang-scheduling", + "test3", + map[string]string{ + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + }, + ) + + assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster3), false) } func TestPopulateGangSchedulingAnnotations(t *testing.T) { From 6a064174434fa0903019922bf367cf3ab1307a2f Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 24 Sep 2024 09:37:53 -0700 Subject: [PATCH 09/14] address review comments --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index f6344a86f2..5cca81f4b8 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -207,10 +207,6 @@ func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string, }, } - if len(app.Spec.WorkerGroupSpecs) == 0 { - app.Spec.WorkerGroupSpecs = make([]rayv1.WorkerGroupSpec, 0) - } - app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ GroupName: workerGroupName, Replicas: &replicas, From 6a3d04436e11f733adb53dd8da01bf65cf25c8c1 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 26 Sep 2024 13:55:01 -0700 Subject: [PATCH 10/14] address review comments --- .../yunikorn/yunikorn_scheduler.go | 28 ++++++++----------- .../yunikorn/yunikorn_scheduler_test.go | 21 +++++++------- .../controllers/ray/utils/constant.go | 5 ++-- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 3bc233d813..eab6a578b8 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -2,7 +2,6 @@ package yunikorn import ( "context" - "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -13,6 +12,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) const ( @@ -21,7 +21,6 @@ const ( YuniKornPodQueueLabelName string = "queue" RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id" RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name" - RayClusterGangSchedulingLabelName string = "yunikorn.apache.org/gang-scheduling" YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name" YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups" ) @@ -67,13 +66,8 @@ func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName st // when gang scheduling is enabled, extra annotations need to be added to all pods if y.isGangSchedulingEnabled(app) { // populate the taskGroups info to each pod - err := y.populateTaskGroupsAnnotationToPod(app, pod) - if err != nil { - y.log.Error(err, "failed to add gang scheduling related annotations to pod, "+ - "gang scheduling will not be enabled for this workload", - "rayCluster", app.Name, "name", pod.Name, "namespace", pod.Namespace) - return - } + y.populateTaskGroupsAnnotationToPod(app, pod) + // set the task group name based on the head or worker group name // the group name for the head and each of the worker group should be different pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName @@ -81,18 +75,18 @@ func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName st } func (y *YuniKornScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { - _, exist := app.Labels[RayClusterGangSchedulingLabelName] + _, exist := app.Labels[utils.RayClusterGangSchedulingEnabled] return exist } -func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) error { - y.log.Info("Gang Scheduling enabled for RayCluster", - "RayCluster", app.Name, "Namespace", app.Namespace) - +func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) { taskGroups := newTaskGroupsFromApp(app) taskGroupsAnnotationValue, err := taskGroups.marshal() if err != nil { - return fmt.Errorf("failed to marshal task groups info, error: %w", err) + y.log.Error(err, "failed to add gang scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "rayCluster", app.Name, "name", pod.Name, "namespace", pod.Namespace) + return } y.log.Info("add task groups info to pod's annotation", @@ -103,7 +97,9 @@ func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayClus pod.Annotations = make(map[string]string) } pod.Annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue - return nil + + y.log.Info("Gang Scheduling enabled for RayCluster", + "RayCluster", app.Name, "Namespace", app.Namespace) } func (yf *YuniKornSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index 5cca81f4b8..edd9992d0d 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -65,9 +65,9 @@ func TestIsGangSchedulingEnabled(t *testing.T) { "ray-cluster-with-gang-scheduling", "test1", map[string]string{ - RayClusterApplicationIDLabelName: job1, - RayClusterQueueLabelName: queue1, - RayClusterGangSchedulingLabelName: "true", + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + utils.RayClusterGangSchedulingEnabled: "true", }, ) @@ -77,9 +77,9 @@ func TestIsGangSchedulingEnabled(t *testing.T) { "ray-cluster-with-gang-scheduling", "test2", map[string]string{ - RayClusterApplicationIDLabelName: job1, - RayClusterQueueLabelName: queue1, - RayClusterGangSchedulingLabelName: "", + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + utils.RayClusterGangSchedulingEnabled: "", }, ) @@ -108,9 +108,9 @@ func TestPopulateGangSchedulingAnnotations(t *testing.T) { "ray-cluster-with-gang-scheduling", "test3", map[string]string{ - RayClusterApplicationIDLabelName: job1, - RayClusterQueueLabelName: queue1, - RayClusterGangSchedulingLabelName: "true", + RayClusterApplicationIDLabelName: job1, + RayClusterQueueLabelName: queue1, + utils.RayClusterGangSchedulingEnabled: "true", }, ) @@ -135,8 +135,7 @@ func TestPopulateGangSchedulingAnnotations(t *testing.T) { // gang-scheduling enabled case, the plugin should populate the taskGroup annotation to the app rayPod := createPod("ray-pod", "default") - err := yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) - assert.NoError(t, err, "failed to populate task groups annotation to pod") + yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) kk, err := GetTaskGroupsFromAnnotation(rayPod) assert.NoError(t, err) diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 54cd44fc8e..17591fde94 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -33,8 +33,9 @@ const ( // Batch scheduling labels // TODO(tgaddair): consider making these part of the CRD - RaySchedulerName = "ray.io/scheduler-name" - RayPriorityClassName = "ray.io/priority-class-name" + RaySchedulerName = "ray.io/scheduler-name" + RayPriorityClassName = "ray.io/priority-class-name" + RayClusterGangSchedulingEnabled = "ray.io/gang-scheduling-enabled" // Ray GCS FT related annotations RayFTEnabledAnnotationKey = "ray.io/ft-enabled" From 10119edca17483fc35cab62eed47df8d87504ed9 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 26 Sep 2024 17:22:08 -0700 Subject: [PATCH 11/14] address review comments --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index eab6a578b8..2b5b6cb0ea 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -19,8 +19,8 @@ const ( SchedulerName string = "yunikorn" YuniKornPodApplicationIDLabelName string = "applicationId" YuniKornPodQueueLabelName string = "queue" - RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id" - RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name" + RayClusterApplicationIDLabelName string = "yunikorn.apache.org/app-id" + RayClusterQueueLabelName string = "yunikorn.apache.org/queue" YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name" YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups" ) From 3ea3b6489c7aef0d7a8db21f289770c5c046681c Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 26 Sep 2024 17:40:58 -0700 Subject: [PATCH 12/14] address review comments --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 2b5b6cb0ea..5963d3916f 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -46,6 +46,8 @@ func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ * } // populatePodLabels is a helper function that copies RayCluster's label to the given pod based on the label key +// TODO: remove the legacy labels, i.e "applicationId" and "queue", directly populate +// RayClusterApplicationIDLabelName and RayClusterQueueLabelName to pod labels func (y *YuniKornScheduler) populatePodLabels(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) { // check labels if value, exist := app.Labels[sourceKey]; exist { From 31ade1c002d11ff5d73bf27e1e0a68636ec2a5b0 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 26 Sep 2024 21:28:03 -0700 Subject: [PATCH 13/14] address review comments --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler.go | 7 +++++-- .../ray/batchscheduler/yunikorn/yunikorn_task_groups.go | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 5963d3916f..4cd0ac9879 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -46,8 +46,11 @@ func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ * } // populatePodLabels is a helper function that copies RayCluster's label to the given pod based on the label key -// TODO: remove the legacy labels, i.e "applicationId" and "queue", directly populate -// RayClusterApplicationIDLabelName and RayClusterQueueLabelName to pod labels +// TODO: remove the legacy labels, i.e "applicationId" and "queue", directly populate labels +// RayClusterApplicationIDLabelName to RayClusterQueueLabelName to pod labels. +// Currently we use this function to translate labels "yunikorn.apache.org/app-id" and "yunikorn.apache.org/queue" +// to legacy labels "applicationId" and "queue", this is for the better compatibilities to support older yunikorn +// versions. func (y *YuniKornScheduler) populatePodLabels(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) { // check labels if value, exist := app.Labels[sourceKey]; exist { diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go index 12f60b7fec..57a5914c53 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go @@ -16,6 +16,7 @@ type TaskGroups struct { } // TaskGroup is the struct for yunikorn to consider a pod belongs to a gang group +// the original schema is defined here: https://github.com/apache/yunikorn-k8shim/blob/master/pkg/cache/amprotocol.go type TaskGroup struct { MinResource map[string]resource.Quantity `json:"minResource"` NodeSelector map[string]string `json:"nodeSelector,omitempty"` @@ -48,7 +49,7 @@ func newTaskGroupsFromApp(app *v1.RayCluster) *TaskGroups { Affinity: headGroupSpec.Template.Spec.Affinity, }) - // worker Groups + // worker groups for _, workerGroupSpec := range app.Spec.WorkerGroupSpecs { workerMinResource := utils.CalculatePodResource(workerGroupSpec.Template.Spec) minWorkers := workerGroupSpec.MinReplicas From d93270376f4337c6cd9e79b7bcfab5ca9b60ca40 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 26 Sep 2024 21:30:57 -0700 Subject: [PATCH 14/14] address review comments --- .../ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go index edd9992d0d..31ba7f95ff 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go @@ -137,7 +137,7 @@ func TestPopulateGangSchedulingAnnotations(t *testing.T) { rayPod := createPod("ray-pod", "default") yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod) - kk, err := GetTaskGroupsFromAnnotation(rayPod) + kk, err := getTaskGroupsFromAnnotation(rayPod) assert.NoError(t, err) assert.Equal(t, len(kk), 2) // verify the annotation value @@ -247,7 +247,7 @@ func podLabelsContains(pod *v1.Pod, key string, value string) bool { return false } -func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]TaskGroup, error) { +func getTaskGroupsFromAnnotation(pod *v1.Pod) ([]TaskGroup, error) { taskGroupInfo, exist := pod.Annotations[YuniKornTaskGroupsAnnotationName] if !exist { return nil, fmt.Errorf("not found")