Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buildkite autoscaler e2e with shared support functions #2207

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .buildkite/test-sample-yamls.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,21 @@
- ./.buildkite/setup-env.sh
# Use KubeRay operator image from the latest release
- source .venv/bin/activate && BUILDKITE_ENV=true OPERATOR_IMAGE=quay.io/kuberay/operator:v1.1.0 python3 tests/test_sample_rayservice_yamls.py

- label: 'Test Autoscaler E2E (nightly operator)'
instance_size: large
image: golang:1.20
commands:
- source .buildkite/setup-env.sh
- kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml
- kubectl config set clusters.kind-kind.server https://docker:6443
# Build nightly KubeRay operator image
- pushd ray-operator
- IMG=kuberay/operator:nightly make docker-image
- kind load docker-image kuberay/operator:nightly
- IMG=kuberay/operator:nightly make deploy
- kubectl wait --timeout=90s --for=condition=Available=true deployment -n ray-system kuberay-operator
# Run e2e tests
- KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 30m -v ./test/e2eautoscaler
# Printing KubeRay operator logs
- kubectl logs -n ray-system --tail -1 -l app.kubernetes.io/name=kuberay
4 changes: 2 additions & 2 deletions ray-operator/test/e2e/rayjob_cluster_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func TestRayJobWithClusterSelector(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)

// RayCluster
rayClusterAC := rayv1ac.RayCluster("raycluster", namespace.Name).
WithSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))
WithSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
10 changes: 5 additions & 5 deletions ray-operator/test/e2e/rayjob_lightweight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRayJobLightWeightMode(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -49,8 +49,8 @@ env_vars:
"num-cpus": "4",
"resources": `'{"R1": 4}'`,
}).
WithTemplate(podTemplateSpecApplyConfiguration(headPodTemplateApplyConfiguration(),
mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))
WithTemplate(podTemplateSpecApplyConfiguration(HeadPodTemplateApplyConfiguration(),
MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -83,7 +83,7 @@ env_vars:
WithSubmissionMode(rayv1.HTTPMode).
WithEntrypoint("python /home/ray/jobs/fail.py").
WithShutdownAfterJobFinishes(false).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -114,7 +114,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithSubmissionMode(rayv1.HTTPMode).
WithEntrypoint("python /home/ray/jobs/stop.py").
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/test/e2e/rayjob_suspend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRayJobSuspend(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "long_running.py", "counter.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "long_running.py", "counter.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -31,7 +31,7 @@ func TestRayJobSuspend(t *testing.T) {
// RayJob
rayJobAC := rayv1ac.RayJob("long-running", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithShutdownAfterJobFinishes(true).
WithTTLSecondsAfterFinished(600).
Expand Down Expand Up @@ -91,7 +91,7 @@ env_vars:
`).
WithShutdownAfterJobFinishes(true).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
14 changes: 7 additions & 7 deletions ray-operator/test/e2e/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRayJob(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py", "long_running.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py", "long_running.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -30,7 +30,7 @@ func TestRayJob(t *testing.T) {
// RayJob
rayJobAC := rayv1ac.RayJob("counter", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/counter.py").
WithRuntimeEnvYAML(`
env_vars:
Expand Down Expand Up @@ -87,7 +87,7 @@ env_vars:
// RayJob
rayJobAC := rayv1ac.RayJob("fail", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/fail.py").
WithShutdownAfterJobFinishes(false).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))
Expand Down Expand Up @@ -132,7 +132,7 @@ env_vars:
// RayJob
rayJobAC := rayv1ac.RayJob("fail-k8s-job", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("The command will be overridden by the submitter Job").
WithShutdownAfterJobFinishes(true).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))
Expand Down Expand Up @@ -176,7 +176,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithEntrypoint("python /home/ray/jobs/stop.py").
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -205,7 +205,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithEntrypoint("python /home/ray/jobs/counter.py").
WithRuntimeEnvYAML(`invalid_yaml_string`).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand All @@ -219,7 +219,7 @@ env_vars:
test.T().Run("RayJob has passed ActiveDeadlineSeconds", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("long-running", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithShutdownAfterJobFinishes(true).
WithTTLSecondsAfterFinished(600).
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/test/e2e/rayservice_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRayService(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "locustfile.py", "locust_runner.py"))
scriptsAC := NewConfigMap(namespace.Name, "scripts", Files(test, _files, "locustfile.py", "locust_runner.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)
Expand All @@ -46,7 +46,7 @@ applications:
ray_actor_options:
num_cpus: 1
`).
WithRayClusterSpec(newRayClusterSpec()))
WithRayClusterSpec(NewRayClusterSpec()))

rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand All @@ -61,7 +61,7 @@ applications:
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(apply(headPodTemplateApplyConfiguration(), mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))))
WithTemplate(Apply(HeadPodTemplateApplyConfiguration(), MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))))
locustCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), locustClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Locust RayCluster %s/%s successfully", locustCluster.Namespace, locustCluster.Name)
Expand Down
53 changes: 26 additions & 27 deletions ray-operator/test/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"embed"

"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
Expand All @@ -16,23 +15,23 @@ import (
//go:embed *.py
var _files embed.FS

func ReadFile(t Test, fileName string) []byte {
func ReadFile(t Test, fs embed.FS, fileName string) []byte {
t.T().Helper()
file, err := _files.ReadFile(fileName)
file, err := fs.ReadFile(fileName)
t.Expect(err).NotTo(gomega.HaveOccurred())
return file
}

type option[T any] func(t *T) *T
type ApplyOption[T any] func(t *T) *T

func apply[T any](t *T, options ...option[T]) *T {
func Apply[T any](t *T, options ...ApplyOption[T]) *T {
for _, opt := range options {
t = opt(t)
}
return t
}

func options[T any](options ...option[T]) option[T] {
func Options[T any](options ...ApplyOption[T]) ApplyOption[T] {
return func(t *T) *T {
for _, opt := range options {
t = opt(t)
Expand All @@ -41,42 +40,42 @@ func options[T any](options ...option[T]) option[T] {
}
}

func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
func NewConfigMap(namespace, name string, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
cmAC := corev1ac.ConfigMap(name, namespace).
WithBinaryData(map[string][]byte{}).
WithImmutable(true)

return configMapWith(cmAC, options...)
return ConfigMapWith(cmAC, options...)
}

func configMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
return apply(configMapAC, options...)
func ConfigMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
return Apply(configMapAC, options...)
}

func file(t Test, fileName string) option[corev1ac.ConfigMapApplyConfiguration] {
func File(t Test, fs embed.FS, fileName string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] {
return func(cmAC *corev1ac.ConfigMapApplyConfiguration) *corev1ac.ConfigMapApplyConfiguration {
cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fileName)})
cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fs, fileName)})
return cmAC
}
}

func files(t Test, fileNames ...string) option[corev1ac.ConfigMapApplyConfiguration] {
var files []option[corev1ac.ConfigMapApplyConfiguration]
func Files(t Test, fs embed.FS, fileNames ...string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] {
var files []ApplyOption[corev1ac.ConfigMapApplyConfiguration]
for _, fileName := range fileNames {
files = append(files, file(t, fileName))
files = append(files, File(t, fs, fileName))
}
return options(files...)
return Options(files...)
}

func newRayClusterSpec(options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return rayClusterSpecWith(rayClusterSpec(), options...)
func NewRayClusterSpec(options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return RayClusterSpecWith(rayClusterSpec(), options...)
}

func rayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return apply(spec, options...)
func RayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return Apply(spec, options...)
}

func mountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) option[T] {
func MountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) ApplyOption[T] {
return func(t *T) *T {
switch obj := (interface{})(t).(type) {
case *rayv1ac.RayClusterSpecApplyConfiguration:
Expand Down Expand Up @@ -104,21 +103,21 @@ func rayClusterSpec() *rayv1ac.RayClusterSpecApplyConfiguration {
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithTemplate(HeadPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(1).
WithMinReplicas(1).
WithMaxReplicas(1).
WithGroupName("small-group").
WithRayStartParams(map[string]string{"num-cpus": "1"}).
WithTemplate(workerPodTemplateApplyConfiguration()))
WithTemplate(WorkerPodTemplateApplyConfiguration()))
}

func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...option[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration {
return apply(template, options...)
func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...ApplyOption[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration {
return Apply(template, options...)
}

func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
func HeadPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
return corev1ac.PodTemplateSpec().
WithSpec(corev1ac.PodSpec().
WithContainers(corev1ac.Container().
Expand All @@ -141,7 +140,7 @@ func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfigura
}))))
}

func workerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
func WorkerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
return corev1ac.PodTemplateSpec().
WithSpec(corev1ac.PodSpec().
WithContainers(corev1ac.Container().
Expand Down
Loading
Loading