diff --git a/dashboard/modules/job/data_types.py b/dashboard/modules/job/data_types.py index 7663ae3fa381..3b9f79054f23 100644 --- a/dashboard/modules/job/data_types.py +++ b/dashboard/modules/job/data_types.py @@ -40,6 +40,14 @@ class JobSubmitResponse: job_id: str +# ==== Job Stop ==== + + +@dataclass +class JobStopResponse: + stopped: bool + + # ==== Job Status ==== diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index a59878ebd625..eacf9d0358a5 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -1,9 +1,11 @@ import aiohttp.web +import dataclasses from functools import wraps import logging -from typing import Callable +from typing import Any, Callable import json -import dataclasses +import traceback +from dataclasses import dataclass import ray import ray.dashboard.utils as dashboard_utils @@ -12,7 +14,7 @@ upload_package_to_gcs) from ray.dashboard.modules.job.data_types import ( GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse, - JobStatusResponse, JobLogsResponse) + JobStopResponse, JobStatusResponse, JobLogsResponse) logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable @@ -22,6 +24,7 @@ JOBS_API_PREFIX = "/api/jobs/" JOBS_API_ROUTE_LOGS = JOBS_API_PREFIX + "logs" JOBS_API_ROUTE_SUBMIT = JOBS_API_PREFIX + "submit" +JOBS_API_ROUTE_STOP = JOBS_API_PREFIX + "stop" JOBS_API_ROUTE_STATUS = JOBS_API_PREFIX + "status" JOBS_API_ROUTE_PACKAGE = JOBS_API_PREFIX + "package" @@ -42,12 +45,34 @@ def __init__(self, dashboard_head): self._job_manager = None + async def _parse_and_validate_request(self, req: aiohttp.web.Request, + request_type: dataclass) -> Any: + """Parse request and cast to request type. If parsing failed, return a + Response object with status 400 and stacktrace instead. + """ + try: + # TODO: (jiaodong) Validate if job request is valid without using + # pydantic. + result = request_type(**(await req.json())) + except Exception: + return aiohttp.web.Response( + reason=traceback.format_exc().encode("utf-8"), + status=aiohttp.web.HTTPBadRequest.status_code) + return result + @routes.get(JOBS_API_ROUTE_PACKAGE) @_ensure_ray_initialized async def get_package(self, req: aiohttp.web.Request) -> aiohttp.web.Response: package_uri = req.query["package_uri"] - resp = GetPackageResponse(package_exists=package_exists(package_uri)) + try: + exists = package_exists(package_uri) + except Exception: + return aiohttp.web.Response( + reason=traceback.format_exc().encode("utf-8"), + status=aiohttp.web.HTTPInternalServerError.status_code) + + resp = GetPackageResponse(package_exists=exists) return aiohttp.web.Response( text=json.dumps(dataclasses.asdict(resp)), content_type="application/json") @@ -57,22 +82,55 @@ async def get_package(self, async def upload_package(self, req: aiohttp.web.Request): package_uri = req.query["package_uri"] logger.info(f"Uploading package {package_uri} to the GCS.") - upload_package_to_gcs(package_uri, await req.read()) + try: + upload_package_to_gcs(package_uri, await req.read()) + except Exception: + return aiohttp.web.Response( + reason=traceback.format_exc().encode("utf-8"), + status=aiohttp.web.HTTPInternalServerError.status_code) - return aiohttp.web.Response() + return aiohttp.web.Response(status=aiohttp.web.HTTPOk.status_code, ) @routes.post(JOBS_API_ROUTE_SUBMIT) @_ensure_ray_initialized async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response: - # TODO: (jiaodong) Validate if job request is valid without using - # pydantic. - submit_request = JobSubmitRequest(**(await req.json())) - job_id = self._job_manager.submit_job( - entrypoint=submit_request.entrypoint, - runtime_env=submit_request.runtime_env, - metadata=submit_request.metadata) - - resp = JobSubmitResponse(job_id=job_id) + result = await self._parse_and_validate_request(req, JobSubmitRequest) + # Request parsing failed, returned with Response object. + if isinstance(result, aiohttp.web.Response): + return result + else: + submit_request = result + + try: + job_id = self._job_manager.submit_job( + entrypoint=submit_request.entrypoint, + runtime_env=submit_request.runtime_env, + metadata=submit_request.metadata) + + resp = JobSubmitResponse(job_id=job_id) + except Exception: + return aiohttp.web.Response( + reason=traceback.format_exc().encode("utf-8"), + status=aiohttp.web.HTTPInternalServerError.status_code) + + return aiohttp.web.Response( + text=json.dumps(dataclasses.asdict(resp)), + content_type="application/json", + status=aiohttp.web.HTTPOk.status_code, + ) + + @routes.post(JOBS_API_ROUTE_STOP) + @_ensure_ray_initialized + async def stop(self, req: aiohttp.web.Request) -> aiohttp.web.Response: + job_id = req.query["job_id"] + try: + stopped = self._job_manager.stop_job(job_id) + resp = JobStopResponse(stopped=stopped) + except Exception: + return aiohttp.web.Response( + reason=traceback.format_exc().encode("utf-8"), + status=aiohttp.web.HTTPInternalServerError.status_code) + return aiohttp.web.Response( text=json.dumps(dataclasses.asdict(resp)), content_type="application/json") @@ -82,7 +140,6 @@ async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response: async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response: job_id = req.query["job_id"] status: JobStatus = self._job_manager.get_job_status(job_id) - resp = JobStatusResponse(job_status=status) return aiohttp.web.Response( text=json.dumps(dataclasses.asdict(resp)), @@ -92,6 +149,7 @@ async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response: @_ensure_ray_initialized async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: job_id = req.query["job_id"] + stdout: bytes = self._job_manager.get_job_stdout(job_id) stderr: bytes = self._job_manager.get_job_stderr(job_id) diff --git a/dashboard/modules/job/sdk.py b/dashboard/modules/job/sdk.py index 6af1890bc34f..edcf10be1931 100644 --- a/dashboard/modules/job/sdk.py +++ b/dashboard/modules/job/sdk.py @@ -10,12 +10,12 @@ create_package, get_uri_for_directory, parse_uri) from ray._private.job_manager import JobStatus from ray.dashboard.modules.job.data_types import ( - GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStatusResponse, - JobLogsResponse) + GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStopResponse, + JobStatusResponse, JobLogsResponse) from ray.dashboard.modules.job.job_head import ( - JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STATUS, - JOBS_API_ROUTE_PACKAGE) + JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STOP, + JOBS_API_ROUTE_STATUS, JOBS_API_ROUTE_PACKAGE) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -46,7 +46,6 @@ def _do_request(self, f"json: {json_data}, params: {params}.") r = requests.request( method, url, data=data, json=json_data, params=params) - r.raise_for_status() if response_type is None: return None @@ -130,6 +129,14 @@ def submit_job(self, response_type=JobSubmitResponse) return resp.job_id + def stop_job(self, job_id: str) -> bool: + resp = self._do_request( + "POST", + JOBS_API_ROUTE_STOP, + params={"job_id": job_id}, + response_type=JobStopResponse) + return resp.stopped + def get_job_status(self, job_id: str) -> JobStatus: resp = self._do_request( "GET", diff --git a/dashboard/modules/job/tests/test_http_job_server.py b/dashboard/modules/job/tests/test_http_job_server.py index ce735f958d75..ca7cbe6e0799 100644 --- a/dashboard/modules/job/tests/test_http_job_server.py +++ b/dashboard/modules/job/tests/test_http_job_server.py @@ -2,6 +2,7 @@ from pathlib import Path import sys import tempfile +import requests import pytest @@ -10,9 +11,9 @@ wait_until_server_available) from ray._private.job_manager import JobStatus from ray.dashboard.modules.job.sdk import JobSubmissionClient +from ray.dashboard.modules.job.job_head import JOBS_API_ROUTE_SUBMIT logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) @pytest.fixture @@ -31,6 +32,16 @@ def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool: return status == JobStatus.SUCCEEDED +def _check_job_failed(client: JobSubmissionClient, job_id: str) -> bool: + status = client.get_job_status(job_id) + return status == JobStatus.FAILED + + +def _check_job_stopped(client: JobSubmissionClient, job_id: str) -> bool: + status = client.get_job_status(job_id) + return status == JobStatus.STOPPED + + @pytest.fixture( scope="function", params=["no_working_dir", "local_working_dir", "s3_working_dir"]) @@ -98,6 +109,96 @@ def test_submit_job(job_sdk_client, working_dir_option): assert stderr == working_dir_option["expected_stderr"] +def test_http_bad_request(job_sdk_client): + """ + Send bad requests to job http server and ensure right return code and + error message is returned via http. + """ + client = job_sdk_client + + # 400 - HTTPBadRequest + with pytest.raises(requests.exceptions.HTTPError) as e: + _ = client._do_request( + "POST", + JOBS_API_ROUTE_SUBMIT, + json_data={"key": "baaaad request"}, + ) + + ex_message = str(e.value) + assert "400 Client Error" in ex_message + assert "TypeError: __init__() got an unexpected keyword argument" in ex_message # noqa: E501 + + # 405 - HTTPMethodNotAllowed + with pytest.raises(requests.exceptions.HTTPError) as e: + _ = client._do_request( + "GET", + JOBS_API_ROUTE_SUBMIT, + json_data={"key": "baaaad request"}, + ) + ex_message = str(e.value) + assert "405 Client Error: Method Not Allowed" in ex_message + + # 500 - HTTPInternalServerError + with pytest.raises(requests.exceptions.HTTPError) as e: + _ = client.submit_job( + entrypoint="echo hello", + runtime_env={"working_dir": "s3://does_not_exist"}) + ex_message = str(e.value) + assert "500 Server Error" in ex_message + assert "Only .zip files supported for S3 URIs" in ex_message + + +def test_submit_job_with_exception_in_driver(job_sdk_client): + """ + Submit a job that's expected to throw exception while executing. + """ + client = job_sdk_client + + with tempfile.TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) + driver_script = """ +print('Hello !') +raise RuntimeError('Intentionally failed.') + """ + test_script_file = path / "test_script.py" + with open(test_script_file, "w+") as file: + file.write(driver_script) + + job_id = client.submit_job( + entrypoint="python test_script.py", + runtime_env={"working_dir": tmp_dir}) + + wait_for_condition(_check_job_failed, client=client, job_id=job_id) + stdout, stderr = client.get_job_logs(job_id) + assert stdout == "Hello !" + assert "RuntimeError: Intentionally failed." in stderr + + +def test_stop_long_running_job(job_sdk_client): + """ + Submit a job that runs for a while and stop it in the middle. + """ + client = job_sdk_client + + with tempfile.TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) + driver_script = """ +print('Hello !') +import time +time.sleep(300) # This should never finish +raise RuntimeError('Intentionally failed.') + """ + test_script_file = path / "test_script.py" + with open(test_script_file, "w+") as file: + file.write(driver_script) + + job_id = client.submit_job( + entrypoint="python test_script.py", + runtime_env={"working_dir": tmp_dir}) + assert client.stop_job(job_id) is True + wait_for_condition(_check_job_stopped, client=client, job_id=job_id) + + def test_job_metadata(job_sdk_client): client = job_sdk_client