Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Make port name variables consistent and meaningful #1389

Merged
merged 11 commits into from
Sep 16, 2023
12 changes: 6 additions & 6 deletions ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ const (
DefaultDashboardAgentListenPort = 52365
DefaultServingPort = 8000

DefaultClientPortName = "client"
DefaultRedisPortName = "redis"
DefaultDashboardName = "dashboard"
DefaultMetricsName = "metrics"
DefaultDashboardAgentListenPortName = "dashboard-agent"
DefaultServingPortName = "serve"
ClientPortName = "client"
RedisPortName = "redis"
DashboardPortName = "dashboard"
MetricsPortName = "metrics"
DashboardAgentListenPortName = "dashboard-agent"
ServingPortName = "serve"

// The default AppProtocol for Kubernetes service
DefaultServiceAppProtocol = "tcp"
Expand Down
8 changes: 4 additions & 4 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func DefaultHeadPodTemplate(instance rayv1alpha1.RayCluster, headSpec rayv1alpha
}

// If the metrics port does not exist in the Ray container, add a default one for Promethues.
isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[RayContainerIndex], DefaultMetricsName, -1) != -1
isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[RayContainerIndex], MetricsPortName, -1) != -1
if !isMetricsPortExists {
metricsPort := v1.ContainerPort{
Name: DefaultMetricsName,
Name: MetricsPortName,
ContainerPort: int32(DefaultMetricsPort),
}
podTemplate.Spec.Containers[RayContainerIndex].Ports = append(podTemplate.Spec.Containers[RayContainerIndex].Ports, metricsPort)
Expand Down Expand Up @@ -202,10 +202,10 @@ func DefaultWorkerPodTemplate(instance rayv1alpha1.RayCluster, workerSpec rayv1a
initTemplateAnnotations(instance, &podTemplate)

// If the metrics port does not exist in the Ray container, add a default one for Promethues.
isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[RayContainerIndex], DefaultMetricsName, -1) != -1
isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[RayContainerIndex], MetricsPortName, -1) != -1
if !isMetricsPortExists {
metricsPort := v1.ContainerPort{
Name: DefaultMetricsName,
Name: MetricsPortName,
ContainerPort: int32(DefaultMetricsPort),
}
podTemplate.Spec.Containers[RayContainerIndex].Ports = append(podTemplate.Spec.Containers[RayContainerIndex].Ports, metricsPort)
Expand Down
12 changes: 6 additions & 6 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,18 +832,18 @@ func TestDefaultHeadPodTemplateWithConfigurablePorts(t *testing.T) {
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
// DefaultHeadPodTemplate will add the default metrics port if user doesn't specify it.
// Verify the default metrics port exists.
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, DefaultMetricsName, int32(DefaultMetricsPort)); err != nil {
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, MetricsPortName, int32(DefaultMetricsPort)); err != nil {
t.Fatal(err)
}
customMetricsPort := int32(DefaultMetricsPort) + 1
metricsPort := v1.ContainerPort{
Name: DefaultMetricsName,
Name: MetricsPortName,
ContainerPort: customMetricsPort,
}
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{metricsPort}
podTemplateSpec = DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
// Verify the custom metrics port exists.
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, DefaultMetricsName, customMetricsPort); err != nil {
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, MetricsPortName, customMetricsPort); err != nil {
t.Fatal(err)
}
}
Expand All @@ -857,18 +857,18 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) {
podTemplateSpec := DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
// DefaultWorkerPodTemplate will add the default metrics port if user doesn't specify it.
// Verify the default metrics port exists.
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, DefaultMetricsName, int32(DefaultMetricsPort)); err != nil {
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, MetricsPortName, int32(DefaultMetricsPort)); err != nil {
t.Fatal(err)
}
customMetricsPort := int32(DefaultMetricsPort) + 1
metricsPort := v1.ContainerPort{
Name: DefaultMetricsName,
Name: MetricsPortName,
ContainerPort: customMetricsPort,
}
cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Ports = []v1.ContainerPort{metricsPort}
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
// Verify the custom metrics port exists.
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, DefaultMetricsName, customMetricsPort); err != nil {
if err := containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, MetricsPortName, customMetricsPort); err != nil {
t.Fatal(err)
}
}
Expand Down
22 changes: 11 additions & 11 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func BuildServeServiceForRayService(rayService rayv1alpha1.RayService, rayCluste
ports_int := getServicePorts(rayCluster)
ports := []corev1.ServicePort{}
for name, port := range ports_int {
if name == DefaultServingPortName {
if name == ServingPortName {
svcPort := corev1.ServicePort{Name: name, Port: port}
ports = append(ports, svcPort)
break
Expand Down Expand Up @@ -202,7 +202,7 @@ func BuildServeServiceForRayService(rayService rayv1alpha1.RayService, rayCluste
} else {
ports := []corev1.ServicePort{}
for _, port := range serveService.Spec.Ports {
if port.Name == DefaultServingPortName {
if port.Name == ServingPortName {
svcPort := corev1.ServicePort{Name: port.Name, Port: port.Port}
ports = append(ports, svcPort)
break
Expand Down Expand Up @@ -295,17 +295,17 @@ func getServicePorts(cluster rayv1alpha1.RayCluster) map[string]int32 {
}

// Check if agent port is defined. If not, check if enable agent service.
if _, agentDefined := ports[DefaultDashboardAgentListenPortName]; !agentDefined {
if _, agentDefined := ports[DashboardAgentListenPortName]; !agentDefined {
enableAgentServiceValue, exist := cluster.Annotations[EnableAgentServiceKey]
if exist && enableAgentServiceValue == EnableAgentServiceTrue {
// If agent port is not in the config, add default value for it.
ports[DefaultDashboardAgentListenPortName] = DefaultDashboardAgentListenPort
ports[DashboardAgentListenPortName] = DefaultDashboardAgentListenPort
}
}

// check if metrics port is defined. If not, add default value for it.
if _, metricsDefined := ports[DefaultMetricsName]; !metricsDefined {
ports[DefaultMetricsName] = DefaultMetricsPort
if _, metricsDefined := ports[MetricsPortName]; !metricsDefined {
ports[MetricsPortName] = DefaultMetricsPort
}

return ports
Expand All @@ -330,10 +330,10 @@ func getPortsFromCluster(cluster rayv1alpha1.RayCluster) (map[string]int32, erro

func getDefaultPorts() map[string]int32 {
return map[string]int32{
DefaultClientPortName: DefaultClientPort,
DefaultRedisPortName: DefaultRedisPort,
DefaultDashboardName: DefaultDashboardPort,
DefaultMetricsName: DefaultMetricsPort,
DefaultServingPortName: DefaultServingPort,
ClientPortName: DefaultClientPort,
RedisPortName: DefaultRedisPort,
DashboardPortName: DefaultDashboardPort,
MetricsPortName: DefaultMetricsPort,
ServingPortName: DefaultServingPort,
}
}
24 changes: 12 additions & 12 deletions ray-operator/controllers/ray/common/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func TestGetServicePortsWithMetricsPort(t *testing.T) {
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{}
ports := getServicePorts(*cluster)
// Verify that getServicePorts sets the default metrics port when the user doesn't specify any ports.
if ports[DefaultMetricsName] != int32(DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(DefaultMetricsPort), ports[DefaultMetricsName])
if ports[MetricsPortName] != int32(DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(DefaultMetricsPort), ports[MetricsPortName])
}

// Test case 2: Only a random port is specified by the user.
Expand All @@ -210,21 +210,21 @@ func TestGetServicePortsWithMetricsPort(t *testing.T) {
}
ports = getServicePorts(*cluster)
// Verify that getServicePorts sets the default metrics port when the user doesn't specify the metrics port but specifies other ports.
if ports[DefaultMetricsName] != int32(DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(DefaultMetricsPort), ports[DefaultMetricsName])
if ports[MetricsPortName] != int32(DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(DefaultMetricsPort), ports[MetricsPortName])
}

// Test case 3: A custom metrics port is specified by the user.
customMetricsPort := int32(DefaultMetricsPort) + 1
metricsPort := corev1.ContainerPort{
Name: DefaultMetricsName,
Name: MetricsPortName,
ContainerPort: customMetricsPort,
}
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports, metricsPort)
ports = getServicePorts(*cluster)
// Verify that getServicePorts uses the user's custom metrics port when the user specifies the metrics port.
if ports[DefaultMetricsName] != customMetricsPort {
t.Fatalf("Expected `%v` but got `%v`", customMetricsPort, ports[DefaultMetricsName])
if ports[MetricsPortName] != customMetricsPort {
t.Fatalf("Expected `%v` but got `%v`", customMetricsPort, ports[MetricsPortName])
}
}

Expand All @@ -238,7 +238,7 @@ func TestUserSpecifiedHeadService(t *testing.T) {
userLabels := map[string]string{"userLabelKey": "userLabelValue", RayClusterLabelKey: "userClusterName"} // Override default cluster name
userAnnotations := map[string]string{"userAnnotationKey": "userAnnotationValue", headServiceAnnotationKey1: "user_override"}
userPort := corev1.ServicePort{Name: "userPort", Port: 12345}
userPortOverride := corev1.ServicePort{Name: DefaultClientPortName, Port: 98765} // Override default client port (10001)
userPortOverride := corev1.ServicePort{Name: ClientPortName, Port: 98765} // Override default client port (10001)
userPorts := []corev1.ServicePort{userPort, userPortOverride}
userSelector := map[string]string{"userSelectorKey": "userSelectorValue", RayClusterLabelKey: "userSelectorClusterName"}
// Specify a "LoadBalancer" type, which differs from the default "ClusterIP" type.
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestUserSpecifiedHeadService(t *testing.T) {
t.Errorf("User annotation not found or incorrect value: key=%s, expected value=%s, actual value=%s", headServiceAnnotationKey2, headServiceAnnotationValue2, headService.ObjectMeta.Annotations[headServiceAnnotationKey2])
}

// Test merged ports. In the case of overlap (DefaultClientPortName) the user port should be ignored.
// Test merged ports. In the case of overlap (ClientPortName) the user port should be ignored.
// DEBUG: Print out the entire head service to help with debugging.
headServiceJSON, err := json.MarshalIndent(headService, "", " ")
if err != nil {
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestUserSpecifiedServeService(t *testing.T) {
userLabels := map[string]string{"userLabelKey": "userLabelValue", RayClusterLabelKey: "userClusterName"} // Override default cluster name
userAnnotations := map[string]string{"userAnnotationKey": "userAnnotationValue", "userAnnotationKey2": "userAnnotationValue2"}
userPort := corev1.ServicePort{Name: "serve", Port: 12345}
userPortOverride := corev1.ServicePort{Name: DefaultClientPortName, Port: 98765} // Override default client port (10001)
userPortOverride := corev1.ServicePort{Name: ClientPortName, Port: 98765} // Override default client port (10001)
userPorts := []corev1.ServicePort{userPort, userPortOverride}
userSelector := map[string]string{"userSelectorKey": "userSelectorValue", RayClusterLabelKey: "userSelectorClusterName"}
// Specify a "LoadBalancer" type, which differs from the default "ClusterIP" type.
Expand Down Expand Up @@ -508,10 +508,10 @@ func TestUserSpecifiedServeService(t *testing.T) {

// ports should only have DefaultServePort
ports := svc.Spec.Ports
expectedPortName := DefaultServingPortName
expectedPortName := ServingPortName
expectedPortNumber := int32(8000)
for _, port := range ports {
if port.Name != DefaultServingPortName {
if port.Name != ServingPortName {
t.Fatalf("Expected `%v` but got `%v`", expectedPortName, port.Name)
}
if port.Port != expectedPortNumber {
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
clientURL := rayJobInstance.Status.DashboardURL
if clientURL == "" {
// TODO: dashboard service may be changed. Check it instead of using the same URL always
if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DefaultDashboardName); err != nil || clientURL == "" {
if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DashboardPortName); err != nil || clientURL == "" {
if clientURL == "" {
err = fmt.Errorf("empty dashboardURL")
}
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context,
var clientURL string
rayServiceStatus := &rayServiceInstance.Status.ActiveServiceStatus

if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DefaultDashboardAgentListenPortName); err != nil || clientURL == "" {
if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DashboardAgentListenPortName); err != nil || clientURL == "" {
r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)
return err
}
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, false, err
}

if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DefaultDashboardAgentListenPortName); err != nil || clientURL == "" {
if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DashboardAgentListenPortName); err != nil || clientURL == "" {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, false, err
}
rayDashboardClient := utils.GetRayDashboardClientFunc()
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu
httpProxyClient.InitClient()
for _, pod := range allPods.Items {
rayContainer := pod.Spec.Containers[common.RayContainerIndex]
servingPort := utils.FindContainerPort(&rayContainer, common.DefaultServingPortName, common.DefaultServingPort)
servingPort := utils.FindContainerPort(&rayContainer, common.ServingPortName, common.DefaultServingPort)
httpProxyClient.SetHostIp(pod.Status.PodIP, servingPort)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,13 @@ applications:
// Each worker Pod should have a container port with the name "dashboard-agent"
exist := false
for _, port := range pod.Spec.Containers[0].Ports {
if port.Name == common.DefaultDashboardAgentListenPortName {
if port.Name == common.DashboardAgentListenPortName {
exist = true
break
}
}
if !exist {
Fail(fmt.Sprintf("Worker Pod %v should have a container port with the name %v", pod.Name, common.DefaultDashboardAgentListenPortName))
Fail(fmt.Sprintf("Worker Pod %v should have a container port with the name %v", pod.Name, common.DashboardAgentListenPortName))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func TestFetchHeadServiceURL(t *testing.T) {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: common.DefaultDashboardName,
Name: common.DashboardPortName,
Port: dashboardPort,
},
},
Expand All @@ -423,7 +423,7 @@ func TestFetchHeadServiceURL(t *testing.T) {
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}

url, err := utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, &cluster, common.DefaultDashboardName)
url, err := utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, &cluster, common.DashboardPortName)
assert.Nil(t, err, "Fail to fetch head service url")
assert.Equal(t, fmt.Sprintf("test-cluster-head-svc.%s.svc.cluster.local:%d", namespace, dashboardPort), url, "Head service url is not correct")
}
Expand Down
Loading