Skip to content

Commit

Permalink
[serve] Replace Ray Client with Ray Job Submission for serve run (#…
Browse files Browse the repository at this point in the history
…30913)


Currently, serve run uses Ray Client to connect to a remote cluster, but Ray Client is buggy and has serialization and network issues. This changes serve run to use [Ray Job Submission](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html) to run Serve for both local and remote testing.

The main difference to be observed is the user will be responsible for making sure the entrypoint config file / deployment node is accessible on the Ray Cluster, because it will no longer be loaded locally relative to where serve run is executed, and instead loaded on the Ray Cluster relative to where the job is run. More details described below:
image

May be worth to note that Ray Job Submission seems to be slower for local clusters: I had to increase the timeout in unit tests test_run_deployment_node and test_run_runtime_env in test_cli.py to give the serve deployments enough time to serve HTTP requests, as well as give them time to shut down and stop serving HTTP requests after ctrl-C.
  • Loading branch information
zcin authored Dec 27, 2022
1 parent 01b19ba commit f40ac95
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 126 deletions.
42 changes: 33 additions & 9 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ class JobSupervisor:
Job supervisor actor should fate share with subprocess it created.
"""

WAIT_FOR_JOB_TERMINATION_S = 3
DEFAULT_RAY_JOB_STOP_WAIT_TIME_S = 3
SUBPROCESS_POLL_PERIOD_S = 0.1
VALID_STOP_SIGNALS = ["SIGINT", "SIGTERM"]

def __init__(
self,
Expand Down Expand Up @@ -225,6 +226,14 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen:
start_new_session=True,
stdout=logs_file,
stderr=subprocess.STDOUT,
# Ray intentionally blocks SIGINT in all processes, so if the user wants
# to stop job through SIGINT, we need to unblock it in the child process
preexec_fn=lambda: signal.pthread_sigmask(
signal.SIG_UNBLOCK, {signal.SIGINT}
)
if sys.platform != "win32"
and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT"
else None,
)
parent_pid = os.getpid()
child_pid = child_process.pid
Expand Down Expand Up @@ -377,7 +386,17 @@ async def run(
win32job.TerminateJobObject(self._win32_job_object, -1)
elif sys.platform != "win32":
try:
os.killpg(os.getpgid(child_process.pid), signal.SIGTERM)
stop_signal = os.environ.get("RAY_JOB_STOP_SIGNAL", "SIGTERM")
if stop_signal not in self.VALID_STOP_SIGNALS:
logger.warning(
f"{stop_signal} not a valid stop signal. Terminating "
"job with SIGTERM."
)
stop_signal = "SIGTERM"
os.killpg(
os.getpgid(child_process.pid),
getattr(signal, stop_signal),
)
except ProcessLookupError:
# Process already completed.
logger.info(
Expand All @@ -386,21 +405,26 @@ async def run(
)
pass
else:
# Wait for job to terminate gracefully, otherwise kill process
# Wait for job to exit gracefully, otherwise kill process
# forcefully after timeout.
try:
await asyncio.wait_for(
polling_task, self.WAIT_FOR_JOB_TERMINATION_S
stop_job_wait_time = int(
os.environ.get(
"RAY_JOB_STOP_WAIT_TIME_S",
self.DEFAULT_RAY_JOB_STOP_WAIT_TIME_S,
)
)
await asyncio.wait_for(polling_task, stop_job_wait_time)
logger.info(
f"Job {self._job_id} has been terminated gracefully."
f"Job {self._job_id} has been terminated gracefully "
f"with {stop_signal}."
)
except asyncio.TimeoutError:
logger.warning(
f"Attempt to gracefully terminate job {self._job_id} "
"through SIGTERM has timed out after "
f"{self.WAIT_FOR_JOB_TERMINATION_S} seconds. Job is "
"now being force-killed."
f"through {stop_signal} has timed out after "
f"{stop_job_wait_time} seconds. Job is now being "
"force-killed."
)
polling_task.cancel()
child_process.kill()
Expand Down
18 changes: 14 additions & 4 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,11 @@ def handler(*args):


@pytest.mark.asyncio
async def test_stop_job_timeout(job_manager):
@pytest.mark.parametrize(
"use_env_var,stop_timeout",
[(True, 10), (False, JobSupervisor.DEFAULT_RAY_JOB_STOP_WAIT_TIME_S)],
)
async def test_stop_job_timeout(job_manager, use_env_var, stop_timeout):
"""
Stop job should send SIGTERM first, then if timeout occurs, send SIGKILL.
"""
Expand All @@ -826,14 +830,19 @@ async def test_stop_job_timeout(job_manager):
import time
def handler(*args):
print('SIGTERM signal handled!');
pass
signal.signal(signal.SIGTERM, handler)
while True:
print('Waiting...')
time.sleep(1)\"
"""
job_id = await job_manager.submit_job(entrypoint=entrypoint)
if use_env_var:
job_id = await job_manager.submit_job(
entrypoint=entrypoint,
runtime_env={"env_vars": {"RAY_JOB_STOP_WAIT_TIME_S": str(stop_timeout)}},
)
else:
job_id = await job_manager.submit_job(entrypoint=entrypoint)

await async_wait_for_condition(
lambda: "Waiting..." in job_manager.get_job_logs(job_id)
Expand All @@ -844,11 +853,12 @@ def handler(*args):
await async_wait_for_condition(
lambda: "SIGTERM signal handled!" in job_manager.get_job_logs(job_id)
)

await async_wait_for_condition_async_predicate(
check_job_stopped,
job_manager=job_manager,
job_id=job_id,
timeout=JobSupervisor.WAIT_FOR_JOB_TERMINATION_S + 10,
timeout=stop_timeout + 10,
)


Expand Down
12 changes: 8 additions & 4 deletions doc/source/serve/dev-workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ serve run local_dev:graph

The `serve run` command blocks the terminal and can be canceled with Ctrl-C.

:::{note}
If you already have a local Ray Cluster running before executing `serve run`, make sure that the path to your Serve app is accessible from the working directory in which you started the Ray Cluster using `ray start --head`. Otherwise, you can pass in `app-dir` or `working-dir` when executing `serve run`. See the documentation for [serve run](serve_cli.html#serve-run) for more details.
:::

Now that Serve is running, we can send HTTP requests to the application.
For simplicity, we'll just use the `curl` command to send requests from another terminal.

Expand All @@ -81,17 +85,17 @@ Note that rerunning `serve run` will redeploy all deployments. To prevent redepl

## Testing on a remote cluster

To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `ray://<head-node-ip-address>:10001`; see [Ray Client](ray-client-ref) for more information.
To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `http://<head-node-ip-address>:8265` and will be passed to Ray Job Submission; see [Ray Jobs](jobs-overview) for more information.

When making the transition from your local machine to a remote cluster, you'll need to make sure your cluster has a similar environment to your local machine--files, environment variables, and Python packages, for example.

Let's see a simple example that just packages the code. Run the following command on your local machine, with your remote cluster head node IP address substituted for `<head-node-ip-address>` in the command:

```bash
serve run --address=ray://<head-node-ip-address>:10001 --working_dir="./project/src" local_dev:graph
serve run --address=http://<head-node-ip-address>:8265 --working_dir="./project/src" local_dev:graph
```

This will connect to the remote cluster via Ray Client, upload the `working_dir` directory, and run your serve application. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve.
This will upload the `working_dir` directory to the remote cluster and run your Serve application as a Ray Job on the remote cluster. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve.

Once this is up and running, we can send requests to the application:

Expand All @@ -103,7 +107,7 @@ curl -X PUT http://<head-node-ip-address>:8000/?name=Ray
For more complex dependencies, including files outside the working directory, environment variables, and Python packages, you can use {ref}`Runtime Environments<runtime-environments>`. Here is an example using the --runtime-env-json argument:

```bash
serve run --address=ray://<head-node-ip-address>:10001 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph
serve run --address=http://<head-node-ip-address>:8265 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph
```

You can also specify the `runtime_env` via a YAML file; see [serve run](serve_cli.html#serve-run) for details.
Expand Down
114 changes: 114 additions & 0 deletions python/ray/serve/run_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import argparse
import pathlib
import sys
import time
import yaml

from ray import serve
from ray._private.utils import import_attr
from ray.autoscaler._private.cli_logger import cli_logger
from ray.serve._private import api as _private_api
from ray.serve._private.constants import (
DEFAULT_HTTP_HOST,
DEFAULT_HTTP_PORT,
)
from ray.serve.schema import ServeApplicationSchema


def main():
"""
This is the Job that gets submitted to the Ray Cluster when `serve run` is executed.
Loads the Serve app (either from a YAML config file or a direct import path), starts
Serve and runs the app. By default, the code blocks until a SIGINT signal is
received, at which point Serve is shutdown and the process exits.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--config-or-import-path")
parser.add_argument("--app-dir")
parser.add_argument("--host")
parser.add_argument("--port", type=int)
parser.add_argument("--blocking", action="store_true")
parser.add_argument("--gradio", action="store_true")
args = parser.parse_args()
host, port = args.host, args.port

sys.path.insert(0, args.app_dir)
if pathlib.Path(args.config_or_import_path).is_file():
config_path = args.config_or_import_path
cli_logger.print(f"Deploying from config file: '{config_path}'.")

with open(config_path, "r") as config_file:
config_dict = yaml.safe_load(config_file)
# If host or port is specified as a CLI argument, they should take priority
# over config values.
config_dict.setdefault("host", DEFAULT_HTTP_HOST)
if host is not None:
config_dict["host"] = host

config_dict.setdefault("port", DEFAULT_HTTP_PORT)
if port is not None:
config_dict["port"] = port

config = ServeApplicationSchema.parse_obj(config_dict)
is_config = True
else:
if host is None:
host = DEFAULT_HTTP_HOST
if port is None:
port = DEFAULT_HTTP_PORT
import_path = args.config_or_import_path
cli_logger.print(f"Deploying from import path: '{import_path}'.")
node = import_attr(import_path)
is_config = False

if is_config:
client = _private_api.serve_start(
detached=True,
http_options={
"host": config.host,
"port": config.port,
"location": "EveryNode",
},
)
else:
client = _private_api.serve_start(
detached=True,
http_options={
"host": host,
"port": port,
"location": "EveryNode",
},
)

try:
if is_config:
client.deploy_app(config, _blocking=args.gradio)
cli_logger.success("Submitted deploy config successfully.")
if args.gradio:
handle = serve.get_deployment("DAGDriver").get_handle()
else:
handle = serve.run(node, host=host, port=port)
cli_logger.success("Deployed Serve app successfully.")

if args.gradio:
from ray.serve.experimental.gradio_visualize_graph import (
GraphVisualizer,
)

visualizer = GraphVisualizer()
visualizer.visualize_with_gradio(handle)
else:
if args.blocking:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)

except KeyboardInterrupt:
cli_logger.info("Got KeyboardInterrupt, shutting down...")
serve.shutdown()
sys.exit()


if __name__ == "__main__":
main()
Loading

0 comments on commit f40ac95

Please sign in to comment.