diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index a5cdd0df1fe0..c8a79b87c0df 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -142,8 +142,9 @@ class JobSupervisor: Job supervisor actor should fate share with subprocess it created. """ - WAIT_FOR_JOB_TERMINATION_S = 3 + DEFAULT_RAY_JOB_STOP_WAIT_TIME_S = 3 SUBPROCESS_POLL_PERIOD_S = 0.1 + VALID_STOP_SIGNALS = ["SIGINT", "SIGTERM"] def __init__( self, @@ -225,6 +226,14 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen: start_new_session=True, stdout=logs_file, stderr=subprocess.STDOUT, + # Ray intentionally blocks SIGINT in all processes, so if the user wants + # to stop job through SIGINT, we need to unblock it in the child process + preexec_fn=lambda: signal.pthread_sigmask( + signal.SIG_UNBLOCK, {signal.SIGINT} + ) + if sys.platform != "win32" + and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT" + else None, ) parent_pid = os.getpid() child_pid = child_process.pid @@ -377,7 +386,17 @@ async def run( win32job.TerminateJobObject(self._win32_job_object, -1) elif sys.platform != "win32": try: - os.killpg(os.getpgid(child_process.pid), signal.SIGTERM) + stop_signal = os.environ.get("RAY_JOB_STOP_SIGNAL", "SIGTERM") + if stop_signal not in self.VALID_STOP_SIGNALS: + logger.warning( + f"{stop_signal} not a valid stop signal. Terminating " + "job with SIGTERM." + ) + stop_signal = "SIGTERM" + os.killpg( + os.getpgid(child_process.pid), + getattr(signal, stop_signal), + ) except ProcessLookupError: # Process already completed. logger.info( @@ -386,21 +405,26 @@ async def run( ) pass else: - # Wait for job to terminate gracefully, otherwise kill process + # Wait for job to exit gracefully, otherwise kill process # forcefully after timeout. try: - await asyncio.wait_for( - polling_task, self.WAIT_FOR_JOB_TERMINATION_S + stop_job_wait_time = int( + os.environ.get( + "RAY_JOB_STOP_WAIT_TIME_S", + self.DEFAULT_RAY_JOB_STOP_WAIT_TIME_S, + ) ) + await asyncio.wait_for(polling_task, stop_job_wait_time) logger.info( - f"Job {self._job_id} has been terminated gracefully." + f"Job {self._job_id} has been terminated gracefully " + f"with {stop_signal}." ) except asyncio.TimeoutError: logger.warning( f"Attempt to gracefully terminate job {self._job_id} " - "through SIGTERM has timed out after " - f"{self.WAIT_FOR_JOB_TERMINATION_S} seconds. Job is " - "now being force-killed." + f"through {stop_signal} has timed out after " + f"{stop_job_wait_time} seconds. Job is now being " + "force-killed." ) polling_task.cancel() child_process.kill() diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 9587e1639d5d..e85a078035e9 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -816,7 +816,11 @@ def handler(*args): @pytest.mark.asyncio -async def test_stop_job_timeout(job_manager): +@pytest.mark.parametrize( + "use_env_var,stop_timeout", + [(True, 10), (False, JobSupervisor.DEFAULT_RAY_JOB_STOP_WAIT_TIME_S)], +) +async def test_stop_job_timeout(job_manager, use_env_var, stop_timeout): """ Stop job should send SIGTERM first, then if timeout occurs, send SIGKILL. """ @@ -826,14 +830,19 @@ async def test_stop_job_timeout(job_manager): import time def handler(*args): print('SIGTERM signal handled!'); - pass signal.signal(signal.SIGTERM, handler) while True: print('Waiting...') time.sleep(1)\" """ - job_id = await job_manager.submit_job(entrypoint=entrypoint) + if use_env_var: + job_id = await job_manager.submit_job( + entrypoint=entrypoint, + runtime_env={"env_vars": {"RAY_JOB_STOP_WAIT_TIME_S": str(stop_timeout)}}, + ) + else: + job_id = await job_manager.submit_job(entrypoint=entrypoint) await async_wait_for_condition( lambda: "Waiting..." in job_manager.get_job_logs(job_id) @@ -844,11 +853,12 @@ def handler(*args): await async_wait_for_condition( lambda: "SIGTERM signal handled!" in job_manager.get_job_logs(job_id) ) + await async_wait_for_condition_async_predicate( check_job_stopped, job_manager=job_manager, job_id=job_id, - timeout=JobSupervisor.WAIT_FOR_JOB_TERMINATION_S + 10, + timeout=stop_timeout + 10, ) diff --git a/doc/source/serve/dev-workflow.md b/doc/source/serve/dev-workflow.md index f40a1efde15b..ed6c17956a1d 100644 --- a/doc/source/serve/dev-workflow.md +++ b/doc/source/serve/dev-workflow.md @@ -61,6 +61,10 @@ serve run local_dev:graph The `serve run` command blocks the terminal and can be canceled with Ctrl-C. +:::{note} +If you already have a local Ray Cluster running before executing `serve run`, make sure that the path to your Serve app is accessible from the working directory in which you started the Ray Cluster using `ray start --head`. Otherwise, you can pass in `app-dir` or `working-dir` when executing `serve run`. See the documentation for [serve run](serve_cli.html#serve-run) for more details. +::: + Now that Serve is running, we can send HTTP requests to the application. For simplicity, we'll just use the `curl` command to send requests from another terminal. @@ -81,17 +85,17 @@ Note that rerunning `serve run` will redeploy all deployments. To prevent redepl ## Testing on a remote cluster -To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `ray://:10001`; see [Ray Client](ray-client-ref) for more information. +To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `http://:8265` and will be passed to Ray Job Submission; see [Ray Jobs](jobs-overview) for more information. When making the transition from your local machine to a remote cluster, you'll need to make sure your cluster has a similar environment to your local machine--files, environment variables, and Python packages, for example. Let's see a simple example that just packages the code. Run the following command on your local machine, with your remote cluster head node IP address substituted for `` in the command: ```bash -serve run --address=ray://:10001 --working_dir="./project/src" local_dev:graph +serve run --address=http://:8265 --working_dir="./project/src" local_dev:graph ``` -This will connect to the remote cluster via Ray Client, upload the `working_dir` directory, and run your serve application. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve. +This will upload the `working_dir` directory to the remote cluster and run your Serve application as a Ray Job on the remote cluster. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve. Once this is up and running, we can send requests to the application: @@ -103,7 +107,7 @@ curl -X PUT http://:8000/?name=Ray For more complex dependencies, including files outside the working directory, environment variables, and Python packages, you can use {ref}`Runtime Environments`. Here is an example using the --runtime-env-json argument: ```bash -serve run --address=ray://:10001 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph +serve run --address=http://:8265 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph ``` You can also specify the `runtime_env` via a YAML file; see [serve run](serve_cli.html#serve-run) for details. diff --git a/python/ray/serve/run_script.py b/python/ray/serve/run_script.py new file mode 100644 index 000000000000..7bf579694acf --- /dev/null +++ b/python/ray/serve/run_script.py @@ -0,0 +1,114 @@ +import argparse +import pathlib +import sys +import time +import yaml + +from ray import serve +from ray._private.utils import import_attr +from ray.autoscaler._private.cli_logger import cli_logger +from ray.serve._private import api as _private_api +from ray.serve._private.constants import ( + DEFAULT_HTTP_HOST, + DEFAULT_HTTP_PORT, +) +from ray.serve.schema import ServeApplicationSchema + + +def main(): + """ + This is the Job that gets submitted to the Ray Cluster when `serve run` is executed. + + Loads the Serve app (either from a YAML config file or a direct import path), starts + Serve and runs the app. By default, the code blocks until a SIGINT signal is + received, at which point Serve is shutdown and the process exits. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--config-or-import-path") + parser.add_argument("--app-dir") + parser.add_argument("--host") + parser.add_argument("--port", type=int) + parser.add_argument("--blocking", action="store_true") + parser.add_argument("--gradio", action="store_true") + args = parser.parse_args() + host, port = args.host, args.port + + sys.path.insert(0, args.app_dir) + if pathlib.Path(args.config_or_import_path).is_file(): + config_path = args.config_or_import_path + cli_logger.print(f"Deploying from config file: '{config_path}'.") + + with open(config_path, "r") as config_file: + config_dict = yaml.safe_load(config_file) + # If host or port is specified as a CLI argument, they should take priority + # over config values. + config_dict.setdefault("host", DEFAULT_HTTP_HOST) + if host is not None: + config_dict["host"] = host + + config_dict.setdefault("port", DEFAULT_HTTP_PORT) + if port is not None: + config_dict["port"] = port + + config = ServeApplicationSchema.parse_obj(config_dict) + is_config = True + else: + if host is None: + host = DEFAULT_HTTP_HOST + if port is None: + port = DEFAULT_HTTP_PORT + import_path = args.config_or_import_path + cli_logger.print(f"Deploying from import path: '{import_path}'.") + node = import_attr(import_path) + is_config = False + + if is_config: + client = _private_api.serve_start( + detached=True, + http_options={ + "host": config.host, + "port": config.port, + "location": "EveryNode", + }, + ) + else: + client = _private_api.serve_start( + detached=True, + http_options={ + "host": host, + "port": port, + "location": "EveryNode", + }, + ) + + try: + if is_config: + client.deploy_app(config, _blocking=args.gradio) + cli_logger.success("Submitted deploy config successfully.") + if args.gradio: + handle = serve.get_deployment("DAGDriver").get_handle() + else: + handle = serve.run(node, host=host, port=port) + cli_logger.success("Deployed Serve app successfully.") + + if args.gradio: + from ray.serve.experimental.gradio_visualize_graph import ( + GraphVisualizer, + ) + + visualizer = GraphVisualizer() + visualizer.visualize_with_gradio(handle) + else: + if args.blocking: + while True: + # Block, letting Ray print logs to the terminal. + time.sleep(10) + + except KeyboardInterrupt: + cli_logger.info("Got KeyboardInterrupt, shutting down...") + serve.shutdown() + sys.exit() + + +if __name__ == "__main__": + main() diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 1325709939e8..7a54664e8626 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -1,8 +1,8 @@ #!/usr/bin/env python +import asyncio import os -import pathlib +import signal import sys -import time from typing import Optional, Union import click @@ -15,6 +15,8 @@ from ray.autoscaler._private.cli_logger import cli_logger from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args from ray.dashboard.modules.serve.sdk import ServeSubmissionClient +from ray.job_submission import JobSubmissionClient +from ray.serve import run_script from ray.serve.api import build as build_app from ray.serve.config import DeploymentMode from ray.serve._private.constants import ( @@ -25,17 +27,11 @@ from ray.serve.deployment import deployment_to_schema from ray.serve.deployment_graph import ClassNode, FunctionNode from ray.serve.schema import ServeApplicationSchema -from ray.serve._private import api as _private_api -APP_DIR_HELP_STR = ( - "Local directory to look for the IMPORT_PATH (will be inserted into " - "PYTHONPATH). Defaults to '.', meaning that an object in ./main.py " - "can be imported as 'main.object'. Not relevant if you're importing " - "from an installed module." -) RAY_INIT_ADDRESS_HELP_STR = ( - "Address to use for ray.init(). Can also be specified " - "using the RAY_ADDRESS environment variable." + "Address of the Ray Cluster to run the Serve app on. If no address is specified, " + "a local Ray Cluster will be started. Can also be specified using the RAY_ADDRESS " + "environment variable." ) RAY_DASHBOARD_ADDRESS_HELP_STR = ( "Address to use to query the Ray dashboard agent (defaults to " @@ -189,12 +185,15 @@ def deploy(config_file_name: str, address: str): @cli.command( short_help="Run a Serve app.", help=( - "Runs the Serve app from the specified import path (e.g. " - "my_script:my_bound_deployment) or YAML config.\n\n" + "Runs a Serve app (specified in config_or_import_path) on a cluster as a Ray " + "Job. config_or_import_path is either a filepath to a YAML config file on the " + "Ray Cluster, or an import path on the Ray Cluster for a deployment node of " + "the pattern containing_module:deployment_node.\n\n" "If using a YAML config, existing deployments with no code changes " "will not be redeployed.\n\n" - "Any import path must lead to a FunctionNode or ClassNode object. " - "By default, this will block and periodically log status. If you " + "Any import path, whether directly specified as the command argument or " + "inside a config file, must lead to a FunctionNode or ClassNode object.\n\n" + "By default, this command will block and periodically log status. If you " "Ctrl-C the command, it will tear down the app." ), ) @@ -205,7 +204,7 @@ def deploy(config_file_name: str, address: str): default=None, required=False, help="Path to a local YAML file containing a runtime_env definition. " - "This will be passed to ray.init() as the default for deployments.", + "This will be passed to Ray Jobs as the default for deployments.", ) @click.option( "--runtime-env-json", @@ -213,7 +212,7 @@ def deploy(config_file_name: str, address: str): default=None, required=False, help="JSON-serialized runtime_env dictionary. This will be passed to " - "ray.init() as the default for deployments.", + "Ray Jobs as the default for deployments.", ) @click.option( "--working-dir", @@ -224,7 +223,7 @@ def deploy(config_file_name: str, address: str): "Directory containing files that your job will run in. Can be a " "local directory or a remote URI to a .zip file (S3, GS, HTTP). " "This overrides the working_dir in --runtime-env if both are " - "specified. This will be passed to ray.init() as the default for " + "specified. This will be passed to Ray Jobs as the default for " "deployments." ), ) @@ -233,7 +232,12 @@ def deploy(config_file_name: str, address: str): "-d", default=".", type=str, - help=APP_DIR_HELP_STR, + help=( + "Directory on the Ray Cluster in which to look for the IMPORT_PATH (will be " + "inserted into PYTHONPATH). Defaults to '.', i.e. a deployment node `app_node` " + "in working_directory/main.py on the Ray Cluster can be run using " + "`main:app_node`. Not relevant if you're importing from an installed module." + ), ) @click.option( "--address", @@ -282,85 +286,55 @@ def run( blocking: bool, gradio: bool, ): - sys.path.insert(0, app_dir) + # If no address is given and no local ray instance is running, we want to start one. + if address is None: + ray.init(namespace=SERVE_NAMESPACE) final_runtime_env = parse_runtime_env_args( runtime_env=runtime_env, runtime_env_json=runtime_env_json, working_dir=working_dir, ) + if "env_vars" not in final_runtime_env: + final_runtime_env["env_vars"] = {} + # Send interrupt signal to run_script, which triggers shutdown of Serve. + final_runtime_env["env_vars"]["RAY_JOB_STOP_SIGNAL"] = "SIGINT" + # Make sure Serve is shutdown correctly before the job is forcefully killed. + final_runtime_env["env_vars"]["RAY_JOB_STOP_WAIT_TIME_S"] = "30" + + # The job to run on the cluster, which imports and runs the serve app. + with open(run_script.__file__, "r") as f: + script = f.read() + + # Use Ray Job Submission to run serve. + client = JobSubmissionClient(address) + submission_id = client.submit_job( + entrypoint=( + f"python -c '{script}' " + f"--config-or-import-path={config_or_import_path} " + f"--app-dir={app_dir} " + + (f"--host={host} " if host is not None else "") + + (f"--port={port} " if port is not None else "") + + ("--blocking " if blocking else "") + + ("--gradio " if gradio else "") + ), + # Setting the runtime_env will set defaults for the deployments. + runtime_env=final_runtime_env, + ) - if pathlib.Path(config_or_import_path).is_file(): - config_path = config_or_import_path - cli_logger.print(f'Deploying from config file: "{config_path}".') - - with open(config_path, "r") as config_file: - config_dict = yaml.safe_load(config_file) - # If host or port is specified as a CLI argument, they should take priority - # over config values. - config_dict.setdefault("host", DEFAULT_HTTP_HOST) - if host is not None: - config_dict["host"] = host - - config_dict.setdefault("port", DEFAULT_HTTP_PORT) - if port is not None: - config_dict["port"] = port + async def print_logs(): + async for lines in client.tail_job_logs(submission_id): + print(lines, end="") - config = ServeApplicationSchema.parse_obj(config_dict) - is_config = True - else: - if host is None: - host = DEFAULT_HTTP_HOST - if port is None: - port = DEFAULT_HTTP_PORT - import_path = config_or_import_path - cli_logger.print(f'Deploying from import path: "{import_path}".') - node = import_attr(import_path) - is_config = False - - # Setting the runtime_env here will set defaults for the deployments. - ray.init(address=address, namespace=SERVE_NAMESPACE, runtime_env=final_runtime_env) - - if is_config: - client = _private_api.serve_start( - detached=True, - http_options={ - "host": config.host, - "port": config.port, - "location": "EveryNode", - }, - ) - else: - client = _private_api.serve_start( - detached=True, - http_options={"host": host, "port": port, "location": "EveryNode"}, - ) + def interrupt_handler(): + # Upon keyboard interrupt, stop job (which sends an interrupt signal to the job + # and shuts down serve). Then continue to stream logs until the job finishes. + client.stop_job(submission_id) - try: - if is_config: - client.deploy_app(config, _blocking=gradio) - cli_logger.success("Submitted deploy config successfully.") - if gradio: - handle = serve.get_deployment("DAGDriver").get_handle() - else: - handle = serve.run(node, host=host, port=port) - cli_logger.success("Deployed Serve app successfully.") - - if gradio: - from ray.serve.experimental.gradio_visualize_graph import GraphVisualizer - - visualizer = GraphVisualizer() - visualizer.visualize_with_gradio(handle) - else: - if blocking: - while True: - # Block, letting Ray print logs to the terminal. - time.sleep(10) - - except KeyboardInterrupt: - cli_logger.info("Got KeyboardInterrupt, shutting down...") - serve.shutdown() - sys.exit() + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, interrupt_handler) + loop.run_until_complete(print_logs()) + loop.close() @cli.command(help="Get the current config of the running Serve app.") @@ -456,7 +430,12 @@ def shutdown(address: str, yes: bool): "-d", default=".", type=str, - help=APP_DIR_HELP_STR, + help=( + "Local directory to look for the IMPORT_PATH (will be inserted into " + "PYTHONPATH). Defaults to '.', meaning that an object in ./main.py " + "can be imported as 'main.object'. Not relevant if you're importing " + "from an installed module." + ), ) @click.option( "--kubernetes_format", diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index 13e609e7028a..251b8e76b800 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -333,7 +333,8 @@ def parrot(request): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -def test_run_application(ray_start_stop): +@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) +def test_run_application(ray_start_stop, address): """Deploys valid config file and import path via `serve run`.""" # Deploy via config file @@ -342,7 +343,7 @@ def test_run_application(ray_start_stop): ) print('Running config file "arithmetic.yaml".') - p = subprocess.Popen(["serve", "run", "--address=auto", config_file_name]) + p = subprocess.Popen(["serve", "run", f"--address={address}", config_file_name]) wait_for_condition( lambda: requests.post("http://localhost:8000/", json=["ADD", 0]).json() == 1, timeout=15, @@ -362,7 +363,7 @@ def test_run_application(ray_start_stop): print('Running node at import path "ray.serve.tests.test_cli.parrot_node".') # Deploy via import path p = subprocess.Popen( - ["serve", "run", "--address=auto", "ray.serve.tests.test_cli.parrot_node"] + ["serve", "run", f"--address={address}", "ray.serve.tests.test_cli.parrot_node"] ) wait_for_condition( lambda: ping_endpoint("parrot", params="?sound=squawk") == "squawk" @@ -393,7 +394,8 @@ def __call__(self): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -def test_run_deployment_node(ray_start_stop): +@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) +def test_run_deployment_node(ray_start_stop, address): """Test `serve run` with bound args and kwargs.""" # Deploy via import path @@ -401,11 +403,11 @@ def test_run_deployment_node(ray_start_stop): [ "serve", "run", - "--address=auto", + f"--address={address}", "ray.serve.tests.test_cli.molly_macaw", ] ) - wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=10) + wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=15) p.send_signal(signal.SIGINT) p.wait() assert ping_endpoint("Macaw") == CONNECTION_ERROR_MSG @@ -421,7 +423,8 @@ def __call__(self, *args): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -def test_run_runtime_env(ray_start_stop): +@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) +def test_run_runtime_env(ray_start_stop, address): """Test `serve run` with runtime_env passed in.""" # With import path @@ -429,14 +432,14 @@ def test_run_runtime_env(ray_start_stop): [ "serve", "run", - "--address=auto", + f"--address={address}", "ray.serve.tests.test_cli.metal_detector_node", "--runtime-env-json", ('{"env_vars": {"buried_item": "lucky coin"} }'), ] ) wait_for_condition( - lambda: ping_endpoint("MetalDetector") == "lucky coin", timeout=10 + lambda: ping_endpoint("MetalDetector") == "lucky coin", timeout=15 ) p.send_signal(signal.SIGINT) p.wait() @@ -446,7 +449,7 @@ def test_run_runtime_env(ray_start_stop): [ "serve", "run", - "--address=auto", + f"--address={address}", os.path.join( os.path.dirname(__file__), "test_config_files", diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 41dc832d5d64..2b13000089c9 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -695,14 +695,15 @@ def get_actor_metadata(self): ) run_string_as_driver(driver) - job_submission_client = ray.job_submission.JobSubmissionClient( - "http://127.0.0.1:8265" - ) - job_id = job_submission_client.submit_job(entrypoint="ls") - wait_for_condition( - lambda: job_submission_client.get_job_status(job_id) - == ray.job_submission.JobStatus.SUCCEEDED - ) + if sys.platform != "win32": + job_submission_client = ray.job_submission.JobSubmissionClient( + "http://127.0.0.1:8265" + ) + job_id = job_submission_client.submit_job(entrypoint="ls") + wait_for_condition( + lambda: job_submission_client.get_job_status(job_id) + == ray.job_submission.JobStatus.SUCCEEDED + ) library_usages = ray_usage_lib.get_library_usages_to_report( ray.experimental.internal_kv.internal_kv_get_gcs_client() @@ -722,8 +723,9 @@ def get_actor_metadata(self): "util.multiprocessing.Pool", "util.Queue", "util.joblib", - "job_submission", } + if sys.platform != "win32": + expected.add("job_submission") if ray_client: expected.add("client") assert set(library_usages) == expected