Skip to content

Commit

Permalink
[Test][RayJob] Transition to Complete if the JobStatus is STOPPED (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Jan 26, 2024
1 parent 4fb4578 commit b47ebbd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
42 changes: 41 additions & 1 deletion ray-operator/test/e2e/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

Expand All @@ -20,7 +22,7 @@ func TestRayJob(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobs := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py"))
jobs := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), jobs, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand Down Expand Up @@ -190,4 +192,42 @@ env_vars:
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
})

test.T().Run("Should transition to 'Complete' if the Ray job has stopped.", func(t *testing.T) {
// `stop.py` will sleep for 20 seconds so that the RayJob has enough time to transition to `RUNNING`
// and then stop the Ray job. If the Ray job is stopped, the RayJob should transition to `Complete`.
rayJobAC := rayv1ac.RayJob("stop", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithEntrypoint("python /home/ray/jobs/stop.py").
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()).
WithRayClusterSpec(rayv1ac.RayClusterSpec().
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{
"dashboard-host": "0.0.0.0",
}).
WithTemplate(podTemplateSpecApplyConfiguration(headPodTemplateApplyConfiguration(),
mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

test.T().Logf("Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning)))

test.T().Logf("Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))

// Refresh the RayJob status
rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name)
test.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusStopped))

// Delete the RayJob
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
})
}
12 changes: 12 additions & 0 deletions ray-operator/test/e2e/stop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import time
from ray.job_submission import JobSubmissionClient

print("Sleep 20 seconds to enable the RayJob to transition to RUNNING state")
time.sleep(20)

client = JobSubmissionClient("http://127.0.0.1:8265")
job_details = client.list_jobs()
print("Number of Jobs: " + str(len(job_details)))
for job_detail in job_details:
print("Stop Job: " + job_detail.submission_id)
client.stop_job(job_detail.submission_id)

0 comments on commit b47ebbd

Please sign in to comment.