Skip to content

Commit

Permalink
[core] ray.init defaults to an existing Ray instance if there is one (r…
Browse files Browse the repository at this point in the history
…ay-project#26678)

ray.init() will currently start a new Ray instance even if one is already existing, which is very confusing if you are a new user trying to go from local development to a cluster. This PR changes it so that, when no address is specified, we first try to find an existing Ray cluster that was created through `ray start`. If none is found, we will start a new one.

This makes two changes to the ray.init() resolution order:
1. When `ray start` is called, the started cluster address was already written to a file called `/tmp/ray/ray_current_cluster`. For ray.init() and ray.init(address="auto"), we will first check this local file for an existing cluster address. The file is deleted on `ray stop`. If the file is empty, autodetect any running cluster (legacy behavior) if address="auto", or we will start a new local Ray instance if address=None.
2. When ray.init(address="local") is called, we will create a new local Ray instance, even if one is already existing. This behavior seems to be necessary mainly for `ray.client` use cases.

This also surfaces the logs about which Ray instance we are connecting to. Previously these were hidden because we didn't set up the log until after connecting to Ray. So now Ray will log one of the following messages during ray.init:
```
(Connecting to existing Ray cluster at address: <IP>...)
...connection...
(Started a local Ray cluster.| Connected to Ray Cluster.)( View the dashboard at <URL>)
```

Note that this changes the dashboard URL to be printed with `ray.init()` instead of when the dashboard is first started.

Co-authored-by: Eric Liang <[email protected]>
  • Loading branch information
stephanie-wang and ericl authored Jul 23, 2022
1 parent 1fa8ddb commit 55a0f7b
Show file tree
Hide file tree
Showing 34 changed files with 438 additions and 233 deletions.
6 changes: 5 additions & 1 deletion dashboard/modules/dashboard_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ def parse_cluster_info(
headers: Optional[Dict[str, Any]] = None,
) -> ClusterInfo:
if address is None:
if ray.is_initialized():
if (
ray.is_initialized()
and ray._private.worker.global_worker.node.address_info["webui_url"]
is not None
):
address = (
"http://"
f"{ray._private.worker.global_worker.node.address_info['webui_url']}"
Expand Down
5 changes: 4 additions & 1 deletion dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen:

def _get_driver_env_vars(self) -> Dict[str, str]:
"""Returns environment variables that should be set in the driver."""
ray_addr = ray._private.services.find_bootstrap_address().pop()
ray_addr = ray._private.services.canonicalize_bootstrap_address_or_die(
"auto", ray.worker._global_node._ray_params.temp_dir
)
assert ray_addr is not None
return {
# Set JobConfig for the child process (runtime_env, metadata).
RAY_JOB_CONFIG_JSON_ENV_VAR: json.dumps(
Expand Down
5 changes: 3 additions & 2 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,10 @@ async def test_env_var_and_driver_job_config_warning(self, job_manager):
check_job_succeeded, job_manager=job_manager, job_id=job_id
)
logs = job_manager.get_job_logs(job_id)
assert logs.startswith(
"Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) " "are provided"
token = (
"Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) are provided"
)
assert token in logs, logs
assert "JOB_1_VAR" in logs

async def test_failed_runtime_env_validation(self, job_manager):
Expand Down
22 changes: 12 additions & 10 deletions doc/source/cluster/cloud.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Ray with cloud providers
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/aws/example-full.yaml
$ # Try running a Ray program with 'ray.init(address="auto")'.
$ # Try running a Ray program.
# Tear down the cluster.
$ ray down ray/python/ray/autoscaler/aws/example-full.yaml
Expand Down Expand Up @@ -68,7 +68,7 @@ Ray with cloud providers
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/azure/example-full.yaml
# test ray setup
$ python -c 'import ray; ray.init(address="auto")'
$ python -c 'import ray; ray.init()'
$ exit
# Tear down the cluster.
$ ray down ray/python/ray/autoscaler/azure/example-full.yaml
Expand All @@ -89,7 +89,7 @@ Ray with cloud providers
.. code-block:: python
import ray
ray.init(address='auto')
ray.init()
Note that on each node the `azure-init.sh <https://github.com/ray-project/ray/blob/master/doc/azure/azure-init.sh>`_ script is executed and performs the following actions:

Expand Down Expand Up @@ -117,7 +117,7 @@ Ray with cloud providers
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/gcp/example-full.yaml
$ # Try running a Ray program with 'ray.init(address="auto")'.
$ # Try running a Ray program with 'ray.init()'.
# Tear down the cluster.
$ ray down ray/python/ray/autoscaler/gcp/example-full.yaml
Expand All @@ -142,7 +142,7 @@ Ray with cloud providers
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/aliyun/example-full.yaml
$ # Try running a Ray program with 'ray.init(address="auto")'.
$ # Try running a Ray program with 'ray.init()'.
# Tear down the cluster.
$ ray down ray/python/ray/autoscaler/aliyun/example-full.yaml
Expand Down Expand Up @@ -203,7 +203,7 @@ There are two ways of running private clusters:
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/local/example-full.yaml
$ # Try running a Ray program with 'ray.init(address="auto")'.
$ # Try running a Ray program with 'ray.init()'.
# Tear down the cluster
$ ray down ray/python/ray/autoscaler/local/example-full.yaml
Expand Down Expand Up @@ -237,7 +237,7 @@ There are two ways of running private clusters:
# Get a remote screen on the head node.
$ ray attach ray/python/ray/autoscaler/local/example-full.yaml
$ # Try running a Ray program with 'ray.init(address="auto")'.
$ # Try running a Ray program with 'ray.init()'.
# Tear down the cluster
$ ray down ray/python/ray/autoscaler/local/example-full.yaml
Expand Down Expand Up @@ -313,7 +313,7 @@ firewall settings preventing access).

If you see ``Ray runtime started.``, then the node successfully connected to
the head node at the ``--address``. You should now be able to connect to the
cluster with ``ray.init(address='auto')``.
cluster with ``ray.init()``.

.. code-block:: bash
Expand Down Expand Up @@ -375,11 +375,13 @@ To run a distributed Ray program, you'll need to execute your program on the sam

.. tabbed:: Python

Within your program/script, you must call ``ray.init`` and add the ``address`` parameter to ``ray.init`` (like ``ray.init(address=...)``). This causes your script to connect to the existing Ray runtime on the cluster. For example:
Within your program/script, ``ray.init()`` will now automatically find and connect to the latest Ray cluster.
For example:

.. code-block:: python
ray.init(address="auto")
ray.init()
# Connecting to existing Ray cluster at address: <IP address>...
.. tabbed:: Java

Expand Down
2 changes: 1 addition & 1 deletion doc/source/cluster/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ See :ref:`the documentation <ray-up-doc>` for ``ray up``.
Running shell commands on the cluster (``ray exec``)
----------------------------------------------------

You can use ``ray exec`` to conveniently run commands on clusters. Note that python scripts that you want to scale should connect to Ray via ``ray.init(address="auto")``. See :ref:`the documentation <ray-exec-doc>` for ``ray exec``.
You can use ``ray exec`` to conveniently run commands on clusters. See :ref:`the documentation <ray-exec-doc>` for ``ray exec``.


.. code-block:: shell
Expand Down
2 changes: 1 addition & 1 deletion doc/source/cluster/kuberay/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ In the Python interpreter, run the following snippet to scale up the cluster:

```python
import ray.autoscaler.sdk
ray.init("auto")
ray.init()
ray.autoscaler.sdk.request_resources(num_cpus=4)
```

Expand Down
11 changes: 3 additions & 8 deletions doc/source/cluster/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,7 @@ Run the application in the cloud
--------------------------------

We are now ready to execute the application in across multiple machines on our Ray cloud cluster.
First, we need to edit the initialization command ``ray.init()`` in ``script.py``.
Change it to

.. code-block:: python
ray.init(address='auto')
This tells your script to connect to the Ray runtime on the remote cluster instead of initializing a new Ray runtime.
``ray.init()`` will now automatically connect to the newly created cluster.

Next, run the following command:

Expand All @@ -233,6 +226,8 @@ The output should now look similar to the following:

.. parsed-literal::
Connecting to existing Ray cluster at address: <IP address>...
This cluster consists of
3 nodes in total
6.0 CPU resources in total
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/examples/big_data_ingestion.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
" TOTAL_NUM_NODES = 70 + 16 + 1\n",
"\n",
" # use the AWS cluster we just set up.\n",
" ray.init(address=\"auto\")\n",
" ray.init()\n",
"\n",
" # waiting for cluster nodes to come up.\n",
" while len(ray.nodes()) < TOTAL_NUM_NODES:\n",
Expand Down
20 changes: 13 additions & 7 deletions doc/source/ray-core/starting-ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ There are three ways of starting the Ray runtime:
* Explicitly via CLI (:ref:`start-ray-cli`)
* Explicitly via the cluster launcher (:ref:`start-ray-up`)

In all cases, ``ray.init()`` will try to automatically find a Ray instance to
connect to. It checks, in order:
1. The ``RAY_ADDRESS`` OS environment variable.
2. The concrete address passed to ``ray.init(address=<address>)``.
3. If no address is provided, the latest Ray instance that was started on the same machine using ``ray start``.

.. _start-ray-init:

Starting Ray on a single machine
--------------------------------

Calling ``ray.init()`` (without any ``address`` args) starts a Ray runtime on your laptop/machine. This laptop/machine becomes the "head node".
Calling ``ray.init()`` starts a local Ray instance on your laptop/machine. This laptop/machine becomes the "head node".

.. note::

Expand Down Expand Up @@ -161,15 +167,15 @@ Use ``ray start`` from the CLI to start a 1 node ray runtime on a machine. This
...
You can connect to this Ray runtime by starting a driver process on the same node as where you ran ``ray start``:
You can connect to this Ray instance by starting a driver process on the same node as where you ran ``ray start``.
``ray.init()`` will now automatically connect to the latest Ray instance.

.. tabbed:: Python

.. code-block:: python
# This must
import ray
ray.init(address='auto')
ray.init()
.. tabbed:: java

Expand Down Expand Up @@ -207,7 +213,7 @@ You can connect to this Ray runtime by starting a driver process on the same nod
RAY_ADDRESS=<address> ./<binary> <args>
You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init(address="auto")`` on any of the cluster machines will connect to the ray cluster.
You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init()`` on any of the cluster machines will connect to the same Ray cluster.

.. _start-ray-up:

Expand All @@ -219,11 +225,11 @@ The ``ray up`` command uses the Ray cluster launcher to start a cluster on the c

Your code **only** needs to execute on one machine in the cluster (usually the head node). Read more about :ref:`running programs on a Ray cluster <using-ray-on-a-cluster>`.

To connect to the existing cluster, similar to the method outlined in :ref:`start-ray-cli`, you must call ``ray.init`` and specify the address of the Ray cluster when initializing Ray in your code. This allows your script to connect to the existing Ray runtime on the cluster.
To connect to the Ray cluster, call ``ray.init`` from one of the machines in the cluster. This will connect to the latest Ray cluster:

.. code-block:: python
ray.init(address="auto")
ray.init()
Note that the machine calling ``ray up`` will not be considered as part of the Ray cluster, and therefore calling ``ray.init`` on that same machine will not attach to the cluster.

Expand Down
3 changes: 1 addition & 2 deletions doc/source/ray-core/walkthrough.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ You can start Ray on a single machine by adding this to your code.
import ray
# Start Ray. If you're connecting to an existing cluster, you would use
# ray.init(address=<cluster-address>) instead.
# Start Ray.
ray.init()
...
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-observability/ray-logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This document will explain Ray's logging system and its best practices.

Driver logs
~~~~~~~~~~~
An entry point of Ray applications that calls ``ray.init(address='auto')`` or ``ray.init()`` is called a driver.
An entry point of Ray applications that calls ``ray.init()`` is called a driver.
All the driver logs are handled in the same way as normal Python programs.

Worker logs
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-observability/ray-metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ You can now get the url of metrics agents using `ray.nodes()`
# In a head node,
import ray
ray.init(address='auto')
ray.init()
from pprint import pprint
pprint(ray.nodes())
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-observability/ray-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ For open-source users who want to experiment with tracing, Ray has a default tra
$ ray start --head --tracing-startup-hook=ray.util.tracing.setup_local_tmp_tracing:setup_tracing
$ python
>>> ray.init(address="auto")
>>> ray.init()
>>> @ray.remote
def my_function():
return 1
Expand Down
4 changes: 2 additions & 2 deletions doc/source/serve/deploying-serve.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ Ray Serve instances run on top of Ray clusters and are started using {mod}`serve
Once {mod}`serve.start <ray.serve.start>` has been called, further API calls can be used to create and update the deployments that will be used to serve your Python code (including ML models).
The Serve instance will be torn down when the script exits.

When running on a long-lived Ray cluster (e.g., one started using `ray start` and connected
to using `ray.init(address="auto")`, you can also deploy a Ray Serve instance as a long-running
When running on a long-lived Ray cluster (e.g., one started using `ray start`),
you can also deploy a Ray Serve instance as a long-running
service using `serve.start(detached=True)`. In this case, the Serve instance will continue to
run on the Ray cluster even after the script that calls it exits. If you want to run another script
to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def memory_summary(
):
from ray.dashboard.memory_utils import memory_summary

address = services.canonicalize_bootstrap_address(address)
address = services.canonicalize_bootstrap_address_or_die(address)

state = GlobalState()
options = GcsClientOptions.from_gcs_address(address)
Expand Down
Loading

0 comments on commit 55a0f7b

Please sign in to comment.