From 55a0f7bb2db941d8c6ff93f55e4b3193f404ddf0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 23 Jul 2022 14:27:22 -0400 Subject: [PATCH] [core] ray.init defaults to an existing Ray instance if there is one (#26678) ray.init() will currently start a new Ray instance even if one is already existing, which is very confusing if you are a new user trying to go from local development to a cluster. This PR changes it so that, when no address is specified, we first try to find an existing Ray cluster that was created through `ray start`. If none is found, we will start a new one. This makes two changes to the ray.init() resolution order: 1. When `ray start` is called, the started cluster address was already written to a file called `/tmp/ray/ray_current_cluster`. For ray.init() and ray.init(address="auto"), we will first check this local file for an existing cluster address. The file is deleted on `ray stop`. If the file is empty, autodetect any running cluster (legacy behavior) if address="auto", or we will start a new local Ray instance if address=None. 2. When ray.init(address="local") is called, we will create a new local Ray instance, even if one is already existing. This behavior seems to be necessary mainly for `ray.client` use cases. This also surfaces the logs about which Ray instance we are connecting to. Previously these were hidden because we didn't set up the log until after connecting to Ray. So now Ray will log one of the following messages during ray.init: ``` (Connecting to existing Ray cluster at address: ...) ...connection... (Started a local Ray cluster.| Connected to Ray Cluster.)( View the dashboard at ) ``` Note that this changes the dashboard URL to be printed with `ray.init()` instead of when the dashboard is first started. Co-authored-by: Eric Liang --- dashboard/modules/dashboard_sdk.py | 6 +- dashboard/modules/job/job_manager.py | 5 +- .../modules/job/tests/test_job_manager.py | 5 +- doc/source/cluster/cloud.rst | 22 +-- doc/source/cluster/commands.rst | 2 +- doc/source/cluster/kuberay/quickstart.md | 2 +- doc/source/cluster/quickstart.rst | 11 +- .../data/examples/big_data_ingestion.ipynb | 2 +- doc/source/ray-core/starting-ray.rst | 20 ++- doc/source/ray-core/walkthrough.rst | 3 +- doc/source/ray-observability/ray-logging.rst | 2 +- doc/source/ray-observability/ray-metrics.rst | 2 +- doc/source/ray-observability/ray-tracing.rst | 2 +- doc/source/serve/deploying-serve.md | 4 +- python/ray/_private/internal_api.py | 2 +- python/ray/_private/services.py | 169 ++++++++++-------- python/ray/_private/utils.py | 46 ++++- python/ray/_private/worker.py | 99 ++++++---- python/ray/client_builder.py | 16 +- python/ray/cluster_utils.py | 6 + python/ray/experimental/state/state_cli.py | 2 +- python/ray/scripts/scripts.py | 58 +++--- python/ray/tests/conftest.py | 14 +- python/ray/tests/test_advanced_8.py | 9 +- python/ray/tests/test_basic_5.py | 2 +- python/ray/tests/test_client_builder.py | 18 +- python/ray/tests/test_exit_observability.py | 2 +- python/ray/tests/test_get_or_create_actor.py | 2 +- python/ray/tests/test_multiprocessing.py | 2 +- python/ray/tests/test_output.py | 42 +++-- python/ray/tests/test_ray_init.py | 61 +++++++ python/ray/util/client/server/server.py | 4 +- python/ray/util/multiprocessing/pool.py | 17 +- python/ray/workflow/tests/test_recovery.py | 12 +- 34 files changed, 438 insertions(+), 233 deletions(-) diff --git a/dashboard/modules/dashboard_sdk.py b/dashboard/modules/dashboard_sdk.py index 1423ee42e19e..938d294ff868 100644 --- a/dashboard/modules/dashboard_sdk.py +++ b/dashboard/modules/dashboard_sdk.py @@ -126,7 +126,11 @@ def parse_cluster_info( headers: Optional[Dict[str, Any]] = None, ) -> ClusterInfo: if address is None: - if ray.is_initialized(): + if ( + ray.is_initialized() + and ray._private.worker.global_worker.node.address_info["webui_url"] + is not None + ): address = ( "http://" f"{ray._private.worker.global_worker.node.address_info['webui_url']}" diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index d27492881367..7b751f6369d9 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -174,7 +174,10 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen: def _get_driver_env_vars(self) -> Dict[str, str]: """Returns environment variables that should be set in the driver.""" - ray_addr = ray._private.services.find_bootstrap_address().pop() + ray_addr = ray._private.services.canonicalize_bootstrap_address_or_die( + "auto", ray.worker._global_node._ray_params.temp_dir + ) + assert ray_addr is not None return { # Set JobConfig for the child process (runtime_env, metadata). RAY_JOB_CONFIG_JSON_ENV_VAR: json.dumps( diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 2d5d920324e1..ed17b058ac65 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -349,9 +349,10 @@ async def test_env_var_and_driver_job_config_warning(self, job_manager): check_job_succeeded, job_manager=job_manager, job_id=job_id ) logs = job_manager.get_job_logs(job_id) - assert logs.startswith( - "Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) " "are provided" + token = ( + "Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) are provided" ) + assert token in logs, logs assert "JOB_1_VAR" in logs async def test_failed_runtime_env_validation(self, job_manager): diff --git a/doc/source/cluster/cloud.rst b/doc/source/cluster/cloud.rst index 17923ba25781..fd2bf8b4969f 100644 --- a/doc/source/cluster/cloud.rst +++ b/doc/source/cluster/cloud.rst @@ -40,7 +40,7 @@ Ray with cloud providers # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/aws/example-full.yaml - $ # Try running a Ray program with 'ray.init(address="auto")'. + $ # Try running a Ray program. # Tear down the cluster. $ ray down ray/python/ray/autoscaler/aws/example-full.yaml @@ -68,7 +68,7 @@ Ray with cloud providers # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/azure/example-full.yaml # test ray setup - $ python -c 'import ray; ray.init(address="auto")' + $ python -c 'import ray; ray.init()' $ exit # Tear down the cluster. $ ray down ray/python/ray/autoscaler/azure/example-full.yaml @@ -89,7 +89,7 @@ Ray with cloud providers .. code-block:: python import ray - ray.init(address='auto') + ray.init() Note that on each node the `azure-init.sh `_ script is executed and performs the following actions: @@ -117,7 +117,7 @@ Ray with cloud providers # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/gcp/example-full.yaml - $ # Try running a Ray program with 'ray.init(address="auto")'. + $ # Try running a Ray program with 'ray.init()'. # Tear down the cluster. $ ray down ray/python/ray/autoscaler/gcp/example-full.yaml @@ -142,7 +142,7 @@ Ray with cloud providers # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/aliyun/example-full.yaml - $ # Try running a Ray program with 'ray.init(address="auto")'. + $ # Try running a Ray program with 'ray.init()'. # Tear down the cluster. $ ray down ray/python/ray/autoscaler/aliyun/example-full.yaml @@ -203,7 +203,7 @@ There are two ways of running private clusters: # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/local/example-full.yaml - $ # Try running a Ray program with 'ray.init(address="auto")'. + $ # Try running a Ray program with 'ray.init()'. # Tear down the cluster $ ray down ray/python/ray/autoscaler/local/example-full.yaml @@ -237,7 +237,7 @@ There are two ways of running private clusters: # Get a remote screen on the head node. $ ray attach ray/python/ray/autoscaler/local/example-full.yaml - $ # Try running a Ray program with 'ray.init(address="auto")'. + $ # Try running a Ray program with 'ray.init()'. # Tear down the cluster $ ray down ray/python/ray/autoscaler/local/example-full.yaml @@ -313,7 +313,7 @@ firewall settings preventing access). If you see ``Ray runtime started.``, then the node successfully connected to the head node at the ``--address``. You should now be able to connect to the -cluster with ``ray.init(address='auto')``. +cluster with ``ray.init()``. .. code-block:: bash @@ -375,11 +375,13 @@ To run a distributed Ray program, you'll need to execute your program on the sam .. tabbed:: Python - Within your program/script, you must call ``ray.init`` and add the ``address`` parameter to ``ray.init`` (like ``ray.init(address=...)``). This causes your script to connect to the existing Ray runtime on the cluster. For example: + Within your program/script, ``ray.init()`` will now automatically find and connect to the latest Ray cluster. + For example: .. code-block:: python - ray.init(address="auto") + ray.init() + # Connecting to existing Ray cluster at address: ... .. tabbed:: Java diff --git a/doc/source/cluster/commands.rst b/doc/source/cluster/commands.rst index 8f2bb09d4a41..5482d18bb0aa 100644 --- a/doc/source/cluster/commands.rst +++ b/doc/source/cluster/commands.rst @@ -84,7 +84,7 @@ See :ref:`the documentation ` for ``ray up``. Running shell commands on the cluster (``ray exec``) ---------------------------------------------------- -You can use ``ray exec`` to conveniently run commands on clusters. Note that python scripts that you want to scale should connect to Ray via ``ray.init(address="auto")``. See :ref:`the documentation ` for ``ray exec``. +You can use ``ray exec`` to conveniently run commands on clusters. See :ref:`the documentation ` for ``ray exec``. .. code-block:: shell diff --git a/doc/source/cluster/kuberay/quickstart.md b/doc/source/cluster/kuberay/quickstart.md index 91645bd94652..242861e9a17a 100644 --- a/doc/source/cluster/kuberay/quickstart.md +++ b/doc/source/cluster/kuberay/quickstart.md @@ -50,7 +50,7 @@ In the Python interpreter, run the following snippet to scale up the cluster: ```python import ray.autoscaler.sdk -ray.init("auto") +ray.init() ray.autoscaler.sdk.request_resources(num_cpus=4) ``` diff --git a/doc/source/cluster/quickstart.rst b/doc/source/cluster/quickstart.rst index 26b0f3dd5f62..45b34a220095 100644 --- a/doc/source/cluster/quickstart.rst +++ b/doc/source/cluster/quickstart.rst @@ -214,14 +214,7 @@ Run the application in the cloud -------------------------------- We are now ready to execute the application in across multiple machines on our Ray cloud cluster. -First, we need to edit the initialization command ``ray.init()`` in ``script.py``. -Change it to - -.. code-block:: python - - ray.init(address='auto') - -This tells your script to connect to the Ray runtime on the remote cluster instead of initializing a new Ray runtime. +``ray.init()`` will now automatically connect to the newly created cluster. Next, run the following command: @@ -233,6 +226,8 @@ The output should now look similar to the following: .. parsed-literal:: + Connecting to existing Ray cluster at address: ... + This cluster consists of 3 nodes in total 6.0 CPU resources in total diff --git a/doc/source/data/examples/big_data_ingestion.ipynb b/doc/source/data/examples/big_data_ingestion.ipynb index 797e293e57a5..279da331dd24 100644 --- a/doc/source/data/examples/big_data_ingestion.ipynb +++ b/doc/source/data/examples/big_data_ingestion.ipynb @@ -306,7 +306,7 @@ " TOTAL_NUM_NODES = 70 + 16 + 1\n", "\n", " # use the AWS cluster we just set up.\n", - " ray.init(address=\"auto\")\n", + " ray.init()\n", "\n", " # waiting for cluster nodes to come up.\n", " while len(ray.nodes()) < TOTAL_NUM_NODES:\n", diff --git a/doc/source/ray-core/starting-ray.rst b/doc/source/ray-core/starting-ray.rst index 8f9b5bcbe824..2a2ff766c210 100644 --- a/doc/source/ray-core/starting-ray.rst +++ b/doc/source/ray-core/starting-ray.rst @@ -18,12 +18,18 @@ There are three ways of starting the Ray runtime: * Explicitly via CLI (:ref:`start-ray-cli`) * Explicitly via the cluster launcher (:ref:`start-ray-up`) +In all cases, ``ray.init()`` will try to automatically find a Ray instance to +connect to. It checks, in order: +1. The ``RAY_ADDRESS`` OS environment variable. +2. The concrete address passed to ``ray.init(address=
)``. +3. If no address is provided, the latest Ray instance that was started on the same machine using ``ray start``. + .. _start-ray-init: Starting Ray on a single machine -------------------------------- -Calling ``ray.init()`` (without any ``address`` args) starts a Ray runtime on your laptop/machine. This laptop/machine becomes the "head node". +Calling ``ray.init()`` starts a local Ray instance on your laptop/machine. This laptop/machine becomes the "head node". .. note:: @@ -161,15 +167,15 @@ Use ``ray start`` from the CLI to start a 1 node ray runtime on a machine. This ... -You can connect to this Ray runtime by starting a driver process on the same node as where you ran ``ray start``: +You can connect to this Ray instance by starting a driver process on the same node as where you ran ``ray start``. +``ray.init()`` will now automatically connect to the latest Ray instance. .. tabbed:: Python .. code-block:: python - # This must import ray - ray.init(address='auto') + ray.init() .. tabbed:: java @@ -207,7 +213,7 @@ You can connect to this Ray runtime by starting a driver process on the same nod RAY_ADDRESS=
./ -You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init(address="auto")`` on any of the cluster machines will connect to the ray cluster. +You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init()`` on any of the cluster machines will connect to the same Ray cluster. .. _start-ray-up: @@ -219,11 +225,11 @@ The ``ray up`` command uses the Ray cluster launcher to start a cluster on the c Your code **only** needs to execute on one machine in the cluster (usually the head node). Read more about :ref:`running programs on a Ray cluster `. -To connect to the existing cluster, similar to the method outlined in :ref:`start-ray-cli`, you must call ``ray.init`` and specify the address of the Ray cluster when initializing Ray in your code. This allows your script to connect to the existing Ray runtime on the cluster. +To connect to the Ray cluster, call ``ray.init`` from one of the machines in the cluster. This will connect to the latest Ray cluster: .. code-block:: python - ray.init(address="auto") + ray.init() Note that the machine calling ``ray up`` will not be considered as part of the Ray cluster, and therefore calling ``ray.init`` on that same machine will not attach to the cluster. diff --git a/doc/source/ray-core/walkthrough.rst b/doc/source/ray-core/walkthrough.rst index 76ced246adef..6159383f5fc9 100644 --- a/doc/source/ray-core/walkthrough.rst +++ b/doc/source/ray-core/walkthrough.rst @@ -77,8 +77,7 @@ You can start Ray on a single machine by adding this to your code. import ray - # Start Ray. If you're connecting to an existing cluster, you would use - # ray.init(address=) instead. + # Start Ray. ray.init() ... diff --git a/doc/source/ray-observability/ray-logging.rst b/doc/source/ray-observability/ray-logging.rst index 1f184a7952ff..dac1eec0d443 100644 --- a/doc/source/ray-observability/ray-logging.rst +++ b/doc/source/ray-observability/ray-logging.rst @@ -6,7 +6,7 @@ This document will explain Ray's logging system and its best practices. Driver logs ~~~~~~~~~~~ -An entry point of Ray applications that calls ``ray.init(address='auto')`` or ``ray.init()`` is called a driver. +An entry point of Ray applications that calls ``ray.init()`` is called a driver. All the driver logs are handled in the same way as normal Python programs. Worker logs diff --git a/doc/source/ray-observability/ray-metrics.rst b/doc/source/ray-observability/ray-metrics.rst index dd67ed6a359a..69f173ed4377 100644 --- a/doc/source/ray-observability/ray-metrics.rst +++ b/doc/source/ray-observability/ray-metrics.rst @@ -89,7 +89,7 @@ You can now get the url of metrics agents using `ray.nodes()` # In a head node, import ray - ray.init(address='auto') + ray.init() from pprint import pprint pprint(ray.nodes()) diff --git a/doc/source/ray-observability/ray-tracing.rst b/doc/source/ray-observability/ray-tracing.rst index ab5797c38dd8..45185b3e57f1 100644 --- a/doc/source/ray-observability/ray-tracing.rst +++ b/doc/source/ray-observability/ray-tracing.rst @@ -58,7 +58,7 @@ For open-source users who want to experiment with tracing, Ray has a default tra $ ray start --head --tracing-startup-hook=ray.util.tracing.setup_local_tmp_tracing:setup_tracing $ python - >>> ray.init(address="auto") + >>> ray.init() >>> @ray.remote def my_function(): return 1 diff --git a/doc/source/serve/deploying-serve.md b/doc/source/serve/deploying-serve.md index 6a7e02e34984..f5c42afa020d 100644 --- a/doc/source/serve/deploying-serve.md +++ b/doc/source/serve/deploying-serve.md @@ -19,8 +19,8 @@ Ray Serve instances run on top of Ray clusters and are started using {mod}`serve Once {mod}`serve.start ` has been called, further API calls can be used to create and update the deployments that will be used to serve your Python code (including ML models). The Serve instance will be torn down when the script exits. -When running on a long-lived Ray cluster (e.g., one started using `ray start` and connected -to using `ray.init(address="auto")`, you can also deploy a Ray Serve instance as a long-running +When running on a long-lived Ray cluster (e.g., one started using `ray start`), +you can also deploy a Ray Serve instance as a long-running service using `serve.start(detached=True)`. In this case, the Serve instance will continue to run on the Ray cluster even after the script that calls it exits. If you want to run another script to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster. diff --git a/python/ray/_private/internal_api.py b/python/ray/_private/internal_api.py index c64733d73b53..27f10a5bf642 100644 --- a/python/ray/_private/internal_api.py +++ b/python/ray/_private/internal_api.py @@ -30,7 +30,7 @@ def memory_summary( ): from ray.dashboard.memory_utils import memory_summary - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) state = GlobalState() options = GcsClientOptions.from_gcs_address(address) diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 398c1ee8a90f..56eba263c425 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -18,8 +18,7 @@ from pathlib import Path from typing import List, Optional -# Import psutil and colorama after ray so the packaged version is used. -import colorama +# Import psutil after ray so the packaged version is used. import psutil # Ray modules @@ -290,73 +289,64 @@ def _find_address_from_flag(flag: str): return addresses -def find_redis_address(): - return _find_address_from_flag("--redis-address") - - -def find_gcs_address(): +def find_gcs_addresses(): + """Finds any local GCS processes based on grepping ps.""" return _find_address_from_flag("--gcs-address") -def find_bootstrap_address(): - return find_gcs_address() - +def find_bootstrap_address(temp_dir: Optional[str]): + """Finds the latest Ray cluster address to connect to, if any. This is the + GCS address connected to by the last successful `ray start`.""" + return ray._private.utils.read_ray_address(temp_dir) -def _find_redis_address_or_die(): - """Finds one Redis address unambiguously, or raise an error. - - Callers outside this module should use - get_ray_address_from_environment() or canonicalize_bootstrap_address() - """ - redis_addresses = find_redis_address() - if len(redis_addresses) > 1: - raise ConnectionError( - f"Found multiple active Ray instances: {redis_addresses}. " - "Please specify the one to connect to by setting `address`." - ) - sys.exit(1) - elif not redis_addresses: - raise ConnectionError( - "Could not find any running Ray instance. " - "Please specify the one to connect to by setting `address`." - ) - return redis_addresses.pop() +def get_ray_address_from_environment(addr: str, temp_dir: Optional[str]): + """Attempts to find the address of Ray cluster to use, in this order: -def _find_gcs_address_or_die(): - """Find one GCS address unambiguously, or raise an error. + 1. Use RAY_ADDRESS if defined. + 2. If no address is provided or the provided address is "auto", use the + address in /tmp/ray/ray_current_cluster if available. This will error if + the specified address is None and there is no address found. For "auto", + we will fallback to connecting to any detected Ray cluster (legacy). + 3. Otherwise, use the provided address. - Callers outside of this module should use get_ray_address_to_use_or_die() + Returns: + A string to pass into `ray.init(address=...)`, e.g. ip:port, `auto`. """ - gcs_addresses = _find_address_from_flag("--gcs-address") - if len(gcs_addresses) > 1: - raise ConnectionError( - f"Found multiple active Ray instances: {gcs_addresses}. " - "Please specify the one to connect to by setting `--address` flag " - "or `RAY_ADDRESS` environment variable." - ) - sys.exit(1) - elif not gcs_addresses: - raise ConnectionError( - "Could not find any running Ray instance. " - "Please specify the one to connect to by setting `--address` flag " + env_addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE) + if env_addr is not None: + addr = env_addr + + if addr is not None and addr != "auto": + return addr + # We should try to automatically find an active local instance. + gcs_addrs = find_gcs_addresses() + bootstrap_addr = find_bootstrap_address(temp_dir) + + if len(gcs_addrs) > 1 and bootstrap_addr is not None: + logger.warning( + f"Found multiple active Ray instances: {gcs_addrs}. " + f"Connecting to latest cluster at {bootstrap_addr}. " + "You can override this by setting the `--address` flag " "or `RAY_ADDRESS` environment variable." ) - return gcs_addresses.pop() + elif len(gcs_addrs) > 0 and addr == "auto": + # Preserve legacy "auto" behavior of connecting to any cluster, even if not + # started with ray start. However if addr is None, we will raise an error. + bootstrap_addr = list(gcs_addrs).pop() + + if bootstrap_addr is None: + if addr is None: + # Caller should start a new instance. + return None + else: + raise ConnectionError( + "Could not find any running Ray instance. " + "Please specify the one to connect to by setting `--address` flag " + "or `RAY_ADDRESS` environment variable." + ) - -def get_ray_address_from_environment(): - """ - Attempts to find the address of Ray cluster to use, first from - RAY_ADDRESS environment variable, then from the local Raylet. - - Returns: - A string to pass into `ray.init(address=...)`, e.g. ip:port, `auto`. - """ - addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE) - if addr is None or addr == "auto": - addr = _find_gcs_address_or_die() - return addr + return bootstrap_addr def wait_for_node( @@ -445,7 +435,9 @@ def remaining_processes_alive(): return ray._private.worker._global_node.remaining_processes_alive() -def canonicalize_bootstrap_address(addr: str): +def canonicalize_bootstrap_address( + addr: str, temp_dir: Optional[str] = None +) -> Optional[str]: """Canonicalizes Ray cluster bootstrap address to host:port. Reads address from the environment if needed. @@ -453,10 +445,13 @@ def canonicalize_bootstrap_address(addr: str): via ray.init() or `--address` flags, before using the address to connect. Returns: - Ray cluster address string in format. + Ray cluster address string in format or None if the caller + should start a local Ray instance. """ if addr is None or addr == "auto": - addr = get_ray_address_from_environment() + addr = get_ray_address_from_environment(addr, temp_dir) + if addr is None or addr == "local": + return None try: bootstrap_address = resolve_ip_for_localhost(addr) except Exception: @@ -465,6 +460,45 @@ def canonicalize_bootstrap_address(addr: str): return bootstrap_address +def canonicalize_bootstrap_address_or_die( + addr: str, temp_dir: Optional[str] = None +) -> str: + """Canonicalizes Ray cluster bootstrap address to host:port. + + This function should be used when the caller expects there to be an active + and local Ray instance. If no address is provided or address="auto", this + will autodetect the latest Ray instance created with `ray start`. + + For convenience, if no address can be autodetected, this function will also + look for any running local GCS processes, based on pgrep output. This is to + allow easier use of Ray CLIs when debugging a local Ray instance (whose GCS + addresses are not recorded). + + Returns: + Ray cluster address string in format. Throws a + ConnectionError if zero or multiple active Ray instances are + autodetected. + """ + bootstrap_addr = canonicalize_bootstrap_address(addr, temp_dir=temp_dir) + if bootstrap_addr is not None: + return bootstrap_addr + + running_gcs_addresses = find_gcs_addresses() + if len(running_gcs_addresses) == 0: + raise ConnectionError( + "Could not find any running Ray instance. " + "Please specify the one to connect to by setting the `--address` " + "flag or `RAY_ADDRESS` environment variable." + ) + if len(running_gcs_addresses) > 1: + raise ConnectionError( + f"Found multiple active Ray instances: {running_gcs_addresses}. " + "Please specify the one to connect to by setting the `--address` " + "flag or `RAY_ADDRESS` environment variable." + ) + return running_gcs_addresses.pop() + + def extract_ip_port(bootstrap_address: str): if ":" not in bootstrap_address: raise ValueError( @@ -570,7 +604,7 @@ def create_redis_client(redis_address, password=None): cli = create_redis_client.instances.get(redis_address) if cli is None: redis_ip_address, redis_port = extract_ip_port( - canonicalize_bootstrap_address(redis_address) + canonicalize_bootstrap_address_or_die(redis_address) ) cli = redis.StrictRedis( host=redis_ip_address, port=int(redis_port), password=password @@ -1473,16 +1507,7 @@ def start_dashboard( else: raise Exception(err_msg) - if not minimal: - logger.info( - "View the Ray dashboard at %s%shttp://%s%s%s", - colorama.Style.BRIGHT, - colorama.Fore.GREEN, - dashboard_url, - colorama.Fore.RESET, - colorama.Style.NORMAL, - ) - else: + if minimal: # If it is the minimal installation, the web url (dashboard url) # shouldn't be configured because it doesn't start a server. dashboard_url = "" diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index a6134aaa1eee..762b90553bea 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -80,6 +80,48 @@ def get_ray_temp_dir(): return os.path.join(get_user_temp_dir(), "ray") +def get_ray_address_file(temp_dir: Optional[str]): + if temp_dir is None: + temp_dir = get_ray_temp_dir() + return os.path.join(temp_dir, "ray_current_cluster") + + +def write_ray_address(ray_address: str, temp_dir: Optional[str] = None): + address_file = get_ray_address_file(temp_dir) + if os.path.exists(address_file): + with open(address_file, "r") as f: + prev_address = f.read() + if prev_address == ray_address: + return + + logger.info( + f"Overwriting previous Ray address ({prev_address}). " + "Running ray.init() on this node will now connect to the new " + f"instance at {ray_address}. To override this behavior, pass " + f"address={prev_address} to ray.init()." + ) + + with open(address_file, "w+") as f: + f.write(ray_address) + + +def reset_ray_address(temp_dir: Optional[str] = None): + address_file = get_ray_address_file(temp_dir) + if os.path.exists(address_file): + try: + os.remove(address_file) + except OSError: + pass + + +def read_ray_address(temp_dir: Optional[str] = None) -> str: + address_file = get_ray_address_file(temp_dir) + if not os.path.exists(address_file): + return None + with open(address_file, "r") as f: + return f.read().strip() + + def _random_string(): id_hash = hashlib.shake_128() id_hash.update(uuid.uuid4().bytes) @@ -1257,7 +1299,7 @@ def internal_kv_list_with_retry(gcs_client, prefix, namespace, num_retries=20): logger.debug(f"Fetched {prefix}=None from KV. Retrying.") time.sleep(2) if result is None: - raise RuntimeError( + raise ConnectionError( f"Could not list '{prefix}' from GCS. Did GCS start successfully?" ) return result @@ -1291,7 +1333,7 @@ def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20): logger.debug(f"Fetched {key}=None from KV. Retrying.") time.sleep(2) if not result: - raise RuntimeError( + raise ConnectionError( f"Could not read '{key.decode()}' from GCS. Did GCS start successfully?" ) return result diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 8ec431f3777e..21fbbd51fc9c 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1029,14 +1029,16 @@ def init( just attach this driver to it or we start all of the processes associated with a Ray cluster and attach to the newly started cluster. - To start Ray locally and all of the relevant processes, use this as - follows: + In most cases, it is enough to just call this method with no arguments. + This will autodetect an existing Ray cluster or start a new Ray instance if + no existing cluster is found: .. code-block:: python ray.init() - To connect to an existing local cluster, use this as follows. + To explicitly connect to an existing local cluster, use this as follows. A + ConnectionError will be thrown if no existing local cluster is found. .. code-block:: python @@ -1058,19 +1060,25 @@ def init( cluster with ray.init() or ray.init(address="auto"). Args: - address: The address of the Ray cluster to connect to. If - this address is not provided, then this command will start Redis, - a raylet, a plasma store, a plasma manager, and some workers. - It will also kill these processes when Python exits. If the driver - is running on a node in a Ray cluster, using `auto` as the value - tells the driver to detect the cluster, removing the need to - specify a specific node address. If the environment variable - `RAY_ADDRESS` is defined and the address is None or "auto", Ray - will set `address` to `RAY_ADDRESS`. - Addresses can be prefixed with a "ray://" to connect to a remote - cluster. For example, passing in the address - "ray://123.45.67.89:50005" will connect to the cluster at the - given address. + address: The address of the Ray cluster to connect to. The provided + address is resolved as follows: + 1. If a concrete address (e.g., localhost:) is provided, try to + connect to it. Concrete addresses can be prefixed with "ray://" to + connect to a remote cluster. For example, passing in the address + "ray://123.45.67.89:50005" will connect to the cluster at the given + address. + 2. If no address is provided, try to find an existing Ray instance + to connect to. This is done by first checking the environment + variable `RAY_ADDRESS`. If this is not defined, check the address + of the latest cluster started (found in + /tmp/ray/ray_current_cluster) if available. If this is also empty, + then start a new local Ray instance. + 3. If the provided address is "auto", then follow the same process + as above. However, if there is no existing cluster found, this will + throw a ConnectionError instead of starting a new local Ray + instance. + 4. If the provided address is "local", start a new local Ray + instance, even if there is already an existing local Ray instance. num_cpus: Number of CPUs the user wishes to assign to each raylet. By default, this is set based on virtual cores. num_gpus: Number of GPUs the user wishes to assign to each @@ -1156,6 +1164,8 @@ def init( Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ + if configure_logging: + setup_logger(logging_level, logging_format) # Parse the hidden options: _enable_object_reconstruction: bool = kwargs.pop( @@ -1289,17 +1299,11 @@ def init( node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) raylet_ip_address = node_ip_address - bootstrap_address, redis_address, gcs_address = None, None, None - if address: - bootstrap_address = services.canonicalize_bootstrap_address(address) - assert bootstrap_address is not None - logger.info( - f"Connecting to existing Ray cluster at address: {bootstrap_address}" - ) + redis_address, gcs_address = None, None + bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir) + if bootstrap_address is not None: gcs_address = bootstrap_address - - if configure_logging: - setup_logger(logging_level, logging_format) + logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) if local_mode: driver_mode = LOCAL_MODE @@ -1440,13 +1444,44 @@ def init( enable_object_reconstruction=_enable_object_reconstruction, metrics_export_port=_metrics_export_port, ) - _global_node = ray._private.node.Node( - ray_params, - head=False, - shutdown_at_exit=False, - spawn_reaper=False, - connect_only=True, + try: + _global_node = ray._private.node.Node( + ray_params, + head=False, + shutdown_at_exit=False, + spawn_reaper=False, + connect_only=True, + ) + except ConnectionError: + if gcs_address == ray._private.utils.read_ray_address(_temp_dir): + logger.info( + "Failed to connect to the default Ray cluster address at " + f"{gcs_address}. This is most likely due to a previous Ray " + "instance that has since crashed. To reset the default " + "address to connect to, run `ray stop` or restart Ray with " + "`ray start`." + ) + raise + + # Log a message to find the Ray address that we connected to and the + # dashboard URL. + dashboard_url = _global_node.address_info["webui_url"] + # We logged the address before attempting the connection, so we don't need + # to log it again. + info_str = "Connected to Ray cluster." + if gcs_address is None: + info_str = "Started a local Ray instance." + if dashboard_url: + logger.info( + info_str + " View the dashboard at %s%shttp://%s%s%s.", + colorama.Style.BRIGHT, + colorama.Fore.GREEN, + dashboard_url, + colorama.Fore.RESET, + colorama.Style.NORMAL, ) + else: + logger.info(info_str) connect( _global_node, diff --git a/python/ray/client_builder.py b/python/ray/client_builder.py index 958626ab15ba..cecb21ce91fb 100644 --- a/python/ray/client_builder.py +++ b/python/ray/client_builder.py @@ -333,19 +333,11 @@ def _split_address(address: str) -> Tuple[str, str]: def _get_builder_from_address(address: Optional[str]) -> ClientBuilder: if address == "local": - return _LocalClientBuilder(None) + return _LocalClientBuilder("local") if address is None: - try: - # NOTE: This is not placed in `Node::get_temp_dir_path`, because - # this file is accessed before the `Node` object is created. - cluster_file = os.path.join( - ray._private.utils.get_user_temp_dir(), "ray_current_cluster" - ) - with open(cluster_file, "r") as f: - address = f.read().strip() - except FileNotFoundError: - # `address` won't be set and we'll create a new cluster. - pass + # NOTE: This is not placed in `Node::get_temp_dir_path`, because + # this file is accessed before the `Node` object is created. + address = ray._private.services.canonicalize_bootstrap_address(address) return _LocalClientBuilder(address) module_string, inner_address = _split_address(address) try: diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index c1d96f24092e..b533842e2597 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -205,6 +205,10 @@ def add_node(self, wait: bool = True, **node_args): # Init global state accessor when creating head node. gcs_options = GcsClientOptions.from_gcs_address(node.gcs_address) self.global_state._initialize_global_state(gcs_options) + # Write the Ray cluster address for convenience in unit + # testing. ray.init() and ray.init(address="auto") will connect + # to the local cluster. + ray._private.utils.write_ray_address(self.head_node.gcs_address) else: ray_params.update_if_absent(redis_address=self.redis_address) ray_params.update_if_absent(gcs_address=self.gcs_address) @@ -361,3 +365,5 @@ def shutdown(self): self.remove_node(self.head_node) # need to reset internal kv since gcs is down ray.experimental.internal_kv._internal_kv_reset() + # Delete the cluster address. + ray._private.utils.reset_ray_address() diff --git a/python/ray/experimental/state/state_cli.py b/python/ray/experimental/state/state_cli.py index fd727b2974f1..7c67c18286f3 100644 --- a/python/ray/experimental/state/state_cli.py +++ b/python/ray/experimental/state/state_cli.py @@ -122,7 +122,7 @@ def _get_available_resources( def get_api_server_url() -> str: - address = services.canonicalize_bootstrap_address(None) + address = services.canonicalize_bootstrap_address_or_die(None) gcs_client = GcsClient(address=address, nums_reconnect_retry=0) ray.experimental.internal_kv._initialize_internal_kv(gcs_client) api_server_url = ray._private.utils.internal_kv_get_with_retry( diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index d88c3587a8d3..bea597055fb6 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -207,7 +207,7 @@ def format_table(table): ) def debug(address): """Show all active breakpoints and exceptions in the Ray debugger.""" - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) logger.info(f"Connecting to Ray instance at {address}.") ray.init(address=address, log_to_driver=False) while True: @@ -739,11 +739,11 @@ def start( # Fail early when starting a new cluster when one is already running if address is None: default_address = f"{ray_params.node_ip_address}:{port}" - bootstrap_addresses = services.find_bootstrap_address() - if default_address in bootstrap_addresses: + bootstrap_address = services.find_bootstrap_address(temp_dir) + if default_address == bootstrap_address: raise ConnectionError( f"Ray is trying to start at {default_address}, " - f"but is already running at {bootstrap_addresses}. " + f"but is already running at {bootstrap_address}. " "Please specify a different port using the `--port`" " flag of `ray start` command." ) @@ -752,17 +752,7 @@ def start( ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block ) - bootstrap_addresses = node.address - if temp_dir is None: - # Default temp directory. - temp_dir = ray._private.utils.get_user_temp_dir() - # Using the user-supplied temp dir unblocks on-prem - # users who can't write to the default temp. - current_cluster_path = os.path.join(temp_dir, "ray_current_cluster") - # TODO: Consider using the custom temp_dir for this file across the - # code base. (https://github.com/ray-project/ray/issues/16458) - with open(current_cluster_path, "w") as f: - print(bootstrap_addresses, file=f) + bootstrap_address = node.address # this is a noop if new-style is not set, so the old logger calls # are still in place @@ -778,9 +768,9 @@ def start( # of the cluster. Please be careful when updating this line. cli_logger.print( cf.bold(" ray start --address='{}'"), - bootstrap_addresses, + bootstrap_address, ) - if bootstrap_addresses.startswith("127.0.0.1:"): + if bootstrap_address.startswith("127.0.0.1:"): cli_logger.print( "This Ray runtime only accepts connections from local host." ) @@ -840,6 +830,7 @@ def start( cli_logger.newline() cli_logger.print("To terminate the Ray runtime, run") cli_logger.print(cf.bold(" ray stop")) + ray_params.gcs_address = bootstrap_address else: # Start worker node. @@ -875,7 +866,9 @@ def start( ) # Start Ray on a non-head node. - bootstrap_address = services.canonicalize_bootstrap_address(address) + bootstrap_address = services.canonicalize_bootstrap_address( + address, temp_dir=temp_dir + ) if bootstrap_address is None: cli_logger.abort( @@ -966,6 +959,9 @@ def start( os._exit(1) # not-reachable + assert ray_params.gcs_address is not None + ray._private.utils.write_ray_address(ray_params.gcs_address, temp_dir) + @cli.command() @click.option( @@ -1052,14 +1048,6 @@ def stop(force, grace_period): "Could not terminate `{}` due to {}", cf.bold(proc_string), str(ex) ) - try: - os.remove( - os.path.join(ray._private.utils.get_user_temp_dir(), "ray_current_cluster") - ) - except OSError: - # This just means the file doesn't exist. - pass - # Wait for the processes to actually stop. # Dedup processes. stopped, alive = psutil.wait_procs(stopped, timeout=0) @@ -1103,6 +1091,10 @@ def on_terminate(proc): proc.kill() # Wait a little bit to make sure processes are killed forcefully. psutil.wait_procs(alive, timeout=2) + # NOTE(swang): This will not reset the cluster address for a user-defined + # temp_dir. This is fine since it will get overwritten the next time we + # call `ray start`. + ray._private.utils.reset_ray_address() @cli.command() @@ -1772,7 +1764,7 @@ def microbenchmark(): ) def timeline(address): """Take a Chrome tracing timeline for a Ray cluster.""" - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) logger.info(f"Connecting to Ray instance at {address}.") ray.init(address=address) time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") @@ -1846,7 +1838,7 @@ def memory( num_entries, ): """Print object references held in a Ray cluster.""" - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) if not ray._private.gcs_utils.check_health(address): print(f"Ray cluster is not found at {address}") sys.exit(1) @@ -1879,7 +1871,7 @@ def memory( @PublicAPI def status(address, redis_password): """Print cluster status, including autoscaling info.""" - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) if not ray._private.gcs_utils.check_health(address): print(f"Ray cluster is not found at {address}") sys.exit(1) @@ -2076,7 +2068,9 @@ def logs( # If both id & ip are not provided, choose a head node as a default. if node_id is None and node_ip is None: - address = ray._private.services.canonicalize_bootstrap_address(None) + # TODO(swang): This command should also support + # passing --address or RAY_ADDRESS, like others. + address = ray._private.services.canonicalize_bootstrap_address_or_die(None) node_ip = address.split(":")[0] filename = None @@ -2271,8 +2265,6 @@ def cluster_dump( ) def global_gc(address): """Trigger Python garbage collection on all cluster workers.""" - address = services.canonicalize_bootstrap_address(address) - logger.info(f"Connecting to Ray instance at {address}.") ray.init(address=address) ray._private.internal_api.global_gc() print("Triggered gc.collect() on all workers.") @@ -2338,7 +2330,7 @@ def healthcheck(address, redis_password, component, skip_version_check): Health check a Ray or a specific component. Exit code 0 is healthy. """ - address = services.canonicalize_bootstrap_address(address) + address = services.canonicalize_bootstrap_address_or_die(address) if not component: try: diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 31ba5472d999..11194ce5ffed 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -117,6 +117,8 @@ def shutdown_only(maybe_external_redis): yield None # The code after the yield will run as teardown code. ray.shutdown() + # Delete the cluster address just in case. + ray._private.utils.reset_ray_address() @contextmanager @@ -124,11 +126,13 @@ def _ray_start(**kwargs): init_kwargs = get_default_fixture_ray_kwargs() init_kwargs.update(kwargs) # Start the Ray processes. - address_info = ray.init(**init_kwargs) + address_info = ray.init("local", **init_kwargs) yield address_info # The code after the yield will run as teardown code. ray.shutdown() + # Delete the cluster address just in case. + ray._private.utils.reset_ray_address() @pytest.fixture @@ -286,7 +290,7 @@ def ray_start_object_store_memory(request, maybe_external_redis): "_system_config": system_config, "object_store_memory": store_size, } - ray.init(**init_kwargs) + ray.init("local", **init_kwargs) yield store_size # The code after the yield will run as teardown code. ray.shutdown() @@ -328,6 +332,8 @@ def call_ray_start(request): ray.shutdown() # Kill the Ray cluster. subprocess.check_call(["ray", "stop"], env=env) + # Delete the cluster address just in case. + ray._private.utils.reset_ray_address() @pytest.fixture @@ -347,6 +353,8 @@ def call_ray_start_with_external_redis(request): ray.shutdown() # Kill the Ray cluster. subprocess.check_call(["ray", "stop"]) + # Delete the cluster address just in case. + ray._private.utils.reset_ray_address() @pytest.fixture @@ -361,6 +369,8 @@ def init_and_serve(): def call_ray_stop_only(): yield subprocess.check_call(["ray", "stop"]) + # Delete the cluster address just in case. + ray._private.utils.reset_ray_address() # Used to test both Ray Client and non-Ray Client codepaths. diff --git a/python/ray/tests/test_advanced_8.py b/python/ray/tests/test_advanced_8.py index eb7f6151d9a1..748220e42178 100644 --- a/python/ray/tests/test_advanced_8.py +++ b/python/ray/tests/test_advanced_8.py @@ -165,8 +165,15 @@ def test_ray_address_environment_variable(ray_start_cluster): ray.shutdown() del os.environ["RAY_ADDRESS"] - # Make sure we start a new cluster if RAY_ADDRESS is not set. + # Make sure we connect to the existing cluster with on args and RAY_ADDRESS + # is not set. ray.init() + assert "CPU" not in ray._private.state.cluster_resources() + ray.shutdown() + + # Make sure we start a new cluster if "local" is explicitly passed. + # is not set. + ray.init(address="local") assert "CPU" in ray._private.state.cluster_resources() ray.shutdown() diff --git a/python/ray/tests/test_basic_5.py b/python/ray/tests/test_basic_5.py index 46d24980fb0b..2a5896334ca0 100644 --- a/python/ray/tests/test_basic_5.py +++ b/python/ray/tests/test_basic_5.py @@ -124,7 +124,7 @@ def test_internal_kv(ray_start_regular): kv._internal_kv_list("@namespace_abc", namespace="n") -def test_run_on_all_workers(ray_start_regular, tmp_path): +def test_run_on_all_workers(call_ray_start, tmp_path): # This test is to ensure run_function_on_all_workers are executed # on all workers. lock_file = tmp_path / "lock" diff --git a/python/ray/tests/test_client_builder.py b/python/ray/tests/test_client_builder.py index e1094af19e11..655757f8cd41 100644 --- a/python/ray/tests/test_client_builder.py +++ b/python/ray/tests/test_client_builder.py @@ -165,7 +165,7 @@ def ping(self): p4 = run_string_as_driver_nonblocking(blocking_noaddr_script) wait_for_condition( - lambda: len(ray._private.services.find_bootstrap_address()) == 4, + lambda: len(ray._private.services.find_gcs_addresses()) == 4, retry_interval_ms=1000, ) @@ -185,14 +185,14 @@ def ping(self): """ import ray ray.client().connect() -assert len(ray._private.services.find_bootstrap_address()) == 1 +assert len(ray._private.services.find_gcs_addresses()) == 1 """ ) # ray.client("local").connect() should always create a new cluster even if # there's one running. p1 = run_string_as_driver_nonblocking(blocking_local_script) wait_for_condition( - lambda: len(ray._private.services.find_bootstrap_address()) == 2, + lambda: len(ray._private.services.find_gcs_addresses()) == 2, retry_interval_ms=1000, ) p1.kill() @@ -285,14 +285,18 @@ def test_address_resolution(call_ray_stop_only): # client(...) takes precedence of RAY_ADDRESS=local assert ray.util.client.ray.is_connected() - with pytest.raises(Exception): - # This tries to call `ray.init(address="local") which - # breaks.` - ray.client(None).connect() + # This tries to call `ray.init(address="local") which creates a new Ray + # instance. + with ray.client(None).connect(): + wait_for_condition( + lambda: len(ray._private.services.find_gcs_addresses()) == 2, + retry_interval_ms=1000, + ) finally: if os.environ.get("RAY_ADDRESS"): del os.environ["RAY_ADDRESS"] + ray.shutdown() def mock_connect(*args, **kwargs): diff --git a/python/ray/tests/test_exit_observability.py b/python/ray/tests/test_exit_observability.py index ab3b2fef8c8d..dba24249e79e 100644 --- a/python/ray/tests/test_exit_observability.py +++ b/python/ray/tests/test_exit_observability.py @@ -111,7 +111,7 @@ def test_worker_exit_intended_user_exit(ray_start_cluster): address=cluster.address ) a = run_string_as_driver(driver) - driver_pid = int(a.strip("\n")) + driver_pid = int(a.strip().split("\n")[-1].strip()) def verify_worker_exit_by_shutdown(): worker = get_worker_by_pid(driver_pid) diff --git a/python/ray/tests/test_get_or_create_actor.py b/python/ray/tests/test_get_or_create_actor.py index e901bd331095..f91055a3e248 100644 --- a/python/ray/tests/test_get_or_create_actor.py +++ b/python/ray/tests/test_get_or_create_actor.py @@ -93,7 +93,7 @@ def do_run(name): # actor creation races. out = [] for line in out_str.split("\n"): - if "Ray dashboard" not in line and "The object store" not in line: + if "local Ray instance" not in line and "The object store" not in line: out.append(line) valid = "".join(out) assert valid.strip() == "DONE", out_str diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index c0e113cf09f0..7ca5f9401ffc 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -142,7 +142,7 @@ def check_pool_size(pool, size): ray._raylet.Config.initialize("") # Check that starting a pool still starts ray if RAY_ADDRESS not set. - pool = Pool(processes=init_cpus) + pool = Pool(processes=init_cpus, ray_address="local") assert ray.is_initialized() assert int(ray.cluster_resources()["CPU"]) == init_cpus check_pool_size(pool, init_cpus) diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index 3d64c49a34e5..3b1f3d9166f0 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -425,20 +425,42 @@ def __repr__(self): assert re.search("Actor2 pid=.*bye", out_str), out_str -def test_output(): - # Use subprocess to execute the __main__ below. - outputs = subprocess.check_output( - [sys.executable, __file__, "_ray_instance"], stderr=subprocess.STDOUT - ).decode() - lines = outputs.split("\n") +def test_output_local_ray(): + script = """ +import ray +ray.init() + """ + output = run_string_as_driver(script) + lines = output.strip("\n").split("\n") + for line in lines: + print(line) + lines = [line for line in lines if "The object store is using /tmp" not in line] + assert len(lines) == 1 + line = lines[0] + print(line) + assert "Started a local Ray instance." in line + if os.environ.get("RAY_MINIMAL") == "1": + assert "View the dashboard" not in line + else: + assert "View the dashboard" in line + + +def test_output_ray_cluster(call_ray_start): + script = """ +import ray +ray.init() + """ + output = run_string_as_driver(script) + lines = output.strip("\n").split("\n") for line in lines: print(line) + assert len(lines) == 2 + assert "Connecting to existing Ray cluster at address:" in lines[0] + assert "Connected to Ray cluster." in lines[1] if os.environ.get("RAY_MINIMAL") == "1": - # Without "View the Ray dashboard" - assert len(lines) == 1, lines + assert "View the dashboard" not in lines[1] else: - # With "View the Ray dashboard" - assert len(lines) == 2, lines + assert "View the dashboard" in lines[1] @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index b5c2fa1ff7b0..59ff7909dc14 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -2,6 +2,7 @@ import os import sys import unittest.mock +import subprocess import grpc import pytest @@ -191,6 +192,7 @@ def test_ray_address(input, call_ray_start): res = ray.init(input) # Ensure this is not a client.connect() assert not isinstance(res, ClientContext) + assert res.address_info["gcs_address"] == address ray.shutdown() addr = "localhost:{}".format(address.split(":")[-1]) @@ -198,9 +200,68 @@ def test_ray_address(input, call_ray_start): res = ray.init(input) # Ensure this is not a client.connect() assert not isinstance(res, ClientContext) + assert res.address_info["gcs_address"] == address ray.shutdown() +@pytest.mark.parametrize("address", [None, "auto"]) +def test_ray_init_no_local_instance(shutdown_only, address): + # Starts a new Ray instance. + if address is None: + ray.init(address=address) + else: + # Throws an error if we explicitly want to connect to an existing + # instance and none exists. + with pytest.raises(ConnectionError): + ray.init(address=address) + + +@pytest.mark.skipif( + os.environ.get("CI") and sys.platform == "win32", + reason="Flaky when run on windows CI", +) +@pytest.mark.parametrize("address", [None, "auto"]) +def test_ray_init_existing_instance(call_ray_start, address): + ray_address = call_ray_start + # If no address is specified, we will default to an existing cluster. + res = ray.init(address=address) + assert res.address_info["gcs_address"] == ray_address + ray.shutdown() + + # Start a second local Ray instance. + try: + subprocess.check_output("ray start --head", shell=True) + # If there are multiple local instances, connect to the latest. + res = ray.init(address=address) + assert res.address_info["gcs_address"] != ray_address + ray.shutdown() + + # If there are multiple local instances and we specify an address + # explicitly, it works. + with unittest.mock.patch.dict(os.environ, {"RAY_ADDRESS": ray_address}): + res = ray.init(address=address) + assert res.address_info["gcs_address"] == ray_address + finally: + ray.shutdown() + subprocess.check_output("ray stop --force", shell=True) + + +@pytest.mark.skipif( + os.environ.get("CI") and sys.platform == "win32", + reason="Flaky when run on windows CI", +) +@pytest.mark.parametrize("address", [None, "auto"]) +def test_ray_init_existing_instance_crashed(address): + ray._private.utils.write_ray_address("localhost:6379") + try: + # If no address is specified, we will default to an existing cluster. + ray._private.node.NUM_REDIS_GET_RETRIES = 1 + with pytest.raises(ConnectionError): + ray.init(address=address) + finally: + ray._private.utils.reset_ray_address() + + class Credentials(grpc.ChannelCredentials): def __init__(self, name): self.name = name diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 5bcd3747559a..5c9e3f7311a3 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -25,7 +25,7 @@ from ray._private.gcs_utils import GcsClient from ray._private.ray_constants import env_integer from ray._private.ray_logging import setup_logger -from ray._private.services import canonicalize_bootstrap_address +from ray._private.services import canonicalize_bootstrap_address_or_die from ray._private.tls_utils import add_port_to_grpc_server from ray.job_config import JobConfig from ray.util.client.common import ( @@ -799,7 +799,7 @@ def try_create_gcs_client( Try to create a gcs client based on the the command line args or by autodetecting a running Ray cluster. """ - address = canonicalize_bootstrap_address(address) + address = canonicalize_bootstrap_address_or_die(address) return GcsClient(address=address) diff --git a/python/ray/util/multiprocessing/pool.py b/python/ray/util/multiprocessing/pool.py index c3c908b3225d..51281aa85d86 100644 --- a/python/ray/util/multiprocessing/pool.py +++ b/python/ray/util/multiprocessing/pool.py @@ -603,19 +603,18 @@ def _init_ray(self, processes=None, ray_address=None): # ray_address argument > RAY_ADDRESS > start new local cluster. if not ray.is_initialized(): # Cluster mode. - if ray_address is None and RAY_ADDRESS_ENV in os.environ: - logger.info( - "Connecting to ray cluster at address='{}'".format( - os.environ[RAY_ADDRESS_ENV] - ) - ) + if ray_address is None and ( + RAY_ADDRESS_ENV in os.environ + or ray._private.utils.read_ray_address() is not None + ): ray.init() elif ray_address is not None: - logger.info(f"Connecting to ray cluster at address='{ray_address}'") - ray.init(address=ray_address) + init_kwargs = {} + if ray_address == "local": + init_kwargs["num_cpus"] = processes + ray.init(address=ray_address, **init_kwargs) # Local mode. else: - logger.info("Starting local ray cluster") ray.init(num_cpus=processes) ray_cpus = int(ray._private.state.cluster_resources()["CPU"]) diff --git a/python/ray/workflow/tests/test_recovery.py b/python/ray/workflow/tests/test_recovery.py index e02fccb0d27d..8ab65d17700e 100644 --- a/python/ray/workflow/tests/test_recovery.py +++ b/python/ray/workflow/tests/test_recovery.py @@ -254,10 +254,10 @@ def test_recovery_non_exists_workflow(workflow_start_regular): def test_recovery_cluster_failure(tmp_path, shutdown_only): ray.shutdown() - subprocess.check_call(["ray", "start", "--head"]) + subprocess.check_call(["ray", "start", "--head", f"--storage={tmp_path}"]) time.sleep(1) proc = run_string_as_driver_nonblocking( - f""" + """ import time import ray from ray import workflow @@ -272,7 +272,7 @@ def foo(x): return 20 if __name__ == "__main__": - ray.init(storage="{tmp_path}") + ray.init() assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20 """ ) @@ -290,9 +290,9 @@ def test_recovery_cluster_failure_resume_all(tmp_path, shutdown_only): ray.shutdown() tmp_path = tmp_path - subprocess.check_call(["ray", "start", "--head"]) - time.sleep(1) workflow_dir = tmp_path / "workflow" + subprocess.check_call(["ray", "start", "--head", f"--storage={workflow_dir}"]) + time.sleep(1) lock_file = tmp_path / "lock_file" lock = FileLock(lock_file) lock.acquire() @@ -310,7 +310,7 @@ def foo(x): return 20 if __name__ == "__main__": - ray.init(storage="{str(workflow_dir)}") + ray.init() assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20 """ )