Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Job Submission] Add stop API to http & sdk, with better status code + stacktrace #20094

Merged
merged 7 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dashboard/modules/job/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class JobSubmitResponse:
job_id: str


# ==== Job Stop ====


@dataclass
class JobStopResponse:
stopped: bool


# ==== Job Status ====


Expand Down
90 changes: 74 additions & 16 deletions dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import aiohttp.web
import dataclasses
from functools import wraps
import logging
from typing import Callable
from typing import Any, Callable
import json
import dataclasses
import traceback
from dataclasses import dataclass

import ray
import ray.dashboard.utils as dashboard_utils
Expand All @@ -12,7 +14,7 @@
upload_package_to_gcs)
from ray.dashboard.modules.job.data_types import (
GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse,
JobStatusResponse, JobLogsResponse)
JobStopResponse, JobStatusResponse, JobLogsResponse)

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
Expand All @@ -22,6 +24,7 @@
JOBS_API_PREFIX = "/api/jobs/"
JOBS_API_ROUTE_LOGS = JOBS_API_PREFIX + "logs"
JOBS_API_ROUTE_SUBMIT = JOBS_API_PREFIX + "submit"
JOBS_API_ROUTE_STOP = JOBS_API_PREFIX + "stop"
JOBS_API_ROUTE_STATUS = JOBS_API_PREFIX + "status"
JOBS_API_ROUTE_PACKAGE = JOBS_API_PREFIX + "package"

Expand All @@ -42,12 +45,34 @@ def __init__(self, dashboard_head):

self._job_manager = None

async def _parse_and_validate_request(self, req: aiohttp.web.Request,
request_type: dataclass) -> Any:
"""Parse request and cast to request type. If parsing failed, return a
Response object with status 400 and stacktrace instead.
"""
try:
# TODO: (jiaodong) Validate if job request is valid without using
# pydantic.
result = request_type(**(await req.json()))
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPBadRequest.status_code)
return result

@routes.get(JOBS_API_ROUTE_PACKAGE)
@_ensure_ray_initialized
async def get_package(self,
req: aiohttp.web.Request) -> aiohttp.web.Response:
package_uri = req.query["package_uri"]
resp = GetPackageResponse(package_exists=package_exists(package_uri))
try:
exists = package_exists(package_uri)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)

resp = GetPackageResponse(package_exists=exists)
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")
Expand All @@ -57,22 +82,55 @@ async def get_package(self,
async def upload_package(self, req: aiohttp.web.Request):
package_uri = req.query["package_uri"]
logger.info(f"Uploading package {package_uri} to the GCS.")
upload_package_to_gcs(package_uri, await req.read())
try:
upload_package_to_gcs(package_uri, await req.read())
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)

return aiohttp.web.Response()
return aiohttp.web.Response(status=aiohttp.web.HTTPOk.status_code, )

@routes.post(JOBS_API_ROUTE_SUBMIT)
@_ensure_ray_initialized
async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
# TODO: (jiaodong) Validate if job request is valid without using
# pydantic.
submit_request = JobSubmitRequest(**(await req.json()))
job_id = self._job_manager.submit_job(
entrypoint=submit_request.entrypoint,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata)

resp = JobSubmitResponse(job_id=job_id)
result = await self._parse_and_validate_request(req, JobSubmitRequest)
# Request parsing failed, returned with Response object.
if isinstance(result, aiohttp.web.Response):
return result
else:
submit_request = result
Comment on lines +99 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit ugly that we have to duplicate this code everywhere. One alternative would be to do this with a context manager:

with exception_status_code(500):
    # Do stuff, if exception is raised, we return response with 500 and traceback.

No need to do it in this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh nice this is definitely better than what we have here. i found it a bit weird towards the end as well but going too far to bother to change it back lol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I give that a try and i think current dashboard modules don't really respect exceptions thrown from our own context manager and we can only do it by returning a response but set exception code. So i will leave it as is for now :/


try:
job_id = self._job_manager.submit_job(
entrypoint=submit_request.entrypoint,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata)

resp = JobSubmitResponse(job_id=job_id)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)

return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json",
status=aiohttp.web.HTTPOk.status_code,
)

@routes.post(JOBS_API_ROUTE_STOP)
@_ensure_ray_initialized
async def stop(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"]
try:
stopped = self._job_manager.stop_job(job_id)
resp = JobStopResponse(stopped=stopped)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)

return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")
Expand All @@ -82,7 +140,6 @@ async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"]
status: JobStatus = self._job_manager.get_job_status(job_id)

resp = JobStatusResponse(job_status=status)
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
Expand All @@ -92,6 +149,7 @@ async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
@_ensure_ray_initialized
async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"]

stdout: bytes = self._job_manager.get_job_stdout(job_id)
stderr: bytes = self._job_manager.get_job_stderr(job_id)

Expand Down
17 changes: 12 additions & 5 deletions dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
create_package, get_uri_for_directory, parse_uri)
from ray._private.job_manager import JobStatus
from ray.dashboard.modules.job.data_types import (
GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStatusResponse,
JobLogsResponse)
GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStopResponse,
JobStatusResponse, JobLogsResponse)

from ray.dashboard.modules.job.job_head import (
JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STATUS,
JOBS_API_ROUTE_PACKAGE)
JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STOP,
JOBS_API_ROUTE_STATUS, JOBS_API_ROUTE_PACKAGE)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -46,7 +46,6 @@ def _do_request(self,
f"json: {json_data}, params: {params}.")
r = requests.request(
method, url, data=data, json=json_data, params=params)

r.raise_for_status()
if response_type is None:
return None
Expand Down Expand Up @@ -130,6 +129,14 @@ def submit_job(self,
response_type=JobSubmitResponse)
return resp.job_id

def stop_job(self, job_id: str) -> bool:
resp = self._do_request(
"POST",
JOBS_API_ROUTE_STOP,
params={"job_id": job_id},
response_type=JobStopResponse)
return resp.stopped

def get_job_status(self, job_id: str) -> JobStatus:
resp = self._do_request(
"GET",
Expand Down
103 changes: 102 additions & 1 deletion dashboard/modules/job/tests/test_http_job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
import sys
import tempfile
import requests

import pytest

Expand All @@ -10,9 +11,9 @@
wait_until_server_available)
from ray._private.job_manager import JobStatus
from ray.dashboard.modules.job.sdk import JobSubmissionClient
from ray.dashboard.modules.job.job_head import JOBS_API_ROUTE_SUBMIT

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@pytest.fixture
Expand All @@ -31,6 +32,16 @@ def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
return status == JobStatus.SUCCEEDED


def _check_job_failed(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
return status == JobStatus.FAILED


def _check_job_stopped(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
return status == JobStatus.STOPPED


@pytest.fixture(
scope="function",
params=["no_working_dir", "local_working_dir", "s3_working_dir"])
Expand Down Expand Up @@ -98,6 +109,96 @@ def test_submit_job(job_sdk_client, working_dir_option):
assert stderr == working_dir_option["expected_stderr"]


def test_http_bad_request(job_sdk_client):
"""
Send bad requests to job http server and ensure right return code and
error message is returned via http.
"""
client = job_sdk_client

# 400 - HTTPBadRequest
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client._do_request(
"POST",
JOBS_API_ROUTE_SUBMIT,
json_data={"key": "baaaad request"},
)

ex_message = str(e.value)
assert "400 Client Error" in ex_message
assert "TypeError: __init__() got an unexpected keyword argument" in ex_message # noqa: E501
Comment on lines +127 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the match kwarg to pytest.raises to do this for you as well


# 405 - HTTPMethodNotAllowed
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client._do_request(
"GET",
JOBS_API_ROUTE_SUBMIT,
json_data={"key": "baaaad request"},
)
ex_message = str(e.value)
assert "405 Client Error: Method Not Allowed" in ex_message

# 500 - HTTPInternalServerError
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client.submit_job(
entrypoint="echo hello",
runtime_env={"working_dir": "s3://does_not_exist"})
ex_message = str(e.value)
assert "500 Server Error" in ex_message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a 400, not a 500

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(500 means something went wrong, like an internal assertion failed or unexpected error occurred)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wrapped everything within job_manager.submit_job() in 500 to wrap everything happens at job manager / ray level, for this one in particular it fails at validation.py for not having a .zip suffix for s3 path, maybe we can add a few more cases and see. Probably not all of them are 500

assert "Only .zip files supported for S3 URIs" in ex_message


def test_submit_job_with_exception_in_driver(job_sdk_client):
"""
Submit a job that's expected to throw exception while executing.
"""
client = job_sdk_client

with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
driver_script = """
print('Hello !')
raise RuntimeError('Intentionally failed.')
"""
test_script_file = path / "test_script.py"
with open(test_script_file, "w+") as file:
file.write(driver_script)

job_id = client.submit_job(
entrypoint="python test_script.py",
runtime_env={"working_dir": tmp_dir})

wait_for_condition(_check_job_failed, client=client, job_id=job_id)
stdout, stderr = client.get_job_logs(job_id)
assert stdout == "Hello !"
assert "RuntimeError: Intentionally failed." in stderr


def test_stop_long_running_job(job_sdk_client):
"""
Submit a job that runs for a while and stop it in the middle.
"""
client = job_sdk_client

with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
driver_script = """
print('Hello !')
import time
time.sleep(300) # This should never finish
raise RuntimeError('Intentionally failed.')
"""
test_script_file = path / "test_script.py"
with open(test_script_file, "w+") as file:
file.write(driver_script)

job_id = client.submit_job(
entrypoint="python test_script.py",
runtime_env={"working_dir": tmp_dir})
assert client.stop_job(job_id) is True
wait_for_condition(_check_job_stopped, client=client, job_id=job_id)


def test_job_metadata(job_sdk_client):
client = job_sdk_client

Expand Down