Skip to content

Commit

Permalink
fix controller: use Service's TargetPort
Browse files Browse the repository at this point in the history
instead of NodePort which is usually "0"
for updating RayCluster's `status.endpoints`.

follow up to ray-project#341
  • Loading branch information
davidxia committed Jul 16, 2022
1 parent 781eed1 commit 0e99561
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 11 deletions.
40 changes: 29 additions & 11 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,34 +672,52 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
}
}

if err := r.updateEndpoints(instance); err != nil {
return err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
return err
}

return nil
}

func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluster) error {
// TODO: (@scarlet25151) For now there are be several K8s Services for
// one RayCluster. We filter for head Services with label selectors and pick the first one.
// We may need use the Get method to select by name in the future.
rayHeadSvc := corev1.ServiceList{}
filterLabels = client.MatchingLabels{
filterLabels := client.MatchingLabels{
common.RayClusterLabelKey: instance.Name,
common.RayNodeTypeLabelKey: "head",
}
// TODO: (@scarlet25151) for now there would be several kubernetes serivces related to
// one raycluster, we have used the label to select the headservice and pick the first one.
// we may need use Get method to select by name.
if err := r.List(context.TODO(), &rayHeadSvc, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}

if len(rayHeadSvc.Items) != 0 {
svc := rayHeadSvc.Items[0]
if instance.Status.Endpoints == nil {
instance.Status.Endpoints = map[string]string{}
}
for _, port := range svc.Spec.Ports {
if len(port.Name) == 0 {
r.Log.Info("updateStatus", "service port name is empty", port)
r.Log.Info("updateStatus", "service port's name is empty. Not adding it to RayCluster status.endpoints", port)
continue
}
instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.NodePort)
if port.TargetPort.IntVal != 0 {
instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.TargetPort.IntVal)
} else if port.TargetPort.StrVal != "" {
instance.Status.Endpoints[port.Name] = port.TargetPort.StrVal
} else {
r.Log.Info("updateStatus", "service port's targetPort is empty. Not adding it to RayCluster status.endpoints", port)
}
}
}
timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
return err
} else {
r.Log.Info("updateEndpoints", "unable to find a Service for this RayCluster. Not adding RayCluster status.endpoints", instance.Name, "Service selectors", filterLabels)
}

return nil
Expand Down
47 changes: 47 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -56,6 +57,7 @@ var (
testPods []runtime.Object
testRayCluster *rayiov1alpha1.RayCluster
headSelector labels.Selector
testServices []runtime.Object
workerSelector labels.Selector
workersToDelete []string
)
Expand Down Expand Up @@ -298,6 +300,26 @@ func setupTest(t *testing.T) {
},
}

headService, err := common.BuildServiceForHeadPod(*testRayCluster)
if err != nil {
t.Errorf("failed to build head service: %v", err)
}
// K8s automatically sets TargetPort to the same value as Port. So we mimic that behavior here.
for i, port := range headService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
dashboardService, err := common.BuildDashboardService(*testRayCluster)
if err != nil {
t.Errorf("failed to build dashboard service: %v", err)
}
for i, port := range dashboardService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
testServices = []runtime.Object{
headService,
dashboardService,
}

instanceReqValue := []string{instanceName}
instanceReq, err := labels.NewRequirement(
common.RayClusterLabelKey,
Expand Down Expand Up @@ -730,3 +752,28 @@ func TestReconcile_AutoscalerRoleBinding(t *testing.T) {

assert.Nil(t, err, "Fail to get autoscaler RoleBinding after reconciliation")
}

func TestUpdateEndpoints(t *testing.T) {
setupTest(t)
defer tearDown(t)

fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(testServices...).Build()

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

testRayClusterReconciler.updateEndpoints(testRayCluster)

expected := map[string]string{
"client": "10001",
"dashboard": "8265",
"metrics": "8080",
"redis": "6379",
"serve": "8000",
}
assert.Equal(t, expected, testRayCluster.Status.Endpoints, "RayCluster status endpoints not updated")
}

0 comments on commit 0e99561

Please sign in to comment.