diff --git a/apiserver/cmd/main.go b/apiserver/cmd/main.go index a46d44f59a9..761bc758bfe 100644 --- a/apiserver/cmd/main.go +++ b/apiserver/cmd/main.go @@ -51,7 +51,7 @@ func startRpcServer(resourceManager *manager.ResourceManager) { s := grpc.NewServer(grpc.UnaryInterceptor(interceptor.ApiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32)) api.RegisterClusterServiceServer(s, server.NewClusterServer(resourceManager, &server.ClusterServerOptions{CollectMetrics: *collectMetricsFlag})) - // TODO: add rest servers here + api.RegisterComputeTemplateServiceServer(s, server.NewComputeTemplateServer(resourceManager, &server.ComputeTemplateServerOptions{CollectMetrics: *collectMetricsFlag})) // Register reflection service on gRPC server. reflection.Register(s) @@ -72,7 +72,7 @@ func startHttpProxy() { // Create gRPC HTTP MUX and register services. runtimeMux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true})) registerHttpHandlerFromEndpoint(api.RegisterClusterServiceHandlerFromEndpoint, "ClusterService", ctx, runtimeMux) - // TODO: add rest servers here + registerHttpHandlerFromEndpoint(api.RegisterComputeTemplateServiceHandlerFromEndpoint, "ComputeTemplateService", ctx, runtimeMux) // Create a top level mux to include both Http gRPC servers and other endpoints like metrics topMux := http.NewServeMux() diff --git a/apiserver/pkg/manager/resource_manager.go b/apiserver/pkg/manager/resource_manager.go index aa25a4c755f..4221826089e 100644 --- a/apiserver/pkg/manager/resource_manager.go +++ b/apiserver/pkg/manager/resource_manager.go @@ -1,5 +1,35 @@ package manager +import ( + "context" + "fmt" + "github.com/ray-project/kuberay/apiserver/pkg/model" + + "github.com/ray-project/kuberay/apiserver/pkg/util" + api "github.com/ray-project/kuberay/proto/go_client" + "github.com/ray-project/kuberay/ray-operator/api/raycluster/v1alpha1" + rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/raycluster/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const DefaultNamespace = "ray-system" + +// ResourceManagerInterface can be used by services to operate resources +// kubernetes objects and potential db objects underneath operations should be encapsulated at this layer +type ResourceManagerInterface interface { + CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*v1alpha1.RayCluster, error) + GetCluster(ctx context.Context, clusterName string) (*v1alpha1.RayCluster, error) + ListClusters(ctx context.Context) ([]*v1alpha1.RayCluster, error) + DeleteCluster(ctx context.Context, clusterName string) error + CreateComputeTemplate(ctx context.Context, runtime *api.ComputeTemplate) (*v1.ConfigMap, error) + GetComputeTemplate(ctx context.Context, name string) (*v1.ConfigMap, error) + ListComputeTemplates(ctx context.Context) ([]*v1.ConfigMap, error) + DeleteComputeTemplate(ctx context.Context, name string) error +} + type ResourceManager struct { clientManager ClientManagerInterface } @@ -10,3 +40,242 @@ func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager { clientManager: clientManager, } } + +// Clients +func (r *ResourceManager) getRayClusterClient(namespace string) rayiov1alpha1.RayClusterInterface { + return r.clientManager.ClusterClient().RayClusterClient(namespace) +} + +func (r *ResourceManager) getKubernetesPodClient(namespace string) clientv1.PodInterface { + return r.clientManager.KubernetesClient().PodClient(namespace) +} + +func (r *ResourceManager) getKubernetesConfigMapClient(namespace string) clientv1.ConfigMapInterface { + return r.clientManager.KubernetesClient().ConfigMapClient(namespace) +} + +// clusters +func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*v1alpha1.RayCluster, error) { + namespace := apiCluster.Namespace + if len(namespace) == 0 { + namespace = DefaultNamespace + } + + // populate cluster map + computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiCluster.Namespace, apiCluster.Name) + } + + // convert *api.Cluster to v1alpha1.RayCluster + rayCluster := util.NewRayCluster(apiCluster, computeTemplateDict) + + // set our own fields. + clusterAt := r.clientManager.Time().Now().String() + rayCluster.Annotations["ray.io/creation-timestamp"] = clusterAt + + newRayCluster, err := r.getRayClusterClient(namespace).Create(ctx, rayCluster.Get(), metav1.CreateOptions{}) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to create a cluster for (%s/%s)", rayCluster.Namespace, rayCluster.Name) + } + + return newRayCluster, nil +} + +func (r *ResourceManager) populateComputeTemplate(ctx context.Context, cluster *api.Cluster) (map[string]*api.ComputeTemplate, error) { + var dict map[string]*api.ComputeTemplate + // populate head compute template + name := cluster.ClusterSpec.HeadGroupSpec.ComputeTemplate + configMap, err := r.GetComputeTemplate(ctx, name) + if err != nil { + return nil, err + } + computeTemplate := model.FromKubeToAPIComputeTemplate(configMap) + dict[name] = computeTemplate + + // populate worker compute template + for _, spec := range cluster.ClusterSpec.WorkerGroupSepc { + name := spec.ComputeTemplate + if _, exist := dict[name]; !exist { + configMap, err := r.GetComputeTemplate(ctx, name) + if err != nil { + return nil, err + } + computeTemplate := model.FromKubeToAPIComputeTemplate(configMap) + dict[name] = computeTemplate + } + } + + return dict, nil +} + +func (r *ResourceManager) GetCluster(ctx context.Context, clusterName string) (*v1alpha1.RayCluster, error) { + if len(clusterName) == 0 { + return nil, util.NewInvalidInputError("clusterName is empty, failed to get the cluster.") + } + + client := r.getRayClusterClient(DefaultNamespace) + return getClusterByName(ctx, client, clusterName) +} + +func (r *ResourceManager) ListClusters(ctx context.Context) ([]*v1alpha1.RayCluster, error) { + rayClusterList, err := r.getRayClusterClient(DefaultNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, util.Wrap(err, "List RayCluster failed") + } + + var result []*v1alpha1.RayCluster + length := len(rayClusterList.Items) + for i := 0; i < length; i++ { + result = append(result, &rayClusterList.Items[i]) + } + + return result, nil +} + +func (r *ResourceManager) DeleteCluster(ctx context.Context, clusterName string) error { + if len(clusterName) == 0 { + return util.NewInvalidInputError("clusterName is empty, failed to delete the cluster.") + } + + client := r.getRayClusterClient(DefaultNamespace) + cluster, err := getClusterByName(ctx, client, clusterName) + if err != nil { + return util.Wrap(err, "Get cluster failure") + } + + // Delete Kubernetes resources + if err := client.Delete(ctx, cluster.Name, metav1.DeleteOptions{}); err != nil { + // API won't need to delete the ray cluster CR + return util.NewInternalServerError(err, "Failed to delete cluster %v.", clusterName) + } + + return nil +} + +// Compute Runtimes +func (r *ResourceManager) CreateComputeTemplate(ctx context.Context, runtime *api.ComputeTemplate) (*v1.ConfigMap, error) { + _, err := r.GetComputeTemplate(ctx, runtime.Name) + if err == nil { + return nil, util.NewAlreadyExistError("Compute template with name %s already exists in namespace %s", runtime.Name, DefaultNamespace) + } + + computeTemplate, err := util.NewComputeTemplate(runtime, DefaultNamespace) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to convert compute runtime (%s/%s)", DefaultNamespace, runtime.Name) + } + + client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace) + newRuntime, err := client.Create(ctx, computeTemplate, metav1.CreateOptions{}) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to create a compute runtime for (%s/%s)", DefaultNamespace, runtime.Name) + } + + return newRuntime, nil +} + +func (r *ResourceManager) GetComputeTemplate(ctx context.Context, name string) (*v1.ConfigMap, error) { + client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace) + return getComputeTemplateByName(ctx, client, name) +} + +func (r *ResourceManager) ListComputeTemplates(ctx context.Context) ([]*v1.ConfigMap, error) { + client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace) + configMapList, err := client.List(ctx, metav1.ListOptions{LabelSelector: "ray.io/config-type=compute-template"}) + if err != nil { + return nil, util.Wrap(err, "List compute runtimes failed") + } + + var result []*v1.ConfigMap + length := len(configMapList.Items) + for i := 0; i < length; i++ { + result = append(result, &configMapList.Items[i]) + } + + return result, nil +} + +func (r *ResourceManager) DeleteComputeTemplate(ctx context.Context, name string) error { + client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace) + + configMap, err := getComputeTemplateByName(ctx, client, name) + if err != nil { + return util.Wrap(err, "Get compute template failure") + } + + if err := client.Delete(ctx, configMap.Name, metav1.DeleteOptions{}); err != nil { + return util.NewInternalServerError(err, "failed to delete compute template %v.", name) + } + + return nil +} + +// getClusterByName returns the Kubernetes RayCluster object by given name and client +func getClusterByName(ctx context.Context, client rayiov1alpha1.RayClusterInterface, name string) (*v1alpha1.RayCluster, error) { + cluster, err := client.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, util.Wrap(err, "Get Cluster failed") + } + + return cluster, nil +} + +// getComputeTemplateByName returns the Kubernetes configmap object by given name and client +func getComputeTemplateByName(ctx context.Context, client clientv1.ConfigMapInterface, name string) (*v1.ConfigMap, error) { + runtime, err := client.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, util.Wrap(err, "Get compute template failed") + } + + return runtime, nil +} + +// getClusterByName returns the Kubernetes RayCluster object by given name and client +func getClusterByNameFromLabel(ctx context.Context, client rayiov1alpha1.RayClusterInterface, name string) (*v1alpha1.RayCluster, error) { + labelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + util.RayClusterNameLabelKey: name, + }, + } + clusters, err := client.List(ctx, metav1.ListOptions{ + LabelSelector: labels.Set(labelSelector.MatchLabels).String(), + }) + if err != nil { + return nil, util.Wrap(err, "Get Cluster failed") + } + + if len(clusters.Items) > 1 { + return nil, fmt.Errorf("find %d duplicates clusters", len(clusters.Items)) + } + + if len(clusters.Items) == 0 { + return nil, fmt.Errorf("can not find clusters with name %s", name) + } + + return &clusters.Items[0], nil +} + +func getComputeTemplateByLabel(ctx context.Context, name string, client clientv1.ConfigMapInterface) (*v1.ConfigMap, error) { + labelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "ray.io/config-type": "compute-runtime", + "ray.io/compute-runtime": name, + }, + } + runtimes, err := client.List(ctx, metav1.ListOptions{ + LabelSelector: labels.Set(labelSelector.MatchLabels).String(), + }) + if err != nil { + return nil, util.Wrap(err, "Get compute runtime failed") + } + + if len(runtimes.Items) > 1 { + return nil, fmt.Errorf("find %d duplicates compute runtimes", len(runtimes.Items)) + } + + if len(runtimes.Items) == 0 { + return nil, fmt.Errorf("can not find compue runtime with name %s", name) + } + + return &runtimes.Items[0], nil +} diff --git a/apiserver/pkg/model/converter.go b/apiserver/pkg/model/converter.go new file mode 100644 index 00000000000..680acdae7fa --- /dev/null +++ b/apiserver/pkg/model/converter.go @@ -0,0 +1,89 @@ +package model + +import ( + "strconv" + + "github.com/golang/protobuf/ptypes/timestamp" + api "github.com/ray-project/kuberay/proto/go_client" + "github.com/ray-project/kuberay/apiserver/pkg/util" + "github.com/ray-project/kuberay/ray-operator/api/raycluster/v1alpha1" + v1 "k8s.io/api/core/v1" +) + +func FromCrdToApiClusters(clusters []*v1alpha1.RayCluster) []*api.Cluster { + apiClusters := make([]*api.Cluster, 0) + for _, cluster := range clusters { + apiClusters = append(apiClusters, FromCrdToApiCluster(cluster)) + } + return apiClusters +} + +func FromCrdToApiCluster(cluster *v1alpha1.RayCluster) *api.Cluster { + pbCluster := &api.Cluster{ + Name: cluster.Name, + Namespace: cluster.Namespace, + Version: cluster.Labels[util.RayClusterVersionLabelKey], + User: cluster.Labels[util.RayClusterUserLabelKey], + Environment: api.Cluster_Environment(api.Cluster_Environment_value[cluster.Labels[util.RayClusterEnvironmentLabelKey]]), + CreatedAt: ×tamp.Timestamp{Seconds: cluster.CreationTimestamp.Unix()}, + } + + // loop container and find the resource + //pbCluster.ClusterSpec + pbCluster.ClusterSpec.HeadGroupSpec = PopulateHeadNodeSpec(cluster.Spec.HeadGroupSpec) + pbCluster.ClusterSpec.WorkerGroupSepc = PopulateWorkerNodeSpec(cluster.Spec.WorkerGroupSpecs) + + + return pbCluster +} + +func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec { + headNodeSpec := &api.HeadGroupSpec{ + RayStartParams: spec.RayStartParams, + ServiceType: string(spec.ServiceType), + Image: spec.Template.Annotations[util.RayClusterImageAnnotationKey], + ComputeTemplate: spec.Template.Annotations[util.RayClusterComputeTemplateAnnotationKey], + } + + return headNodeSpec +} + +func PopulateWorkerNodeSpec(specs []v1alpha1.WorkerGroupSpec) []*api.WorkerGroupSpec { + var workerNodeSpecs []*api.WorkerGroupSpec + + for _, spec := range specs { + workerNodeSpec := &api.WorkerGroupSpec{ + RayStartParams: spec.RayStartParams, + MaxReplicas: *spec.MinReplicas, + MinReplicas: *spec.MaxReplicas, + Replicas: *spec.Replicas, + GroupName: spec.GroupName, + } + // Resources. + workerNodeSpecs = append(workerNodeSpecs, workerNodeSpec) + } + + return workerNodeSpecs +} + +func FromKubeToAPIComputeTemplate(configMap *v1.ConfigMap) *api.ComputeTemplate { + cpu, _ := strconv.ParseUint(configMap.Data["cpu"], 10, 32) + memory, _ := strconv.ParseUint(configMap.Data["memory"], 10, 32) + gpu, _ := strconv.ParseUint(configMap.Data["gpu"], 10, 32) + + runtime := &api.ComputeTemplate{} + runtime.Name = configMap.Name + runtime.Cpu = uint32(cpu) + runtime.Memory = uint32(memory) + runtime.Gpu = uint32(gpu) + runtime.GpuAccelerator = configMap.Data["gpu_accelerator"] + return runtime +} + +func FromKubeToAPIComputeTemplates(configMaps []*v1.ConfigMap) []*api.ComputeTemplate { + apiComputeTemplates := make([]*api.ComputeTemplate, 0) + for _, configMap := range configMaps { + apiComputeTemplates = append(apiComputeTemplates, FromKubeToAPIComputeTemplate(configMap)) + } + return apiComputeTemplates +} diff --git a/apiserver/pkg/server/cluster_server.go b/apiserver/pkg/server/cluster_server.go index 7d0d9593999..2468f47fda5 100644 --- a/apiserver/pkg/server/cluster_server.go +++ b/apiserver/pkg/server/cluster_server.go @@ -4,9 +4,11 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" + "github.com/ray-project/kuberay/apiserver/pkg/model" + api "github.com/ray-project/kuberay/proto/go_client" "github.com/ray-project/kuberay/apiserver/pkg/manager" "github.com/ray-project/kuberay/apiserver/pkg/util" - api "github.com/ray-project/kuberay/proto/go_client" + "google.golang.org/protobuf/types/known/emptypb" ) type ClusterServerOptions struct { @@ -23,24 +25,50 @@ type ClusterServer struct { // Creates a new Cluster. func (s *ClusterServer) CreateCluster(ctx context.Context, request *api.CreateClusterRequest) (*api.Cluster, error) { - panic("Implement me") + if err := ValidateCreateClusterRequest(request); err != nil { + return nil, util.Wrap(err, "Validate cluster request failed.") + } + + cluster, err := s.resourceManager.CreateCluster(ctx, request.Cluster) + if err != nil { + return nil, util.Wrap(err, "Create Cluster failed.") + } + + return model.FromCrdToApiCluster(cluster), nil } -// Finds a specific Cluster by ID. +// Finds a specific Cluster by cluster name. func (s *ClusterServer) GetCluster(ctx context.Context, request *api.GetClusterRequest) (*api.Cluster, error) { - panic("Implement me") + cluster, err := s.resourceManager.GetCluster(ctx, request.Name) + if err != nil { + return nil, util.Wrap(err, "Get cluster failed.") + } + return model.FromCrdToApiCluster(cluster), nil } -// Finds all Clusters. Supports pagination, and sorting on certain fields. +// Finds all Clusters. +// TODO: Supports pagination and sorting on certain fields when we have DB support. request needs to be extended. func (s *ClusterServer) ListCluster(ctx context.Context, request *api.ListClustersRequest) (*api.ListClustersResponse, error) { - panic("Implement me") + clusters, err := s.resourceManager.ListClusters(ctx) + if err != nil { + return nil, util.Wrap(err, "List clusters failed.") + } + + return &api.ListClustersResponse{ + Clusters: model.FromCrdToApiClusters(clusters), + }, nil } // Deletes an Cluster without deleting the Cluster's runs and jobs. To // avoid unexpected behaviors, delete an Cluster's runs and jobs before // deleting the Cluster. func (s *ClusterServer) DeleteCluster(ctx context.Context, request *api.DeleteClusterRequest) (*empty.Empty, error) { - panic("Implement me") + // TODO: do we want to have some logics here to check cluster exist here? or put it inside resourceManager + if err := s.resourceManager.DeleteCluster(ctx, request.Name); err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil } func ValidateCreateClusterRequest(request *api.CreateClusterRequest) error { @@ -52,6 +80,25 @@ func ValidateCreateClusterRequest(request *api.CreateClusterRequest) error { return util.NewInvalidInputError("User who create the cluster is empty. Please specify a valid value.") } + if len(request.Cluster.ClusterSpec.HeadGroupSpec.ComputeTemplate) == 0 { + return util.NewInvalidInputError("HeadGroupSpec compute template is empty. Please specify a valid value.") + } + + for index, spec := range request.Cluster.ClusterSpec.WorkerGroupSepc { + if len(spec.GroupName) == 0 { + return util.NewInvalidInputError("WorkerNodeSpec %d group name is empty. Please specify a valid value.", index) + } + if len(spec.ComputeTemplate) == 0 { + return util.NewInvalidInputError("WorkerNodeSpec %d compute template is empty. Please specify a valid value.", index) + } + if spec.MaxReplicas == 0 { + return util.NewInvalidInputError("WorkerNodeSpec %d MaxReplicas can not be 0. Please specify a valid value.", index) + } + if spec.MinReplicas > spec.MaxReplicas { + return util.NewInvalidInputError("WorkerNodeSpec %d MinReplica > MaxReplicas. Please specify a valid value.", index) + } + } + return nil } diff --git a/apiserver/pkg/server/compute_template_server.go b/apiserver/pkg/server/compute_template_server.go new file mode 100644 index 00000000000..946e17a0cd6 --- /dev/null +++ b/apiserver/pkg/server/compute_template_server.go @@ -0,0 +1,84 @@ +package server + +import ( + "context" + + api "github.com/ray-project/kuberay/proto/go_client" + "github.com/ray-project/kuberay/apiserver/pkg/manager" + "github.com/ray-project/kuberay/apiserver/pkg/model" + "github.com/ray-project/kuberay/apiserver/pkg/util" + "google.golang.org/protobuf/types/known/emptypb" +) + +type ComputeTemplateServerOptions struct { + CollectMetrics bool +} + +// implements `type ComputeTemplateServiceServer interface` in runtime_grpc.pb.go +// ComputeTemplateServer is the server API for ClusterRuntimeService. +type ComputeTemplateServer struct { + resourceManager *manager.ResourceManager + options *ComputeTemplateServerOptions + api.UnimplementedComputeTemplateServiceServer +} + +func (s *ComputeTemplateServer) CreateComputeTemplate(ctx context.Context, request *api.CreateComputeTemplateRequest) (*api.ComputeTemplate, error) { + if err := ValidateCreateComputeTemplateRequest(request); err != nil { + return nil, util.Wrap(err, "Validate compute runtime request failed.") + } + + runtime, err := s.resourceManager.CreateComputeTemplate(ctx, request.ComputeTemplate) + if err != nil { + return nil, util.Wrap(err, "Create Compute Runtime failed.") + } + + return model.FromKubeToAPIComputeTemplate(runtime), nil +} + +func (s *ComputeTemplateServer) GetComputeTemplate(ctx context.Context, request *api.GetComputeTemplateRequest) (*api.ComputeTemplate, error) { + runtime, err := s.resourceManager.GetComputeTemplate(ctx, request.Name) + if err != nil { + return nil, util.Wrap(err, "Get cluster runtime failed.") + } + + return model.FromKubeToAPIComputeTemplate(runtime), nil +} + +func (s *ComputeTemplateServer) ListComputeTemplates(ctx context.Context, request *api.ListComputeTemplatesRequest) (*api.ListComputeTemplatesResponse, error) { + runtimes, err := s.resourceManager.ListComputeTemplates(ctx) + if err != nil { + return nil, util.Wrap(err, "List cluster runtime failed.") + } + + return &api.ListComputeTemplatesResponse{ + ComputeTemplates: model.FromKubeToAPIComputeTemplates(runtimes), + }, nil +} + +func (s *ComputeTemplateServer) DeleteComputeTemplate(ctx context.Context, request *api.DeleteComputeTemplateRequest) (*emptypb.Empty, error) { + if err := s.resourceManager.DeleteComputeTemplate(ctx, request.Name); err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} + +func ValidateCreateComputeTemplateRequest(request *api.CreateComputeTemplateRequest) error { + if request.ComputeTemplate.Name == "" { + return util.NewInvalidInputError("Cluster name is empty. Please specify a valid value.") + } + + if request.ComputeTemplate.Cpu == 0 { + return util.NewInvalidInputError("Cpu amount is zero. Please specify a valid value.") + } + + if request.ComputeTemplate.Memory == 0 { + return util.NewInvalidInputError("Memory amount is zero. Please specify a valid value.") + } + + return nil +} + +func NewComputeTemplateServer(resourceManager *manager.ResourceManager, options *ComputeTemplateServerOptions) *ComputeTemplateServer { + return &ComputeTemplateServer{resourceManager: resourceManager, options: options} +} diff --git a/apiserver/pkg/util/cluster.go b/apiserver/pkg/util/cluster.go new file mode 100644 index 00000000000..450084c1b5c --- /dev/null +++ b/apiserver/pkg/util/cluster.go @@ -0,0 +1,361 @@ +package util + +import ( + "fmt" + "net" + "strconv" + + api "github.com/ray-project/kuberay/proto/go_client" + rayclusterapi "github.com/ray-project/kuberay/ray-operator/api/raycluster/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type RayCluster struct { + *rayclusterapi.RayCluster +} + +// NewRayCluster creates a RayCluster. +//func NewRayCluster(apiCluster *api.Cluster, clusterRuntime *api.ClusterRuntime, computeRuntime *api.ComputeRuntime) *RayCluster { +func NewRayCluster(apiCluster *api.Cluster, computeTemplateMap map[string]*api.ComputeTemplate) *RayCluster { + // figure out how to build this + computeTemplate := computeTemplateMap[apiCluster.ClusterSpec.HeadGroupSpec.ComputeTemplate] + headPodTemplate := buildHeadPodTemplate(apiCluster, apiCluster.ClusterSpec.HeadGroupSpec, computeTemplate) + headReplicas := int32(1) + rayCluster := &rayclusterapi.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: apiCluster.Name, + Namespace: apiCluster.Namespace, + Labels: buildRayClusterLabels(apiCluster), + Annotations: buildRayClusterAnnotations(apiCluster), + }, + Spec: rayclusterapi.RayClusterSpec{ + RayVersion: apiCluster.Version, + HeadGroupSpec: rayclusterapi.HeadGroupSpec{ + ServiceType: v1.ServiceType(apiCluster.ClusterSpec.HeadGroupSpec.ServiceType), + Template: headPodTemplate, + Replicas: &headReplicas, + RayStartParams: apiCluster.ClusterSpec.HeadGroupSpec.RayStartParams, + }, + WorkerGroupSpecs: []rayclusterapi.WorkerGroupSpec{}, + }, + } + + for _, spec := range apiCluster.ClusterSpec.WorkerGroupSepc { + computeTemplate := computeTemplateMap[spec.ComputeTemplate] + workerPodTemplate := buildWorkerPodTemplate(apiCluster, spec, computeTemplate) + + minReplicas := spec.Replicas + maxReplicas := spec.Replicas + if spec.MinReplicas != 0 { + minReplicas = spec.MinReplicas + } + if spec.MaxReplicas != 0 { + maxReplicas = spec.MaxReplicas + } + + workerNodeSpec := rayclusterapi.WorkerGroupSpec{ + GroupName: spec.GroupName, + MinReplicas: intPointer(minReplicas), + MaxReplicas: intPointer(maxReplicas), + Replicas: intPointer(spec.Replicas), + RayStartParams: spec.RayStartParams, + Template: workerPodTemplate, + } + + rayCluster.Spec.WorkerGroupSpecs = append(rayCluster.Spec.WorkerGroupSpecs, workerNodeSpec) + } + + return &RayCluster{rayCluster} +} + +func buildRayClusterLabels(cluster *api.Cluster) map[string]string { + labels := map[string]string{} + labels[RayClusterNameLabelKey] = cluster.Name + labels[RayClusterUserLabelKey] = cluster.User + labels[RayClusterVersionLabelKey] = cluster.Version + labels[RayClusterEnvironmentLabelKey] = cluster.Environment.String() + return labels +} + +func buildRayClusterAnnotations(cluster *api.Cluster) map[string]string { + annotations := map[string]string{} + // TODO: Add optional annotations + return annotations +} + +func buildNodeGroupAnnotations(computeTemplate *api.ComputeTemplate, image string) map[string]string { + annotations := map[string]string{} + annotations[RayClusterComputeTemplateAnnotationKey] = computeTemplate.Name + annotations[RayClusterImageAnnotationKey] = image + return annotations +} + +func buildHeadPodTemplate(cluster *api.Cluster, spec *api.HeadGroupSpec, computeRuntime *api.ComputeTemplate) v1.PodTemplateSpec { + image := constructRayImage(RayClusterDefaultImageRepository, cluster.Version) + if len(cluster.ClusterSpec.HeadGroupSpec.Image) != 0 { + image = cluster.ClusterSpec.HeadGroupSpec.Image + } + + cpu := fmt.Sprint(computeRuntime.GetCpu()) + memory := fmt.Sprintf("%d%s", computeRuntime.GetMemory(), "Gi") + + podTemplateSpec := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: buildNodeGroupAnnotations(computeRuntime, spec.Image), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "ray-head", + Image: image, + Env: []v1.EnvVar{ + { + Name: "MY_POD_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + }, + // Customization is not allowed here. We should consider whether to make this part smart. + Ports: []v1.ContainerPort{ + { + Name: "redis", + ContainerPort: 6379, + }, + { + Name: "head", + ContainerPort: 10001, + }, + { + Name: "dashboard", + ContainerPort: 8265, + }, + }, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + }, + }, + }, + }, + } + + if computeRuntime.GetGpu() != 0 { + gpu := computeRuntime.GetGpu() + accelerator := "nvidia.com/gpu" + if len(computeRuntime.GetGpuAccelerator()) == 0 { + accelerator = computeRuntime.GetGpuAccelerator() + } + + // need smarter algorithm to filter main container. for example filter by name `ray-worker` + podTemplateSpec.Spec.Containers[0].Resources.Requests[v1.ResourceName(accelerator)] = resource.MustParse(fmt.Sprint(gpu)) + podTemplateSpec.Spec.Containers[0].Resources.Limits[v1.ResourceName(accelerator)] = resource.MustParse(fmt.Sprint(gpu)) + } + + return podTemplateSpec +} + +func constructRayImage(containerImage string, version string) string { + return fmt.Sprintf("%s:%s", containerImage, version) +} + +func buildWorkerPodTemplate(cluster *api.Cluster, spec *api.WorkerGroupSpec, computeRuntime *api.ComputeTemplate) v1.PodTemplateSpec { + // If user doesn't provide the image, let's use the default image instead. + // TODO: verify the versions in the range + image := constructRayImage(RayClusterDefaultImageRepository, cluster.Version) + if len(spec.Image) != 0 { + image = spec.Image + } + + cpu := fmt.Sprint(computeRuntime.GetCpu()) + memory := fmt.Sprintf("%d%s", computeRuntime.GetMemory(), "Gi") + + podTemplateSpec := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: buildNodeGroupAnnotations(computeRuntime, spec.Image), + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "init-myservice", + Image: image, + Command: []string{ + "sh", + "-c", + "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done", + }, + }, + }, + Containers: []v1.Container{ + { + Name: "ray-worker", + Image: image, + Env: []v1.EnvVar{ + { + Name: "RAY_DISABLE_DOCKER_CPU_WRARNING", + Value: "1", + }, + { + Name: "TYPE", + Value: "worker", + }, + { + Name: "CPU_REQUEST", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "ray-worker", + Resource: "requests.cpu", + }, + }, + }, + { + Name: "CPU_LIMITS", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "ray-worker", + Resource: "limits.cpu", + }, + }, + }, + { + Name: "MEMORY_REQUESTS", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "ray-worker", + Resource: "requests.cpu", + }, + }, + }, + { + Name: "MEMORY_LIMITS", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "ray-worker", + Resource: "limits.cpu", + }, + }, + }, + { + Name: "MY_POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "MY_POD_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + }, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + Lifecycle: &v1.Lifecycle{ + PreStop: &v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{ + "/bin/sh", "-c", "ray stop", + }, + }, + }, + }, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + }, + }, + }, + }, + } + + if computeRuntime.GetGpu() != 0 { + gpu := computeRuntime.GetGpu() + accelerator := "nvidia.com/gpu" + if len(computeRuntime.GetGpuAccelerator()) == 0 { + accelerator = computeRuntime.GetGpuAccelerator() + } + + // need smarter algorithm to filter main container. for example filter by name `ray-worker` + podTemplateSpec.Spec.Containers[0].Resources.Requests[v1.ResourceName(accelerator)] = resource.MustParse(fmt.Sprint(gpu)) + podTemplateSpec.Spec.Containers[0].Resources.Limits[v1.ResourceName(accelerator)] = resource.MustParse(fmt.Sprint(gpu)) + } + + return podTemplateSpec +} + +func intPointer(value int32) *int32 { + return &value +} + +// Get converts this object to a rayclusterapi.Workflow. +func (c *RayCluster) Get() *rayclusterapi.RayCluster { + return c.RayCluster +} + +// SetAnnotations sets annotations on all templates in a RayCluster +func (c *RayCluster) SetAnnotationsToAllTemplates(key string, value string) { + // TODO: reserved for common parameters. +} + +func NewComputeTemplate(runtime *api.ComputeTemplate, namespace string) (*v1.ConfigMap, error) { + config := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtime.Name, + Namespace: namespace, + Labels: map[string]string{ + "ray.io/config-type": "compute-template", + "ray.io/compute-template": runtime.Name, + }, + }, + Data: map[string]string{ + "name": runtime.Name, + "cpu": strconv.FormatUint(uint64(runtime.Cpu), 10), + "memory": strconv.FormatUint(uint64(runtime.Memory), 10), + "gpu": strconv.FormatUint(uint64(runtime.Gpu), 10), + "gpu_accelerator": runtime.GpuAccelerator, + }, + } + + return config, nil +} + +// GetNodeHostIP returns the provided node's IP, based on the priority: +// 1. NodeInternalIP +// 2. NodeExternalIP +func GetNodeHostIP(node *v1.Node) (net.IP, error) { + addresses := node.Status.Addresses + addressMap := make(map[v1.NodeAddressType][]v1.NodeAddress) + for i := range addresses { + addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) + } + if addresses, ok := addressMap[v1.NodeInternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + if addresses, ok := addressMap[v1.NodeExternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) +} diff --git a/apiserver/pkg/util/config.go b/apiserver/pkg/util/config.go index ef365a2cca3..0c561e238da 100644 --- a/apiserver/pkg/util/config.go +++ b/apiserver/pkg/util/config.go @@ -5,3 +5,19 @@ type ClientOptions struct { QPS float32 Burst int } + +// TODO: this needs to be revised. +const ( + // Label keys + RayClusterNameLabelKey = "ray.io/cluster-name" + RayClusterUserLabelKey = "ray.io/user" + RayClusterVersionLabelKey = "ray.io/version" + RayClusterEnvironmentLabelKey = "ray.io/environment" + + // Annotation keys + // Role level + RayClusterComputeTemplateAnnotationKey = "ray.io/compute-template" + RayClusterImageAnnotationKey = "ray.io/compute-image" + + RayClusterDefaultImageRepository = "ray.io/ray" +)