Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

created HA example for API server #1461

Merged
merged 6 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions apiserver/HACluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Creating HA cluster with API Server

One of the issue for long-running Ray applications, for example, Ray Serve is that Ray Head node is a single
point of failure, which means that if the Head node dies, complete cluster has to be restarted. Fortunately,
KubeRay cluster provides an option to create
[fault tolerance Ray cluster](https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html).
The similar type of highly available Ray cluster can also be created using API server. The foundation of this
approach is ensuring high availability Global Control Service (GCS) data. GCS manages cluster-level
metadata. By default, the GCS lacks fault tolerance as it stores all data in-memory, and a failure can cause the
entire Ray cluster to fail. To make the GCS fault tolerant, you must have a high-availability Redis. This way,
in the event of a GCS restart, it retrieves all the data from the Redis instance and resumes its regular
functioning.

## Creating external Redis cluster
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

A comprehensive documentation on creating Redis cluster on Kubernetes can be found
[here]( https://www.dragonflydb.io/guides/redis-kubernetes). For this example we will use a rather simple
[yaml file](test/cluster/redis/redis.yaml). To create Redis run:

```sh
kubectl create ns redis
kubectl apply -f <your location>/kuberay/apiserver/test/cluster/redis/redis.yaml -n redis
```

Note that here we are deploying redis to the `redis` namespace, that we are creating here.

Alternatively, if you run on the cloud you can use managed version of HA Redis, which will not require
you to stand up, run, manage and monitor your own version of redis.

## Creating Redis password secret

Before creating your cluster, you also need to create [secret](test/cluster/redis/redis_passwrd.yaml) in the
namespace where you want to create your Ray cluster (remember, that secret is visible only within a given
namespace). To create a secret for using external redis run:

```sh
kubectl apply -f <your location>/kuberay/apiserver/test/cluster/redis/redis_passwrd.yaml
```

## Ray Code for testing

For both Ray jobs and Ray Serve we are recommending packaging the code in the image. For a simple testing here
we will create a [config map](test/cluster/code_configmap.yaml), containg simple code, that we will use for
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
testing. To deploy it run the following:

```sh
kubectl apply -f <your location>/kuberay/apiserver/test/cluster/code_configmap.yaml
```

## API server request

To create a a cluster we can use the following curl command:
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

```sh
curl -X POST 'localhost:8888/apis/v1alpha2/namespaces/default/clusters' \
--header 'Content-Type: application/json' \
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
--data '{
"name": "ha-cluster",
"namespace": "default",
"user": "boris",
"version": "2.7.0",
"environment": "DEV",
"annotations" : {
"ray.io/ft-enabled": "true"
},
"clusterSpec": {
"headGroupSpec": {
"computeTemplate": "default-template",
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
"image": "rayproject/ray:2.7.0-py310",
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
"serviceType": "NodePort",
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"metrics-export-port": "8080",
"num-cpus": "0",
"redis-password": "$REDIS_PASSWORD"
},
"environment": {
"values": {
"RAY_REDIS_ADDRESS": "redis.redis.svc.cluster.local:6379"
},
"valuesFrom": {
"REDIS_PASSWORD": {
"source": 1,
"name": "redis-password-secret",
"key": "password"
}
}
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-example",
"items": {
"detached_actor.py" : "detached_actor.py",
"increment_counter.py" : "increment_counter.py"
}
}
]
},
"workerGroupSpec": [
{
"groupName": "small-wg",
"computeTemplate": "default-template",
"image": "rayproject/ray:2.7.0-py310",
"replicas": 1,
"minReplicas": 0,
"maxReplicas": 5,
"rayStartParams": {
"node-ip-address": "$MY_POD_IP",
"metrics-export-port": "8080"
},
"environment": {
"values": {
"RAY_gcs_rpc_server_reconnect_timeout_s": "300"
}
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-example",
"items": {
"detached_actor.py" : "detached_actor.py",
"increment_counter.py" : "increment_counter.py"
}
}
]
}
]
}
}'
```

Lets discuss the importaant pieces here:
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
You need to specify annotation, that tells Ray that this is cluster with GCS fault tolerance

```sh
ray.io/ft-enabled: "true"
```

For the `headGroupSpec` you need the following. In the `rayStartParams` you need to add information about Redis
password.

```sh
"redis-password:: "$REDIS_PASSWORD"
"num-cpu": "0"
```

Where the value of `REDIS_PASSWORD` comes from environment variable (below). We also ensure that that no
application code runs on a head node.

blublinsky marked this conversation as resolved.
Show resolved Hide resolved
The following environment variable have to be added here:

```sh
"environment": {
"values": {
"RAY_REDIS_ADDRESS": "redis.redis.svc.cluster.local:6379"
},
"valuesFrom": {
"REDIS_PASSWORD": {
"source": 1,
"name": "redis-password-secret",
"key": "password"
}
}
},
```

For the `workerGroupSpecs` you might want to increase `gcs_rpc_server_reconnect_timeout` by specifying the following
environment variable:

```sh
"environment": {
"values": {
"RAY_gcs_rpc_server_reconnect_timeout_s": "300"
}
},
```

This environment variable allows to increase GCS heartbeat timeout, which is 60 sec by default. The reason for
increasing it is because restart of the head node can take some time, and we want to make sure that the workwer node
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
will not be killed during this time.

## Testing resulting cluster

Once the cluster is created, we can validate that it is working correctly. To do this first create a detached actor.
To do this, note the name of the head node and create a detached actor using the following command:

```sh
kubectl exec -it <head node pod name> -- python3 /home/ray/samples/detached_actor.py
```

Once this is done, open Ray dashboard (using pod-forward). In the cluster tab you should see 2 nodes and in the
actor pane you should see created actor.

blublinsky marked this conversation as resolved.
Show resolved Hide resolved
Now you can delete head node pode:
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

```sh
kubectl delete pods <head node pod name>
```

The operator will recreate it. Make sure that only head node is recreated (note that it now has a different name),
while worker node stays as is. Now you can go to the dashboard and make sure that in the cluster tab you still see
2 nodes and in the actor pane you still see created actor.

blublinsky marked this conversation as resolved.
Show resolved Hide resolved
For additional test run the following command:

```sh
kubectl exec -it <head node pod name> -- python3 /home/ray/samples/increment_counter.py
```

and make sure that it executes correctly. Note that the name of the head node here is different
78 changes: 63 additions & 15 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func getHeadNodeEnv() []string {
"RAY_PORT",
"RAY_ADDRESS",
"RAY_USAGE_STATS_KUBERAY_IN_USE",
"REDIS_PASSWORD",
}
}

Expand Down Expand Up @@ -173,13 +172,7 @@ func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec {

// Here we update environment only for a container named 'ray-head'
if container, _, ok := util.GetContainerByName(spec.Template.Spec.Containers, "ray-head"); ok && len(container.Env) > 0 {
env := make(map[string]string)
for _, kv := range container.Env {
if !contains(getHeadNodeEnv(), kv.Name) {
env[kv.Name] = kv.Value
}
}
headNodeSpec.Environment = env
headNodeSpec.Environment = convert_env_variables(container.Env, true)
}

if len(spec.Template.Spec.ServiceAccountName) > 1 {
Expand Down Expand Up @@ -224,13 +217,7 @@ func PopulateWorkerNodeSpec(specs []v1alpha1.WorkerGroupSpec) []*api.WorkerGroup

// Here we update environment only for a container named 'ray-worker'
if container, _, ok := util.GetContainerByName(spec.Template.Spec.Containers, "ray-worker"); ok && len(container.Env) > 0 {
env := make(map[string]string)
for _, kv := range container.Env {
if !contains(getWorkNodeEnv(), kv.Name) {
env[kv.Name] = kv.Value
}
}
workerNodeSpec.Environment = env
workerNodeSpec.Environment = convert_env_variables(container.Env, false)
}

if len(spec.Template.Spec.ServiceAccountName) > 1 {
Expand All @@ -247,6 +234,67 @@ func PopulateWorkerNodeSpec(specs []v1alpha1.WorkerGroupSpec) []*api.WorkerGroup
return workerNodeSpecs
}

func convert_env_variables(cenv []v1.EnvVar, header bool) *api.EnvironmentVariables {
env := api.EnvironmentVariables{
Values: make(map[string]string),
ValuesFrom: make(map[string]*api.EnvValueFrom),
}
for _, kv := range cenv {
if header {
if contains(getHeadNodeEnv(), kv.Name) {
continue
}
} else {
if contains(getWorkNodeEnv(), kv.Name) {
// Skip reserved names
continue
}
}
if kv.ValueFrom != nil {
// this is value from
if kv.ValueFrom.ConfigMapKeyRef != nil {
// This is config map
env.ValuesFrom[kv.Name] = &api.EnvValueFrom{
Source: api.EnvValueFrom_CONFIGMAP,
Name: kv.ValueFrom.ConfigMapKeyRef.Name,
Key: kv.ValueFrom.ConfigMapKeyRef.Key,
}
continue
}
if kv.ValueFrom.SecretKeyRef != nil {
// This is Secret
env.ValuesFrom[kv.Name] = &api.EnvValueFrom{
Source: api.EnvValueFrom_SECRET,
Name: kv.ValueFrom.SecretKeyRef.Name,
Key: kv.ValueFrom.SecretKeyRef.Key,
}
continue
}
if kv.ValueFrom.ResourceFieldRef != nil {
// This resource ref
env.ValuesFrom[kv.Name] = &api.EnvValueFrom{
Source: api.EnvValueFrom_RESOURCEFIELD,
Name: kv.ValueFrom.ResourceFieldRef.ContainerName,
Key: kv.ValueFrom.ResourceFieldRef.Resource,
}
continue
}
if kv.ValueFrom.FieldRef != nil {
// This resource ref
env.ValuesFrom[kv.Name] = &api.EnvValueFrom{
Source: api.EnvValueFrom_FIELD,
Key: kv.ValueFrom.FieldRef.FieldPath,
}
continue
}
} else {
// This is value
env.Values[kv.Name] = kv.Value
}
}
return &env
}

func FromKubeToAPIComputeTemplate(configMap *v1.ConfigMap) *api.ComputeTemplate {
cpu, _ := strconv.ParseUint(configMap.Data["cpu"], 10, 32)
memory, _ := strconv.ParseUint(configMap.Data["memory"], 10, 32)
Expand Down
Loading
Loading