Skip to content

Commit

Permalink
Implement resource manager and gRPC services
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffwan committed Jan 20, 2022
1 parent 4095994 commit cfb0562
Show file tree
Hide file tree
Showing 7 changed files with 876 additions and 8 deletions.
4 changes: 2 additions & 2 deletions apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func startRpcServer(resourceManager *manager.ResourceManager) {

s := grpc.NewServer(grpc.UnaryInterceptor(interceptor.ApiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32))
api.RegisterClusterServiceServer(s, server.NewClusterServer(resourceManager, &server.ClusterServerOptions{CollectMetrics: *collectMetricsFlag}))
// TODO: add rest servers here
api.RegisterComputeTemplateServiceServer(s, server.NewComputeTemplateServer(resourceManager, &server.ComputeTemplateServerOptions{CollectMetrics: *collectMetricsFlag}))

// Register reflection service on gRPC server.
reflection.Register(s)
Expand All @@ -72,7 +72,7 @@ func startHttpProxy() {
// Create gRPC HTTP MUX and register services.
runtimeMux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}))
registerHttpHandlerFromEndpoint(api.RegisterClusterServiceHandlerFromEndpoint, "ClusterService", ctx, runtimeMux)
// TODO: add rest servers here
registerHttpHandlerFromEndpoint(api.RegisterComputeTemplateServiceHandlerFromEndpoint, "ComputeTemplateService", ctx, runtimeMux)

// Create a top level mux to include both Http gRPC servers and other endpoints like metrics
topMux := http.NewServeMux()
Expand Down
270 changes: 270 additions & 0 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
package manager

import (
"context"
"fmt"

"github.com/ray-project/kuberay/apiserver/pkg/model"

"github.com/ray-project/kuberay/apiserver/pkg/util"
api "github.com/ray-project/kuberay/proto/go_client"
"github.com/ray-project/kuberay/ray-operator/api/raycluster/v1alpha1"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/raycluster/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

const DefaultNamespace = "ray-system"

// ResourceManagerInterface can be used by services to operate resources
// kubernetes objects and potential db objects underneath operations should be encapsulated at this layer
type ResourceManagerInterface interface {
CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*v1alpha1.RayCluster, error)
GetCluster(ctx context.Context, clusterName string) (*v1alpha1.RayCluster, error)
ListClusters(ctx context.Context) ([]*v1alpha1.RayCluster, error)
DeleteCluster(ctx context.Context, clusterName string) error
CreateComputeTemplate(ctx context.Context, runtime *api.ComputeTemplate) (*v1.ConfigMap, error)
GetComputeTemplate(ctx context.Context, name string) (*v1.ConfigMap, error)
ListComputeTemplates(ctx context.Context) ([]*v1.ConfigMap, error)
DeleteComputeTemplate(ctx context.Context, name string) error
}

type ResourceManager struct {
clientManager ClientManagerInterface
}
Expand All @@ -10,3 +41,242 @@ func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager {
clientManager: clientManager,
}
}

// Clients
func (r *ResourceManager) getRayClusterClient(namespace string) rayiov1alpha1.RayClusterInterface {
return r.clientManager.ClusterClient().RayClusterClient(namespace)
}

func (r *ResourceManager) getKubernetesPodClient(namespace string) clientv1.PodInterface {
return r.clientManager.KubernetesClient().PodClient(namespace)
}

func (r *ResourceManager) getKubernetesConfigMapClient(namespace string) clientv1.ConfigMapInterface {
return r.clientManager.KubernetesClient().ConfigMapClient(namespace)
}

// clusters
func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*v1alpha1.RayCluster, error) {
namespace := apiCluster.Namespace
if len(namespace) == 0 {
namespace = DefaultNamespace
}

// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiCluster.Namespace, apiCluster.Name)
}

// convert *api.Cluster to v1alpha1.RayCluster
rayCluster := util.NewRayCluster(apiCluster, computeTemplateDict)

// set our own fields.
clusterAt := r.clientManager.Time().Now().String()
rayCluster.Annotations["ray.io/creation-timestamp"] = clusterAt

newRayCluster, err := r.getRayClusterClient(namespace).Create(ctx, rayCluster.Get(), metav1.CreateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a cluster for (%s/%s)", rayCluster.Namespace, rayCluster.Name)
}

return newRayCluster, nil
}

func (r *ResourceManager) populateComputeTemplate(ctx context.Context, cluster *api.Cluster) (map[string]*api.ComputeTemplate, error) {
dict := map[string]*api.ComputeTemplate{}
// populate head compute template
name := cluster.ClusterSpec.HeadGroupSpec.ComputeTemplate
configMap, err := r.GetComputeTemplate(ctx, name)
if err != nil {
return nil, err
}
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
dict[name] = computeTemplate

// populate worker compute template
for _, spec := range cluster.ClusterSpec.WorkerGroupSepc {
name := spec.ComputeTemplate
if _, exist := dict[name]; !exist {
configMap, err := r.GetComputeTemplate(ctx, name)
if err != nil {
return nil, err
}
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
dict[name] = computeTemplate
}
}

return dict, nil
}

func (r *ResourceManager) GetCluster(ctx context.Context, clusterName string) (*v1alpha1.RayCluster, error) {
if len(clusterName) == 0 {
return nil, util.NewInvalidInputError("clusterName is empty, failed to get the cluster.")
}

client := r.getRayClusterClient(DefaultNamespace)
return getClusterByName(ctx, client, clusterName)
}

func (r *ResourceManager) ListClusters(ctx context.Context) ([]*v1alpha1.RayCluster, error) {
rayClusterList, err := r.getRayClusterClient(DefaultNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, util.Wrap(err, "List RayCluster failed")
}

var result []*v1alpha1.RayCluster
length := len(rayClusterList.Items)
for i := 0; i < length; i++ {
result = append(result, &rayClusterList.Items[i])
}

return result, nil
}

func (r *ResourceManager) DeleteCluster(ctx context.Context, clusterName string) error {
if len(clusterName) == 0 {
return util.NewInvalidInputError("clusterName is empty, failed to delete the cluster.")
}

client := r.getRayClusterClient(DefaultNamespace)
cluster, err := getClusterByName(ctx, client, clusterName)
if err != nil {
return util.Wrap(err, "Get cluster failure")
}

// Delete Kubernetes resources
if err := client.Delete(ctx, cluster.Name, metav1.DeleteOptions{}); err != nil {
// API won't need to delete the ray cluster CR
return util.NewInternalServerError(err, "Failed to delete cluster %v.", clusterName)
}

return nil
}

// Compute Runtimes
func (r *ResourceManager) CreateComputeTemplate(ctx context.Context, runtime *api.ComputeTemplate) (*v1.ConfigMap, error) {
_, err := r.GetComputeTemplate(ctx, runtime.Name)
if err == nil {
return nil, util.NewAlreadyExistError("Compute template with name %s already exists in namespace %s", runtime.Name, DefaultNamespace)
}

computeTemplate, err := util.NewComputeTemplate(runtime, DefaultNamespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to convert compute runtime (%s/%s)", DefaultNamespace, runtime.Name)
}

client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace)
newRuntime, err := client.Create(ctx, computeTemplate, metav1.CreateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a compute runtime for (%s/%s)", DefaultNamespace, runtime.Name)
}

return newRuntime, nil
}

func (r *ResourceManager) GetComputeTemplate(ctx context.Context, name string) (*v1.ConfigMap, error) {
client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace)
return getComputeTemplateByName(ctx, client, name)
}

func (r *ResourceManager) ListComputeTemplates(ctx context.Context) ([]*v1.ConfigMap, error) {
client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace)
configMapList, err := client.List(ctx, metav1.ListOptions{LabelSelector: "ray.io/config-type=compute-template"})
if err != nil {
return nil, util.Wrap(err, "List compute runtimes failed")
}

var result []*v1.ConfigMap
length := len(configMapList.Items)
for i := 0; i < length; i++ {
result = append(result, &configMapList.Items[i])
}

return result, nil
}

func (r *ResourceManager) DeleteComputeTemplate(ctx context.Context, name string) error {
client := r.clientManager.KubernetesClient().ConfigMapClient(DefaultNamespace)

configMap, err := getComputeTemplateByName(ctx, client, name)
if err != nil {
return util.Wrap(err, "Get compute template failure")
}

if err := client.Delete(ctx, configMap.Name, metav1.DeleteOptions{}); err != nil {
return util.NewInternalServerError(err, "failed to delete compute template %v.", name)
}

return nil
}

// getClusterByName returns the Kubernetes RayCluster object by given name and client
func getClusterByName(ctx context.Context, client rayiov1alpha1.RayClusterInterface, name string) (*v1alpha1.RayCluster, error) {
cluster, err := client.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, util.Wrap(err, "Get Cluster failed")
}

return cluster, nil
}

// getComputeTemplateByName returns the Kubernetes configmap object by given name and client
func getComputeTemplateByName(ctx context.Context, client clientv1.ConfigMapInterface, name string) (*v1.ConfigMap, error) {
runtime, err := client.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, util.Wrap(err, "Get compute template failed")
}

return runtime, nil
}

// getClusterByName returns the Kubernetes RayCluster object by given name and client
func getClusterByNameFromLabel(ctx context.Context, client rayiov1alpha1.RayClusterInterface, name string) (*v1alpha1.RayCluster, error) {
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
util.RayClusterNameLabelKey: name,
},
}
clusters, err := client.List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
})
if err != nil {
return nil, util.Wrap(err, "Get Cluster failed")
}

if len(clusters.Items) > 1 {
return nil, fmt.Errorf("find %d duplicates clusters", len(clusters.Items))
}

if len(clusters.Items) == 0 {
return nil, fmt.Errorf("can not find clusters with name %s", name)
}

return &clusters.Items[0], nil
}

func getComputeTemplateByLabel(ctx context.Context, name string, client clientv1.ConfigMapInterface) (*v1.ConfigMap, error) {
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
"ray.io/config-type": "compute-runtime",
"ray.io/compute-runtime": name,
},
}
runtimes, err := client.List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
})
if err != nil {
return nil, util.Wrap(err, "Get compute runtime failed")
}

if len(runtimes.Items) > 1 {
return nil, fmt.Errorf("find %d duplicates compute runtimes", len(runtimes.Items))
}

if len(runtimes.Items) == 0 {
return nil, fmt.Errorf("can not find compue runtime with name %s", name)
}

return &runtimes.Items[0], nil
}
90 changes: 90 additions & 0 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package model

import (
"strconv"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/ray-project/kuberay/apiserver/pkg/util"
api "github.com/ray-project/kuberay/proto/go_client"
"github.com/ray-project/kuberay/ray-operator/api/raycluster/v1alpha1"
v1 "k8s.io/api/core/v1"
)

func FromCrdToApiClusters(clusters []*v1alpha1.RayCluster) []*api.Cluster {
apiClusters := make([]*api.Cluster, 0)
for _, cluster := range clusters {
apiClusters = append(apiClusters, FromCrdToApiCluster(cluster))
}
return apiClusters
}

func FromCrdToApiCluster(cluster *v1alpha1.RayCluster) *api.Cluster {
pbCluster := &api.Cluster{
Name: cluster.Name,
Namespace: cluster.Namespace,
Version: cluster.Labels[util.RayClusterVersionLabelKey],
User: cluster.Labels[util.RayClusterUserLabelKey],
Environment: api.Cluster_Environment(api.Cluster_Environment_value[cluster.Labels[util.RayClusterEnvironmentLabelKey]]),
CreatedAt: &timestamp.Timestamp{Seconds: cluster.CreationTimestamp.Unix()},
}

// loop container and find the resource
pbCluster.ClusterSpec = &api.ClusterSpec{}
pbCluster.ClusterSpec.HeadGroupSpec = PopulateHeadNodeSpec(cluster.Spec.HeadGroupSpec)
pbCluster.ClusterSpec.WorkerGroupSepc = PopulateWorkerNodeSpec(cluster.Spec.WorkerGroupSpecs)

return pbCluster
}

func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec {
headNodeSpec := &api.HeadGroupSpec{
RayStartParams: spec.RayStartParams,
ServiceType: string(spec.ServiceType),
Image: spec.Template.Annotations[util.RayClusterImageAnnotationKey],
ComputeTemplate: spec.Template.Annotations[util.RayClusterComputeTemplateAnnotationKey],
}

return headNodeSpec
}

func PopulateWorkerNodeSpec(specs []v1alpha1.WorkerGroupSpec) []*api.WorkerGroupSpec {
var workerNodeSpecs []*api.WorkerGroupSpec

for _, spec := range specs {
workerNodeSpec := &api.WorkerGroupSpec{
RayStartParams: spec.RayStartParams,
MaxReplicas: *spec.MinReplicas,
MinReplicas: *spec.MaxReplicas,
Replicas: *spec.Replicas,
GroupName: spec.GroupName,
Image: spec.Template.Annotations[util.RayClusterImageAnnotationKey],
ComputeTemplate: spec.Template.Annotations[util.RayClusterComputeTemplateAnnotationKey],
}
// Resources.
workerNodeSpecs = append(workerNodeSpecs, workerNodeSpec)
}

return workerNodeSpecs
}

func FromKubeToAPIComputeTemplate(configMap *v1.ConfigMap) *api.ComputeTemplate {
cpu, _ := strconv.ParseUint(configMap.Data["cpu"], 10, 32)
memory, _ := strconv.ParseUint(configMap.Data["memory"], 10, 32)
gpu, _ := strconv.ParseUint(configMap.Data["gpu"], 10, 32)

runtime := &api.ComputeTemplate{}
runtime.Name = configMap.Name
runtime.Cpu = uint32(cpu)
runtime.Memory = uint32(memory)
runtime.Gpu = uint32(gpu)
runtime.GpuAccelerator = configMap.Data["gpu_accelerator"]
return runtime
}

func FromKubeToAPIComputeTemplates(configMaps []*v1.ConfigMap) []*api.ComputeTemplate {
apiComputeTemplates := make([]*api.ComputeTemplate, 0)
for _, configMap := range configMaps {
apiComputeTemplates = append(apiComputeTemplates, FromKubeToAPIComputeTemplate(configMap))
}
return apiComputeTemplates
}
Loading

0 comments on commit cfb0562

Please sign in to comment.