Skip to content

Commit

Permalink
feat: update RayCluster.status with head info
Browse files Browse the repository at this point in the history
like so. Missing IPs will be excluded.

```
status:
  head:
    podIP: string
    serviceIP: string
```

Motivation for this change is so that users can get the head IP from the
RayCluster CRD directly without knowing the underlying implementation
details of kuberay.

In a later commit we update the API server to parse the status
and return the IPs for each cluster.
  • Loading branch information
davidxia committed Nov 1, 2022
1 parent 310911c commit a93a2f6
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 12 deletions.
8 changes: 8 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11122,6 +11122,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for this
cluster status.
Expand Down
8 changes: 8 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11680,6 +11680,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for
this cluster status.
Expand Down
16 changes: 16 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11736,6 +11736,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down Expand Up @@ -11835,6 +11843,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/apis/ray/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ type RayClusterStatus struct {
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
// Service Endpoints
Endpoints map[string]string `json:"endpoints,omitempty"`
// Head info
Head HeadInfo `json:"head,omitempty"`
}

// HeadInfo gives info about head
type HeadInfo struct {
PodIP string `json:"podIP,omitempty"`
ServiceIP string `json:"serviceIP,omitempty"`
}

// RayNodeType the type of a ray node: head/worker
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/apis/ray/v1alpha1/rayservice_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ var expected = `{
},
"rayClusterStatus":{
"head":{}
}
},
"pendingServiceStatus":{
Expand All @@ -389,7 +389,7 @@ var expected = `{
},
"rayClusterStatus":{
"head":{}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11122,6 +11122,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for this
cluster status.
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11680,6 +11680,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for
this cluster status.
Expand Down
16 changes: 16 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11736,6 +11736,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down Expand Up @@ -11835,6 +11843,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down
19 changes: 12 additions & 7 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

// HeadServiceLabels returns the default labels for a cluster's head service.
func HeadServiceLabels(cluster rayiov1alpha1.RayCluster) map[string]string {
return map[string]string{
RayClusterLabelKey: cluster.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayiov1alpha1.HeadNode)),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}
}

// BuildServiceForHeadPod Builds the service for a pod. Currently, there is only one service that allows
// the worker nodes to connect to the head node.
func BuildServiceForHeadPod(cluster rayiov1alpha1.RayCluster, labels map[string]string) (*corev1.Service, error) {
if labels == nil {
labels = make(map[string]string)
}

default_labels := map[string]string{
RayClusterLabelKey: cluster.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayiov1alpha1.HeadNode)),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}
default_labels := HeadServiceLabels(cluster)

for k, v := range default_labels {
if _, ok := labels[k]; !ok {
Expand Down
55 changes: 55 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(request ctrl.Request, instanc
r.Log.Error(err, "Update status error", "cluster name", request.Name)
}
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -819,6 +820,10 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
return err
}

if err := r.updateHeadInfo(instance); err != nil {
return err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
Expand All @@ -828,6 +833,40 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
return nil
}

func (r *RayClusterReconciler) getHeadPodIP(instance *rayiov1alpha1.RayCluster) (string, error) {
runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode)}
if err := r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return "", err
}
if len(runtimePods.Items) < 1 {
return "", fmt.Errorf("unable to find head pod. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if len(runtimePods.Items) > 1 {
return "", fmt.Errorf("found multiple head pods. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if runtimePods.Items[0].Status.PodIP == "" {
return "", fmt.Errorf("head pod IP is empty. cluster name %s, filter labels %v", instance.Name, filterLabels)
}

return runtimePods.Items[0].Status.PodIP, nil
}

func (r *RayClusterReconciler) getHeadServiceIP(instance *rayiov1alpha1.RayCluster) (string, error) {
runtimeServices := corev1.ServiceList{}
filterLabels := client.MatchingLabels(common.HeadServiceLabels(*instance))
if err := r.List(context.TODO(), &runtimeServices, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return "", err
}
if len(runtimeServices.Items) < 1 {
return "", fmt.Errorf("unable to find head service. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if len(runtimeServices.Items) > 1 {
return "", fmt.Errorf("found multiple head services. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if runtimeServices.Items[0].Spec.ClusterIP == "" {
return "", fmt.Errorf("head service IP is empty. cluster name %s, filter labels %v", instance.Name, filterLabels)
}

return runtimeServices.Items[0].Spec.ClusterIP, nil
}

func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluster) error {
// TODO: (@scarlet25151) There may be several K8s Services for a RayCluster.
// We assume we can find the right one by filtering Services with appropriate label selectors
Expand Down Expand Up @@ -868,6 +907,22 @@ func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluste
return nil
}

func (r *RayClusterReconciler) updateHeadInfo(instance *rayiov1alpha1.RayCluster) error {
if ip, err := r.getHeadPodIP(instance); err != nil {
return err
} else {
instance.Status.Head.PodIP = ip
}

if ip, err := r.getHeadServiceIP(instance); err != nil {
return err
} else {
instance.Status.Head.ServiceIP = ip
}

return nil
}

func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(instance *rayiov1alpha1.RayCluster) error {
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
return nil
Expand Down
Loading

0 comments on commit a93a2f6

Please sign in to comment.