Skip to content

Commit

Permalink
feat: update RayCluster.status with head info
Browse files Browse the repository at this point in the history
like so. Missing IPs will be excluded.

```
status:
  head:
    podIP: string
    serviceIP: string
```

Motivation for this change is so that users can get the head IP from the
RayCluster CRD directly without knowing the underlying implementation
details of kuberay.

In a later commit we update the API server to parse the status
and return the IPs for each cluster.
  • Loading branch information
davidxia committed Oct 19, 2022
1 parent f4b2823 commit ab1301b
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 10 deletions.
8 changes: 8 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11122,6 +11122,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for this
cluster status.
Expand Down
8 changes: 8 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11680,6 +11680,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for
this cluster status.
Expand Down
16 changes: 16 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11736,6 +11736,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down Expand Up @@ -11835,6 +11843,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/apis/ray/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ type RayClusterStatus struct {
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
// Service Endpoints
Endpoints map[string]string `json:"endpoints,omitempty"`
// Head info
Head HeadInfo `json:"head,omitempty"`
}

// HeadInfo gives info about head
type HeadInfo struct {
PodIP string `json:"podIP,omitempty"`
ServiceIP string `json:"serviceIP,omitempty"`
}

// RayNodeType the type of a ray node: head/worker
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11122,6 +11122,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for this
cluster status.
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11680,6 +11680,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp for
this cluster status.
Expand Down
16 changes: 16 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11736,6 +11736,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down Expand Up @@ -11835,6 +11843,14 @@ spec:
type: string
description: Service Endpoints
type: object
head:
description: Head info
properties:
podIP:
type: string
serviceIP:
type: string
type: object
lastUpdateTime:
description: LastUpdateTime indicates last update timestamp
for this cluster status.
Expand Down
19 changes: 12 additions & 7 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

// HeadServiceLabels returns the default labels for a cluster's head service.
func HeadServiceLabels(cluster rayiov1alpha1.RayCluster) map[string]string {
return map[string]string{
RayClusterLabelKey: cluster.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayiov1alpha1.HeadNode)),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}
}

// BuildServiceForHeadPod Builds the service for a pod. Currently, there is only one service that allows
// the worker nodes to connect to the head node.
func BuildServiceForHeadPod(cluster rayiov1alpha1.RayCluster, labels map[string]string) (*corev1.Service, error) {
if labels == nil {
labels = make(map[string]string)
}

default_labels := map[string]string{
RayClusterLabelKey: cluster.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayiov1alpha1.HeadNode)),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}
default_labels := HeadServiceLabels(cluster)

for k, v := range default_labels {
if _, ok := labels[k]; !ok {
Expand Down
67 changes: 67 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(request ctrl.Request, instanc
r.Log.Error(err, "Update status error", "cluster name", request.Name)
}
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -802,6 +803,10 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
return err
}

if err := r.updateHeadInfo(instance); err != nil {
return err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
Expand All @@ -811,6 +816,36 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
return nil
}

func (r *RayClusterReconciler) getHeadPodIP(instance *rayiov1alpha1.RayCluster) (string, error) {
runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode)}
if err := r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return "", err
}
if len(runtimePods.Items) < 1 {
return "", fmt.Errorf("unable to find head node pod. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if len(runtimePods.Items) > 1 {
return "", fmt.Errorf("found multiple head node pods. cluster name %s, filter labels %v", instance.Name, filterLabels)
}

return runtimePods.Items[0].Status.PodIP, nil
}

func (r *RayClusterReconciler) getHeadServiceIP(instance *rayiov1alpha1.RayCluster) (string, error) {
runtimeServices := corev1.ServiceList{}
filterLabels := client.MatchingLabels(common.HeadServiceLabels(*instance))
if err := r.List(context.TODO(), &runtimeServices, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return "", err
}
if len(runtimeServices.Items) < 1 {
return "", fmt.Errorf("unable to find head service. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if len(runtimeServices.Items) > 1 {
return "", fmt.Errorf("found multiple head services. cluster name %s, filter labels %v", instance.Name, filterLabels)
}

return runtimeServices.Items[0].Spec.ClusterIP, nil
}

func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluster) error {
// TODO: (@scarlet25151) There may be several K8s Services for a RayCluster.
// We assume we can find the right one by filtering Services with appropriate label selectors
Expand Down Expand Up @@ -851,6 +886,38 @@ func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluste
return nil
}

func (r *RayClusterReconciler) updateHeadInfo(instance *rayiov1alpha1.RayCluster) error {
if ip, err := r.getHeadPodIP(instance); err != nil {
r.Log.Info(
"unable to get head pod IP. Not updating RayCluster status.head.podIP",
)
} else {
if ip == "" {
r.Log.Info(
"head pod IP is empty. Not updating RayCluster status.head.podIP",
)
} else {
instance.Status.Head.PodIP = ip
}
}

if ip, err := r.getHeadServiceIP(instance); err != nil {
r.Log.Info(
"unable to get head service IP. Not updating RayCluster status.head.serviceIP",
)
} else {
if ip == "" {
r.Log.Info(
"head service IP is empty. Not updating RayCluster status.head.serviceIP",
)
} else {
instance.Status.Head.ServiceIP = ip
}
}

return nil
}

func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(instance *rayiov1alpha1.RayCluster) error {
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
return nil
Expand Down
71 changes: 68 additions & 3 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
Expand Down Expand Up @@ -57,6 +57,7 @@ var (
testPods []runtime.Object
testRayCluster *rayiov1alpha1.RayCluster
headSelector labels.Selector
headNodeIP string
testServices []runtime.Object
workerSelector labels.Selector
workersToDelete []string
Expand All @@ -75,6 +76,7 @@ func setupTest(t *testing.T) {
groupNameStr = "small-group"
expectReplicaNum = 3
workersToDelete = []string{"pod1", "pod2"}
headNodeIP = "1.2.3.4"
testPods = []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -98,6 +100,7 @@ func setupTest(t *testing.T) {
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: headNodeIP,
},
},
&corev1.Pod{
Expand Down Expand Up @@ -706,7 +709,7 @@ func TestReconcile_AutoscalerServiceAccount(t *testing.T) {
sa := corev1.ServiceAccount{}
err := fakeClient.Get(context.Background(), saNamespacedName, &sa)

assert.True(t, errors.IsNotFound(err), "Head group service account should not exist yet")
assert.True(t, k8serrors.IsNotFound(err), "Head group service account should not exist yet")

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Expand Down Expand Up @@ -736,7 +739,7 @@ func TestReconcile_AutoscalerRoleBinding(t *testing.T) {
rb := rbacv1.RoleBinding{}
err := fakeClient.Get(context.Background(), rbNamespacedName, &rb)

assert.True(t, errors.IsNotFound(err), "autoscaler RoleBinding should not exist yet")
assert.True(t, k8serrors.IsNotFound(err), "autoscaler RoleBinding should not exist yet")

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Expand Down Expand Up @@ -779,3 +782,65 @@ func TestUpdateEndpoints(t *testing.T) {
}
assert.Equal(t, expected, testRayCluster.Status.Endpoints, "RayCluster status endpoints not updated")
}

func TestGetHeadPodIP(t *testing.T) {
setupTest(t)
defer tearDown(t)

extraHeadPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "unexpectedExtraHeadNode",
Namespace: namespaceStr,
Labels: map[string]string{
common.RayClusterLabelKey: instanceName,
common.RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
common.RayNodeGroupLabelKey: headGroupNameStr,
},
},
}

tests := map[string]struct {
pods []runtime.Object
expectedIP string
returnsError bool
}{
"get expected Pod IP if there's one head node": {
pods: testPods,
expectedIP: headNodeIP,
returnsError: false,
},
"get error if there's no head node": {
pods: []runtime.Object{},
expectedIP: "",
returnsError: true,
},
"get error if there's more than one head node": {
pods: append(testPods, extraHeadPod),
expectedIP: "",
returnsError: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(tc.pods...).Build()

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

ip, err := testRayClusterReconciler.getHeadPodIP(testRayCluster)

if tc.returnsError {
assert.NotNil(t, err, "getHeadPodIP should return error")
} else {
assert.Nil(t, err, "getHeadPodIP should not return error")
}

assert.Equal(t, tc.expectedIP, ip, "getHeadPodIP returned unexpected IP")
})
}
}

0 comments on commit ab1301b

Please sign in to comment.