diff --git a/dashboard/modules/dashboard_sdk.py b/dashboard/modules/dashboard_sdk.py new file mode 100644 index 000000000000..cda7b7cf57eb --- /dev/null +++ b/dashboard/modules/dashboard_sdk.py @@ -0,0 +1,315 @@ +import dataclasses +import importlib +import logging +import json +import yaml +from pathlib import Path +import tempfile +from typing import Any, Dict, List, Optional +from pkg_resources import packaging + +try: + import aiohttp + import requests +except ImportError: + aiohttp = None + requests = None + +from ray._private.runtime_env.packaging import ( + create_package, + get_uri_for_directory, + parse_uri, +) +from ray.dashboard.modules.job.common import uri_to_http_components + +from ray.ray_constants import DEFAULT_DASHBOARD_PORT +from ray.util.annotations import PublicAPI +from ray.client_builder import _split_address +from ray.autoscaler._private.cli_logger import cli_logger + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def parse_runtime_env_args( + runtime_env: Optional[str] = None, + runtime_env_json: Optional[str] = None, + working_dir: Optional[str] = None, +): + """ + Generates a runtime_env dictionary using `runtime_env`, `runtime_env_json`, + and `working_dir` CLI options. Only one of `runtime_env` or + `runtime_env_json` may be defined. `working_dir` overwrites the + `working_dir` from any other option. + """ + + final_runtime_env = {} + if runtime_env is not None: + if runtime_env_json is not None: + raise ValueError( + "Only one of --runtime_env and --runtime-env-json can be provided." + ) + with open(runtime_env, "r") as f: + final_runtime_env = yaml.safe_load(f) + + elif runtime_env_json is not None: + final_runtime_env = json.loads(runtime_env_json) + + if working_dir is not None: + if "working_dir" in final_runtime_env: + cli_logger.warning( + "Overriding runtime_env working_dir with --working-dir option" + ) + + final_runtime_env["working_dir"] = working_dir + + return final_runtime_env + + +@dataclasses.dataclass +class ClusterInfo: + address: str + cookies: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None + headers: Optional[Dict[str, Any]] = None + + +def get_submission_client_cluster_info( + address: str, + # For backwards compatibility + *, + # only used in importlib case in parse_cluster_info, but needed + # in function signature. + create_cluster_if_needed: Optional[bool] = False, + cookies: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + _use_tls: Optional[bool] = False, +) -> ClusterInfo: + """Get address, cookies, and metadata used for SubmissionClient. + + If no port is specified in `address`, the Ray dashboard default will be + inserted. + + Args: + address (str): Address without the module prefix that is passed + to SubmissionClient. + create_cluster_if_needed (bool): Indicates whether the cluster + of the address returned needs to be running. Ray doesn't + start a cluster before interacting with jobs, but other + implementations may do so. + + Returns: + ClusterInfo object consisting of address, cookies, and metadata + for SubmissionClient to use. + """ + + scheme = "https" if _use_tls else "http" + + split = address.split(":") + host = split[0] + if len(split) == 1: + port = DEFAULT_DASHBOARD_PORT + elif len(split) == 2: + port = int(split[1]) + else: + raise ValueError(f"Invalid address: {address}.") + + return ClusterInfo( + address=f"{scheme}://{host}:{port}", + cookies=cookies, + metadata=metadata, + headers=headers, + ) + + +def parse_cluster_info( + address: str, + create_cluster_if_needed: bool = False, + cookies: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, +) -> ClusterInfo: + module_string, inner_address = _split_address(address) + + # If user passes http(s):// or ray://, go through normal parsing. + if module_string in {"http", "https", "ray"}: + return get_submission_client_cluster_info( + inner_address, + create_cluster_if_needed=create_cluster_if_needed, + cookies=cookies, + metadata=metadata, + headers=headers, + _use_tls=module_string == "https", + ) + # Try to dynamically import the function to get cluster info. + else: + try: + module = importlib.import_module(module_string) + except Exception: + raise RuntimeError( + f"Module: {module_string} does not exist.\n" + f"This module was parsed from Address: {address}" + ) from None + assert "get_submission_client_cluster_info" in dir(module), ( + f"Module: {module_string} does " + "not have `get_submission_client_cluster_info`." + ) + + return module.get_submission_client_cluster_info( + inner_address, + create_cluster_if_needed=create_cluster_if_needed, + cookies=cookies, + metadata=metadata, + headers=headers, + ) + + +class SubmissionClient: + def __init__( + self, + address: str, + create_cluster_if_needed=False, + cookies: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + ): + + cluster_info = parse_cluster_info( + address, create_cluster_if_needed, cookies, metadata, headers + ) + self._address = cluster_info.address + self._cookies = cluster_info.cookies + self._default_metadata = cluster_info.metadata or {} + # Headers used for all requests sent to job server, optional and only + # needed for cases like authentication to remote cluster. + self._headers = cluster_info.headers + + def _check_connection_and_version( + self, min_version: str = "1.9", version_error_message: str = None + ): + if version_error_message is None: + version_error_message = ( + f"Please ensure the cluster is running Ray {min_version} or higher." + ) + + try: + r = self._do_request("GET", "/api/version") + if r.status_code == 404: + raise RuntimeError(version_error_message) + r.raise_for_status() + + running_ray_version = r.json()["ray_version"] + if packaging.version.parse(running_ray_version) < packaging.version.parse( + min_version + ): + raise RuntimeError(version_error_message) + # TODO(edoakes): check the version if/when we break compatibility. + except requests.exceptions.ConnectionError: + raise ConnectionError( + f"Failed to connect to Ray at address: {self._address}." + ) + + def _raise_error(self, r: "requests.Response"): + raise RuntimeError( + f"Request failed with status code {r.status_code}: {r.text}." + ) + + def _do_request( + self, + method: str, + endpoint: str, + *, + data: Optional[bytes] = None, + json_data: Optional[dict] = None, + ) -> "requests.Response": + url = self._address + endpoint + logger.debug(f"Sending request to {url} with json data: {json_data or {}}.") + return requests.request( + method, + url, + cookies=self._cookies, + data=data, + json=json_data, + headers=self._headers, + ) + + def _package_exists( + self, + package_uri: str, + ) -> bool: + protocol, package_name = uri_to_http_components(package_uri) + r = self._do_request("GET", f"/api/packages/{protocol}/{package_name}") + + if r.status_code == 200: + logger.debug(f"Package {package_uri} already exists.") + return True + elif r.status_code == 404: + logger.debug(f"Package {package_uri} does not exist.") + return False + else: + self._raise_error(r) + + def _upload_package( + self, + package_uri: str, + package_path: str, + include_parent_dir: Optional[bool] = False, + excludes: Optional[List[str]] = None, + ) -> bool: + logger.info(f"Uploading package {package_uri}.") + with tempfile.TemporaryDirectory() as tmp_dir: + protocol, package_name = uri_to_http_components(package_uri) + package_file = Path(tmp_dir) / package_name + create_package( + package_path, + package_file, + include_parent_dir=include_parent_dir, + excludes=excludes, + ) + try: + r = self._do_request( + "PUT", + f"/api/packages/{protocol}/{package_name}", + data=package_file.read_bytes(), + ) + if r.status_code != 200: + self._raise_error(r) + finally: + package_file.unlink() + + def _upload_package_if_needed( + self, package_path: str, excludes: Optional[List[str]] = None + ) -> str: + package_uri = get_uri_for_directory(package_path, excludes=excludes) + if not self._package_exists(package_uri): + self._upload_package(package_uri, package_path, excludes=excludes) + else: + logger.info(f"Package {package_uri} already exists, skipping upload.") + + return package_uri + + def _upload_working_dir_if_needed(self, runtime_env: Dict[str, Any]): + if "working_dir" in runtime_env: + working_dir = runtime_env["working_dir"] + try: + parse_uri(working_dir) + is_uri = True + logger.debug("working_dir is already a valid URI.") + except ValueError: + is_uri = False + + if not is_uri: + logger.debug("working_dir is not a URI, attempting to upload.") + package_uri = self._upload_package_if_needed( + working_dir, excludes=runtime_env.get("excludes", None) + ) + runtime_env["working_dir"] = package_uri + + @PublicAPI(stability="beta") + def get_version(self) -> str: + r = self._do_request("GET", "/api/version") + if r.status_code == 200: + return r.json().get("version") + else: + self._raise_error(r) diff --git a/dashboard/modules/job/cli.py b/dashboard/modules/job/cli.py index e43f0588bf7c..9b10d248be55 100644 --- a/dashboard/modules/job/cli.py +++ b/dashboard/modules/job/cli.py @@ -1,17 +1,16 @@ import asyncio -import json import os import pprint from subprocess import list2cmdline import time from typing import Optional, Tuple -import yaml import click from ray.autoscaler._private.cli_logger import add_click_logging_options, cli_logger, cf from ray.job_submission import JobStatus, JobSubmissionClient from ray.util.annotations import PublicAPI +from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args def _get_sdk_client( @@ -144,25 +143,11 @@ def submit( """ client = _get_sdk_client(address, create_cluster_if_needed=True) - final_runtime_env = {} - if runtime_env is not None: - if runtime_env_json is not None: - raise ValueError( - "Only one of --runtime_env and " "--runtime-env-json can be provided." - ) - with open(runtime_env, "r") as f: - final_runtime_env = yaml.safe_load(f) - - elif runtime_env_json is not None: - final_runtime_env = json.loads(runtime_env_json) - - if working_dir is not None: - if "working_dir" in final_runtime_env: - cli_logger.warning( - "Overriding runtime_env working_dir with --working-dir option" - ) - - final_runtime_env["working_dir"] = working_dir + final_runtime_env = parse_runtime_env_args( + runtime_env=runtime_env, + runtime_env_json=runtime_env_json, + working_dir=working_dir, + ) job_id = client.submit_job( entrypoint=list2cmdline(entrypoint), diff --git a/dashboard/modules/job/sdk.py b/dashboard/modules/job/sdk.py index c9ac5ff8895b..679a253c0f63 100644 --- a/dashboard/modules/job/sdk.py +++ b/dashboard/modules/job/sdk.py @@ -1,9 +1,6 @@ import dataclasses -import importlib import logging -from pathlib import Path -import tempfile -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Iterator, Optional try: import aiohttp @@ -12,11 +9,6 @@ aiohttp = None requests = None -from ray._private.runtime_env.packaging import ( - create_package, - get_uri_for_directory, - parse_uri, -) from ray.dashboard.modules.job.common import ( JobStatus, JobSubmitRequest, @@ -24,117 +16,16 @@ JobStopResponse, JobInfo, JobLogsResponse, - uri_to_http_components, ) +from ray.dashboard.modules.dashboard_sdk import SubmissionClient -from ray.ray_constants import DEFAULT_DASHBOARD_PORT from ray.util.annotations import PublicAPI -from ray.client_builder import _split_address logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -@dataclasses.dataclass -class ClusterInfo: - address: str - cookies: Optional[Dict[str, Any]] = None - metadata: Optional[Dict[str, Any]] = None - headers: Optional[Dict[str, Any]] = None - - -def get_job_submission_client_cluster_info( - address: str, - # For backwards compatibility - *, - # only used in importlib case in parse_cluster_info, but needed - # in function signature. - create_cluster_if_needed: Optional[bool] = False, - cookies: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - headers: Optional[Dict[str, Any]] = None, - _use_tls: Optional[bool] = False, -) -> ClusterInfo: - """Get address, cookies, and metadata used for JobSubmissionClient. - - If no port is specified in `address`, the Ray dashboard default will be - inserted. - - Args: - address (str): Address without the module prefix that is passed - to JobSubmissionClient. - create_cluster_if_needed (bool): Indicates whether the cluster - of the address returned needs to be running. Ray doesn't - start a cluster before interacting with jobs, but other - implementations may do so. - - Returns: - ClusterInfo object consisting of address, cookies, and metadata - for JobSubmissionClient to use. - """ - - scheme = "https" if _use_tls else "http" - - split = address.split(":") - host = split[0] - if len(split) == 1: - port = DEFAULT_DASHBOARD_PORT - elif len(split) == 2: - port = int(split[1]) - else: - raise ValueError(f"Invalid address: {address}.") - - return ClusterInfo( - address=f"{scheme}://{host}:{port}", - cookies=cookies, - metadata=metadata, - headers=headers, - ) - - -def parse_cluster_info( - address: str, - create_cluster_if_needed: bool = False, - cookies: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - headers: Optional[Dict[str, Any]] = None, -) -> ClusterInfo: - module_string, inner_address = _split_address(address) - - # If user passes http(s):// or ray://, go through normal parsing. - if module_string in {"http", "https", "ray"}: - return get_job_submission_client_cluster_info( - inner_address, - create_cluster_if_needed=create_cluster_if_needed, - cookies=cookies, - metadata=metadata, - headers=headers, - _use_tls=module_string == "https", - ) - # Try to dynamically import the function to get cluster info. - else: - try: - module = importlib.import_module(module_string) - except Exception: - raise RuntimeError( - f"Module: {module_string} does not exist.\n" - f"This module was parsed from Address: {address}" - ) from None - assert "get_job_submission_client_cluster_info" in dir(module), ( - f"Module: {module_string} does " - "not have `get_job_submission_client_cluster_info`." - ) - - return module.get_job_submission_client_cluster_info( - inner_address, - create_cluster_if_needed=create_cluster_if_needed, - cookies=cookies, - metadata=metadata, - headers=headers, - ) - - -class JobSubmissionClient: +class JobSubmissionClient(SubmissionClient): def __init__( self, address: str, @@ -148,140 +39,20 @@ def __init__( "The Ray jobs CLI & SDK require the ray[default] " "installation: `pip install 'ray[default']``" ) - - cluster_info = parse_cluster_info( - address, create_cluster_if_needed, cookies, metadata, headers - ) - self._address = cluster_info.address - self._cookies = cluster_info.cookies - self._default_metadata = cluster_info.metadata or {} - # Headers used for all requests sent to job server, optional and only - # needed for cases like authentication to remote cluster. - self._headers = cluster_info.headers - - self._check_connection_and_version() - - def _check_connection_and_version(self): - try: - r = self._do_request("GET", "/api/version") - if r.status_code == 404: - raise RuntimeError( - "Jobs API not supported on the Ray cluster. " - "Please ensure the cluster is running " - "Ray 1.9 or higher." - ) - - r.raise_for_status() - # TODO(edoakes): check the version if/when we break compatibility. - except requests.exceptions.ConnectionError: - raise ConnectionError( - f"Failed to connect to Ray at address: {self._address}." - ) - - def _raise_error(self, r: "requests.Response"): - raise RuntimeError( - f"Request failed with status code {r.status_code}: {r.text}." + super().__init__( + address=address, + create_cluster_if_needed=create_cluster_if_needed, + cookies=cookies, + metadata=metadata, + headers=headers, ) - - def _do_request( - self, - method: str, - endpoint: str, - *, - data: Optional[bytes] = None, - json_data: Optional[dict] = None, - ) -> "requests.Response": - url = self._address + endpoint - logger.debug(f"Sending request to {url} with json data: {json_data or {}}.") - return requests.request( - method, - url, - cookies=self._cookies, - data=data, - json=json_data, - headers=self._headers, + self._check_connection_and_version( + min_version="1.9", + version_error_message="Jobs API is not supported on the Ray " + "cluster. Please ensure the cluster is " + "running Ray 1.9 or higher.", ) - def _package_exists( - self, - package_uri: str, - ) -> bool: - protocol, package_name = uri_to_http_components(package_uri) - r = self._do_request("GET", f"/api/packages/{protocol}/{package_name}") - - if r.status_code == 200: - logger.debug(f"Package {package_uri} already exists.") - return True - elif r.status_code == 404: - logger.debug(f"Package {package_uri} does not exist.") - return False - else: - self._raise_error(r) - - def _upload_package( - self, - package_uri: str, - package_path: str, - include_parent_dir: Optional[bool] = False, - excludes: Optional[List[str]] = None, - ) -> bool: - logger.info(f"Uploading package {package_uri}.") - with tempfile.TemporaryDirectory() as tmp_dir: - protocol, package_name = uri_to_http_components(package_uri) - package_file = Path(tmp_dir) / package_name - create_package( - package_path, - package_file, - include_parent_dir=include_parent_dir, - excludes=excludes, - ) - try: - r = self._do_request( - "PUT", - f"/api/packages/{protocol}/{package_name}", - data=package_file.read_bytes(), - ) - if r.status_code != 200: - self._raise_error(r) - finally: - package_file.unlink() - - def _upload_package_if_needed( - self, package_path: str, excludes: Optional[List[str]] = None - ) -> str: - package_uri = get_uri_for_directory(package_path, excludes=excludes) - if not self._package_exists(package_uri): - self._upload_package(package_uri, package_path, excludes=excludes) - else: - logger.info(f"Package {package_uri} already exists, skipping upload.") - - return package_uri - - def _upload_working_dir_if_needed(self, runtime_env: Dict[str, Any]): - if "working_dir" in runtime_env: - working_dir = runtime_env["working_dir"] - try: - parse_uri(working_dir) - is_uri = True - logger.debug("working_dir is already a valid URI.") - except ValueError: - is_uri = False - - if not is_uri: - logger.debug("working_dir is not a URI, attempting to upload.") - package_uri = self._upload_package_if_needed( - working_dir, excludes=runtime_env.get("excludes", None) - ) - runtime_env["working_dir"] = package_uri - - @PublicAPI(stability="beta") - def get_version(self) -> str: - r = self._do_request("GET", "/api/version") - if r.status_code == 200: - return r.json().get("version") - else: - self._raise_error(r) - @PublicAPI(stability="beta") def submit_job( self, diff --git a/dashboard/modules/job/tests/test_http_job_server.py b/dashboard/modules/job/tests/test_http_job_server.py index 928d4d958511..89c03eb4420b 100644 --- a/dashboard/modules/job/tests/test_http_job_server.py +++ b/dashboard/modules/job/tests/test_http_job_server.py @@ -11,7 +11,7 @@ import ray from ray.job_submission import JobSubmissionClient, JobStatus from ray.dashboard.modules.job.common import CURRENT_VERSION, JobInfo -from ray.dashboard.modules.job.sdk import ( +from ray.dashboard.modules.dashboard_sdk import ( ClusterInfo, parse_cluster_info, ) diff --git a/dashboard/modules/job/tests/test_sdk.py b/dashboard/modules/job/tests/test_sdk.py index 2c8d73567142..b9607dddb978 100644 --- a/dashboard/modules/job/tests/test_sdk.py +++ b/dashboard/modules/job/tests/test_sdk.py @@ -2,7 +2,7 @@ from typing import Dict, Optional, Tuple from unittest.mock import Mock, patch -from ray.dashboard.modules.job.sdk import parse_cluster_info +from ray.dashboard.modules.dashboard_sdk import parse_cluster_info @pytest.mark.parametrize( @@ -25,12 +25,12 @@ def test_parse_cluster_info( headers: Optional[Dict[str, str]], ): """ - Test ray.dashboard.modules.job.sdk.parse_cluster_info for different + Test ray.dashboard.modules.dashboard_sdk.parse_cluster_info for different format of addresses. """ - mock_get_job_submission_client_cluster = Mock(return_value="Ray ClusterInfo") + mock_get_submission_client_cluster = Mock(return_value="Ray ClusterInfo") mock_module = Mock() - mock_module.get_job_submission_client_cluster_info = Mock( + mock_module.get_submission_client_cluster_info = Mock( return_value="Other module ClusterInfo" ) mock_import_module = Mock(return_value=mock_module) @@ -38,8 +38,8 @@ def test_parse_cluster_info( address, module_string, inner_address = address_param with patch.multiple( - "ray.dashboard.modules.job.sdk", - get_job_submission_client_cluster_info=mock_get_job_submission_client_cluster, + "ray.dashboard.modules.dashboard_sdk", + get_submission_client_cluster_info=mock_get_submission_client_cluster, ), patch.multiple("importlib", import_module=mock_import_module): if module_string == "ray": assert ( @@ -52,7 +52,7 @@ def test_parse_cluster_info( ) == "Ray ClusterInfo" ) - mock_get_job_submission_client_cluster.assert_called_once_with( + mock_get_submission_client_cluster.assert_called_once_with( inner_address, create_cluster_if_needed=create_cluster_if_needed, cookies=cookies, @@ -71,7 +71,7 @@ def test_parse_cluster_info( == "Other module ClusterInfo" ) mock_import_module.assert_called_once_with(module_string) - mock_module.get_job_submission_client_cluster_info.assert_called_once_with( + mock_module.get_submission_client_cluster_info.assert_called_once_with( inner_address, create_cluster_if_needed=create_cluster_if_needed, cookies=cookies, diff --git a/dashboard/modules/serve/sdk.py b/dashboard/modules/serve/sdk.py new file mode 100644 index 000000000000..178fe0cedf0e --- /dev/null +++ b/dashboard/modules/serve/sdk.py @@ -0,0 +1,70 @@ +from typing import Any, Dict, Optional, Union + +try: + import aiohttp + import requests +except ImportError: + aiohttp = None + requests = None + +from ray.dashboard.modules.dashboard_sdk import SubmissionClient + + +DEPLOY_PATH = "/api/serve/deployments/" +INFO_PATH = "/api/serve/deployments/" +STATUS_PATH = "/api/serve/deployments/status" +DELETE_PATH = "/api/serve/deployments/" + + +class ServeSubmissionClient(SubmissionClient): + def __init__( + self, + address: str, + create_cluster_if_needed=False, + cookies: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + ): + if requests is None: + raise RuntimeError( + "The Serve CLI requires the ray[default] " + "installation: `pip install 'ray[default']``" + ) + super().__init__( + address=address, + create_cluster_if_needed=create_cluster_if_needed, + cookies=cookies, + metadata=metadata, + headers=headers, + ) + self._check_connection_and_version( + min_version="1.12", + version_error_message="Serve CLI is not supported on the Ray " + "cluster. Please ensure the cluster is " + "running Ray 1.12 or higher.", + ) + + def deploy_application(self, app_config: Dict) -> None: + response = self._do_request("PUT", DEPLOY_PATH, json_data=app_config) + + if response.status_code != 200: + self._raise_error(response) + + def get_info(self) -> Union[Dict, None]: + response = self._do_request("GET", INFO_PATH) + if response.status_code == 200: + return response.json() + else: + self._raise_error(response) + + def get_status(self) -> Union[Dict, None]: + response = self._do_request("GET", STATUS_PATH) + if response.status_code == 200: + return response.json() + else: + self._raise_error(response) + + def delete_application(self) -> None: + response = self._do_request("DELETE", DELETE_PATH) + if response.status_code != 200: + self._raise_error(response) diff --git a/dashboard/modules/tests/test_config_files/basic_runtime_env.yaml b/dashboard/modules/tests/test_config_files/basic_runtime_env.yaml new file mode 100644 index 000000000000..a311b5182918 --- /dev/null +++ b/dashboard/modules/tests/test_config_files/basic_runtime_env.yaml @@ -0,0 +1,5 @@ +py_modules: + - "pm1" + - "pm2" + +working_dir: "wd" diff --git a/dashboard/modules/tests/test_dashboard_sdk.py b/dashboard/modules/tests/test_dashboard_sdk.py new file mode 100644 index 000000000000..d4ed85dff441 --- /dev/null +++ b/dashboard/modules/tests/test_dashboard_sdk.py @@ -0,0 +1,64 @@ +import pytest +import sys +import os +from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args + + +class TestParseRuntimeEnvArgs: + @pytest.mark.skipif( + sys.platform == "win32", reason="File path incorrect on Windows." + ) + def test_runtime_env_valid(self): + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml" + ) + assert parse_runtime_env_args(runtime_env=config_file_name) == { + "py_modules": ["pm1", "pm2"], + "working_dir": "wd", + } + + def test_runtime_env_json_valid(self): + runtime_env = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd"}' + assert parse_runtime_env_args(runtime_env_json=runtime_env) == { + "py_modules": ["pm1", "pm2"], + "working_dir": "wd", + } + + @pytest.mark.skipif( + sys.platform == "win32", reason="File path incorrect on Windows." + ) + def test_runtime_env_and_json(self): + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml" + ) + runtime_env_json = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd"}' + with pytest.raises(ValueError): + parse_runtime_env_args( + runtime_env=config_file_name, runtime_env_json=runtime_env_json + ) + + def test_working_dir_valid(self): + assert parse_runtime_env_args(working_dir="wd") == {"working_dir": "wd"} + + @pytest.mark.skipif( + sys.platform == "win32", reason="File path incorrect on Windows." + ) + def test_working_dir_override(self): + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml" + ) + assert parse_runtime_env_args( + runtime_env=config_file_name, working_dir="wd2" + ) == {"py_modules": ["pm1", "pm2"], "working_dir": "wd2"} + + runtime_env = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd2"}' + assert parse_runtime_env_args( + runtime_env_json=runtime_env, working_dir="wd2" + ) == {"py_modules": ["pm1", "pm2"], "working_dir": "wd2"} + + def test_all_none(self): + assert parse_runtime_env_args() == {} + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index 465bbec1cd6a..d4347bd30654 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -269,7 +269,7 @@ py_test( py_test( name = "test_cli", - size = "medium", + size = "large", srcs = serve_tests_srcs, tags = ["exclusive", "team:serve"], deps = [":serve_lib"], diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 2a231e42b54f..44732a84ac90 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -4,7 +4,6 @@ import os import sys import pathlib -import requests import click import time from typing import Tuple, List, Dict @@ -25,20 +24,11 @@ schema_to_serve_application, serve_application_status_to_schema, ) +from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args +from ray.dashboard.modules.serve.sdk import ServeSubmissionClient from ray.autoscaler._private.cli_logger import cli_logger -def log_failed_request(response: requests.models.Response, address: str): - error_message = ( - f"\nRequest to address {address} failed. Got response status code " - f"{response.status_code} with the following message:" - f"\n\n{response.text}" - ) - cli_logger.newline() - cli_logger.error(error_message) - cli_logger.newline() - - def process_args_and_kwargs( args_and_kwargs: Tuple[str], ) -> Tuple[List[str], Dict[str, str]]: @@ -99,14 +89,33 @@ def error(self, message): return args, args_and_kwargs +def configure_runtime_env(deployment: Deployment, updates: Dict): + """ + Overwrites deployment's runtime_env with fields in updates. Any fields in + deployment's runtime_env that aren't in updates stay the same. + """ + + if deployment.ray_actor_options is None: + deployment._ray_actor_options = {"runtime_env": updates} + elif "runtime_env" in deployment.ray_actor_options: + deployment.ray_actor_options["runtime_env"].update(updates) + else: + deployment.ray_actor_options["runtime_env"] = updates + + @click.group(help="[EXPERIMENTAL] CLI for managing Serve instances on a Ray cluster.") +def cli(): + pass + + +@cli.command(help="Start a detached Serve instance on the Ray cluster.") @click.option( "--address", "-a", default=os.environ.get("RAY_ADDRESS", "auto"), required=False, type=str, - help="Address of the running Ray cluster to connect to. " 'Defaults to "auto".', + help='Address of the running Ray cluster to connect to. Defaults to "auto".', ) @click.option( "--namespace", @@ -123,15 +132,6 @@ def error(self, message): type=str, help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."), ) -def cli(address, namespace, runtime_env_json): - ray.init( - address=address, - namespace=namespace, - runtime_env=json.loads(runtime_env_json), - ) - - -@cli.command(help="Start a detached Serve instance on the Ray cluster.") @click.option( "--http-host", default=DEFAULT_HTTP_HOST, @@ -160,7 +160,20 @@ def cli(address, namespace, runtime_env_json): type=str, hidden=True, ) -def start(http_host, http_port, http_location, checkpoint_path): +def start( + address, + namespace, + runtime_env_json, + http_host, + http_port, + http_location, + checkpoint_path, +): + ray.init( + address=address, + namespace=namespace, + runtime_env=json.loads(runtime_env_json), + ) serve.start( detached=True, http_options=dict( @@ -173,7 +186,27 @@ def start(http_host, http_port, http_location, checkpoint_path): @cli.command(help="Shutdown the running Serve instance on the Ray cluster.") -def shutdown(): +@click.option( + "--address", + "-a", + default=os.environ.get("RAY_ADDRESS", "auto"), + required=False, + type=str, + help='Address of the running Ray cluster to connect to. Defaults to "auto".', +) +@click.option( + "--namespace", + "-n", + default="serve", + required=False, + type=str, + help='Ray namespace to connect to. Defaults to "serve".', +) +def shutdown(address: str, namespace: str): + ray.init( + address=address, + namespace=namespace, + ) serve.api._connect() serve.shutdown() @@ -188,6 +221,29 @@ class may or may not be decorated with ``@serve.deployment``. hidden=True, ) @click.argument("deployment") +@click.option( + "--address", + "-a", + default=os.environ.get("RAY_ADDRESS", "auto"), + required=False, + type=str, + help='Address of the running Ray cluster to connect to. Defaults to "auto".', +) +@click.option( + "--namespace", + "-n", + default="serve", + required=False, + type=str, + help='Ray namespace to connect to. Defaults to "serve".', +) +@click.option( + "--runtime-env-json", + default=r"{}", + required=False, + type=str, + help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."), +) @click.option( "--options-json", default=r"{}", @@ -195,7 +251,18 @@ class may or may not be decorated with ``@serve.deployment``. type=str, help="JSON string for the deployments options", ) -def create_deployment(deployment: str, options_json: str): +def create_deployment( + address: str, + namespace: str, + runtime_env_json: str, + deployment: str, + options_json: str, +): + ray.init( + address=address, + namespace=namespace, + runtime_env=json.loads(runtime_env_json), + ) deployment_cls = import_attr(deployment) if not isinstance(deployment_cls, Deployment): deployment_cls = serve.deployment(deployment_cls) @@ -204,11 +271,17 @@ def create_deployment(deployment: str, options_json: str): @cli.command( - help=""" - [Experimental] Deploy a YAML configuration file via REST API to - your Serve cluster. - """, - hidden=True, + short_help="[Experimental] Deploy deployments from a YAML config file.", + help=( + "Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a " + "Serve YAML configuration file path or an import path to " + "a class or function to deploy.\n\n" + "Import paths must be of the form " + '"module.submodule_1...submodule_n.MyClassOrFunction".\n\n' + "Sends a nonblocking request. A successful response only indicates that the " + "request was received successfully. It does not mean the deployments are " + "live. Use `serve info` and `serve status` to check on them. " + ), ) @click.argument("config_file_name") @click.option( @@ -220,51 +293,111 @@ def create_deployment(deployment: str, options_json: str): help='Address of the Ray dashboard to query. For example, "http://localhost:8265".', ) def deploy(config_file_name: str, address: str): - full_address_path = f"{address}/api/serve/deployments/" with open(config_file_name, "r") as config_file: config = yaml.safe_load(config_file) - # Generate a schema using the config to ensure its format is valid + # Schematize config to validate format ServeApplicationSchema.parse_obj(config) - response = requests.put(full_address_path, json=config) + ServeSubmissionClient(address).deploy_application(config) - if response.status_code == 200: - cli_logger.newline() - cli_logger.success( - "\nSent deploy request successfully!\n " - "* Use `serve status` to check your deployments' statuses.\n " - "* Use `serve info` to see your running Serve " - "application's configuration.\n" - ) - cli_logger.newline() - else: - log_failed_request(response, address) + cli_logger.newline() + cli_logger.success( + "\nSent deploy request successfully!\n " + "* Use `serve status` to check your deployments' statuses.\n " + "* Use `serve info` to see your running Serve " + "application's configuration.\n" + ) + cli_logger.newline() @cli.command( - help="[Experimental] Run deployments via Serve's Python API.", - hidden=True, + short_help="[Experimental] Run deployments via Serve's Python API.", + help=( + "Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a " + "Serve YAML configuration file path or an import path to " + "a class or function to deploy.\n\n" + "The full command must be of the form:\n" + '"serve run [import path] [optional parameters] -- [arg-1] ... [arg-n] ' + '[kwarg-1]=[kwval-1] ... [kwarg-n]=[kwval-n]"\n\n' + "Deployments via import path may also take in init_args and " + "init_kwargs from any ARGS_AND_KWARGS passed in. Import paths must be " + "of the form:\n" + '"module.submodule_1...submodule_n.MyClassOrFunction".\n\n' + "Blocks after deploying, and logs status periodically. After being killed, " + "this command tears down all deployments it deployed. If there are no " + "deployments left, it also tears down the Serve application." + ), ) @click.argument("config_or_import_path") @click.argument("args_and_kwargs", required=False, nargs=-1) +@click.option( + "--runtime-env", + type=str, + default=None, + required=False, + help="Path to a local YAML file containing a runtime_env definition. " + "Overrides all runtime_envs specified in a config file.", +) +@click.option( + "--runtime-env-json", + type=str, + default=None, + required=False, + help="JSON-serialized runtime_env dictionary. Overrides all runtime_envs " + "specified in a config file.", +) +@click.option( + "--working-dir", + type=str, + default=None, + required=False, + help=( + "Directory containing files that your job will run in. Can be a " + "local directory or a remote URI to a .zip file (S3, GS, HTTP). " + "This overrides the working_dir in --runtime-env if both are " + "specified. Overrides all working_dirs specified in a config file." + ), +) +@click.option( + "--cluster-address", + "-c", + default="auto", + required=False, + type=str, + help=('Address of the Ray cluster to query. Defaults to "auto".'), +) +@click.option( + "--dashboard-address", + "-d", + default="http://localhost:8265", + required=False, + type=str, + help=( + 'Address of the Ray dashboard to query. Defaults to "http://localhost:8265".' + ), +) def run( config_or_import_path: str, args_and_kwargs: Tuple[str], + runtime_env: str, + runtime_env_json: str, + working_dir: str, + cluster_address: str, + dashboard_address: str, ): - """ - Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a - Serve YAML configuration file path or an import path to - a class or function to deploy. Import paths must be of the form - "module.submodule_1...submodule_n.MyClassOrFunction". - """ try: # Check if path provided is for config or import deployments = [] is_config = pathlib.Path(config_or_import_path).is_file() args, kwargs = process_args_and_kwargs(args_and_kwargs) + runtime_env_updates = parse_runtime_env_args( + runtime_env=runtime_env, + runtime_env_json=runtime_env_json, + working_dir=working_dir, + ) if is_config: config_path = config_or_import_path @@ -285,7 +418,14 @@ def run( schematized_config = ServeApplicationSchema.parse_obj(config) deployments = schema_to_serve_application(schematized_config) + ray.init(address=cluster_address, namespace="serve") serve.start(detached=True) + ServeSubmissionClient(dashboard_address)._upload_working_dir_if_needed( + runtime_env_updates + ) + + for deployment in deployments: + configure_runtime_env(deployment, runtime_env_updates) deploy_group(deployments) cli_logger.newline() @@ -310,7 +450,13 @@ def run( deployment = serve.deployment(name=deployment_name)(import_path) deployments = [deployment] + ray.init(address=cluster_address, namespace="serve") serve.start(detached=True) + ServeSubmissionClient(dashboard_address)._upload_working_dir_if_needed( + runtime_env_updates + ) + + configure_runtime_env(deployment, runtime_env_updates) deployment.options( init_args=args, init_kwargs=kwargs, @@ -340,8 +486,11 @@ def run( @cli.command( - help="[Experimental] Get info about your Serve application's config.", - hidden=True, + short_help="[Experimental] Get info about your Serve application's config.", + help=( + "Prints the configurations of all running deployments in the Serve " + "application." + ), ) @click.option( "--address", @@ -351,18 +500,32 @@ def run( type=str, help='Address of the Ray dashboard to query. For example, "http://localhost:8265".', ) -def info(address: str): - full_address_path = f"{address}/api/serve/deployments/" - response = requests.get(full_address_path) - if response.status_code == 200: - print(json.dumps(response.json(), indent=4)) - else: - log_failed_request(response, address) +@click.option( + "--json_format", + "-j", + is_flag=True, + help="Print info as json. If omitted, info is printed as YAML.", +) +def info(address: str, json_format=bool): + + app_info = ServeSubmissionClient(address).get_info() + if app_info is not None: + if json_format: + print(json.dumps(app_info, indent=4)) + else: + print(yaml.dump(app_info)) @cli.command( - help="[Experimental] Get your Serve application's status.", - hidden=True, + short_help="[Experimental] Get your Serve application's status.", + help=( + "Prints status information about all deployments in the Serve application.\n\n" + "Deployments may be:\n\n" + "- HEALTHY: all replicas are acting normally and passing their health checks.\n" + "- UNHEALTHY: at least one replica is not acting normally and may not be " + "passing its health check.\n" + "- UPDATING: the deployment is updating." + ), ) @click.option( "--address", @@ -373,17 +536,17 @@ def info(address: str): help='Address of the Ray dashboard to query. For example, "http://localhost:8265".', ) def status(address: str): - full_address_path = f"{address}/api/serve/deployments/status" - response = requests.get(full_address_path) - if response.status_code == 200: - print(json.dumps(response.json(), indent=4)) - else: - log_failed_request(response, address) + + app_status = ServeSubmissionClient(address).get_status() + if app_status is not None: + print(json.dumps(app_status, indent=4)) @cli.command( - help="[Experimental] Get info about your Serve application's config.", - hidden=True, + short_help=( + "[EXPERIMENTAL] Deletes all running deployments in the Serve application." + ), + help="Deletes all running deployments in the Serve application.", ) @click.option( "--address", @@ -395,6 +558,7 @@ def status(address: str): ) @click.option("--yes", "-y", is_flag=True, help="Bypass confirmation prompt.") def delete(address: str, yes: bool): + if not yes: click.confirm( f"\nThis will shutdown the Serve application at address " @@ -403,11 +567,8 @@ def delete(address: str, yes: bool): abort=True, ) - full_address_path = f"{address}/api/serve/deployments/" - response = requests.delete(full_address_path) - if response.status_code == 200: - cli_logger.newline() - cli_logger.success("\nSent delete request successfully!\n") - cli_logger.newline() - else: - log_failed_request(response, address) + ServeSubmissionClient(address).delete_application() + + cli_logger.newline() + cli_logger.success("\nSent delete request successfully!\n") + cli_logger.newline() diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index 5094fa244234..7a5f44f30eb3 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -12,7 +12,7 @@ from ray.tests.conftest import tmp_working_dir # noqa: F401, E501 from ray._private.test_utils import wait_for_condition from ray.dashboard.optional_utils import RAY_INTERNAL_DASHBOARD_NAMESPACE -from ray.serve.scripts import process_args_and_kwargs +from ray.serve.scripts import process_args_and_kwargs, configure_runtime_env def ping_endpoint(endpoint: str, params: str = ""): @@ -149,6 +149,72 @@ def test_empty_args_and_kwargs(self): assert kwargs == {} +class TestConfigureRuntimeEnv: + @serve.deployment + def f(): + pass + + @pytest.mark.parametrize("ray_actor_options", [None, {}]) + def test_empty_ray_actor_options(self, ray_actor_options): + runtime_env = { + "working_dir": "http://test.com", + "pip": ["requests", "pendulum==2.1.2"], + } + deployment = TestConfigureRuntimeEnv.f.options( + ray_actor_options=ray_actor_options + ) + configure_runtime_env(deployment, runtime_env) + assert deployment.ray_actor_options["runtime_env"] == runtime_env + + def test_overwrite_all_options(self): + old_runtime_env = { + "working_dir": "http://test.com", + "pip": ["requests", "pendulum==2.1.2"], + } + new_runtime_env = { + "working_dir": "http://new.com", + "pip": [], + "env_vars": {"test_var": "test"}, + } + deployment = TestConfigureRuntimeEnv.f.options( + ray_actor_options={"runtime_env": old_runtime_env} + ) + configure_runtime_env(deployment, new_runtime_env) + assert deployment.ray_actor_options["runtime_env"] == new_runtime_env + + def test_overwrite_some_options(self): + old_runtime_env = { + "working_dir": "http://new.com", + "pip": [], + "env_vars": {"test_var": "test"}, + } + new_runtime_env = { + "working_dir": "http://test.com", + "pip": ["requests", "pendulum==2.1.2"], + } + merged_env = { + "working_dir": "http://test.com", + "pip": ["requests", "pendulum==2.1.2"], + "env_vars": {"test_var": "test"}, + } + deployment = TestConfigureRuntimeEnv.f.options( + ray_actor_options={"runtime_env": old_runtime_env} + ) + configure_runtime_env(deployment, new_runtime_env) + assert deployment.ray_actor_options["runtime_env"] == merged_env + + def test_overwrite_no_options(self): + runtime_env = { + "working_dir": "http://test.com", + "pip": ["requests", "pendulum==2.1.2"], + } + deployment = TestConfigureRuntimeEnv.f.options( + ray_actor_options={"runtime_env": runtime_env} + ) + configure_runtime_env(deployment, {}) + assert deployment.ray_actor_options["runtime_env"] == runtime_env + + def test_start_shutdown(ray_start_stop): with pytest.raises(subprocess.CalledProcessError): subprocess.check_output(["serve", "shutdown"]) @@ -159,10 +225,10 @@ def test_start_shutdown(ray_start_stop): def test_start_shutdown_in_namespace(ray_start_stop): with pytest.raises(subprocess.CalledProcessError): - subprocess.check_output(["serve", "-n", "test", "shutdown"]) + subprocess.check_output(["serve", "shutdown", "-n", "test"]) - subprocess.check_output(["serve", "-n", "test", "start"]) - subprocess.check_output(["serve", "-n", "test", "shutdown"]) + subprocess.check_output(["serve", "start", "-n", "test"]) + subprocess.check_output(["serve", "shutdown", "-n", "test"]) class A: @@ -195,14 +261,14 @@ def test_create_deployment(ray_start_stop, tmp_working_dir, class_name): # noqa subprocess.check_output( [ "serve", + "create-deployment", + f"ray.serve.tests.test_cli.{class_name}", "--runtime-env-json", json.dumps( { "working_dir": tmp_working_dir, } ), - "create-deployment", - f"ray.serve.tests.test_cli.{class_name}", "--options-json", json.dumps( { @@ -334,7 +400,7 @@ def test_info(ray_start_stop): deploy_response = subprocess.check_output(["serve", "deploy", config_file_name]) assert success_message_fragment in deploy_response - info_response = subprocess.check_output(["serve", "info"]).decode("utf-8") + info_response = subprocess.check_output(["serve", "info", "-j"]).decode("utf-8") info = json.loads(info_response) assert "deployments" in info @@ -404,7 +470,7 @@ def test_delete(ray_start_stop): # Deploys a config file and deletes it def get_num_deployments(): - info_response = subprocess.check_output(["serve", "info"]) + info_response = subprocess.check_output(["serve", "info", "-j"]) info = json.loads(info_response) return len(info["deployments"]) @@ -561,5 +627,81 @@ def test_run_simultaneous(ray_start_stop): assert ping_endpoint("Macaw") == "connection error" +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_run_runtime_env(ray_start_stop): + # Tests serve run with runtime_envs specified + + # Use local working_dir with import path + p = subprocess.Popen( + [ + "serve", + "run", + "test_cli.Macaw", + "--working-dir", + os.path.dirname(__file__), + "--", + "green", + "--name=Molly", + ] + ) + wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=10) + p.send_signal(signal.SIGINT) + p.wait() + + # Use local working_dir with config file + p = subprocess.Popen( + [ + "serve", + "run", + os.path.join( + os.path.dirname(__file__), "test_config_files", "scarlet.yaml" + ), + "--working-dir", + os.path.dirname(__file__), + ] + ) + wait_for_condition( + lambda: ping_endpoint("Scarlet") == "Scarlet is red!", timeout=10 + ) + p.send_signal(signal.SIGINT) + p.wait() + + # Use remote working_dir + p = subprocess.Popen( + [ + "serve", + "run", + "test_module.test.one", + "--working-dir", + "https://github.com/shrekris-anyscale/test_module/archive/HEAD.zip", + ] + ) + wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10) + p.send_signal(signal.SIGINT) + p.wait() + + # Use runtime env + p = subprocess.Popen( + [ + "serve", + "run", + os.path.join( + os.path.dirname(__file__), "test_config_files", "fake_runtime_env.yaml" + ), + "--runtime-env-json", + ( + '{"py_modules": ["https://github.com/shrekris-anyscale/' + 'test_deploy_group/archive/HEAD.zip"],' + '"working_dir": "http://nonexistentlink-q490123950ni34t"}' + ), + "--working-dir", + "https://github.com/shrekris-anyscale/test_module/archive/HEAD.zip", + ] + ) + wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10) + p.send_signal(signal.SIGINT) + p.wait() + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_config_files/fake_runtime_env.yaml b/python/ray/serve/tests/test_config_files/fake_runtime_env.yaml new file mode 100644 index 000000000000..21fa96189f34 --- /dev/null +++ b/python/ray/serve/tests/test_config_files/fake_runtime_env.yaml @@ -0,0 +1,21 @@ +deployments: + + - name: one + init_args: null + init_kwargs: null + import_path: "test_module.test.one" + num_replicas: 2 + route_prefix: "/one" + max_concurrent_queries: null + user_config: null + autoscaling_config: null + graceful_shutdown_wait_loop_s: null + graceful_shutdown_timeout_s: null + health_check_period_s: null + health_check_timeout_s: null + ray_actor_options: + runtime_env: + py_modules: + - "https://fakemodule1.com" + - "https://fakemodule2.com" + working_dir: "https://fakewd1.com" diff --git a/python/ray/serve/tests/test_config_files/scarlet.yaml b/python/ray/serve/tests/test_config_files/scarlet.yaml new file mode 100644 index 000000000000..b2b13e013776 --- /dev/null +++ b/python/ray/serve/tests/test_config_files/scarlet.yaml @@ -0,0 +1,18 @@ +deployments: + + - name: Macaw + init_args: + - "red" + init_kwargs: + name: "Scarlet" + import_path: "test_cli.Macaw" + num_replicas: 1 + route_prefix: "/Scarlet" + max_concurrent_queries: null + user_config: null + autoscaling_config: null + graceful_shutdown_wait_loop_s: null + graceful_shutdown_timeout_s: null + health_check_period_s: null + health_check_timeout_s: null + ray_actor_options: null