Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gang scheduling with Apache YuniKorn #2396

Merged
merged 15 commits into from
Sep 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package yunikorn

import (
"context"
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -15,11 +16,14 @@ import (
)

const (
SchedulerName string = "yunikorn"
YuniKornPodApplicationIDLabelName string = "applicationId"
YuniKornPodQueueLabelName string = "queue"
RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id"
RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name"
SchedulerName string = "yunikorn"
YuniKornPodApplicationIDLabelName string = "applicationId"
YuniKornPodQueueLabelName string = "queue"
RayClusterApplicationIDLabelName string = "yunikorn.apache.org/application-id"
RayClusterQueueLabelName string = "yunikorn.apache.org/queue-name"
RayClusterGangSchedulingLabelName string = "yunikorn.apache.org/gang-scheduling"
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name"
YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups"
)

type YuniKornScheduler struct {
Expand All @@ -42,19 +46,64 @@ func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *
return nil
}

// populatePodLabels is a helper function that copies RayCluster's label to the given pod based on the label key
func (y *YuniKornScheduler) populatePodLabels(app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) {
// check labels
if value, exist := app.Labels[sourceKey]; exist {
y.log.Info("Updating pod label based on RayCluster annotations",
y.log.Info("Updating pod label based on RayCluster labels",
"sourceKey", sourceKey, "targetKey", targetKey, "value", value)
pod.Labels[targetKey] = value
}
}

func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
// AddMetadataToPod adds essential labels and annotations to the Ray pods
// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly
func (y *YuniKornScheduler) AddMetadataToPod(app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
// the applicationID and queue name must be provided in the labels
y.populatePodLabels(app, pod, RayClusterApplicationIDLabelName, YuniKornPodApplicationIDLabelName)
y.populatePodLabels(app, pod, RayClusterQueueLabelName, YuniKornPodQueueLabelName)
Comment on lines 67 to 68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why the label names need to be translated from RayClusterApplicationIDLabelName to YuniKornPodApplicationIDLabelName and from RayClusterQueueLabelName to YuniKornPodQueueLabelName.

Does Yunikorn read the labels on resources other than Pods? If not, why not set YuniKornPodApplicationIDLabelName to yunikorn.apache.org/app-id and YuniKornPodQueueLabelName to yunikorn.apache.org/queue and set them directly on RayCluster, and then apply the same labels to all the Pods that controlled by RayCluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caveat is, yunikorn depends on labels to identify apps, but they are currently using very simple format:

  • YuniKornPodApplicationIDLabelName: applicationId
  • YuniKornPodQueueLabelName: queue

but in Ray, I think it makes sense to add the domain name to the label, to indicate these lables are set for the scheduler.

  • yunikorn.apache.org/application-id
  • yunikorn.apache.org/queue-name

the code logic is very simple, just translate these labels and set them on pods, without introducing any inconsistency in Ray cluster spec. Hope that makes sense.

Copy link
Member

@MortalHappiness MortalHappiness Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I viewed the doc here

https://yunikorn.apache.org/docs/next/user_guide/labels_and_annotations_in_yunikorn

there are two labels yunikorn.apache.org/app-id and yunikorn.apache.org/queue. And applicationId and queue labels are legacy now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch. Seems like the community is moving to a domain scoped label approach recently. Let me take a look, and do some tests, will update the PR once confims these labels works. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @MortalHappiness I just found these labels are only supported in v1.6 which was released just a few weeks ago. For better compatibility with some older version of yunikorn deployment, I think it's better to continue to support the legacy label names. However, I have modified RayClusterApplicationIDLabelName and RayClusterQueueLabelName in the package to be the same as yunikorn upstream. We can remove this translation layer and directly populate the labels to pods after a few releases. Does this sound good?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add a TODO comment here to let us know we need to remove this in the future release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yangwwei Would you mind briefly adding comments for the context you mentioned in https://github.com/ray-project/kuberay/pull/2396/files#r1777710831?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

pod.Spec.SchedulerName = y.Name()

// when gang scheduling is enabled, extra annotations need to be added to all pods
if y.isGangSchedulingEnabled(app) {
// populate the taskGroups info to each pod
err := y.populateTaskGroupsAnnotationToPod(app, pod)
if err != nil {
y.log.Error(err, "failed to add gang scheduling related annotations to pod, "+
"gang scheduling will not be enabled for this workload",
"rayCluster", app.Name, "name", pod.Name, "namespace", pod.Namespace)
return
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
}
// set the task group name based on the head or worker group name
// the group name for the head and each of the worker group should be different
pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (y *YuniKornScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[RayClusterGangSchedulingLabelName]
return exist
}

func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(app *rayv1.RayCluster, pod *corev1.Pod) error {
y.log.Info("Gang Scheduling enabled for RayCluster",
"RayCluster", app.Name, "Namespace", app.Namespace)

taskGroups := newTaskGroupsFromApp(app)
taskGroupsAnnotationValue, err := taskGroups.marshal()
if err != nil {
return fmt.Errorf("failed to marshal task groups info, error: %w", err)
}

y.log.Info("add task groups info to pod's annotation",
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
"key", YuniKornTaskGroupsAnnotationName,
"value", taskGroupsAnnotationValue,
"numOfTaskGroups", taskGroups.size())
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue
return nil
}

func (yf *YuniKornSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package yunikorn

import (
"encoding/json"
"fmt"
"testing"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand Down Expand Up @@ -41,7 +46,7 @@ func TestPopulatePodLabels(t *testing.T) {

rayCluster2 := createRayClusterWithLabels(
"ray-cluster-without-labels",
"test",
"test1",
nil, // empty labels
)
rayPod3 := createPod("my-pod-2", "test")
Expand All @@ -51,6 +56,115 @@ func TestPopulatePodLabels(t *testing.T) {
assert.Equal(t, podLabelsContains(rayPod3, YuniKornPodQueueLabelName, queue2), false)
}

func TestIsGangSchedulingEnabled(t *testing.T) {
yk := &YuniKornScheduler{}

job1 := "job-1-01234"
queue1 := "root.default"
rayCluster1 := createRayClusterWithLabels(
"ray-cluster-with-gang-scheduling",
"test1",
map[string]string{
RayClusterApplicationIDLabelName: job1,
RayClusterQueueLabelName: queue1,
RayClusterGangSchedulingLabelName: "true",
},
)

assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster1), true)

rayCluster2 := createRayClusterWithLabels(
"ray-cluster-with-gang-scheduling",
"test2",
map[string]string{
RayClusterApplicationIDLabelName: job1,
RayClusterQueueLabelName: queue1,
RayClusterGangSchedulingLabelName: "",
},
)

assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster2), true)

rayCluster3 := createRayClusterWithLabels(
"ray-cluster-with-gang-scheduling",
"test3",
map[string]string{
RayClusterApplicationIDLabelName: job1,
RayClusterQueueLabelName: queue1,
},
)

assert.Equal(t, yk.isGangSchedulingEnabled(rayCluster3), false)
}

func TestPopulateGangSchedulingAnnotations(t *testing.T) {
yk := &YuniKornScheduler{}

job1 := "job-1-01234"
queue1 := "root.default"

// test the case when gang-scheduling is enabled
rayClusterWithGangScheduling := createRayClusterWithLabels(
"ray-cluster-with-gang-scheduling",
"test3",
map[string]string{
RayClusterApplicationIDLabelName: job1,
RayClusterQueueLabelName: queue1,
RayClusterGangSchedulingLabelName: "true",
},
)

// head pod:
// cpu: 5
// memory: 5Gi
addHeadPodSpec(rayClusterWithGangScheduling, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
v1.ResourceMemory: resource.MustParse("5Gi"),
})

// worker pod:
// cpu: 2
// memory: 10Gi
// nvidia.com/gpu: 1
addWorkerPodSpec(rayClusterWithGangScheduling,
"worker-group-1", 1, 1, 2, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("10Gi"),
"nvidia.com/gpu": resource.MustParse("1"),
})

// gang-scheduling enabled case, the plugin should populate the taskGroup annotation to the app
rayPod := createPod("ray-pod", "default")
err := yk.populateTaskGroupsAnnotationToPod(rayClusterWithGangScheduling, rayPod)
assert.NoError(t, err, "failed to populate task groups annotation to pod")

kk, err := GetTaskGroupsFromAnnotation(rayPod)
assert.NoError(t, err)
assert.Equal(t, len(kk), 2)
// verify the annotation value
taskGroupsSpec := rayPod.Annotations[YuniKornTaskGroupsAnnotationName]
assert.Equal(t, true, len(taskGroupsSpec) > 0)
taskGroups := newTaskGroups()
err = taskGroups.unmarshalFrom(taskGroupsSpec)
assert.NoError(t, err)
assert.Equal(t, len(taskGroups.Groups), 2)

// verify the correctness of head group
headGroup := taskGroups.getTaskGroup(utils.RayNodeHeadGroupLabelValue)
assert.NotNil(t, headGroup)
assert.Equal(t, int32(1), headGroup.MinMember)
assert.Equal(t, resource.MustParse("5"), headGroup.MinResource[v1.ResourceCPU.String()])
assert.Equal(t, resource.MustParse("5Gi"), headGroup.MinResource[v1.ResourceMemory.String()])

// verify the correctness of worker group
workerGroup := taskGroups.getTaskGroup("worker-group-1")
assert.NotNil(t, workerGroup)
assert.Equal(t, int32(1), workerGroup.MinMember)
assert.Equal(t, resource.MustParse("2"), workerGroup.MinResource[v1.ResourceCPU.String()])
assert.Equal(t, resource.MustParse("10Gi"), workerGroup.MinResource[v1.ResourceMemory.String()])
assert.Equal(t, resource.MustParse("1"), workerGroup.MinResource["nvidia.com/gpu"])
}

func createRayClusterWithLabels(name string, namespace string, labels map[string]string) *rayv1.RayCluster {
rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -63,6 +177,49 @@ func createRayClusterWithLabels(name string, namespace string, labels map[string
return rayCluster
}

func addHeadPodSpec(app *rayv1.RayCluster, resource v1.ResourceList) {
// app.Spec.HeadGroupSpec.Template.Spec.Containers
headContainers := []v1.Container{
{
Name: "head-pod",
Image: "ray.io/ray-head:latest",
Resources: v1.ResourceRequirements{
Limits: nil,
Requests: resource,
},
},
}

app.Spec.HeadGroupSpec.Template.Spec.Containers = headContainers
}

func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string,
replicas int32, minReplicas int32, maxReplicas int32, resources v1.ResourceList,
) {
workerContainers := []v1.Container{
{
Name: "worker-pod",
Image: "ray.io/ray-head:latest",
Resources: v1.ResourceRequirements{
Limits: nil,
Requests: resources,
},
},
}

app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{
GroupName: workerGroupName,
Replicas: &replicas,
MinReplicas: &minReplicas,
MaxReplicas: &maxReplicas,
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: workerContainers,
},
},
})
}

func createPod(name string, namespace string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -90,3 +247,36 @@ func podLabelsContains(pod *v1.Pod, key string, value string) bool {

return false
}

func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]TaskGroup, error) {
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
taskGroupInfo, exist := pod.Annotations[YuniKornTaskGroupsAnnotationName]
if !exist {
return nil, fmt.Errorf("not found")
}

taskGroups := []TaskGroup{}
err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
if err != nil {
return nil, err
}
// json.Unmarshal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
for _, taskGroup := range taskGroups {
if taskGroup.Name == "" {
return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
taskGroupInfo)
}
if taskGroup.MinResource == nil {
return nil, fmt.Errorf("can't get taskGroup MinResource from pod annotation, %s",
taskGroupInfo)
}
if taskGroup.MinMember == int32(0) {
return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s",
taskGroupInfo)
}
if taskGroup.MinMember < int32(0) {
return nil, fmt.Errorf("minMember cannot be negative, %s",
taskGroupInfo)
}
}
return taskGroups, nil
}
Loading
Loading