Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fake TPU e2e Autoscaling Test Cases #2279

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ray-operator/test/e2eautoscaler/create_detached_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
parser.add_argument('name')
parser.add_argument('--num-cpus', type=float, default=1)
parser.add_argument('--num-gpus', type=float, default=0)
parser.add_argument('--custom-resource-name', type=str, default="CustomResource")
parser.add_argument('--num-custom-resources', type=float, default=0)
args = parser.parse_args()

@ray.remote(num_cpus=args.num_cpus, num_gpus=args.num_gpus, resources={"CustomResource": args.num_custom_resources})
@ray.remote(num_cpus=args.num_cpus, num_gpus=args.num_gpus, resources={args.custom_resource_name: args.num_custom_resources})
class Actor:
pass

Expand Down
136 changes: 136 additions & 0 deletions ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,142 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) {
})
}

func TestRayClusterAutoscalerWithFakeSingleHostTPU(t *testing.T) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts-tpu", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)

test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) {
rayClusterSpecAC := rayv1ac.RayClusterSpec().
WithEnableInTreeAutoscaling(true).
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"num-cpus": "0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(0).
WithMinReplicas(0).
WithMaxReplicas(3).
WithNumOfHosts(1).
WithGroupName("tpu-group").
WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `"{\"TPU\": 4}"`}).
WithTemplate(workerPodTemplateApplyConfiguration()))
rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

// Wait for RayCluster to become ready and verify the number of available worker replicas.
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)
test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0)))

headPod := GetHeadPod(test, rayCluster)
test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name)

// Create a detached tpu actor, and 1 worker in the multi-host "tpu-group" should be created.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "tpu_actor", "--custom-resource-name=\"TPU\"", "--num-custom-resources=4"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1))))

// Each TPU multi-host replica should have 1 worker, so we check for 1 pod in 'tpu-group'.
test.Expect(GetGroupPods(test, rayCluster, "tpu-group")).To(HaveLen(1))

// Terminate the TPU detached actor and the worker group replica should be deleted.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "tpu_actor"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0))))
})
}

func TestRayClusterAutoscalerWithFakeMultiHostTPU(t *testing.T) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts-tpu", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)

test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) {
rayClusterSpecAC := rayv1ac.RayClusterSpec().
WithEnableInTreeAutoscaling(true).
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"num-cpus": "0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(0).
WithMinReplicas(0).
WithMaxReplicas(3).
WithNumOfHosts(4).
WithGroupName("tpu-group").
WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `"{\"TPU\": 4}"`}).
WithTemplate(workerPodTemplateApplyConfiguration()))
rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

// Wait for RayCluster to become ready and verify the number of available worker replicas.
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)
test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0)))

headPod := GetHeadPod(test, rayCluster)
test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name)

// Create a detached tpu actor, and 4 workers in the multi-host "tpu-group" should be created.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "tpu_actor_1", "--custom-resource-name=\"TPU\"", "--num-custom-resources=4"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1))))

// Each TPU multi-host replica should have 4 workers, so we check for 4 pods in 'tpu-group'.
test.Expect(GetGroupPods(test, rayCluster, "tpu-group")).To(HaveLen(4))

// Each TPU multi-host worker should have a task or actor scheduled on it, therefore we create 3 more detached actors
// to run on each node in the multi-host TPU worker group.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "tpu_actor_2", "--custom-resource-name=\"TPU\"", "--num-custom-resources=4"})
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "tpu_actor_3", "--custom-resource-name=\"TPU\"", "--num-custom-resources=4"})
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "tpu_actor_4", "--custom-resource-name=\"TPU\"", "--num-custom-resources=4"})

// Each new TPU detached actor should get scheduled to an existing scaled-up worker, so we check that there are still 4 pods in 'tpu-group'.
test.Expect(GetGroupPods(test, rayCluster, "tpu-group")).To(HaveLen(4))
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved

// Terminating one TPU detached actor will result in the Ray node becoming idle, causing Ray to scale down the entire multi-host
// worker group. A new multi-host worker group will then be scaled back up since the remaining detached actors are running.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior seems a bit unexpected to me. What's the reason we expect a scale down and a scale up again in this scenario?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might just be mis-understanding this comment. Should there be an assertion for this part?

A new multi-host worker group will then be scaled back up since the remaining detached actors are running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detached actors keep running when the Ray node they're scheduled on is scaled down, so the autoscaler sees the request for TPUs and scales back up a multi-host worker group to meet the unmet demand. In a regular scenario (i..e non-detached actors), the actors would be terminated along with their respective nodes when the replica scales down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add an assertion that checks that the pod list length becomes 0 before becoming 4 again

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I missed the behavior specific to detached actors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add an assertion that checks that the pod list length becomes 0 before becoming 4 again

sgtm!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing this section in 0c6bb58, because getting the node to become idle requires setting the timeout to 5+ minutes which I'd imagine would slow down the presubmit too much. The behavior to scale down a multi-host replica is still tested by deleting the detached actors.

ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "tpu_actor_1"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0))))

// Terminate the remaining 3 TPU detached actors, and the worker group should be deleted.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "tpu_actor_2"})
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "tpu_actor_3"})
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "tpu_actor_4"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0))))
test.Expect(GetGroupPods(test, rayCluster, "tpu-group")).To(HaveLen(0))
})
}

func TestRayClusterAutoscalerWithCustomResource(t *testing.T) {
test := With(t)

Expand Down
Loading