From 3aff0be263887a1dfe72a82bce31ab1116ccb8a2 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 27 Jun 2024 22:27:54 +0800 Subject: [PATCH] [Test][Autoscaler] Run Autoscaler e2e tests on buildkite and share the support functions Signed-off-by: Rueian --- .../test/e2e/rayjob_cluster_selector_test.go | 4 +- .../test/e2e/rayjob_lightweight_test.go | 10 +- ray-operator/test/e2e/rayjob_suspend_test.go | 6 +- ray-operator/test/e2e/rayjob_test.go | 14 +- ray-operator/test/e2e/rayservice_ha_test.go | 6 +- ray-operator/test/e2e/support.go | 53 ++++---- .../raycluster_autoscaler_test.go | 26 ++-- ray-operator/test/e2eautoscaler/support.go | 124 ------------------ 8 files changed, 59 insertions(+), 184 deletions(-) diff --git a/ray-operator/test/e2e/rayjob_cluster_selector_test.go b/ray-operator/test/e2e/rayjob_cluster_selector_test.go index 48c84016e47..d03fedbd486 100644 --- a/ray-operator/test/e2e/rayjob_cluster_selector_test.go +++ b/ray-operator/test/e2e/rayjob_cluster_selector_test.go @@ -19,14 +19,14 @@ func TestRayJobWithClusterSelector(t *testing.T) { test.StreamKubeRayOperatorLogs() // Job scripts - jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py")) + jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) // RayCluster rayClusterAC := rayv1ac.RayCluster("raycluster", namespace.Name). - WithSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))) + WithSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) diff --git a/ray-operator/test/e2e/rayjob_lightweight_test.go b/ray-operator/test/e2e/rayjob_lightweight_test.go index 5f8ee86a8f7..d218106c71b 100644 --- a/ray-operator/test/e2e/rayjob_lightweight_test.go +++ b/ray-operator/test/e2e/rayjob_lightweight_test.go @@ -22,7 +22,7 @@ func TestRayJobLightWeightMode(t *testing.T) { test.StreamKubeRayOperatorLogs() // Job scripts - jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py")) + jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) @@ -49,8 +49,8 @@ env_vars: "num-cpus": "4", "resources": `'{"R1": 4}'`, }). - WithTemplate(podTemplateSpecApplyConfiguration(headPodTemplateApplyConfiguration(), - mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs")))))) + WithTemplate(podTemplateSpecApplyConfiguration(HeadPodTemplateApplyConfiguration(), + MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs")))))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -83,7 +83,7 @@ env_vars: WithSubmissionMode(rayv1.HTTPMode). WithEntrypoint("python /home/ray/jobs/fail.py"). WithShutdownAfterJobFinishes(false). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -114,7 +114,7 @@ env_vars: WithSpec(rayv1ac.RayJobSpec(). WithSubmissionMode(rayv1.HTTPMode). WithEntrypoint("python /home/ray/jobs/stop.py"). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) diff --git a/ray-operator/test/e2e/rayjob_suspend_test.go b/ray-operator/test/e2e/rayjob_suspend_test.go index 8e6a8f8d2aa..33f1ce9e4a4 100644 --- a/ray-operator/test/e2e/rayjob_suspend_test.go +++ b/ray-operator/test/e2e/rayjob_suspend_test.go @@ -22,7 +22,7 @@ func TestRayJobSuspend(t *testing.T) { test.StreamKubeRayOperatorLogs() // Job scripts - jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "long_running.py", "counter.py")) + jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "long_running.py", "counter.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) @@ -31,7 +31,7 @@ func TestRayJobSuspend(t *testing.T) { // RayJob rayJobAC := rayv1ac.RayJob("long-running", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). WithEntrypoint("python /home/ray/jobs/long_running.py"). WithShutdownAfterJobFinishes(true). WithTTLSecondsAfterFinished(600). @@ -91,7 +91,7 @@ env_vars: `). WithShutdownAfterJobFinishes(true). WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) diff --git a/ray-operator/test/e2e/rayjob_test.go b/ray-operator/test/e2e/rayjob_test.go index 97f7d774cfc..4680ddf6f10 100644 --- a/ray-operator/test/e2e/rayjob_test.go +++ b/ray-operator/test/e2e/rayjob_test.go @@ -21,7 +21,7 @@ func TestRayJob(t *testing.T) { test.StreamKubeRayOperatorLogs() // Job scripts - jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py", "long_running.py")) + jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py", "long_running.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) @@ -30,7 +30,7 @@ func TestRayJob(t *testing.T) { // RayJob rayJobAC := rayv1ac.RayJob("counter", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). WithEntrypoint("python /home/ray/jobs/counter.py"). WithRuntimeEnvYAML(` env_vars: @@ -87,7 +87,7 @@ env_vars: // RayJob rayJobAC := rayv1ac.RayJob("fail", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). WithEntrypoint("python /home/ray/jobs/fail.py"). WithShutdownAfterJobFinishes(false). WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) @@ -132,7 +132,7 @@ env_vars: // RayJob rayJobAC := rayv1ac.RayJob("fail-k8s-job", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). WithEntrypoint("The command will be overridden by the submitter Job"). WithShutdownAfterJobFinishes(true). WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) @@ -176,7 +176,7 @@ env_vars: WithSpec(rayv1ac.RayJobSpec(). WithEntrypoint("python /home/ray/jobs/stop.py"). WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -205,7 +205,7 @@ env_vars: WithSpec(rayv1ac.RayJobSpec(). WithEntrypoint("python /home/ray/jobs/counter.py"). WithRuntimeEnvYAML(`invalid_yaml_string`). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -219,7 +219,7 @@ env_vars: test.T().Run("RayJob has passed ActiveDeadlineSeconds", func(_ *testing.T) { rayJobAC := rayv1ac.RayJob("long-running", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). - WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). WithEntrypoint("python /home/ray/jobs/long_running.py"). WithShutdownAfterJobFinishes(true). WithTTLSecondsAfterFinished(600). diff --git a/ray-operator/test/e2e/rayservice_ha_test.go b/ray-operator/test/e2e/rayservice_ha_test.go index 03c184f2f56..73ce41eacb2 100644 --- a/ray-operator/test/e2e/rayservice_ha_test.go +++ b/ray-operator/test/e2e/rayservice_ha_test.go @@ -21,7 +21,7 @@ func TestRayService(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "locustfile.py", "locust_runner.py")) + scriptsAC := NewConfigMap(namespace.Name, "scripts", Files(test, _files, "locustfile.py", "locust_runner.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -46,7 +46,7 @@ applications: ray_actor_options: num_cpus: 1 `). - WithRayClusterSpec(newRayClusterSpec())) + WithRayClusterSpec(NewRayClusterSpec())) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -61,7 +61,7 @@ applications: WithRayVersion(GetRayVersion()). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). - WithTemplate(apply(headPodTemplateApplyConfiguration(), mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))))) + WithTemplate(Apply(HeadPodTemplateApplyConfiguration(), MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))))) locustCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), locustClusterAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created Locust RayCluster %s/%s successfully", locustCluster.Namespace, locustCluster.Name) diff --git a/ray-operator/test/e2e/support.go b/ray-operator/test/e2e/support.go index 40249c03483..517567c0e32 100644 --- a/ray-operator/test/e2e/support.go +++ b/ray-operator/test/e2e/support.go @@ -4,7 +4,6 @@ import ( "embed" "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" @@ -16,23 +15,23 @@ import ( //go:embed *.py var _files embed.FS -func ReadFile(t Test, fileName string) []byte { +func ReadFile(t Test, fs embed.FS, fileName string) []byte { t.T().Helper() - file, err := _files.ReadFile(fileName) + file, err := fs.ReadFile(fileName) t.Expect(err).NotTo(gomega.HaveOccurred()) return file } -type option[T any] func(t *T) *T +type ApplyOption[T any] func(t *T) *T -func apply[T any](t *T, options ...option[T]) *T { +func Apply[T any](t *T, options ...ApplyOption[T]) *T { for _, opt := range options { t = opt(t) } return t } -func options[T any](options ...option[T]) option[T] { +func Options[T any](options ...ApplyOption[T]) ApplyOption[T] { return func(t *T) *T { for _, opt := range options { t = opt(t) @@ -41,42 +40,42 @@ func options[T any](options ...option[T]) option[T] { } } -func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { +func NewConfigMap(namespace, name string, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { cmAC := corev1ac.ConfigMap(name, namespace). WithBinaryData(map[string][]byte{}). WithImmutable(true) - return configMapWith(cmAC, options...) + return ConfigMapWith(cmAC, options...) } -func configMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { - return apply(configMapAC, options...) +func ConfigMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { + return Apply(configMapAC, options...) } -func file(t Test, fileName string) option[corev1ac.ConfigMapApplyConfiguration] { +func File(t Test, fs embed.FS, fileName string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] { return func(cmAC *corev1ac.ConfigMapApplyConfiguration) *corev1ac.ConfigMapApplyConfiguration { - cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fileName)}) + cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fs, fileName)}) return cmAC } } -func files(t Test, fileNames ...string) option[corev1ac.ConfigMapApplyConfiguration] { - var files []option[corev1ac.ConfigMapApplyConfiguration] +func Files(t Test, fs embed.FS, fileNames ...string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] { + var files []ApplyOption[corev1ac.ConfigMapApplyConfiguration] for _, fileName := range fileNames { - files = append(files, file(t, fileName)) + files = append(files, File(t, fs, fileName)) } - return options(files...) + return Options(files...) } -func newRayClusterSpec(options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration { - return rayClusterSpecWith(rayClusterSpec(), options...) +func NewRayClusterSpec(options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration { + return RayClusterSpecWith(rayClusterSpec(), options...) } -func rayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration { - return apply(spec, options...) +func RayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration { + return Apply(spec, options...) } -func mountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) option[T] { +func MountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) ApplyOption[T] { return func(t *T) *T { switch obj := (interface{})(t).(type) { case *rayv1ac.RayClusterSpecApplyConfiguration: @@ -104,21 +103,21 @@ func rayClusterSpec() *rayv1ac.RayClusterSpecApplyConfiguration { WithRayVersion(GetRayVersion()). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). - WithTemplate(headPodTemplateApplyConfiguration())). + WithTemplate(HeadPodTemplateApplyConfiguration())). WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). WithReplicas(1). WithMinReplicas(1). WithMaxReplicas(1). WithGroupName("small-group"). WithRayStartParams(map[string]string{"num-cpus": "1"}). - WithTemplate(workerPodTemplateApplyConfiguration())) + WithTemplate(WorkerPodTemplateApplyConfiguration())) } -func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...option[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration { - return apply(template, options...) +func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...ApplyOption[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration { + return Apply(template, options...) } -func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { +func HeadPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { return corev1ac.PodTemplateSpec(). WithSpec(corev1ac.PodSpec(). WithContainers(corev1ac.Container(). @@ -141,7 +140,7 @@ func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfigura })))) } -func workerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { +func WorkerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { return corev1ac.PodTemplateSpec(). WithSpec(corev1ac.PodSpec(). WithContainers(corev1ac.Container(). diff --git a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go index 92a250d1e9a..fd7a474d965 100644 --- a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go +++ b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go @@ -4,10 +4,10 @@ import ( "testing" . "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + "github.com/ray-project/kuberay/ray-operator/test/e2e" . "github.com/ray-project/kuberay/ray-operator/test/support" ) @@ -19,7 +19,7 @@ func TestRayClusterAutoscaler(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := e2e.NewConfigMap(namespace.Name, "scripts", e2e.Files(test, _files, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -30,16 +30,16 @@ func TestRayClusterAutoscaler(t *testing.T) { WithRayVersion(GetRayVersion()). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"num-cpus": "0"}). - WithTemplate(headPodTemplateApplyConfiguration())). + WithTemplate(e2e.HeadPodTemplateApplyConfiguration())). WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). WithReplicas(0). WithMinReplicas(0). WithMaxReplicas(3). WithGroupName("small-group"). WithRayStartParams(map[string]string{"num-cpus": "1"}). - WithTemplate(workerPodTemplateApplyConfiguration())) + WithTemplate(e2e.WorkerPodTemplateApplyConfiguration())) rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). - WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + WithSpec(e2e.Apply(rayClusterSpecAC, e2e.MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -84,7 +84,7 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := e2e.NewConfigMap(namespace.Name, "scripts", e2e.Files(test, _files, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -95,16 +95,16 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { WithRayVersion(GetRayVersion()). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"num-cpus": "0"}). - WithTemplate(headPodTemplateApplyConfiguration())). + WithTemplate(e2e.HeadPodTemplateApplyConfiguration())). WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). WithReplicas(0). WithMinReplicas(0). WithMaxReplicas(3). WithGroupName("gpu-group"). WithRayStartParams(map[string]string{"num-cpus": "1", "num-gpus": "1"}). - WithTemplate(workerPodTemplateApplyConfiguration())) + WithTemplate(e2e.WorkerPodTemplateApplyConfiguration())) rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). - WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + WithSpec(e2e.Apply(rayClusterSpecAC, e2e.MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) @@ -142,7 +142,7 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := e2e.NewConfigMap(namespace.Name, "scripts", e2e.Files(test, _files, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -155,16 +155,16 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { WithRayVersion(GetRayVersion()). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"num-cpus": "0"}). - WithTemplate(headPodTemplateApplyConfiguration())). + WithTemplate(e2e.HeadPodTemplateApplyConfiguration())). WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). WithReplicas(0). WithMinReplicas(0). WithMaxReplicas(3). WithGroupName(groupName). WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `"{\"CustomResource\": 1}"`}). - WithTemplate(workerPodTemplateApplyConfiguration())) + WithTemplate(e2e.WorkerPodTemplateApplyConfiguration())) rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). - WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + WithSpec(e2e.Apply(rayClusterSpecAC, e2e.MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) test.Expect(err).NotTo(HaveOccurred()) diff --git a/ray-operator/test/e2eautoscaler/support.go b/ray-operator/test/e2eautoscaler/support.go index 48fae41b54e..0c7d2aef120 100644 --- a/ray-operator/test/e2eautoscaler/support.go +++ b/ray-operator/test/e2eautoscaler/support.go @@ -2,131 +2,7 @@ package e2eautoscaler import ( "embed" - - "github.com/onsi/gomega" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - corev1ac "k8s.io/client-go/applyconfigurations/core/v1" - - rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" - . "github.com/ray-project/kuberay/ray-operator/test/support" ) //go:embed *.py var _files embed.FS - -func ReadFile(t Test, fileName string) []byte { - t.T().Helper() - file, err := _files.ReadFile(fileName) - t.Expect(err).NotTo(gomega.HaveOccurred()) - return file -} - -type option[T any] func(t *T) *T - -func apply[T any](t *T, options ...option[T]) *T { - for _, opt := range options { - t = opt(t) - } - return t -} - -func options[T any](options ...option[T]) option[T] { - return func(t *T) *T { - for _, opt := range options { - t = opt(t) - } - return t - } -} - -func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { - cmAC := corev1ac.ConfigMap(name, namespace). - WithBinaryData(map[string][]byte{}). - WithImmutable(true) - - return configMapWith(cmAC, options...) -} - -func configMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { - return apply(configMapAC, options...) -} - -func file(t Test, fileName string) option[corev1ac.ConfigMapApplyConfiguration] { - return func(cmAC *corev1ac.ConfigMapApplyConfiguration) *corev1ac.ConfigMapApplyConfiguration { - cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fileName)}) - return cmAC - } -} - -func files(t Test, fileNames ...string) option[corev1ac.ConfigMapApplyConfiguration] { - var files []option[corev1ac.ConfigMapApplyConfiguration] - for _, fileName := range fileNames { - files = append(files, file(t, fileName)) - } - return options(files...) -} - -func mountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) option[T] { - return func(t *T) *T { - switch obj := (interface{})(t).(type) { - case *rayv1ac.RayClusterSpecApplyConfiguration: - obj.HeadGroupSpec.Template.Spec.Containers[0].WithVolumeMounts(corev1ac.VolumeMount(). - WithName(configMap.Name). - WithMountPath(mountPath)) - obj.HeadGroupSpec.Template.Spec.WithVolumes(corev1ac.Volume(). - WithName(configMap.Name). - WithConfigMap(corev1ac.ConfigMapVolumeSource().WithName(configMap.Name))) - - case *corev1ac.PodTemplateSpecApplyConfiguration: - obj.Spec.Containers[0].WithVolumeMounts(corev1ac.VolumeMount(). - WithName(configMap.Name). - WithMountPath(mountPath)) - obj.Spec.WithVolumes(corev1ac.Volume(). - WithName(configMap.Name). - WithConfigMap(corev1ac.ConfigMapVolumeSource().WithName(configMap.Name))) - } - return t - } -} - -func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { - return corev1ac.PodTemplateSpec(). - WithSpec(corev1ac.PodSpec(). - WithContainers(corev1ac.Container(). - WithName("ray-head"). - WithImage(GetRayImage()). - WithPorts( - corev1ac.ContainerPort().WithName("gcs").WithContainerPort(6379), - corev1ac.ContainerPort().WithName("serve").WithContainerPort(8000), - corev1ac.ContainerPort().WithName("dashboard").WithContainerPort(8265), - corev1ac.ContainerPort().WithName("client").WithContainerPort(10001), - ). - WithResources(corev1ac.ResourceRequirements(). - WithRequests(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("300m"), - corev1.ResourceMemory: resource.MustParse("1G"), - }). - WithLimits(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("2G"), - })))) -} - -func workerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration { - return corev1ac.PodTemplateSpec(). - WithSpec(corev1ac.PodSpec(). - WithContainers(corev1ac.Container(). - WithName("ray-worker"). - WithImage(GetRayImage()). - WithResources(corev1ac.ResourceRequirements(). - WithRequests(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("300m"), - corev1.ResourceMemory: resource.MustParse("1G"), - }). - WithLimits(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("1G"), - })))) -}