-
Notifications
You must be signed in to change notification settings - Fork 402
/
service.go
196 lines (169 loc) · 6.64 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package common
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)
// BuildServiceForHeadPod Builds the service for a pod. Currently, there is only one service that allows
// the worker nodes to connect to the head node.
func BuildServiceForHeadPod(cluster rayiov1alpha1.RayCluster, labels map[string]string) (*corev1.Service, error) {
if labels == nil {
labels = make(map[string]string)
}
default_labels := map[string]string{
RayClusterLabelKey: cluster.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayiov1alpha1.HeadNode)),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}
for k, v := range default_labels {
if _, ok := labels[k]; !ok {
labels[k] = v
}
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateServiceName(cluster.Name),
Namespace: cluster.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{},
Type: cluster.Spec.HeadGroupSpec.ServiceType,
},
}
ports := getServicePorts(cluster)
for name, port := range ports {
svcPort := corev1.ServicePort{Name: name, Port: port}
service.Spec.Ports = append(service.Spec.Ports, svcPort)
}
return service, nil
}
// BuildHeadServiceForRayService Builds the service for a pod. Currently, there is only one service that allows
// the worker nodes to connect to the head node.
// RayService controller updates the service whenever a new RayCluster serves the traffic.
func BuildHeadServiceForRayService(rayService rayiov1alpha1.RayService, rayCluster rayiov1alpha1.RayCluster) (*corev1.Service, error) {
service, err := BuildServiceForHeadPod(rayCluster, nil)
if err != nil {
return nil, err
}
service.ObjectMeta.Name = utils.GenerateServiceName(rayService.Name)
service.ObjectMeta.Namespace = rayService.Namespace
service.ObjectMeta.Labels = map[string]string{
RayServiceLabelKey: rayService.Name,
RayNodeTypeLabelKey: string(rayiov1alpha1.HeadNode),
RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(rayService.Name, rayiov1alpha1.HeadNode)),
}
return service, nil
}
// BuildServeServiceForRayService builds the service for head node and worker nodes who have healthy http proxy to serve traffics.
func BuildServeServiceForRayService(rayService rayiov1alpha1.RayService, rayCluster rayiov1alpha1.RayCluster) (*corev1.Service, error) {
labels := map[string]string{
RayServiceLabelKey: rayService.Name,
RayClusterServingServiceLabelKey: utils.GenerateServeServiceLabel(rayService.Name),
}
selectorLabels := map[string]string{
RayClusterLabelKey: rayCluster.Name,
RayClusterServingServiceLabelKey: EnableRayClusterServingServiceTrue,
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateServeServiceName(rayService.Name),
Namespace: rayService.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: selectorLabels,
Ports: []corev1.ServicePort{},
Type: rayService.Spec.RayClusterSpec.HeadGroupSpec.ServiceType,
},
}
ports := getServicePorts(rayCluster)
for name, port := range ports {
if name == DefaultServingPortName {
svcPort := corev1.ServicePort{Name: name, Port: port}
service.Spec.Ports = append(service.Spec.Ports, svcPort)
break
}
}
return service, nil
}
// BuildDashboardService Builds the service for dashboard agent and head node.
func BuildDashboardService(cluster rayiov1alpha1.RayCluster) (*corev1.Service, error) {
labels := map[string]string{
RayClusterDashboardServiceLabelKey: utils.GenerateDashboardAgentLabel(cluster.Name),
}
selectorLabels := map[string]string{
RayClusterDashboardServiceLabelKey: utils.GenerateDashboardAgentLabel(cluster.Name),
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateDashboardServiceName(cluster.Name),
Namespace: cluster.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: selectorLabels,
Ports: []corev1.ServicePort{},
Type: cluster.Spec.HeadGroupSpec.ServiceType,
},
}
ports := getServicePorts(cluster)
for name, port := range ports {
if name == DefaultDashboardAgentListenPortName {
svcPort := corev1.ServicePort{Name: name, Port: port}
service.Spec.Ports = append(service.Spec.Ports, svcPort)
break
}
}
return service, nil
}
// getServicePorts will either user passing ports or default ports to create service.
func getServicePorts(cluster rayiov1alpha1.RayCluster) map[string]int32 {
ports, err := getPortsFromCluster(cluster)
// Assign default ports
if err != nil || len(ports) == 0 {
ports = getDefaultPorts()
}
// Check if agent port is defined. If not, check if enable agent service.
if _, agentDefined := ports[DefaultDashboardAgentListenPortName]; !agentDefined {
enableAgentServiceValue, exist := cluster.Annotations[EnableAgentServiceKey]
if exist && enableAgentServiceValue == EnableAgentServiceTrue {
// If agent port is not in the config, add default value for it.
ports[DefaultDashboardAgentListenPortName] = DefaultDashboardAgentListenPort
}
}
return ports
}
// getPortsFromCluster get the ports from head container and directly map them in service
// It's user's responsibility to maintain rayStartParam ports and container ports mapping
// TODO: Consider to infer ports from rayStartParams (source of truth) in the future.
func getPortsFromCluster(cluster rayiov1alpha1.RayCluster) (map[string]int32, error) {
svcPorts := map[string]int32{}
index := utils.FindRayContainerIndex(cluster.Spec.HeadGroupSpec.Template.Spec)
cPorts := cluster.Spec.HeadGroupSpec.Template.Spec.Containers[index].Ports
for _, port := range cPorts {
svcPorts[port.Name] = port.ContainerPort
}
return svcPorts, nil
}
func getDefaultPorts() map[string]int32 {
return map[string]int32{
DefaultClientPortName: DefaultClientPort,
DefaultRedisPortName: DefaultRedisPort,
DefaultDashboardName: DefaultDashboardPort,
DefaultMetricsName: DefaultMetricsPort,
DefaultServingPortName: DefaultServingPort,
}
}
// IsAgentServiceEnabled check if the agent service is enabled for RayCluster.
func IsAgentServiceEnabled(instance *rayiov1alpha1.RayCluster) bool {
enableAgentServiceValue, exist := instance.Annotations[EnableAgentServiceKey]
if exist && enableAgentServiceValue == EnableAgentServiceTrue {
return true
}
return false
}