Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob] [Doc] Add real-world Ray Job use case tutorial for KubeRay #1361

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions docs/guidance/batch-inference-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# RayJob Batch Inference Example

This page demonstrates how to use the RayJob custom resource to run a batch inference job on a Ray cluster.

We will use an image classification workload. The example is based on <https://docs.ray.io/en/latest/data/examples/huggingface_vit_batch_prediction.html>. Please see that page for a full explanation of the code.

## Prerequisites

You must have a Kubernetes cluster running and `kubectl` configured to use it, and GPUs available. We provide a brief tutorial for setting up the necessary GPUs on Google Kubernetes Engine (GKE), but you can use any Kubernetes cluster with GPUs.

## Deploy KubeRay

Make sure your KubeRay operator version is at least v0.6.0.

The latest released KubeRay version is recommended.

For installation instructions, please follow [the documentation](../deploy/installation.md).

## Step 0: Create a Kubernetes cluster on GKE (Optional)

If you already have a Kubernetes cluster with GPUs, you can skip this step.

Run this command and all following commands on your local machine or on the [Google Cloud Shell](https://cloud.google.com/shell). If running from your local machine, you will need to install the [Google Cloud SDK](https://cloud.google.com/sdk/docs/install).

```bash

gcloud container clusters create batch-gpu-cluster \
--num-nodes=1 --min-nodes 0 --max-nodes 1 --enable-autoscaling \
--zone=us-west1-b --machine-type e2-standard-8

```

This command creates a Kubernetes cluster named `batch-gpu-cluster` with 1 node in the `us-west1-b` zone. In this example, we use the `e2-standard-8` machine type, which has 8 vCPUs and 32 GB RAM.

You can also create a cluster from the [Google Cloud Console](https://console.cloud.google.com/kubernetes/list).

Run the following command to create a GPU node pool for the Ray cluster.
(You can also create it from the Google Cloud Console; see the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/node-taints#create_a_node_pool_with_node_taints) for more details.)

```bash

gcloud container node-pools create gpu-node-pool \
--accelerator type=nvidia-tesla-t4,count=4,gpu-driver-version=default \
--zone us-west1-b \
--cluster batch-gpu-cluster \
--num-nodes 1 \
--min-nodes 0 \
--max-nodes 1 \
--enable-autoscaling \
--machine-type n1-standard-64

```

The `--accelerator` flag specifies the type and number of GPUs for each node in the node pool. In this example, we use the [NVIDIA L4](https://cloud.google.com/compute/docs/gpus#l4-gpus) GPU. The machine type is `n1-standard-64`, which has [64 vCPUs and 240 GB RAM](https://cloud.google.com/compute/docs/general-purpose-machines#n1_machine_types). The `--min-nodes 0` and `--max-nodes 1` flags enable autoscaling for the node pool. The `--num-nodes 1` flag specifies the initial number of nodes in the node pool.

GKE will automatically prevent CPU-only pods such as the Kuberay operator from being scheduled on this GPU node pool. This is because GPUs are expensive, so we want to use this node pool for Ray GPU nodes only. To set this behavior up manually, you can use taints and tolerations; see the [Kubernetes documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/).

Finally, run the following command to download credentials and configure the Kubernetes CLI to use them.

```sh
gcloud container clusters get-credentials batch-gpu-cluster --zone us-west1-b
```

For more details, see the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl).

## Step 1: Install the KubeRay Operator

Once `kubectl` is configured to connect to your cluster, you can install the KubeRay operator.

```sh
# Install both CRDs and KubeRay operator v0.5.0.
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
helm repo update
helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0

# It should be scheduled on the CPU node. If it is not, something is wrong.
```

## Step 2: Submit the RayJob

Now we can submit the RayJob. Our RayJob spec is defined in [ray_v1alpha1_rayjob.batch-inference.yaml](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray_v1alpha1_rayjob.batch-inference.yaml).

Note that the `RayJob` spec contains a spec for the `RayCluster` that is to be created for the job. For this tutorial, we use a single-node cluster with 4 GPUs. For production use cases, we recommend using a multi-node cluster where the head node does not have GPUs, so that Ray can automatically schedule GPU workloads on worker nodes and they won't interfere with critical Ray processes on the head node.

Note the following fields in the `RayJob` spec, which specify the Ray image and the GPU resources for our Ray node:
```yaml
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.6.3-gpu
resources:
limits:
nvidia.com/gpu: "4"
requests:
cpu: "54"
memory: "54Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-tesla-t4 # This is the GPU type we used in the GPU node pool.
```

To submit the job, run the following command:

```bash
kubectl apply -f ray_v1alpha1_rayjob.batch-inference.yaml
```

Here the cluster is running because we did not set `shutdownAfterJobFinishes` in the `RayJob` spec. If you set `shutdownAfterJobFinishes` to `true`, the cluster will be shut down after the job finishes.

We can check the status with `kubectl describe rayjob rayjob-sample`.

Sample output:

```
[...]
Status:
Dashboard URL: rayjob-sample-raycluster-j6t8n-head-svc.default.svc.cluster.local:8265
End Time: 2023-08-22T22:48:35Z
Job Deployment Status: Running
Job Id: rayjob-sample-ft8lh
Job Status: SUCCEEDED
Message: Job finished successfully.
Observed Generation: 2
Ray Cluster Name: rayjob-sample-raycluster-j6t8n
Ray Cluster Status:
Endpoints:
Client: 10001
Dashboard: 8265
Gcs - Server: 6379
Metrics: 8080
Head:
Pod IP: 10.112.1.3
Service IP: 10.116.1.93
Last Update Time: 2023-08-22T22:47:44Z
Observed Generation: 1
State: ready
Start Time: 2023-08-22T22:48:02Z
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Created 36m rayjob-controller Created cluster rayjob-sample-raycluster-j6t8n
Normal Created 32m rayjob-controller Created k8s job rayjob-sample
```

To view the logs, first find the name of the pod running the job with `kubectl get pods`.

Sample output:

```bash
NAME READY STATUS RESTARTS AGE
kuberay-operator-8b86754c-r4rc2 1/1 Running 0 25h
rayjob-sample-raycluster-j6t8n-head-kx2gz 1/1 Running 0 35m
rayjob-sample-w98c7 0/1 Completed 0 30m
```

Next, run

```text
kubetcl logs rayjob-sample-w98c7
```

to get the standard output of the `entrypoint` command for the `RayJob`. Sample output:

```text
[...]
Running: 62.0/64.0 CPU, 4.0/4.0 GPU, 955.57 MiB/12.83 GiB object_store_memory: 0%| | 0/200 [00:05<?, ?it/s]
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 999.41 MiB/12.83 GiB object_store_memory: 0%| | 0/200 [00:05<?, ?it/s]
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 999.41 MiB/12.83 GiB object_store_memory: 0%| | 1/200 [00:05<17:04, 5.15s/it]
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 1008.68 MiB/12.83 GiB object_store_memory: 0%| | 1/200 [00:05<17:04, 5.15s/it]
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 1008.68 MiB/12.83 GiB object_store_memory: 100%|██████████| 1/1 [00:05<00:00, 5.15s/it]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This output is a little suspicious.


2023-08-22 15:48:33,905 WARNING actor_pool_map_operator.py:267 -- To ensure full parallelization across an actor pool of size 4, the specified batch size should be at most 5. Your configured batch size for this operator was 16.
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF7F0>
Label: tench, Tinca tinca
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546AE430>
Label: tench, Tinca tinca
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF430>
Label: tench, Tinca tinca
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546AE430>
Label: tench, Tinca tinca
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF7F0>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also suspicious, why are there only two distinct memory locations across the five lines?

Label: tench, Tinca tinca
2023-08-22 15:48:36,522 SUCC cli.py:33 -- -----------------------------------
2023-08-22 15:48:36,522 SUCC cli.py:34 -- Job 'rayjob-sample-ft8lh' succeeded
2023-08-22 15:48:36,522 SUCC cli.py:35 -- -----------------------------------
```
119 changes: 37 additions & 82 deletions ray-operator/config/samples/ray_v1alpha1_rayjob.batch-inference.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ spec:
# shutdownAfterJobFinishes: false
# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
# ttlSecondsAfterFinished: 10
# Runtime env decoded to {
# {
# "pip": [
# "torch",
# "torchvision",
# "Pillow",
# "transformers"
# ]
# }
runtimeEnv: ewogICJwaXAiOiBbCiAgICAidG9yY2giLAogICAgInRvcmNodmlzaW9uIiwKICAgICJQaWxsb3ciLAogICAgInRyYW5zZm9ybWVycyIKICBdCn0=
# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
Expand Down Expand Up @@ -47,14 +37,15 @@ spec:
name: client
resources:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resource config is pretty weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it to make limits=requests

limits:
cpu: 2
memory: 8Gi
nvidia.com/gpu: "4"
requests:
cpu: 2
memory: 8Gi
cpu: "54"
memory: "54Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-tesla-t4
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: code-sample
Expand All @@ -65,39 +56,6 @@ spec:
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams:
resources: '"{\"accelerator_type_cpu\": 48, \"accelerator_type_a10\": 2, \"accelerator_type_a100\": 2}"'
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray-ml:2.6.3-gpu
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "48"
memory: "192G"
nvidia.com/gpu: 4
requests:
cpu: "36"
memory: "128G"
nvidia.com/gpu: 4
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-tesla-t4
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
# submitterPodTemplate:
Expand Down Expand Up @@ -129,51 +87,48 @@ data:
s3_uri, mode="RGB"
)
ds
# TODO(archit) need to install Pillow, pytorch or tf or flax (pip install torch torchvision torchaudio)
from typing import Dict
import numpy as np

from transformers import pipeline
from PIL import Image

# Pick the largest batch size that can fit on our GPUs
BATCH_SIZE = 1024
BATCH_SIZE = 16

# TODO(archit) basic step
class ImageClassifier:
def __init__(self):
# If doing CPU inference, set `device="cpu"` instead.
self.classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0) # TODO:archit

# single_batch = ds.take_batch(10)
def __call__(self, batch: Dict[str, np.ndarray]):
# Convert the numpy array of images into a list of PIL images which is the format the HF pipeline expects.
outputs = self.classifier(
[Image.fromarray(image_array) for image_array in batch["image"]],
top_k=1,
batch_size=BATCH_SIZE)

# `outputs` is a list of length-one lists. For example:
# [[{'score': '...', 'label': '...'}], ..., [{'score': '...', 'label': '...'}]]
batch["score"] = [output[0]["score"] for output in outputs]
batch["label"] = [output[0]["label"] for output in outputs]
return batch

# from PIL import Image

# img = Image.fromarray(single_batch["image"][0])
# # display image
# img.show()
# from transformers import pipeline
# from PIL import Image

# # If doing CPU inference, set device="cpu" instead.
# classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0")
# outputs = classifier([Image.fromarray(image_array) for image_array in single_batch["image"]], top_k=1, batch_size=10)
# del classifier # Delete the classifier to free up GPU memory.
# print(outputs)

@ray.remote(num_gpus=1)
def do_single_batch():
single_batch = ds.take_batch(10)
predictions = ds.map_batches(
ImageClassifier,
compute=ray.data.ActorPoolStrategy(size=4), # Change this number based on the number of GPUs in your cluster.
num_gpus=1, # Specify 1 GPU per model replica.
batch_size=BATCH_SIZE # Use the largest batch size that can fit on our GPUs
)

from PIL import Image
prediction_batch = predictions.take_batch(5)

img = Image.fromarray(single_batch["image"][0])
# display image
from PIL import Image
print("A few sample predictions: ")
for image, prediction in zip(prediction_batch["image"], prediction_batch["label"]):
img = Image.fromarray(image)
# Display the image
img.show()
from transformers import pipeline
from PIL import Image

# If doing CPU inference, set device="cpu" instead.
classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0")
outputs = classifier([Image.fromarray(image_array) for image_array in single_batch["image"]], top_k=1, batch_size=10)
del classifier # Delete the classifier to free up GPU memory.
print(outputs)
return outputs
print("Label: ", prediction)

print(ray.get(do_single_batch.remote()))
# Write to local disk, or external storage, e.g. S3
# ds.write_parquet("s3://my_bucket/my_folder")
Loading