Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TPU Multi-Host Support #1913

Merged
merged 16 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type WorkerGroupSpec struct {
type ScaleStrategy struct {
// WorkersToDelete workers to be deleted
WorkersToDelete []string `json:"workersToDelete,omitempty"`
// MultihostReplicasToDelete multi-host replicas to be deleted
MultihostReplicasToDelete []string `json:"multihostReplicasToDelete,omitempty"`
}

// AutoscalerOptions specifies optional configuration for the Ray autoscaler.
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

179 changes: 150 additions & 29 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"math/rand"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -706,23 +708,60 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return err
}

// Delete unhealthy worker Pods
// Construct a map of multihost group ids to pods.
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
workerMap := make(map[string]corev1.PodList)
for _, workerPod := range workerPods.Items {
replicaKey := workerPod.Labels[utils.MultihostReplicaKey]
if pods, ok := workerMap[replicaKey]; ok {
pods.Items = append(pods.Items, workerPod)
workerMap[replicaKey] = pods
} else {
pods = corev1.PodList{}
pods.Items = append(pods.Items, workerPod)
workerMap[replicaKey] = pods
}
}

// Delete unhealthy worker Pods. Multihost grouped pods are deleted together.
deletedWorkers := make(map[string]struct{})
deleted := struct{}{}
deletedMultihostReplicas := make(map[string]struct{})
numDeletedUnhealthyWorkerPods := 0
for _, workerPod := range workerPods.Items {
shouldDelete, reason := shouldDeletePod(workerPod, rayv1.WorkerNode)
r.Log.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason)
// TODO (kevin85421): We may need to allow users to configure how many `Failed` or `Succeeded` Pods should be kept for debugging purposes.
if shouldDelete {
numDeletedUnhealthyWorkerPods++
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
return err
if worker.NumOfHosts > 1 {
for replicaKey, workerPodList := range workerMap {
// Check deletion reasons for pods in a multihost group together. If one of them needs to be deleted, all others need to be deleted.
shouldDelete, reason := shouldDeleteMultihostPods(workerPodList, rayv1.WorkerNode)
if shouldDelete {
deletedMultihostReplicas[replicaKey] = deleted
for _, workerPod := range workerPodList.Items {
r.Log.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason)
// TODO (kevin85421): We may need to allow users to configure how many `Failed` or `Succeeded` Pods should be kept for debugging purposes.
// If one pod in a multi-host worker fails, all of the pods should be deleted
numDeletedUnhealthyWorkerPods++
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
return err
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
workerPod.Name, workerPod.Status.Phase, workerPod.Spec.RestartPolicy, getRayContainerStateTerminated(workerPod))
}
}
}
} else {
for _, workerPod := range workerPods.Items {
shouldDelete, reason := shouldDeletePod(workerPod, rayv1.WorkerNode)
r.Log.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason)
if shouldDelete {
numDeletedUnhealthyWorkerPods++
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
return err
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
workerPod.Name, workerPod.Status.Phase, workerPod.Spec.RestartPolicy, getRayContainerStateTerminated(workerPod))
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
workerPod.Name, workerPod.Status.Phase, workerPod.Spec.RestartPolicy, getRayContainerStateTerminated(workerPod))
}
}

Expand Down Expand Up @@ -752,13 +791,40 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
worker.ScaleStrategy.WorkersToDelete = []string{}

// Remove multi-host replicas specified by MultihostReplicasToDelete to meet the expectations of the Autoscaler.
r.Log.Info("reconcilePods", "removing worker groups in the scaleStrategy of", worker.GroupName)
for _, replicaToDelete := range worker.ScaleStrategy.MultihostReplicasToDelete {
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
for _, pod := range workerPods.Items {
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
if pod.Labels[utils.MultihostReplicaKey] == replicaToDelete {
r.Log.Info("Deleting pod", "namespace", pod.Namespace, "name", pod.Name)
if err := r.Delete(ctx, &pod); err != nil {
if !errors.IsNotFound(err) {
r.Log.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
return err
}
r.Log.Info("reconcilePods", "The worker Pod has already been deleted", pod.Name)
} else {
deletedWorkers[pod.Name] = deleted
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
}
}
}
worker.ScaleStrategy.MultihostReplicasToDelete = []string{}

runningPods := corev1.PodList{}
for _, pod := range workerPods.Items {
if _, ok := deletedWorkers[pod.Name]; !ok {
runningPods.Items = append(runningPods.Items, pod)
}
}
diff := workerReplicas - int32(len(runningPods.Items))
runningReplicas := int32(len(runningPods.Items))
if worker.NumOfHosts > 1 {
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
// A replica can contain multiple hosts, so we need to calculate this based on the number of hosts per replica.
runningReplicas = runningReplicas / worker.NumOfHosts
}

diff := workerReplicas - runningReplicas
r.Log.Info("reconcilePods", "workerReplicas", workerReplicas, "runningPods", len(runningPods.Items), "diff", diff)

if diff > 0 {
Expand All @@ -768,8 +834,14 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
var i int32
for i = 0; i < diff; i++ {
r.Log.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
return err
// Due to pods being scaled down, we are not guaranteed that the multihost group name will always be
// incremental. So we just need to use some random integer here.
group := rand.Uint32()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above comment, perhaps we should make this id more deterministic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Update) I think it should be fine to leave it as it is, see the reply on first comment.

var j uint32
for j = 0; j < uint32(worker.NumOfHosts); j++ {
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy(), group, j); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we manually delete a Pod in a multi-host PodSlice? It seems the implementation may not be able to handle it.

return err
}
}
}
} else if diff == 0 {
Expand All @@ -795,18 +867,40 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// is not set, we will disable random Pod deletion by default.
if !enableInTreeAutoscaling || enableRandomPodDelete {
// diff < 0 means that we need to delete some Pods to meet the desired number of replicas.
randomlyRemovedWorkers := -diff
r.Log.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedWorkers, "Worker group", worker.GroupName)
for i := 0; i < int(randomlyRemovedWorkers); i++ {
randomPodToDelete := runningPods.Items[i]
r.Log.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
return err
randomlyRemovedReplicas := -diff
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
r.Log.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedReplicas, "Worker group", worker.GroupName)
replicasRemoved := 0
for replicaKey, workerPodList := range workerMap {
if _, ok := deletedMultihostReplicas[replicaKey]; ok {
// Skip this multihost group if it has already been deleted earlier.
continue
}
var podsToRemove int32
if worker.NumOfHosts > 1 {
podsToRemove = randomlyRemovedReplicas * int32(len(workerPodList.Items))
} else {
podsToRemove = randomlyRemovedReplicas
}
podsRemoved := 0
for _, randomPodToDelete := range workerPodList.Items {
r.Log.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", podsRemoved, podsToRemove), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
return err
}
r.Log.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
} else {
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted Pod %s", randomPodToDelete.Name)
podsRemoved = podsRemoved + 1
if int32(podsRemoved) == podsToRemove {
break
}
}
r.Log.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted Pod %s", randomPodToDelete.Name)
replicasRemoved = replicasRemoved + 1
if int32(replicasRemoved) == randomlyRemovedReplicas {
break
}
}
} else {
r.Log.Info(fmt.Sprintf("Random Pod deletion is disabled for cluster %s. The only decision-maker for Pod deletions is Autoscaler.", instance.Name))
Expand Down Expand Up @@ -886,6 +980,28 @@ func shouldDeletePod(pod corev1.Pod, nodeType rayv1.RayNodeType) (bool, string)
return false, reason
}

// shouldDeleteMultihostPods returns whether the Pod in a multihost group should be deleted and the reason.
// Note that if one pod in a multihost group needs to be deleted, then all other pods in the
// same group have to be deleted as well. By default most of these groups have only one pod.
//
// @param podList: The Pods to be checked.
// @param nodeType: The type of the node that the Pod belongs to (head or worker).
//
// @return: shouldDelete (bool), reason (string)
// (1) shouldDelete: Whether the Pods in this group should be deleted.
// (2) reason: The reason why the Pods should or should not be deleted.
func shouldDeleteMultihostPods(podList corev1.PodList, nodeType rayv1.RayNodeType) (bool, string) {
var reason string
for _, pod := range podList.Items {
shouldDelete, reason := shouldDeletePod(pod, nodeType)
if shouldDelete {
return shouldDelete, reason
}
}
// Return false after all pods in the group have been checked.
return false, reason
}

// `ContainerStatuses` does not guarantee the order of the containers. Therefore, we need to find the Ray
// container's status by name. See the following links for more details:
// (1) https://discuss.kubernetes.io/t/pod-spec-containers-and-pod-status-containerstatuses-can-have-a-different-order-why/25273
Expand Down Expand Up @@ -996,9 +1112,9 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
return nil
}

func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec) error {
func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec, multihostReplica uint32, hostIndex uint32) error {
// build the pod then create it
pod := r.buildWorkerPod(ctx, instance, worker)
pod := r.buildWorkerPod(ctx, instance, worker, multihostReplica, hostIndex)
podIdentifier := types.NamespacedName{
Name: pod.Name,
Namespace: pod.Namespace,
Expand Down Expand Up @@ -1061,7 +1177,7 @@ func getCreatorCRDType(instance rayv1.RayCluster) utils.CRDType {
}

// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec) corev1.Pod {
func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec, multihostReplica uint32, hostIndex uint32) corev1.Pod {
podName := strings.ToLower(instance.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol)
podName = utils.CheckName(podName) // making sure the name is valid
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name
Expand All @@ -1075,6 +1191,11 @@ func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv
}
creatorCRDType := getCreatorCRDType(instance)
pod := common.BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, headPort, autoscalingEnabled, creatorCRDType, fqdnRayIP)

// Set multihost pod labels
podTemplateSpec.Labels[utils.MultihostReplicaKey] = strconv.FormatUint(uint64(multihostReplica), 10)
podTemplateSpec.Labels[utils.RayNodeHostIndexKey] = strconv.FormatUint(uint64(hostIndex), 10)

// Set raycluster instance as the owner and controller
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
r.Log.Error(err, "Failed to set controller reference for raycluster pod")
Expand Down
Loading
Loading