Skip to content

Commit

Permalink
feat: set appProtocol based on target service
Browse files Browse the repository at this point in the history
  • Loading branch information
Yap committed Apr 21, 2024
1 parent ff008a1 commit 4b9052f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 42 deletions.
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/common/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func BuildIngressForHeadService(ctx context.Context, cluster rayv1.RayCluster) (

var paths []networkingv1.HTTPIngressPath
pathType := networkingv1.PathTypeExact
servicePorts := getServicePorts(cluster)
serviceMapPortInfo := getServicePorts(cluster)
dashboardPort := int32(utils.DefaultDashboardPort)
if port, ok := servicePorts["dashboard"]; ok {
dashboardPort = port
if portInfo, ok := serviceMapPortInfo["dashboard"]; ok {
dashboardPort = portInfo.PortNumber
}

headSvcName, err := utils.GenerateHeadServiceName(utils.RayClusterCRD, cluster.Spec, cluster.Name)
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/common/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func BuildRouteForHeadService(cluster rayv1.RayCluster) (*routev1.Route, error)
annotation[key] = value
}

servicePorts := getServicePorts(cluster)
serviceMapPortInfo := getServicePorts(cluster)
dashboardPort := utils.DefaultDashboardPort
if port, ok := servicePorts["dashboard"]; ok {
dashboardPort = int(port)
if portInfo, ok := serviceMapPortInfo[utils.DashboardPortName]; ok {
dashboardPort = int(portInfo.PortNumber)
}

weight := int32(100)
Expand Down
73 changes: 51 additions & 22 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ func BuildServiceForHeadPod(ctx context.Context, cluster rayv1.RayCluster, label
default_namespace := cluster.Namespace
default_type := cluster.Spec.HeadGroupSpec.ServiceType

defaultAppProtocol := utils.DefaultServiceAppProtocol
// `ports_int` is a map of port names to port numbers, while `ports` is a list of ServicePort objects
ports_int := getServicePorts(cluster)
// `serviceMapPortInfo` is a map of port names to port infos, while `ports` is a list of ServicePort objects
serviceMapPortInfo := getServicePorts(cluster)
ports := []corev1.ServicePort{}
for name, port := range ports_int {
svcPort := corev1.ServicePort{Name: name, Port: port, AppProtocol: &defaultAppProtocol}
for svc, portInfo := range serviceMapPortInfo {
appProtocol := utils.DefaultServiceAppProtocol
if portInfo.Protocol != "" {
appProtocol = portInfo.Protocol
}

svcPort := corev1.ServicePort{Name: svc, Port: portInfo.PortNumber, AppProtocol: &appProtocol}
ports = append(ports, svcPort)
}
if cluster.Spec.HeadGroupSpec.HeadService != nil {
Expand Down Expand Up @@ -198,12 +202,17 @@ func BuildServeService(ctx context.Context, rayService rayv1.RayService, rayClus
default_type = rayService.Spec.RayClusterSpec.HeadGroupSpec.ServiceType
}

// `ports_int` is a map of port names to port numbers, while `ports` is a list of ServicePort objects
ports_int := getServicePorts(rayCluster)
// `serviceMapPortInfo` is a map of port names to port infos, while `ports` is a list of ServicePort objects
serviceMapPortInfo := getServicePorts(rayCluster)
ports := []corev1.ServicePort{}
for name, port := range ports_int {
if name == utils.ServingPortName {
svcPort := corev1.ServicePort{Name: name, Port: port}
for svc, portInfo := range serviceMapPortInfo {
if svc == utils.ServingPortName {
appProtocol := utils.DefaultServiceAppProtocol
if portInfo.Protocol != "" {
appProtocol = portInfo.Protocol
}

svcPort := corev1.ServicePort{Name: name, Port: portInfo.PortNumber, AppProtocol: &appProtocol}
ports = append(ports, svcPort)
break
}
Expand Down Expand Up @@ -360,7 +369,7 @@ func setLabelsforUserProvidedService(service *corev1.Service, labels map[string]
}

// getServicePorts will either user passing ports or default ports to create service.
func getServicePorts(cluster rayv1.RayCluster) map[string]int32 {
func getServicePorts(cluster rayv1.RayCluster) map[string]portInfo {
ports, err := getPortsFromCluster(cluster)
// Assign default ports
if err != nil || len(ports) == 0 {
Expand All @@ -369,35 +378,55 @@ func getServicePorts(cluster rayv1.RayCluster) map[string]int32 {

// check if metrics port is defined. If not, add default value for it.
if _, metricsDefined := ports[utils.MetricsPortName]; !metricsDefined {
ports[utils.MetricsPortName] = utils.DefaultMetricsPort
ports[utils.MetricsPortName] = portInfo{PortNumber: utils.DefaultMetricsPort, Protocol: utils.HTTPProtocol}
}

return ports
}

type portInfo struct {
PortNumber int32
Protocol string
}

// getPortsFromCluster get the ports from head container and directly map them in service
// It's user's responsibility to maintain rayStartParam ports and container ports mapping
// TODO: Consider to infer ports from rayStartParams (source of truth) in the future.
func getPortsFromCluster(cluster rayv1.RayCluster) (map[string]int32, error) {
svcPorts := map[string]int32{}
func getPortsFromCluster(cluster rayv1.RayCluster) (map[string]portInfo, error) {
svcPorts := make(map[string]portInfo)

cPorts := cluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Ports
for _, port := range cPorts {
if port.Name == "" {
port.Name = fmt.Sprint(port.ContainerPort) + "-port"
}
svcPorts[port.Name] = port.ContainerPort
svcPorts[port.Name] = portInfo{PortNumber: port.ContainerPort}
}

return svcPorts, nil
}

func getDefaultPorts() map[string]int32 {
return map[string]int32{
utils.ClientPortName: utils.DefaultClientPort,
utils.RedisPortName: utils.DefaultRedisPort,
utils.DashboardPortName: utils.DefaultDashboardPort,
utils.MetricsPortName: utils.DefaultMetricsPort,
utils.ServingPortName: utils.DefaultServingPort,
func getDefaultPorts() map[string]portInfo {
return map[string]portInfo{
utils.ClientPortName: {
PortNumber: utils.DefaultClientPort,
Protocol: utils.GRPCProtocol,
},
utils.RedisPortName: {
PortNumber: utils.DefaultRedisPort,
Protocol: utils.GRPCProtocol,
},
utils.DashboardPortName: {
PortNumber: utils.DefaultDashboardPort,
Protocol: utils.HTTPProtocol,
},
utils.MetricsPortName: {
PortNumber: utils.DefaultMetricsPort,
Protocol: utils.HTTPProtocol,
},
utils.ServingPortName: {
PortNumber: utils.DefaultServingPort,
Protocol: utils.HTTPProtocol,
},
}
}
28 changes: 14 additions & 14 deletions ray-operator/controllers/ray/common/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,19 @@ func TestBuildServiceForHeadPodWithAnnotations(t *testing.T) {
}

func TestGetPortsFromCluster(t *testing.T) {
svcPorts, err := getPortsFromCluster(*instanceWithWrongSvc)
serviceMapPortInfo, err := getPortsFromCluster(*instanceWithWrongSvc)
assert.Nil(t, err)

// getPortsFromCluster creates service ports based on the container ports.
// It will assign a generated service port name if the container port name
// is not defined. To compare created service ports with container ports,
// all generated service port names need to be reverted to empty strings.
svcNames := map[int32]string{}
for name, port := range svcPorts {
if name == (fmt.Sprint(port) + "-port") {
name = ""
for svc, portInfo := range serviceMapPortInfo {
if svc == (fmt.Sprint(portInfo.PortNumber) + "-port") {
svc = ""
}
svcNames[port] = name
svcNames[portInfo.PortNumber] = svc
}

cPorts := instanceWithWrongSvc.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Ports
Expand All @@ -222,10 +222,10 @@ func TestGetServicePortsWithMetricsPort(t *testing.T) {

// Test case 1: No ports are specified by the user.
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{}
ports := getServicePorts(*cluster)
serviceMapPortInfo := getServicePorts(*cluster)
// Verify that getServicePorts sets the default metrics port when the user doesn't specify any ports.
if ports[utils.MetricsPortName] != int32(utils.DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(utils.DefaultMetricsPort), ports[utils.MetricsPortName])
if serviceMapPortInfo[utils.MetricsPortName].PortNumber != int32(utils.DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(utils.DefaultMetricsPort), serviceMapPortInfo[utils.MetricsPortName].PortNumber)
}

// Test case 2: Only a random port is specified by the user.
Expand All @@ -235,10 +235,10 @@ func TestGetServicePortsWithMetricsPort(t *testing.T) {
ContainerPort: 1234,
},
}
ports = getServicePorts(*cluster)
serviceMapPortInfo = getServicePorts(*cluster)
// Verify that getServicePorts sets the default metrics port when the user doesn't specify the metrics port but specifies other ports.
if ports[utils.MetricsPortName] != int32(utils.DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(utils.DefaultMetricsPort), ports[utils.MetricsPortName])
if serviceMapPortInfo[utils.MetricsPortName].PortNumber != int32(utils.DefaultMetricsPort) {
t.Fatalf("Expected `%v` but got `%v`", int32(utils.DefaultMetricsPort), serviceMapPortInfo[utils.MetricsPortName].PortNumber)
}

// Test case 3: A custom metrics port is specified by the user.
Expand All @@ -248,10 +248,10 @@ func TestGetServicePortsWithMetricsPort(t *testing.T) {
ContainerPort: customMetricsPort,
}
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[0].Ports, metricsPort)
ports = getServicePorts(*cluster)
serviceMapPortInfo = getServicePorts(*cluster)
// Verify that getServicePorts uses the user's custom metrics port when the user specifies the metrics port.
if ports[utils.MetricsPortName] != customMetricsPort {
t.Fatalf("Expected `%v` but got `%v`", customMetricsPort, ports[utils.MetricsPortName])
if serviceMapPortInfo[utils.MetricsPortName].PortNumber != customMetricsPort {
t.Fatalf("Expected `%v` but got `%v`", customMetricsPort, serviceMapPortInfo[utils.MetricsPortName].PortNumber)
}
}

Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ const (
// The default AppProtocol for Kubernetes service
DefaultServiceAppProtocol = "tcp"

HTTPProtocol = "http"
GRPCProtocol = "grpc"

// The default application name
ApplicationName = "kuberay"

Expand Down

0 comments on commit 4b9052f

Please sign in to comment.