diff --git a/dashboard/client/src/pages/metrics/Metrics.tsx b/dashboard/client/src/pages/metrics/Metrics.tsx index 86adefde10d1..fb5a450707c4 100644 --- a/dashboard/client/src/pages/metrics/Metrics.tsx +++ b/dashboard/client/src/pages/metrics/Metrics.tsx @@ -119,6 +119,10 @@ const METRICS_CONFIG: MetricsSectionConfig[] = [ title: "Active Actors by Name", pathParams: "orgId=1&theme=light&panelId=36", }, + { + title: "Out of Memory Failures by Name", + pathParams: "orgId=1&theme=light&panelId=44", + }, ], }, { diff --git a/dashboard/modules/metrics/dashboards/default_dashboard_panels.py b/dashboard/modules/metrics/dashboards/default_dashboard_panels.py index fe223b603f70..5b1264a1d970 100644 --- a/dashboard/modules/metrics/dashboards/default_dashboard_panels.py +++ b/dashboard/modules/metrics/dashboards/default_dashboard_panels.py @@ -239,6 +239,18 @@ def max_plus_pending(max_resource, pending_resource): ), ], ), + Panel( + id=44, + title="Node Out of Memory Failures by Name", + description="The number of tasks and actors killed by the Ray Out of Memory killer due to high memory pressure. Metrics are broken down by IP and the name. https://docs.ray.io/en/master/ray-core/scheduling/ray-oom-prevention.html.", + unit="failures", + targets=[ + Target( + expr='ray_memory_manager_worker_eviction_total{{instance=~"$Instance",{global_filters}}}', + legend="OOM Killed: {{Name}}, {{instance}}", + ), + ], + ), Panel( id=34, title="Node Memory by Component", diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index bdd66a67fc45..8aac99bd3539 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -103,6 +103,7 @@ I/O and Conversion Dataset.write_csv Dataset.write_numpy Dataset.write_tfrecords + Dataset.write_webdataset Dataset.write_mongo Dataset.write_datasource Dataset.to_torch @@ -138,8 +139,7 @@ Execution .. autosummary:: :toctree: doc/ - Dataset.cache - Dataset.is_cached + Dataset.materialize Serialization ------------- @@ -150,3 +150,15 @@ Serialization Dataset.has_serializable_lineage Dataset.serialize_lineage Dataset.deserialize_lineage + +Internals +--------- + +.. autosummary:: + :toctree: doc/ + + Dataset.__init__ + Dataset.dataset_format + Dataset.fully_executed + Dataset.is_fully_executed + Dataset.lazy diff --git a/doc/source/data/api/execution_options.rst b/doc/source/data/api/execution_options.rst index 862e4392d406..4ad6e73d80b4 100644 --- a/doc/source/data/api/execution_options.rst +++ b/doc/source/data/api/execution_options.rst @@ -10,6 +10,7 @@ Constructor .. autosummary:: :toctree: doc/ + :template: autosummary/class_without_autosummary.rst ExecutionOptions @@ -18,5 +19,6 @@ Resource Options .. autosummary:: :toctree: doc/ + :template: autosummary/class_without_autosummary.rst ExecutionResources diff --git a/doc/source/data/creating-datasets.rst b/doc/source/data/creating-datasets.rst index 1598df16d78a..08da6fc271fa 100644 --- a/doc/source/data/creating-datasets.rst +++ b/doc/source/data/creating-datasets.rst @@ -884,4 +884,4 @@ inspection functions like :meth:`ds.schema() ` and :meth:`ds.show() ` will trigger execution of only one or some tasks, instead of all tasks. This allows metadata to be inspected right away. Execution of all read tasks can be triggered manually using the -:meth:`ds.cache() ` API. +:meth:`ds.materialize() ` API. diff --git a/doc/source/data/dataset-internals.rst b/doc/source/data/dataset-internals.rst index 446ecdd2aca3..e40984e36d57 100644 --- a/doc/source/data/dataset-internals.rst +++ b/doc/source/data/dataset-internals.rst @@ -64,7 +64,20 @@ This should be considered for advanced use cases to improve performance predicta Execution ========= -This section covers the Datasets execution model and performance considerations. +The Datasets execution by default is: + +- **Lazy**: This means that transformations on Dataset are not executed until a + consumption operation (e.g. :meth:`ds.iter_batches() `) + or :meth:`Dataset.materialize() ` is called. This creates + opportunities for optimizing the execution plan (e.g. :ref:`stage fusion `). +- **Pipelined**: This means that Dataset transformations will be executed in a + streaming way, incrementally on the base data, instead of on all of the data + at once, and overlapping the execution of operations. This can be used for streaming + data loading into ML training to overlap the data preprocessing and model training, + or to execute batch transformations on large datasets without needing to load the + entire dataset into cluster memory. + +.. _datasets_lazy_execution: Lazy Execution ~~~~~~~~~~~~~~ @@ -75,7 +88,7 @@ to stage fusion optimizations and aggressive garbage collection of intermediate Dataset creation and transformation APIs are lazy, with execution only triggered via "sink" APIs, such as consuming (:meth:`ds.iter_batches() `), writing (:meth:`ds.write_parquet() `), or manually triggering via -:meth:`ds.cache() `. There are a few +:meth:`ds.materialize() `. There are a few exceptions to this rule, where transformations such as :meth:`ds.union() ` and :meth:`ds.limit() ` trigger execution; we plan to make these @@ -85,6 +98,122 @@ Check the API docs for Datasets methods to see if they trigger execution. Those that do trigger execution will have a ``Note`` indicating as much. +.. _datasets_streaming_execution: + +Streaming Execution +~~~~~~~~~~~~~~~~~~~ + +The following code is a hello world example which invokes the execution with +:meth:`ds.iter_batches() ` consumption. We will also enable verbose progress reporting, which shows per-operator progress in addition to overall progress. + +.. code-block:: + + import ray + import time + + # Enable verbose reporting. This can also be toggled on by setting + # the environment variable RAY_DATA_VERBOSE_PROGRESS=1. + ctx = ray.data.DatasetContext.get_current() + ctx.execution_options.verbose_progress = True + + def sleep(x): + time.sleep(0.1) + return x + + for _ in ( + ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200) + .map_batches(sleep, num_cpus=2) + .map_batches(sleep, compute=ray.data.ActorPoolStrategy(2, 4)) + .map_batches(sleep, num_cpus=1) + .iter_batches() + ): + pass + +This launches a simple 4-stage pipeline. We use different compute args for each stage, which forces them to be run as separate operators instead of getting fused together. You should see a log message indicating streaming execution is being used: + +.. code-block:: + + 2023-03-30 16:40:10,076 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> TaskPoolMapOperator[MapBatches(sleep)] + +The next few lines will show execution progress. Here is how to interpret the output: + +.. code-block:: + + Running: 7.0/16.0 CPU, 0.0/0.0 GPU, 76.91 MiB/2.25 GiB object_store_memory 65%|██▊ | 130/200 [00:08<00:02, 22.52it/s] + +This line tells you how many resources are currently being used by the streaming executor out of the limits, as well as the number of completed output blocks. The streaming executor will attempt to keep resource usage under the printed limits by throttling task executions. + +.. code-block:: + + ReadRange: 2 active, 37 queued, 7.32 MiB objects 1: 80%|████████▊ | 161/200 [00:08<00:02, 17.81it/s] + MapBatches(sleep): 5 active, 5 queued, 18.31 MiB objects 2: 76%|██▎| 151/200 [00:08<00:02, 19.93it/s] + MapBatches(sleep): 7 active, 2 queued, 25.64 MiB objects, 2 actors [all objects local] 3: 71%|▋| 142/ + MapBatches(sleep): 2 active, 0 queued, 7.32 MiB objects 4: 70%|██▊ | 139/200 [00:08<00:02, 23.16it/s] + +These lines are only shown when verbose progress reporting is enabled. The `active` count indicates the number of running tasks for the operator. The `queued` count is the number of input blocks for the operator that are computed but are not yet submitted for execution. For operators that use actor-pool execution, the number of running actors is shown as `actors`. + +.. tip:: + + Avoid returning large outputs from the final operation of a pipeline you are iterating over, since the consumer process will be a serial bottleneck. + +Configuring Resources and Locality +---------------------------------- + +By default, the CPU and GPU limits are set to the cluster size, and the object store memory limit conservatively to 1/4 of the total object store size to avoid the possibility of disk spilling. + +You may want to customize these limits in the following scenarios: +- If running multiple concurrent jobs on the cluster, setting lower limits can avoid resource contention between the jobs. +- If you want to fine-tune the memory limit to maximize performance. +- For data loading into training jobs, you may want to set the object store memory to a low value (e.g., 2GB) to limit resource usage. + +Execution options can be configured via the global DatasetContext. The options will be applied for future jobs launched in the process: + +.. code-block:: + + ctx = ray.data.DatasetContext.get_current() + ctx.execution_options.resource_limits.cpu = 10 + ctx.execution_options.resource_limits.gpu = 5 + ctx.execution_options.resource_limits.object_store_memory = 10e9 + +Deterministic Execution +----------------------- + +.. code-block:: + + # By default, this is set to False. + ctx.execution_options.preserve_order = True + +To enable deterministic execution, set the above to True. This may decrease performance, but will ensure block ordering is preserved through execution. This flag defaults to False. + +Actor Locality Optimization (ML inference use case) +--------------------------------------------------- + +.. code-block:: + + # By default, this is set to True already. + ctx.execution_options.actor_locality_enabled = True + +The actor locality optimization (if you're using actor pools) tries to schedule objects that are already local to an actor's node to the same actor. This reduces network traffic across nodes. When actor locality is enabled, you'll see a report in the progress output of the hit rate: + +.. code-block:: + + MapBatches(Model): 0 active, 0 queued, 0 actors [992 locality hits, 8 misses]: 100%|██████████| 1000/1000 [00:59<00:00, 16.84it/s] + +Locality with Output (ML ingest use case) +----------------------------------------- + +.. code-block:: + + ctx.execution_options.locality_with_output = True + +Setting this to True tells Datasets to prefer placing operator tasks onto the consumer node in the cluster, rather than spreading them evenly across the cluster. This can be useful if you know you'll be consuming the output data directly on the consumer node (i.e., for ML training ingest). However, this may incur a performance penalty for other use cases. + +Scalability +----------- +We expect the data streaming backend to scale to tens of thousands of files / blocks and up to hundreds of terabytes of data. Please report if you experience performance degradation at these scales, we would be very interested to investigate! + +.. _datasets_stage_fusion: + Stage Fusion Optimization ~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/data/doc_code/creating_datasets.py b/doc/source/data/doc_code/creating_datasets.py index cdadce9f9b56..351d5d9c0688 100644 --- a/doc/source/data/doc_code/creating_datasets.py +++ b/doc/source/data/doc_code/creating_datasets.py @@ -298,7 +298,7 @@ "example://iris.parquet", columns=["sepal.length", "variety"], filter=pa.dataset.field("sepal.length") > 5.0, -).cache() # Force a full read of the file. +).materialize() # Force a full read of the file. # -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string}) ds.show(2) diff --git a/doc/source/data/doc_code/tensor.py b/doc/source/data/doc_code/tensor.py index 374be7f9865e..c224d8a0d890 100644 --- a/doc/source/data/doc_code/tensor.py +++ b/doc/source/data/doc_code/tensor.py @@ -52,7 +52,7 @@ def single_col_udf(batch: pd.DataFrame) -> pd.DataFrame: ds.map_batches(single_col_udf) -ds.cache() +ds.materialize() # -> Dataset(num_blocks=17, num_rows=1000, # schema={__value__: TensorDtype(shape=(128, 128, 3), dtype=int64)}) # __create_pandas_end__ @@ -74,7 +74,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame: ds.map_batches(multi_col_udf) -ds.cache() +ds.materialize() # -> Dataset(num_blocks=17, num_rows=1000, # schema={image: TensorDtype(shape=(128, 128, 3), dtype=int64), # embed: TensorDtype(shape=(256,), dtype=uint8)}) @@ -156,7 +156,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame: # two: extension> # __create_parquet_2_end__ -ds.cache() +ds.materialize() shutil.rmtree(path) # __create_parquet_3_begin__ @@ -193,7 +193,7 @@ def cast_udf(block: pa.Table) -> pa.Table: # -> one: int64 # two: extension> # __create_parquet_3_end__ -ds.cache() +ds.materialize() # __create_images_begin__ ds = ray.data.read_images("example://image-datasets/simple") @@ -449,7 +449,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]: # __consume_numpy_2_end__ -ds.cache() +ds.materialize() shutil.rmtree("/tmp/some_path") # __write_1_begin__ @@ -468,7 +468,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]: # label: string # __write_1_end__ -read_ds.cache() +read_ds.materialize() shutil.rmtree("/tmp/some_path") # __write_2_begin__ diff --git a/doc/source/data/examples/advanced-pipelines.rst b/doc/source/data/examples/advanced-pipelines.rst index 5c9e1e618f54..2b41a66d18e6 100644 --- a/doc/source/data/examples/advanced-pipelines.rst +++ b/doc/source/data/examples/advanced-pipelines.rst @@ -11,7 +11,7 @@ This page covers more advanced examples for dataset pipelines. Pre-repeat vs post-repeat transforms ==================================== -Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.cache()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset. +Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.materialize()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset. For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. However, if we omitted the map transformation, then the pipeline would re-read from the base data on each repetition. @@ -50,7 +50,7 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc .. important:: - Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.cache().repeat()``. + Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.materialize().repeat()``. Changing Pipeline Structure =========================== diff --git a/doc/source/data/examples/nyc_taxi_basic_processing.ipynb b/doc/source/data/examples/nyc_taxi_basic_processing.ipynb index db20dd6ff7aa..15b2983aaeae 100644 --- a/doc/source/data/examples/nyc_taxi_basic_processing.ipynb +++ b/doc/source/data/examples/nyc_taxi_basic_processing.ipynb @@ -317,7 +317,7 @@ } ], "source": [ - "ds.cache().size_bytes()" + "ds.materialize().size_bytes()" ] }, { @@ -654,7 +654,7 @@ ")\n", "\n", "# Force full execution of both of the file reads.\n", - "pushdown_ds = pushdown_ds.cache()\n", + "pushdown_ds = pushdown_ds.materialize()\n", "pushdown_ds" ] }, diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index af8074e3d5d6..d74536146ef6 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -92,7 +92,10 @@ Execution mode ============== Most transformations are lazy. They don't execute until you consume a dataset or call -:meth:`Dataset.cache() `. +:meth:`Dataset.materialize() `. + +The transformations are executed in a streaming way, incrementally on the data and +with operators processed in parallel, see :ref:`Streaming Execution `. For an in-depth guide on Datasets execution, read :ref:`Execution `. diff --git a/doc/source/data/pipelining-compute.rst b/doc/source/data/pipelining-compute.rst index d8b05c1523ac..acab716c1611 100644 --- a/doc/source/data/pipelining-compute.rst +++ b/doc/source/data/pipelining-compute.rst @@ -1,5 +1,12 @@ .. _pipelining_datasets: +.. note:: + + The DatasetPipeline is expected to be deprecated in Ray 2.5. If your use case doesn't + need per-window shuffle, we recommend using just plain Datasets, which supports the + streaming execution by default in Ray 2.4. For more detail, see + :ref:`Streaming Execution `. + ================== Pipelining Compute ================== diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index 8c049994ddd1..1b57f310ac85 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -473,7 +473,7 @@ aggregation has been computed. for x in range(10)]) # Group by the A column and calculate the per-group mean for B and C columns. - agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).cache() + agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).materialize() # -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00, 9.04it/s] # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s] # -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s] @@ -542,7 +542,7 @@ with calculated column means. return df ds = ds.map_batches(batch_standard_scaler, batch_format="pandas") - ds.cache() + ds.materialize() # -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s] # -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double}) diff --git a/doc/source/ray-air/check-ingest.rst b/doc/source/ray-air/check-ingest.rst index 485276fde342..fba2a79db222 100644 --- a/doc/source/ray-air/check-ingest.rst +++ b/doc/source/ray-air/check-ingest.rst @@ -417,7 +417,7 @@ Performance Tips Dataset Sharing ~~~~~~~~~~~~~~~ -When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.cache()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial. +When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.materialize()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial. FAQ diff --git a/doc/source/ray-air/examples/torch_image_example.ipynb b/doc/source/ray-air/examples/torch_image_example.ipynb index d07b05eff391..5c1bd7a3f471 100644 --- a/doc/source/ray-air/examples/torch_image_example.ipynb +++ b/doc/source/ray-air/examples/torch_image_example.ipynb @@ -195,8 +195,8 @@ " return {\"image\": images, \"label\": labels}\n", "\n", "\n", - "train_dataset = train_dataset.map_batches(convert_batch_to_numpy).cache()\n", - "test_dataset = test_dataset.map_batches(convert_batch_to_numpy).cache()" + "train_dataset = train_dataset.map_batches(convert_batch_to_numpy).materialize()\n", + "test_dataset = test_dataset.map_batches(convert_batch_to_numpy).materialize()" ] }, { diff --git a/doc/source/ray-air/examples/torch_incremental_learning.ipynb b/doc/source/ray-air/examples/torch_incremental_learning.ipynb index 6d95a2428aad..18d6565228c3 100644 --- a/doc/source/ray-air/examples/torch_incremental_learning.ipynb +++ b/doc/source/ray-air/examples/torch_incremental_learning.ipynb @@ -294,7 +294,7 @@ "\n", " return {\"image\": images, \"label\": labels}\n", "\n", - " mnist_dataset = mnist_dataset.map_batches(convert_batch_to_numpy).cache()\n", + " mnist_dataset = mnist_dataset.map_batches(convert_batch_to_numpy).materialize()\n", " return mnist_dataset" ] }, diff --git a/doc/source/ray-observability/ray-metrics.rst b/doc/source/ray-observability/ray-metrics.rst index fb3f7520fe76..7b22fd9ff01a 100644 --- a/doc/source/ray-observability/ray-metrics.rst +++ b/doc/source/ray-observability/ray-metrics.rst @@ -136,6 +136,9 @@ Ray exports a number of system metrics, which provide introspection into the sta * - `ray_placement_groups` - `State` - Current number of placement groups by state. The State label (e.g., PENDING, CREATED, REMOVED) describes the state of the placement group. See `rpc::PlacementGroupTable `_ for more information. + * - `ray_memory_manager_worker_eviction_total` + - `Type`, `Name` + - The number of tasks and actors killed by the Ray Out of Memory killer (https://docs.ray.io/en/master/ray-core/scheduling/ray-oom-prevention.html) broken down by types (whether it is tasks or actors) and names (name of tasks and actors). * - `ray_node_cpu_utilization` - `InstanceId` - The CPU utilization per node as a percentage quantity (0..100). This should be scaled by the number of cores per node to convert the units into cores. diff --git a/doc/source/templates/01_batch_inference/batch_inference.ipynb b/doc/source/templates/01_batch_inference/batch_inference.ipynb index 2c8c54f0cdec..1fe897619ab6 100644 --- a/doc/source/templates/01_batch_inference/batch_inference.ipynb +++ b/doc/source/templates/01_batch_inference/batch_inference.ipynb @@ -244,7 +244,7 @@ "metadata": {}, "outputs": [], "source": [ - "preds = predictions.cache()\n", + "preds = predictions.materialize()\n", "preds.schema()\n" ] }, diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 037a925d7321..d3e21046a693 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -404,10 +404,11 @@ def gcs_actor_scheduling_enabled(): # Whether to enable Ray clusters (in addition to local Ray). # Ray clusters are not explicitly supported for Windows and OSX. +IS_WINDOWS_OR_OSX = sys.platform == "darwin" or sys.platform == "win32" ENABLE_RAY_CLUSTERS_ENV_VAR = "RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER" ENABLE_RAY_CLUSTER = env_bool( ENABLE_RAY_CLUSTERS_ENV_VAR, - not (sys.platform == "darwin" or sys.platform == "win32"), + not IS_WINDOWS_OR_OSX, ) SESSION_LATEST = "session_latest" @@ -422,3 +423,5 @@ def gcs_actor_scheduling_enabled(): "dashboard_agent_listen_port", "gcs_server_port", # the `port` option for gcs port. } + +RAY_ENABLE_RECORD_TASK_LOGGING = env_bool("RAY_ENABLE_RECORD_TASK_LOGGING", False) diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 3465def8192e..b1bc8b222d6a 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -608,9 +608,6 @@ def resolve_ip_for_localhost(address: str): raise ValueError(f"Malformed address: {address}") address_parts = address.split(":") if address_parts[0] == "127.0.0.1" or address_parts[0] == "localhost": - # Clusters are disabled by default for OSX and Windows. - if not ray_constants.ENABLE_RAY_CLUSTER: - return address # Make sure localhost isn't resolved to the loopback ip ip_address = get_node_ip_address() return ":".join([ip_address] + address_parts[1:]) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 0b899b1afc28..161dafa34fba 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -456,6 +456,7 @@ def __init__(self): self.ray_debugger_external = False self._load_code_from_local = False # Opened file descriptor to stdout/stderr for this python worker. + self._enable_record_task_log = ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING self._out_file = None self._err_file = None # Create the lock here because the serializer will use it before @@ -539,6 +540,9 @@ def set_out_file(self, out_file=Optional[IO[AnyStr]]) -> None: def record_task_log_start(self): """Record the task log info when task starts executing""" + if not self._enable_record_task_log: + return + self.core_worker.record_task_log_start( self.get_out_file_path(), self.get_err_file_path(), @@ -548,6 +552,9 @@ def record_task_log_start(self): def record_task_log_end(self): """Record the task log info when task finishes executing""" + if not self._enable_record_task_log: + return + self.core_worker.record_task_log_end( self.get_current_out_offset(), self.get_current_err_offset() ) @@ -1424,10 +1431,6 @@ def init( gcs_address = bootstrap_address logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) - # NOTE(swang): We must set the node IP address *after* we determine whether - # this is an existing cluster or not. For Windows and OSX, the resolved IP - # is localhost for new clusters and the usual public IP for existing - # clusters. if _node_ip_address is not None: node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) raylet_ip_address = node_ip_address diff --git a/python/ray/air/util/check_ingest.py b/python/ray/air/util/check_ingest.py index feae9960e43e..24a49995e84e 100755 --- a/python/ray/air/util/check_ingest.py +++ b/python/ray/air/util/check_ingest.py @@ -67,11 +67,11 @@ def preprocess_datasets(self): print("Starting dataset preprocessing") super().preprocess_datasets() if self.time_preprocessing_separately: - for dataset_name, ds in self.datasets.items(): + for dataset_name, ds in list(self.datasets.items()): start = time.perf_counter() # Force execution to time preprocessing since Datasets are lazy by # default. - ds.cache() + self.datasets[dataset_name] = ds.materialize() print( f"Preprocessed {dataset_name} in", time.perf_counter() - start, diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 55fc105d183c..d192c33bb4c8 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +import os from typing import Dict, List, Optional, Iterable, Iterator, Tuple, Callable, Union import ray @@ -188,6 +189,22 @@ class ExecutionOptions: """Common options for execution. Some options may not be supported on all executors (e.g., resource limits). + + Attributes: + resource_limits: Set a soft limit on the resource usage during execution. + This is not supported in bulk execution mode. Autodetected by default. + locality_with_output: Set this to prefer running tasks on the same node as the + output node (node driving the execution). It can also be set to a list of + node ids to spread the outputs across those nodes. Off by default. + preserve_order: Set this to preserve the ordering between blocks processed by + operators under the streaming executor. The bulk executor always preserves + order. Off by default. + actor_locality_enabled: Whether to enable locality-aware task dispatch to + actors (on by default). This applies to both ActorPoolStrategy map and + streaming_split operations. + verbose_progress: Whether to report progress individually per operator. By + default, only AllToAll operators and global progress is reported. This + option is useful for performance debugging. Off by default. """ # Set a soft limit on the resource usage during execution. This is not supported @@ -207,6 +224,8 @@ class ExecutionOptions: # applies to both ActorPoolStrategy map and streaming_split operations. actor_locality_enabled: bool = True + verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "0"))) + @dataclass class TaskContext: diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 4205cbb7bf1f..fc2e60589161 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -51,7 +51,6 @@ def __init__(self, options: ExecutionOptions): self._initial_stats: Optional[DatasetStats] = None self._final_stats: Optional[DatasetStats] = None self._global_info: Optional[ProgressBar] = None - self._output_info: Optional[ProgressBar] = None self._execution_id = uuid.uuid4().hex self._autoscaling_state = AutoscalingState() @@ -79,18 +78,18 @@ def execute( """ self._initial_stats = initial_stats self._start_time = time.perf_counter() + if not isinstance(dag, InputDataBuffer): logger.get_logger().info("Executing DAG %s", dag) - self._global_info = ProgressBar("Resource usage vs limits", 1, 0) + logger.get_logger().info("Execution config: %s", self._options) # Setup the streaming DAG topology and start the runner thread. _validate_dag(dag, self._get_or_refresh_resource_limits()) - self._topology, progress_bar_position = build_streaming_topology( - dag, self._options - ) - self._output_info = ProgressBar( - "Output", dag.num_outputs_total() or 1, progress_bar_position - ) + self._topology, _ = build_streaming_topology(dag, self._options) + + if not isinstance(dag, InputDataBuffer): + # Note: DAG must be initialized in order to query num_outputs_total. + self._global_info = ProgressBar("Running", dag.num_outputs_total() or 1) self._output_node: OpState = self._topology[dag] self.start() @@ -112,7 +111,8 @@ def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: raise item else: # Otherwise return a concrete RefBundle. - self._outer._output_info.update(1) + if self._outer._global_info: + self._outer._global_info.update(1) return item except Exception: self._outer.shutdown() @@ -143,8 +143,6 @@ def shutdown(self): for op, state in self._topology.items(): op.shutdown() state.close_progress_bars() - if self._output_info: - self._output_info.close() # Make request for zero resources to autoscaler for this execution. actor = get_or_create_autoscaling_requester_actor() actor.request_resources.remote({}, self._execution_id) @@ -266,18 +264,15 @@ def _get_or_refresh_resource_limits(self) -> ExecutionResources: def _report_current_usage( self, cur_usage: TopologyResourceUsage, limits: ExecutionResources ) -> None: + resources_status = ( + "Running: " + f"{cur_usage.overall.cpu}/{limits.cpu} CPU, " + f"{cur_usage.overall.gpu}/{limits.gpu} GPU, " + f"{cur_usage.overall.object_store_memory_str()}/" + f"{limits.object_store_memory_str()} object_store_memory" + ) if self._global_info: - self._global_info.set_description( - "Resource usage vs limits: " - f"{cur_usage.overall.cpu}/{limits.cpu} CPU, " - f"{cur_usage.overall.gpu}/{limits.gpu} GPU, " - f"{cur_usage.overall.object_store_memory_str()}/" - f"{limits.object_store_memory_str()} object_store_memory" - ) - if self._output_info: - self._output_info.set_description( - f"output: {len(self._output_node.outqueue)} queued" - ) + self._global_info.set_description(resources_status) def _validate_dag(dag: PhysicalOperator, limits: ExecutionResources) -> None: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index d6869882fb26..2c2b6ad806a7 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -122,18 +122,27 @@ def __init__(self, op: PhysicalOperator, inqueues: List[Deque[MaybeRefBundle]]): self.num_completed_tasks = 0 self.inputs_done_called = False - def initialize_progress_bars(self, index: int) -> int: + def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: """Create progress bars at the given index (line offset in console). For AllToAllOperator, zero or more sub progress bar would be created. Return the number of progress bars created for this operator. """ + is_all_to_all = isinstance(self.op, AllToAllOperator) + # Only show 1:1 ops when in verbose progress mode. + enabled = verbose_progress or is_all_to_all self.progress_bar = ProgressBar( - self.op.name, self.op.num_outputs_total() or 1, index + "- " + self.op.name, + self.op.num_outputs_total() or 1, + index, + enabled=enabled, ) - num_bars = 1 - if isinstance(self.op, AllToAllOperator): - num_bars += self.op.initialize_sub_progress_bars(index + 1) + if enabled: + num_bars = 1 + if is_all_to_all: + num_bars += self.op.initialize_sub_progress_bars(index + 1) + else: + num_bars = 0 return num_bars def close_progress_bars(self): @@ -166,7 +175,7 @@ def refresh_progress_bar(self) -> None: def summary_str(self) -> str: queued = self.num_queued() + self.op.internal_queue_size() active = self.op.num_active_work_refs() - desc = f"{self.op.name}: {active} active, {queued} queued" + desc = f"- {self.op.name}: {active} active, {queued} queued" mem = memory_string(self.op.current_resource_usage().object_store_memory or 0) desc += f", {mem} objects" suffix = self.op.progress_str() @@ -275,7 +284,7 @@ def setup_state(op: PhysicalOperator) -> OpState: i = 1 for op_state in list(topology.values()): if not isinstance(op_state.op, InputDataBuffer): - i += op_state.initialize_progress_bars(i) + i += op_state.initialize_progress_bars(i, options.verbose_progress) return (topology, i) diff --git a/python/ray/data/_internal/pipeline_executor.py b/python/ray/data/_internal/pipeline_executor.py index a918738b4bce..679427e38de4 100644 --- a/python/ray/data/_internal/pipeline_executor.py +++ b/python/ray/data/_internal/pipeline_executor.py @@ -19,7 +19,7 @@ def pipeline_stage(fn: Callable[[], Dataset[T]]) -> Dataset[T]: # Force eager evaluation of all blocks in the pipeline stage. This # prevents resource deadlocks due to overlapping stage execution (e.g., # task -> actor stage). - return fn().cache() + return fn().materialize() class PipelineExecutor: diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index df0c88cc01c6..160caddd0da5 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -585,9 +585,9 @@ def execute( preserve_order=preserve_order, ) # TODO(ekl) we shouldn't need to set this in the future once we move - # to a fully lazy execution model, unless .cache() is used. The reason - # we need it right now is since the user may iterate over a Dataset - # multiple times after fully executing it once. + # to a fully lazy execution model, unless .materialize() is used. Th + # reason we need it right now is since the user may iterate over a + # Dataset multiple times after fully executing it once. if not self._run_by_consumer: blocks._owned_by_consumer = False stats = executor.get_stats() diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index edcc132aa44e..3b1af02bdd4f 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -44,9 +44,11 @@ def set_progress_bars(enabled: bool) -> bool: class ProgressBar: """Thin wrapper around tqdm to handle soft imports.""" - def __init__(self, name: str, total: int, position: int = 0): + def __init__( + self, name: str, total: int, position: int = 0, enabled: bool = _enabled + ): self._desc = name - if not _enabled: + if not enabled: self._bar = None elif tqdm: ctx = ray.data.context.DatasetContext.get_current() diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 9ed696c0259b..6d09f0220fb5 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2906,7 +2906,7 @@ def write_fn_wrapper(blocks: Iterator[Block], ctx, fn) -> Iterator[Block]: try: self._write_ds = Dataset( plan, self._epoch, self._lazy, logical_plan - ).cache() + ).materialize() blocks = ray.get(self._write_ds._plan.execute().get_blocks()) assert all( isinstance(block, list) and len(block) == 1 for block in blocks @@ -4061,37 +4061,31 @@ def __iter__(self): ) return pipe - @Deprecated(message="Use `Dataset.cache()` instead.") + @Deprecated(message="Use `Dataset.materialize()` instead.") def fully_executed(self) -> "Dataset[T]": logger.warning( - "The 'fully_executed' call has been renamed to 'cache'.", + "Deprecation warning: use Dataset.materialize() instead of " + "fully_executed()." ) - return self.cache() + self._plan.execute(force_read=True) + return self - @Deprecated(message="Use `Dataset.is_cached()` instead.") + # Note: will be deprecated in 2.5. def is_fully_executed(self) -> bool: - logger.warning( - "The 'is_fully_executed' call has been renamed to 'is_cached'.", - ) - return self.is_cached() - - def is_cached(self) -> bool: - """Returns whether this Dataset has been cached in memory. - - This will return False if the output of its final stage hasn't been computed - yet. - """ return self._plan.has_computed_output() @ConsumptionAPI(pattern="store memory.", insert_after=True) - def cache(self) -> "Dataset[T]": - """Evaluate and cache the blocks of this Dataset in object store memory. + def materialize(self) -> "Dataset[T]": + """Execute and materialize this dataset into object store memory. - This can be used to read all blocks into memory. By default, Datasets + This can be used to read all blocks into memory. By default, Dataset doesn't read blocks from the datasource until the first transform. + Note that this does not mutate the original Dataset. Only the blocks of the + returned Dataset class are pinned in memory. + Returns: - A Dataset with all blocks fully materialized in memory. + A Dataset holding the materialized data blocks. """ self._plan.execute(force_read=True) return self @@ -4138,7 +4132,7 @@ def lazy(self) -> "Dataset[T]": The returned dataset is a lazy dataset, where all subsequent operations on the dataset won't be executed until the dataset is consumed (e.g. ``.take()``, ``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is - manually triggered via ``.cache()``. + manually triggered via ``.materialize()``. """ ds = Dataset( self._plan, self._epoch, lazy=True, logical_plan=self._logical_plan diff --git a/python/ray/data/tests/preprocessors/test_concatenator.py b/python/ray/data/tests/preprocessors/test_concatenator.py index b71d7239aec5..7c7e402d73c7 100644 --- a/python/ray/data/tests/preprocessors/test_concatenator.py +++ b/python/ray/data/tests/preprocessors/test_concatenator.py @@ -28,7 +28,7 @@ def test_raise_if_missing(self): ) with pytest.raises(ValueError, match="'b'"): - prep.transform(ds).cache() + prep.transform(ds).materialize() @pytest.mark.parametrize("exclude", ("b", ["b"])) def test_exclude(self, exclude): diff --git a/python/ray/data/tests/preprocessors/test_encoder.py b/python/ray/data/tests/preprocessors/test_encoder.py index 7a5f51bb749e..f7e975412a97 100644 --- a/python/ray/data/tests/preprocessors/test_encoder.py +++ b/python/ray/data/tests/preprocessors/test_encoder.py @@ -97,7 +97,7 @@ def test_ordinal_encoder(): # Verify transform fails for null values. with pytest.raises(ValueError): - null_encoder.transform(null_ds).cache() + null_encoder.transform(null_ds).materialize() null_encoder.transform(nonnull_ds) # Verify transform_batch fails for null values. @@ -299,7 +299,7 @@ def test_one_hot_encoder(): # Verify transform fails for null values. with pytest.raises(ValueError): - null_encoder.transform(null_ds).cache() + null_encoder.transform(null_ds).materialize() null_encoder.transform(nonnull_ds) # Verify transform_batch fails for null values. @@ -408,7 +408,7 @@ def test_multi_hot_encoder(): # Verify transform fails for null values. with pytest.raises(ValueError): - null_encoder.transform(null_ds).cache() + null_encoder.transform(null_ds).materialize() null_encoder.transform(nonnull_ds) # Verify transform_batch fails for null values. @@ -511,7 +511,7 @@ def test_label_encoder(): # Verify transform fails for null values. with pytest.raises(ValueError): - null_encoder.transform(null_ds).cache() + null_encoder.transform(null_ds).materialize() null_encoder.transform(nonnull_ds) # Verify transform_batch fails for null values. diff --git a/python/ray/data/tests/preprocessors/test_torch.py b/python/ray/data/tests/preprocessors/test_torch.py index be00d19b154b..d5efeb3d678a 100644 --- a/python/ray/data/tests/preprocessors/test_torch.py +++ b/python/ray/data/tests/preprocessors/test_torch.py @@ -118,7 +118,7 @@ def test_invalid_transform_raises_value_error(self): preprocessor = TorchVisionPreprocessor(columns=["image"], transform=transform) with pytest.raises(ValueError): - preprocessor.transform(dataset).cache() + preprocessor.transform(dataset).materialize() if __name__ == "__main__": diff --git a/python/ray/data/tests/test_bulk_executor.py b/python/ray/data/tests/test_bulk_executor.py index c1135f82a07c..b0175ad69983 100644 --- a/python/ray/data/tests/test_bulk_executor.py +++ b/python/ray/data/tests/test_bulk_executor.py @@ -75,7 +75,7 @@ def reverse_sort(inputs: List[RefBundle], ctx): def test_basic_stats(ray_start_10_cpus_shared): executor = BulkExecutor(ExecutionOptions()) - prev_stats = ray.data.range(10).cache()._plan.stats() + prev_stats = ray.data.range(10).materialize()._plan.stats() inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create( diff --git a/python/ray/data/tests/test_dataset_all_to_all.py b/python/ray/data/tests/test_dataset_all_to_all.py index e1fc817445e2..73a70017b207 100644 --- a/python/ray/data/tests/test_dataset_all_to_all.py +++ b/python/ray/data/tests/test_dataset_all_to_all.py @@ -22,7 +22,7 @@ def test_zip(ray_start_regular_shared): assert ds.schema() == tuple assert ds.take() == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)] with pytest.raises(ValueError): - ds.zip(ray.data.range(3)).cache() + ds.zip(ray.data.range(3)).materialize() @pytest.mark.parametrize( @@ -66,7 +66,7 @@ def test_zip_different_num_blocks_split_smallest( [{str(i): i for i in range(num_cols1, num_cols1 + num_cols2)}] * n, parallelism=num_blocks2, ) - ds = ds1.zip(ds2).cache() + ds = ds1.zip(ds2).materialize() num_blocks = ds._plan._snapshot_blocks.executed_num_blocks() assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n if should_invert: @@ -765,38 +765,38 @@ def test_groupby_agg_bad_on(ray_start_regular_shared): df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs, "C": [2 * x for x in xs]}) # Wrong type. with pytest.raises(TypeError): - ray.data.from_pandas(df).groupby("A").mean(5).cache() + ray.data.from_pandas(df).groupby("A").mean(5).materialize() with pytest.raises(TypeError): - ray.data.from_pandas(df).groupby("A").mean([5]).cache() + ray.data.from_pandas(df).groupby("A").mean([5]).materialize() # Empty list. with pytest.raises(ValueError): - ray.data.from_pandas(df).groupby("A").mean([]).cache() + ray.data.from_pandas(df).groupby("A").mean([]).materialize() # Nonexistent column. with pytest.raises(ValueError): - ray.data.from_pandas(df).groupby("A").mean("D").cache() + ray.data.from_pandas(df).groupby("A").mean("D").materialize() with pytest.raises(ValueError): - ray.data.from_pandas(df).groupby("A").mean(["B", "D"]).cache() + ray.data.from_pandas(df).groupby("A").mean(["B", "D"]).materialize() # Columns for simple Dataset. with pytest.raises(ValueError): - ray.data.from_items(xs).groupby(lambda x: x % 3 == 0).mean("A").cache() + ray.data.from_items(xs).groupby(lambda x: x % 3 == 0).mean("A").materialize() # Test bad on for global aggregation # Wrong type. with pytest.raises(TypeError): - ray.data.from_pandas(df).mean(5).cache() + ray.data.from_pandas(df).mean(5).materialize() with pytest.raises(TypeError): - ray.data.from_pandas(df).mean([5]).cache() + ray.data.from_pandas(df).mean([5]).materialize() # Empty list. with pytest.raises(ValueError): - ray.data.from_pandas(df).mean([]).cache() + ray.data.from_pandas(df).mean([]).materialize() # Nonexistent column. with pytest.raises(ValueError): - ray.data.from_pandas(df).mean("D").cache() + ray.data.from_pandas(df).mean("D").materialize() with pytest.raises(ValueError): - ray.data.from_pandas(df).mean(["B", "D"]).cache() + ray.data.from_pandas(df).mean(["B", "D"]).materialize() # Columns for simple Dataset. with pytest.raises(ValueError): - ray.data.from_items(xs).mean("A").cache() + ray.data.from_items(xs).mean("A").materialize() @pytest.mark.parametrize("num_parts", [1, 30]) @@ -1016,7 +1016,7 @@ def test_groupby_map_groups_merging_invalid_result(ray_start_regular_shared): # The UDF returns None, which is invalid. with pytest.raises(TypeError): - grouped.map_groups(lambda x: None if x == [1] else x).cache() + grouped.map_groups(lambda x: None if x == [1] else x).materialize() @pytest.mark.parametrize("num_parts", [1, 2, 30]) @@ -1676,7 +1676,7 @@ def test_random_shuffle_with_custom_resource(ray_start_cluster): parallelism=2, ray_remote_args={"resources": {"bar": 1}}, ) - ds = ds.random_shuffle(resources={"bar": 1}).cache() + ds = ds.random_shuffle(resources={"bar": 1}).materialize() assert "1 nodes used" in ds.stats() assert "2 nodes used" not in ds.stats() diff --git a/python/ray/data/tests/test_dataset_consumption.py b/python/ray/data/tests/test_dataset_consumption.py index 10523e622cdb..e83b2b4ed021 100644 --- a/python/ray/data/tests/test_dataset_consumption.py +++ b/python/ray/data/tests/test_dataset_consumption.py @@ -164,19 +164,19 @@ def test_empty_dataset(ray_start_regular_shared): ds = ray.data.range(1) ds = ds.filter(lambda x: x > 1) - ds.cache() + ds = ds.materialize() assert str(ds) == "Dataset(num_blocks=1, num_rows=0, schema=Unknown schema)" # Test map on empty dataset. ds = ray.data.from_items([]) ds = ds.map(lambda x: x) - ds.cache() + ds = ds.materialize() assert ds.count() == 0 # Test filter on empty dataset. ds = ray.data.from_items([]) ds = ds.filter(lambda: True) - ds.cache() + ds = ds.materialize() assert ds.count() == 0 @@ -199,7 +199,7 @@ def inc(x): ds = ray.data.range(1) ds = ds.map(inc) - ds = ds.cache() + ds = ds.materialize() for _ in range(10): ds.take_all() @@ -211,9 +211,9 @@ def test_schema(ray_start_regular_shared): ds = ray.data.range(10, parallelism=10) ds2 = ray.data.range_table(10, parallelism=10) ds3 = ds2.repartition(5) - ds3.cache() + ds3 = ds3.materialize() ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1) - ds4.cache() + ds4 = ds4.materialize() assert str(ds) == "Dataset(num_blocks=10, num_rows=10, schema=)" assert str(ds2) == "Dataset(num_blocks=10, num_rows=10, schema={value: int64})" assert str(ds3) == "Dataset(num_blocks=5, num_rows=10, schema={value: int64})" @@ -288,7 +288,7 @@ def test_dataset_repr(ray_start_regular_shared): " +- MapBatches()\n" " +- Dataset(num_blocks=10, num_rows=10, schema=)" ) - ds.cache() + ds = ds.materialize() assert repr(ds) == "Dataset(num_blocks=10, num_rows=9, schema=)" ds = ds.map_batches(lambda x: x) assert repr(ds) == ( @@ -326,7 +326,7 @@ def my_dummy_fn(x): def test_limit(ray_start_regular_shared, lazy): ds = ray.data.range(100, parallelism=20) if not lazy: - ds = ds.cache() + ds = ds.materialize() for i in range(100): assert ds.limit(i).take(200) == list(range(i)) @@ -1397,15 +1397,15 @@ def test_read_write_local_node_ray_client(ray_start_cluster_enabled): # Read/write from Ray Client will result in error. ray.init(address) with pytest.raises(ValueError): - ds = ray.data.read_parquet("local://" + path).cache() + ds = ray.data.read_parquet("local://" + path).materialize() ds = ray.data.from_pandas(df) with pytest.raises(ValueError): - ds.write_parquet("local://" + data_path).cache() + ds.write_parquet("local://" + data_path).materialize() def test_read_warning_large_parallelism(ray_start_regular, propagate_logs, caplog): with caplog.at_level(logging.WARNING, logger="ray.data.read_api"): - ray.data.range(5000, parallelism=5000).cache() + ray.data.range(5000, parallelism=5000).materialize() assert ( "The requested parallelism of 5000 is " "more than 4x the number of available CPU slots in the cluster" in caplog.text @@ -1451,17 +1451,17 @@ def check_dataset_is_local(ds): local_path = "local://" + data_path # Plain read. - ds = ray.data.read_parquet(local_path).cache() + ds = ray.data.read_parquet(local_path).materialize() check_dataset_is_local(ds) # SPREAD scheduling got overridden when read local scheme. ds = ray.data.read_parquet( local_path, ray_remote_args={"scheduling_strategy": "SPREAD"} - ).cache() + ).materialize() check_dataset_is_local(ds) # With fusion. - ds = ray.data.read_parquet(local_path).map(lambda x: x).cache() + ds = ray.data.read_parquet(local_path).map(lambda x: x).materialize() check_dataset_is_local(ds) # Write back to local scheme. @@ -1474,15 +1474,15 @@ def check_dataset_is_local(ds): with pytest.raises(ValueError): ds = ray.data.read_parquet( [local_path + "/test1.parquet", data_path + "/test2.parquet"] - ).cache() + ).materialize() with pytest.raises(ValueError): ds = ray.data.read_parquet( [local_path + "/test1.parquet", "example://iris.parquet"] - ).cache() + ).materialize() with pytest.raises(ValueError): ds = ray.data.read_parquet( ["example://iris.parquet", local_path + "/test1.parquet"] - ).cache() + ).materialize() @ray.remote @@ -1588,7 +1588,7 @@ def f(should_import_polars): ray.data.from_pandas(dfs) .map_batches(lambda t: t, batch_format="pyarrow", batch_size=None) .sort(key="a") - .cache() + .materialize() ) assert any(ray.get([f.remote(True) for _ in range(parallelism)])) diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index d79930cf578a..5658e5a425f5 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -340,7 +340,7 @@ def test_get_read_tasks(ray_start_cluster): head_node_id = ray.get_runtime_context().get_node_id() # Issue read so `_get_read_tasks` being executed. - ray.data.range(10).cache() + ray.data.range(10).materialize() # Verify `_get_read_tasks` being executed on same node (head node). def verify_get_read_tasks(): diff --git a/python/ray/data/tests/test_dataset_image.py b/python/ray/data/tests/test_dataset_image.py index 6bd9fe716111..88d952e12664 100644 --- a/python/ray/data/tests/test_dataset_image.py +++ b/python/ray/data/tests/test_dataset_image.py @@ -193,7 +193,7 @@ def test_data_size_estimate( data_size = ds.size_bytes() assert data_size >= 0, "estimated data size is out of expected bound" - data_size = ds.cache().size_bytes() + data_size = ds.materialize().size_bytes() assert data_size >= 0, "actual data size is out of expected bound" reader = _ImageDatasourceReader( @@ -225,12 +225,12 @@ def test_dynamic_block_split(ray_start_regular_shared): root = "example://image-datasets/simple" ds = ray.data.read_images(root, parallelism=1) assert ds.num_blocks() == 1 - ds.cache() + ds = ds.materialize() # Verify dynamic block splitting taking effect to generate more blocks. assert ds.num_blocks() == 3 # Test union of same datasets - union_ds = ds.union(ds, ds, ds).cache() + union_ds = ds.union(ds, ds, ds).materialize() assert union_ds.num_blocks() == 12 finally: ctx.target_max_block_size = target_max_block_size diff --git a/python/ray/data/tests/test_dataset_map.py b/python/ray/data/tests/test_dataset_map.py index 2c9534d53d08..71c3ff8b57fc 100644 --- a/python/ray/data/tests/test_dataset_map.py +++ b/python/ray/data/tests/test_dataset_map.py @@ -146,7 +146,7 @@ def mapper(x): return x with pytest.raises(ray.exceptions.RayTaskError): - ds.map(mapper).cache() + ds.map(mapper).materialize() def test_flat_map_generator(ray_start_regular_shared): @@ -189,7 +189,7 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): ] # Test dropping non-existent column with pytest.raises(KeyError): - ds.drop_columns(["dummy_col", "col1", "col2"]).cache() + ds.drop_columns(["dummy_col", "col1", "col2"]).materialize() def test_select_columns(ray_start_regular_shared): @@ -218,12 +218,12 @@ def test_select_columns(ray_start_regular_shared): ] # Test selecting a column that is not in the dataset schema with pytest.raises(KeyError): - each_ds.select_columns(cols=["col1", "col2", "dummy_col"]).cache() + each_ds.select_columns(cols=["col1", "col2", "dummy_col"]).materialize() # Test simple ds3 = ray.data.range(10) with pytest.raises(ValueError): - ds3.select_columns(cols=[]).cache() + ds3.select_columns(cols=[]).materialize() def test_map_batches_basic(ray_start_regular_shared, tmp_path, restore_dataset_context): @@ -621,13 +621,13 @@ def mutate(df): ds = ray.data.range_table(num_rows, parallelism=num_blocks).repartition(num_blocks) # Convert to Pandas blocks. ds = ds.map_batches(lambda df: df, batch_format="pandas", batch_size=None) - ds.cache() + ds = ds.materialize() # Apply UDF that mutates the batches, which should fail since the batch is # read-only. with pytest.raises(ValueError, match="tried to mutate a zero-copy read-only batch"): ds = ds.map_batches(mutate, batch_size=batch_size, zero_copy_batch=True) - ds.cache() + ds.materialize() BLOCK_BUNDLING_TEST_CASES = [ @@ -648,11 +648,11 @@ def test_map_batches_block_bundling_auto( assert ds.num_blocks() == num_blocks # Blocks should be bundled up to the batch size. - ds1 = ds.map_batches(lambda x: x, batch_size=batch_size).cache() + ds1 = ds.map_batches(lambda x: x, batch_size=batch_size).materialize() assert ds1.num_blocks() == math.ceil(num_blocks / max(batch_size // block_size, 1)) # Blocks should not be bundled up when batch_size is not specified. - ds2 = ds.map_batches(lambda x: x).cache() + ds2 = ds.map_batches(lambda x: x).materialize() assert ds2.num_blocks() == num_blocks @@ -678,7 +678,7 @@ def test_map_batches_block_bundling_skewed_manual( ) # Confirm that we have the expected number of initial blocks. assert ds.num_blocks() == num_blocks - ds = ds.map_batches(lambda x: x, batch_size=batch_size).cache() + ds = ds.map_batches(lambda x: x, batch_size=batch_size).materialize() # Blocks should be bundled up to the batch size. assert ds.num_blocks() == expected_num_blocks @@ -704,7 +704,7 @@ def test_map_batches_block_bundling_skewed_auto( ) # Confirm that we have the expected number of initial blocks. assert ds.num_blocks() == num_blocks - ds = ds.map_batches(lambda x: x, batch_size=batch_size).cache() + ds = ds.map_batches(lambda x: x, batch_size=batch_size).materialize() curr = 0 num_out_blocks = 0 for block_size in block_sizes: @@ -735,7 +735,7 @@ def good_fn(row): ds = ray.data.range(10, parallelism=1) error_message = "Current row has different columns compared to previous rows." with pytest.raises(ValueError) as e: - ds.map(bad_fn).cache() + ds.map(bad_fn).materialize() assert error_message in str(e.value) ds_map = ds.map(good_fn) assert ds_map.take() == [{"a": "hello1", "b": "hello2"} for _ in range(10)] @@ -857,7 +857,7 @@ def f(x): compute_strategy = ray.data.ActorPoolStrategy() ray.data.range(10, parallelism=10).map_batches( f, batch_size=1, compute=compute_strategy - ).cache() + ).materialize() # The new execution backend is not using the ActorPoolStrategy under # the hood, so the expectation here applies only to the old backend. @@ -884,7 +884,7 @@ def f(x): ds = ( ray.data.range(10, parallelism=10) .map_batches(f, batch_size=None, compute=compute_strategy) - .cache() + .materialize() ) # TODO(https://github.com/ray-project/ray/issues/31723): implement the feature @@ -896,7 +896,7 @@ def f(x): ds = ( ray.data.range(10, parallelism=10) .map_batches(f, batch_size=10, compute=compute_strategy) - .cache() + .materialize() ) assert "1/1 blocks" in ds.stats() diff --git a/python/ray/data/tests/test_dataset_parquet.py b/python/ray/data/tests/test_dataset_parquet.py index 0d98d792ca67..760d2e21e4cd 100644 --- a/python/ray/data/tests/test_dataset_parquet.py +++ b/python/ray/data/tests/test_dataset_parquet.py @@ -657,7 +657,7 @@ def test_parquet_reader_estimate_data_size(shutdown_only, tmp_path): assert ( data_size >= 6_000_000 and data_size <= 10_000_000 ), "estimated data size is out of expected bound" - data_size = ds.cache().size_bytes() + data_size = ds.materialize().size_bytes() assert ( data_size >= 7_000_000 and data_size <= 10_000_000 ), "actual data size is out of expected bound" @@ -685,7 +685,7 @@ def test_parquet_reader_estimate_data_size(shutdown_only, tmp_path): assert ( data_size >= 1_000_000 and data_size <= 2_000_000 ), "estimated data size is out of expected bound" - data_size = ds.cache().size_bytes() + data_size = ds.materialize().size_bytes() assert ( data_size >= 1_000_000 and data_size <= 2_000_000 ), "actual data size is out of expected bound" diff --git a/python/ray/data/tests/test_dataset_tensor.py b/python/ray/data/tests/test_dataset_tensor.py index 2c4ba1016cd6..4d42e9bd6b9e 100644 --- a/python/ray/data/tests/test_dataset_tensor.py +++ b/python/ray/data/tests/test_dataset_tensor.py @@ -287,7 +287,7 @@ def test_tensors_sort(ray_start_regular_shared): def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. ds = ray.data.range(10, parallelism=10).map(lambda _: np.ones((4, 4))) - ds.cache() + ds = ds.materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=10,\n" @@ -300,7 +300,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(16, parallelism=4).map_batches( lambda _: np.ones((3, 4, 4)), batch_size=2 ) - ds.cache() + ds = ds.materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=4,\n" @@ -313,7 +313,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(10, parallelism=10).flat_map( lambda _: [np.ones((4, 4)), np.ones((4, 4))] ) - ds.cache() + ds = ds.materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=10,\n" @@ -326,7 +326,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(16, parallelism=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((4, 4))] * 3}), batch_size=2 ) - ds.cache() + ds = ds.materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=4,\n" @@ -339,7 +339,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): lambda _: pd.DataFrame({"a": [np.ones((2, 2)), np.ones((3, 3))]}), batch_size=2, ) - ds.cache() + ds = ds.materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=4,\n" diff --git a/python/ray/data/tests/test_dataset_tfrecords.py b/python/ray/data/tests/test_dataset_tfrecords.py index 35868f840ba2..dfc68100cf7c 100644 --- a/python/ray/data/tests/test_dataset_tfrecords.py +++ b/python/ray/data/tests/test_dataset_tfrecords.py @@ -612,11 +612,11 @@ def test_read_with_invalid_schema( # which should raise a `ValueError`. ds.write_tfrecords(tmp_path) with pytest.raises(ValueError) as e: - ray.data.read_tfrecords(tmp_path, tf_schema=tf_schema_wrong_name).cache() + ray.data.read_tfrecords(tmp_path, tf_schema=tf_schema_wrong_name).materialize() assert "Found extra unexpected feature" in str(e.value.args[0]) with pytest.raises(ValueError) as e: - ray.data.read_tfrecords(tmp_path, tf_schema=tf_schema_wrong_type).cache() + ray.data.read_tfrecords(tmp_path, tf_schema=tf_schema_wrong_type).materialize() assert str(e.value.args[0]) == ( "Schema field type mismatch during read: " "specified type is int, but underlying type is bytes" diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index 9650c533ced2..edc74feaa1f3 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -155,15 +155,15 @@ def test_dataset( assert ds.size_bytes() >= 0.7 * block_size * num_blocks_per_task * num_tasks map_ds = ds.map_batches(lambda x: x, compute=compute) - map_ds.cache() + map_ds = map_ds.materialize() assert map_ds.num_blocks() == num_tasks map_ds = ds.map_batches( lambda x: x, batch_size=num_blocks_per_task * num_tasks, compute=compute ) - map_ds.cache() + map_ds = map_ds.materialize() assert map_ds.num_blocks() == 1 map_ds = ds.map(lambda x: x, compute=compute) - map_ds.cache() + map_ds = map_ds.materialize() assert map_ds.num_blocks() == num_blocks_per_task * num_tasks ds_list = ds.split(5) @@ -177,7 +177,7 @@ def test_dataset( new_ds = ds.union(ds, ds) assert new_ds.num_blocks() == num_tasks * 3 - new_ds.cache() + new_ds = new_ds.materialize() assert new_ds.num_blocks() == num_blocks_per_task * num_tasks * 3 new_ds = ds.random_shuffle() @@ -187,7 +187,7 @@ def test_dataset( assert ds.groupby("one").count().count() == num_blocks_per_task * num_tasks new_ds = ds.zip(ds) - new_ds.cache() + new_ds = new_ds.materialize() assert new_ds.num_blocks() == num_blocks_per_task * num_tasks assert len(ds.take(5)) == 5 @@ -243,12 +243,12 @@ def test_filter( ) ds = ds.filter(lambda _: True) - ds.cache() + ds = ds.materialize() assert ds.count() == num_blocks_per_task assert ds.num_blocks() == num_blocks_per_task ds = ds.filter(lambda _: False) - ds.cache() + ds = ds.materialize() assert ds.count() == 0 assert ds.num_blocks() == num_blocks_per_task @@ -336,7 +336,7 @@ def test_lazy_block_list( assert metadata.num_rows == 1 # Check internal states of LazyBlockList after execution - ds.cache() + ds = ds.materialize() metadata = block_list.get_metadata() assert block_list._num_computed() == num_tasks diff --git a/python/ray/data/tests/test_mongo_dataset.py b/python/ray/data/tests/test_mongo_dataset.py index 979491ef1cca..bfe90bc43b16 100644 --- a/python/ray/data/tests/test_mongo_dataset.py +++ b/python/ray/data/tests/test_mongo_dataset.py @@ -208,7 +208,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, schema=schema, - ).cache() + ).materialize() assert ds._block_num_rows() == [3, 2] assert str(ds) == ( "Dataset(\n" @@ -227,7 +227,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - ).cache() + ).materialize() assert ds._block_num_rows() == [3, 2] assert str(ds) == ( "Dataset(\n" @@ -245,7 +245,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - ).cache() + ).materialize() assert str(ds) == ( "Dataset(\n" " num_blocks=5,\n" diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 6bc5c7117ca8..d6c54e817eb0 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -68,7 +68,7 @@ def test_memory_sanity(shutdown_only): info = ray.init(num_cpus=1, object_store_memory=500e6) ds = ray.data.range(10) ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) - ds.cache() + ds.materialize() meminfo = memory_summary(info.address_info["address"], stats_only=True) # Sanity check spilling is happening as expected. @@ -170,7 +170,7 @@ def test_memory_release_lazy(shutdown_only): ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) - ds.cache() + ds.materialize() meminfo = memory_summary(info.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo @@ -188,7 +188,7 @@ def test_memory_release_lazy_shuffle(shutdown_only): # Should get fused into single stage. ds = ds.lazy() ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) - ds.random_shuffle().cache() + ds.random_shuffle().materialize() meminfo = memory_summary(info.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo return @@ -224,12 +224,12 @@ def inc(row): ds2 = ds1.map(inc) ds3 = ds1.map(inc) # Test content. - assert ds2.cache().take() == [ + assert ds2.materialize().take() == [ {"one": 3, "two": "a"}, {"one": 4, "two": "b"}, {"one": 5, "two": "c"}, ] - assert ds3.cache().take() == [ + assert ds3.materialize().take() == [ {"one": 3, "two": "a"}, {"one": 4, "two": "b"}, {"one": 5, "two": "c"}, @@ -254,8 +254,8 @@ def inc(x): ds2 = ds1.map(inc) ds3 = ds1.map(inc) # Test content. - assert ds2.cache().take() == list(range(2, 12)) - assert ds3.cache().take() == list(range(2, 12)) + assert ds2.materialize().take() == list(range(2, 12)) + assert ds3.materialize().take() == list(range(2, 12)) # Test that first map is executed twice. assert ray.get(map_counter.get.remote()) == 2 * 10 + 10 + 10 @@ -268,10 +268,10 @@ def inc(x): ds1 = ds.map(inc) ds2 = ds.map(inc) # Test content. - assert ds1.cache().take() == list(range(2, 12)) - assert ds2.cache().take() == list(range(2, 12)) - # Test that first map is executed twice, because ds1.cache() - # clears up the previous snapshot blocks, and ds2.cache() + assert ds1.materialize().take() == list(range(2, 12)) + assert ds2.materialize().take() == list(range(2, 12)) + # Test that first map is executed twice, because ds1.materialize() + # clears up the previous snapshot blocks, and ds2.materialize() # has to re-execute ds.map(inc) again. assert ray.get(map_counter.get.remote()) == 2 * 10 + 10 + 10 @@ -305,7 +305,7 @@ def test_stage_linking(ray_start_regular_shared): assert len(ds._plan._stages_before_snapshot) == 0 _assert_has_stages(ds._plan._stages_after_snapshot, ["Map"]) assert ds._plan._last_optimized_stages is None - ds = ds.cache() + ds = ds.materialize() _assert_has_stages(ds._plan._stages_before_snapshot, ["Map"]) assert len(ds._plan._stages_after_snapshot) == 0 _assert_has_stages(ds._plan._last_optimized_stages, ["ReadRange->Map"]) @@ -317,7 +317,7 @@ def test_optimize_reorder(ray_start_regular_shared): context.optimize_fuse_read_stages = True context.optimize_reorder_stages = True - ds = ray.data.range(10).randomize_block_order().map_batches(dummy_map).cache() + ds = ray.data.range(10).randomize_block_order().map_batches(dummy_map).materialize() expect_stages( ds, 2, @@ -329,7 +329,7 @@ def test_optimize_reorder(ray_start_regular_shared): .randomize_block_order() .repartition(10) .map_batches(dummy_map) - .cache() + .materialize() ) expect_stages( ds2, diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 3c80d5c2c156..a77e16ccfee5 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -161,7 +161,7 @@ def gen(name): ds = ( ray.data.range(200000, parallelism=1) .map(lambda _: uuid.uuid4().hex) - .cache() + .materialize() ) # Fully execute the operations prior to write, because with # parallelism=1, there is only one task; so the write operator diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index ad81c987d9df..cf720c8f82f5 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -307,7 +307,7 @@ def test_push_based_shuffle_stats(ray_start_cluster): parallelism = 100 ds = ray.data.range(1000, parallelism=parallelism).random_shuffle() - ds = ds.cache() + ds = ds.materialize() assert "RandomShuffleMerge" in ds.stats() # Check all nodes used. assert "2 nodes used" in ds.stats() diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 36f813d3be3c..e7b7955cc0bf 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -54,7 +54,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): ) with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(dummy_map_batches).cache() + ds = ds.map_batches(dummy_map_batches).materialize() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -86,7 +86,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): """ ) - ds = ds.map(dummy_map_batches).cache() + ds = ds.map(dummy_map_batches).materialize() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -118,7 +118,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): ) for batch in ds.iter_batches(): pass - stats = canonicalize(ds.cache().stats()) + stats = canonicalize(ds.materialize().stats()) if context.new_execution_backend: if context.use_streaming_executor: @@ -264,7 +264,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): context.optimize_fuse_stages = True ds = ray.data.range(1000, parallelism=10) ds = ds.random_shuffle().repartition(1, shuffle=True) - stats = canonicalize(ds.cache().stats()) + stats = canonicalize(ds.materialize().stats()) assert ( stats == """Stage N ReadRange->RandomShuffle: executed in T @@ -309,35 +309,35 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): def test_dataset_stats_repartition(ray_start_regular_shared): ds = ray.data.range(1000, parallelism=10) ds = ds.repartition(1, shuffle=False) - stats = ds.cache().stats() + stats = ds.materialize().stats() assert "Repartition" in stats, stats def test_dataset_stats_union(ray_start_regular_shared): ds = ray.data.range(1000, parallelism=10) ds = ds.union(ds) - stats = ds.cache().stats() + stats = ds.materialize().stats() assert "Union" in stats, stats def test_dataset_stats_zip(ray_start_regular_shared): ds = ray.data.range(1000, parallelism=10) ds = ds.zip(ds) - stats = ds.cache().stats() + stats = ds.materialize().stats() assert "Zip" in stats, stats def test_dataset_stats_sort(ray_start_regular_shared): ds = ray.data.range(1000, parallelism=10) ds = ds.sort() - stats = ds.cache().stats() + stats = ds.materialize().stats() assert "SortMap" in stats, stats assert "SortReduce" in stats, stats def test_dataset_stats_from_items(ray_start_regular_shared): ds = ray.data.from_items(range(10)) - stats = ds.cache().stats() + stats = ds.materialize().stats() assert "FromItems" in stats, stats @@ -347,7 +347,7 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): ds = ray.data.range(1000, parallelism=10) ds.write_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path)).map(lambda x: x) - stats = canonicalize(ds.cache().stats()) + stats = canonicalize(ds.materialize().stats()) if context.new_execution_backend: assert ( stats @@ -382,7 +382,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): dses = ds.split_at_indices([49]) dses = [ds.map(lambda x: x + 1) for ds in dses] for ds_ in dses: - stats = canonicalize(ds_.cache().stats()) + stats = canonicalize(ds_.materialize().stats()) if context.new_execution_backend: assert ( @@ -470,7 +470,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(dummy_map_batches).cache() + ds = ds.map_batches(dummy_map_batches).materialize() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -702,7 +702,7 @@ def test_dataset_pipeline_cache_cases(ray_start_regular_shared): assert "[execution cached]" not in stats # CACHED (called cache()). - ds = ray.data.range(10).cache().repeat(2).map_batches(lambda x: x) + ds = ray.data.range(10).materialize().repeat(2).map_batches(lambda x: x) ds.take(999) stats = ds.stats() assert "[execution cached]" in stats diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index f70ef5aada46..632967613d18 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -47,14 +47,23 @@ def map_fn(block_iter): return map_fn -def test_build_streaming_topology(): +@pytest.mark.parametrize( + "verbose_progress", + [True, False], +) +def test_build_streaming_topology(verbose_progress): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) o3 = MapOperator.create(make_transform(lambda block: [b * 2 for b in block]), o2) - topo, num_progress_bars = build_streaming_topology(o3, ExecutionOptions()) + topo, num_progress_bars = build_streaming_topology( + o3, ExecutionOptions(verbose_progress=verbose_progress) + ) assert len(topo) == 3, topo - assert num_progress_bars == 3, num_progress_bars + if verbose_progress: + assert num_progress_bars == 3, num_progress_bars + else: + assert num_progress_bars == 1, num_progress_bars assert o1 in topo, topo assert not topo[o1].inqueues, topo assert topo[o1].outqueue == topo[o2].inqueues[0], topo @@ -70,14 +79,14 @@ def test_disallow_non_unique_operators(): o3 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) o4 = PhysicalOperator("test_combine", [o2, o3]) with pytest.raises(ValueError): - build_streaming_topology(o4, ExecutionOptions()) + build_streaming_topology(o4, ExecutionOptions(verbose_progress=True)) def test_process_completed_tasks(): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) - topo, _ = build_streaming_topology(o2, ExecutionOptions()) + topo, _ = build_streaming_topology(o2, ExecutionOptions(verbose_progress=True)) # Test processing output bundles. assert len(topo[o1].outqueue) == 0, topo diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index 14ac12f40cb2..39ef1ea612af 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -12,7 +12,7 @@ from ray.util.debug import log_once try: - import tqdm as real_tqdm + import tqdm.auto as real_tqdm except ImportError: real_tqdm = None @@ -238,6 +238,9 @@ def __init__(self): self.in_hidden_state = False self.num_hides = 0 self.lock = threading.RLock() + # Avoid colorizing Jupyter output, since the tqdm bar is rendered in + # ipywidgets instead of in the console. + self.should_colorize = not ray.widgets.util.in_notebook() def process_state_update(self, state: ProgressBarState) -> None: """Apply the remote progress bar state update. @@ -258,20 +261,26 @@ def _process_state_update_locked(self, state: ProgressBarState) -> None: if state["pid"] == self.pid: prefix = "" else: - prefix = "{}{}(pid={}){} ".format( - colorama.Style.DIM, - colorama.Fore.CYAN, - state.get("pid"), - colorama.Style.RESET_ALL, - ) + prefix = "(pid={}) ".format(state.get("pid")) + if self.should_colorize: + prefix = "{}{}{}{}".format( + colorama.Style.DIM, + colorama.Fore.CYAN, + prefix, + colorama.Style.RESET_ALL, + ) else: - prefix = "{}{}(pid={}, ip={}){} ".format( - colorama.Style.DIM, - colorama.Fore.CYAN, + prefix = "(pid={}, ip={}) ".format( state.get("pid"), state.get("ip"), - colorama.Style.RESET_ALL, ) + if self.should_colorize: + prefix = "{}{}{}{}".format( + colorama.Style.DIM, + colorama.Fore.CYAN, + prefix, + colorama.Style.RESET_ALL, + ) state["desc"] = prefix + state["desc"] process = self._get_or_allocate_bar_group(state) if process.has_bar(state["uuid"]): diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 57bc50b87578..eb6d5ff8b106 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -578,7 +578,10 @@ def start( cf.bold("--port"), ) + # Whether the original arguments include node_ip_address. + include_node_ip_address = False if node_ip_address is not None: + include_node_ip_address = True node_ip_address = services.resolve_ip_for_localhost(node_ip_address) resources = parse_resources_json(resources, cli_logger, cf) @@ -747,104 +750,77 @@ def start( cli_logger.newline() with cli_logger.group("Next steps"): dashboard_url = node.address_info["webui_url"] - if bootstrap_address.startswith("127.0.0.1:"): - if ray_constants.ENABLE_RAY_CLUSTER: - cli_logger.print( - "This Ray runtime only accepts connections from local host." - ) - cli_logger.print( - "To accept connections from remote hosts, " - "specify a public ip when starting" - ) - cli_logger.print( - "the head node: ray start --head --node-ip-address=." - ) - else: - cli_logger.print( - "Multi-node Ray clusters are not supported on OSX and Windows." - ) - cli_logger.print( - "If you would like to proceed anyway, restart Ray with:" - ) - cli_logger.print( - cf.bold(" ray stop"), - ) - cli_logger.print( - cf.bold(" {}=true ray start"), - ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR, - ) - cli_logger.newline() - else: + if ray_constants.ENABLE_RAY_CLUSTER: cli_logger.print("To add another node to this Ray cluster, run") # NOTE(kfstorm): Java driver rely on this line to get the address # of the cluster. Please be careful when updating this line. cli_logger.print( - cf.bold(" ray start --address='{}'"), + cf.bold(" {} ray start --address='{}'"), + f" {ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR}=1" + if ray_constants.IS_WINDOWS_OR_OSX + else "", bootstrap_address, ) - cli_logger.newline() - if ray_constants.ENABLE_RAY_CLUSTER: + + cli_logger.newline() + cli_logger.print("To connect to this Ray cluster:") + with cli_logger.indented(): + cli_logger.print("{} ray", cf.magenta("import")) cli_logger.print( - "To connect to this Ray cluster, run `ray.init()` as usual:" - ) - with cli_logger.indented(): - cli_logger.print("{} ray", cf.magenta("import")) - cli_logger.print( - "ray{}init()", - cf.magenta("."), + "ray{}init({})", + cf.magenta("."), + "_node_ip_address{}{}".format( + cf.magenta("="), cf.yellow("'" + node_ip_address + "'") ) - cli_logger.newline() - cli_logger.print( - "To connect to this Ray instance from outside of " - "the cluster, for example " - ) - cli_logger.print( - "when connecting to a remote cluster from your laptop, " - "make sure the" + if include_node_ip_address + else "", ) + + if dashboard_url: + cli_logger.newline() + cli_logger.print("To submit a Ray job using the Ray Jobs CLI:") cli_logger.print( - "dashboard {}is accessible and use the Ray Jobs API. For example:", - f"({dashboard_url}) " if dashboard_url else "", + cf.bold( + " RAY_ADDRESS='http://{}' ray job submit " + "--working-dir . " + "-- python my_script.py" + ), + dashboard_url, ) - if dashboard_url: - cli_logger.print( - cf.bold( - " RAY_ADDRESS='http://:{}' ray job submit " - "--working-dir . " - "-- python my_script.py" - ), - ray_params.dashboard_port, - ) cli_logger.newline() cli_logger.print( "See https://docs.ray.io/en/latest/cluster/running-applications" - "/job-submission/index.html" + "/job-submission/index.html " ) cli_logger.print( - "for more information on connecting to the Ray cluster from " - "a remote client." + "for more information on submitting Ray jobs to the Ray cluster." ) + + cli_logger.newline() + cli_logger.print("To terminate the Ray runtime, run") + cli_logger.print(cf.bold(" ray stop")) + + cli_logger.newline() + cli_logger.print("To view the status of the cluster, use") + cli_logger.print(" {}".format(cf.bold("ray status"))) + + if dashboard_url: cli_logger.newline() - cli_logger.print("To see the status of the cluster, use") - cli_logger.print(" {}".format(cf.bold("ray status"))) - if dashboard_url: - cli_logger.print("To monitor and debug Ray, view the dashboard at ") - cli_logger.print( - " {}".format( - cf.bold(dashboard_url), - ) + cli_logger.print("To monitor and debug Ray, view the dashboard at ") + cli_logger.print( + " {}".format( + cf.bold(dashboard_url), ) + ) + cli_logger.newline() cli_logger.print( cf.underlined( - "If connection fails, check your " + "If connection to the dashboard fails, check your " "firewall settings and " "network configuration." ) ) - cli_logger.newline() - cli_logger.print("To terminate the Ray runtime, run") - cli_logger.print(cf.bold(" ray stop")) ray_params.gcs_address = bootstrap_address else: # Start worker node. diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index 5e5829aaad9e..d2d3e6c1dd7d 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -360,10 +360,9 @@ def test_redis_not_available(monkeypatch, call_ray_stop_only): shell=True, capture_output=True, ) - assert ( - "Could not establish connection to Redis localhost:12345" in p.stderr.decode() - ) - assert "Please check /tmp/ray/session" in p.stderr.decode() + assert "Could not establish connection to Redis" in p.stderr.decode() + assert "Please check" in p.stderr.decode() + assert "gcs_server.out for details" in p.stderr.decode() assert "RuntimeError: Failed to start GCS" in p.stderr.decode() diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 818fb361e41f..f2c6d90ec5dc 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -277,10 +277,6 @@ def test_disable_usage_stats(monkeypatch, tmp_path): assert '{"usage_stats": false}' == tmp_usage_stats_config_path.read_text() -@pytest.mark.skipif( - sys.platform == "darwin" and "travis" in os.environ.get("USER", ""), - reason=("Mac builds don't provide proper locale support"), -) def test_ray_start(configure_lang, monkeypatch, tmp_path, cleanup_ray): monkeypatch.setenv("RAY_USAGE_STATS_CONFIG_PATH", str(tmp_path / "config.json")) runner = CliRunner() @@ -306,8 +302,8 @@ def test_ray_start(configure_lang, monkeypatch, tmp_path, cleanup_ray): _die_on_error(runner.invoke(scripts.stop)) - if ray.util.get_node_ip_address() == "127.0.0.1": - _check_output_via_pattern("test_ray_start_localhost.txt", result) + if ray_constants.IS_WINDOWS_OR_OSX: + _check_output_via_pattern("test_ray_start_windows_osx.txt", result) else: _check_output_via_pattern("test_ray_start.txt", result) diff --git a/python/ray/tests/test_cli_patterns/test_ray_start.txt b/python/ray/tests/test_cli_patterns/test_ray_start.txt index bddb03bb7619..92746b8478f5 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_start.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_start.txt @@ -10,25 +10,23 @@ Next steps To add another node to this Ray cluster, run ray start --address='.+' - To connect to this Ray cluster, run `ray.init\(\)` as usual: + To connect to this Ray cluster: import ray ray\.init\(\) - To connect to this Ray instance from outside of the cluster, for example - when connecting to a remote cluster from your laptop, make sure the - dashboard (.*) is accessible and use the Ray Jobs API\. For example: - RAY_ADDRESS='http://:8265' ray job submit --working-dir \. -- python my_script\.py + To submit a Ray job using the Ray Jobs CLI: + RAY_ADDRESS='http://.+:8265' ray job submit --working-dir \. -- python my_script\.py See https://docs\.ray\.io/en/latest/cluster/running-applications/job-submission/index\.html - for more information on connecting to the Ray cluster from a remote client\. + for more information on submitting Ray jobs to the Ray cluster. - To see the status of the cluster, use + To terminate the Ray runtime, run + ray stop + + To view the status of the cluster, use ray status + To monitor and debug Ray, view the dashboard at 127.0.0.1:8265 - If connection fails, check your firewall settings and network configuration. - - To terminate the Ray runtime, run - ray stop - + If connection to the dashboard fails, check your firewall settings and network configuration. diff --git a/python/ray/tests/test_cli_patterns/test_ray_start_localhost.txt b/python/ray/tests/test_cli_patterns/test_ray_start_localhost.txt deleted file mode 100644 index 949eb5c5694a..000000000000 --- a/python/ray/tests/test_cli_patterns/test_ray_start_localhost.txt +++ /dev/null @@ -1,28 +0,0 @@ -Usage stats collection is enabled by default without user confirmation because this terminal is detected to be non-interactive\..+ - -Local node IP: .+ - --------------------- -Ray runtime started. --------------------- - -Next steps - To connect to this Ray runtime from another node, run - ray start --address='.+' - This Ray runtime only accepts connections from local host. - To accept connections from remote hosts, specify a public ip when starting - the head node: ray start --head --node-ip-address=. - - Alternatively, use the following Python code: - import ray - ray\.init\(address='auto'\) - - To see the status of the cluster, use - ray status - To monitor and debug Ray, view the dashboard at - 127.0.0.1:8265 - - If connection fails, check your firewall settings and network configuration. - - To terminate the Ray runtime, run - ray stop diff --git a/python/ray/tests/test_cli_patterns/test_ray_start_windows_osx.txt b/python/ray/tests/test_cli_patterns/test_ray_start_windows_osx.txt new file mode 100644 index 000000000000..b6ea1348f10f --- /dev/null +++ b/python/ray/tests/test_cli_patterns/test_ray_start_windows_osx.txt @@ -0,0 +1,32 @@ +Usage stats collection is enabled by default without user confirmation because this terminal is detected to be non-interactive\..+ + +Local node IP: .+ + +-------------------- +Ray runtime started. +-------------------- + +Next steps + To add another node to this Ray cluster, run + RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1 ray start --address='.+' + + To connect to this Ray cluster: + import ray + ray\.init\(\) + + To submit a Ray job using the Ray Jobs CLI: + RAY_ADDRESS='http://.+:8265' ray job submit --working-dir \. -- python my_script\.py + + See https://docs\.ray\.io/en/latest/cluster/running-applications/job-submission/index\.html + for more information on submitting Ray jobs to the Ray cluster. + + To terminate the Ray runtime, run + ray stop + + To view the status of the cluster, use + ray status + + To monitor and debug Ray, view the dashboard at + 127.0.0.1:8265 + + If connection to the dashboard fails, check your firewall settings and network configuration. diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index 0ec4f745fb5e..926b7c76ec01 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -2,15 +2,20 @@ import sys import signal import threading +import json +from pathlib import Path import ray import numpy as np import pytest +import psutil import time from ray._private.test_utils import ( SignalActor, wait_for_pid_to_exit, + wait_for_condition, + run_string_as_driver_nonblocking, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM @@ -347,6 +352,104 @@ def pid(self): ray.get(t) +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path): + """ + Verify that processes created by Ray tasks and actors are + cleaned up after a Ctrl+C is sent to the driver. This is done by + creating an actor and task that each spawn a number of child + processes, sending a SIGINT to the driver process, and + verifying that all child processes are killed. + + The driver script uses a temporary JSON file to communicate + the list of PIDs that are children of the Ray worker + processes. + """ + + output_file_path = tmp_path / "leaked_pids.json" + driver_script = f""" +import ray +import json +import multiprocessing +import shutil +import time +import os + +@ray.remote +class Actor: + def create_leaked_child_process(self, num_to_leak): + print("Creating leaked process", os.getpid()) + + pids = [] + for _ in range(num_to_leak): + proc = multiprocessing.Process( + target=time.sleep, + args=(1000,), + daemon=True, + ) + proc.start() + pids.append(proc.pid) + + return pids + +@ray.remote +def task(): + print("Creating leaked process", os.getpid()) + proc = multiprocessing.Process( + target=time.sleep, + args=(1000,), + daemon=True, + ) + proc.start() + + return proc.pid + +num_to_leak_per_type = 10 + +actor = Actor.remote() +actor_leaked_pids = ray.get(actor.create_leaked_child_process.remote( + num_to_leak=num_to_leak_per_type, +)) + +task_leaked_pids = ray.get([task.remote() for _ in range(num_to_leak_per_type)]) +leaked_pids = actor_leaked_pids + task_leaked_pids + +final_file = "{output_file_path}" +tmp_file = final_file + ".tmp" +with open(tmp_file, "w") as f: + json.dump(leaked_pids, f) +shutil.move(tmp_file, final_file) + +while True: + print(os.getpid()) + time.sleep(1) + """ + + driver_proc = run_string_as_driver_nonblocking(driver_script) + + # Wait for the json file containing the child PIDS + # to be present. + wait_for_condition( + condition_predictor=lambda: Path(output_file_path).exists(), + timeout=30, + ) + + # Load the PIDs of the child processes. + with open(output_file_path, "r") as f: + pids = json.load(f) + + # Validate all children of the worker processes are in a sleeping state. + processes = [psutil.Process(pid) for pid in pids] + assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes]) + + # Valdiate children of worker process die after SIGINT. + driver_proc.send_signal(signal.SIGINT) + wait_for_condition( + condition_predictor=lambda: all([not proc.is_running() for proc in processes]), + timeout=30, + ) + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 7f0f7df07bc6..19c35898fd5c 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -9,6 +9,7 @@ from ray._private.utils import get_or_create_event_loop from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy import ray._private.gcs_utils as gcs_utils +from ray._private import ray_constants from ray._private.test_utils import ( convert_actor_state, generate_system_config_map, @@ -765,6 +766,55 @@ def f(): wait_for_pid_to_exit(gcs_server_pid, 1000) +@pytest.mark.parametrize( + "ray_start_regular_with_external_redis", + [ + generate_system_config_map( + gcs_failover_worker_reconnect_timeout=20, + gcs_rpc_server_reconnect_timeout_s=60, + gcs_server_request_timeout_seconds=10, + raylet_liveness_self_check_interval_ms=3000, + ) + ], + indirect=True, +) +def test_redis_data_loss_no_leak(ray_start_regular_with_external_redis): + @ray.remote + def create_actor(): + @ray.remote + class A: + def pid(self): + return os.getpid() + + a = A.options(lifetime="detached", name="A").remote() + ray.get(a.pid.remote()) + + ray.get(create_actor.remote()) + + ray._private.worker._global_node.kill_gcs_server() + # Delete redis + redis_addr = os.environ.get("RAY_REDIS_ADDRESS") + import redis + + ip, port = redis_addr.split(":") + cli = redis.Redis(ip, port) + cli.flushall() + raylet_proc = ray._private.worker._global_node.all_processes[ + ray_constants.PROCESS_TYPE_RAYLET + ][0].process + + def check_raylet_healthy(): + return raylet_proc.poll() is None + + wait_for_condition(lambda: check_raylet_healthy()) + + # Start GCS + ray._private.worker._global_node.start_gcs_server() + + # Waiting for raylet to become unhealthy + wait_for_condition(lambda: not check_raylet_healthy()) + + if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_task_events_2.py b/python/ray/tests/test_task_events_2.py index d0efcfac2361..f5aecac9c4bd 100644 --- a/python/ray/tests/test_task_events_2.py +++ b/python/ray/tests/test_task_events_2.py @@ -537,7 +537,8 @@ def _read_file(filepath, start, end): @pytest.mark.skipif( - sys.platform == "win32", reason="Failing on Windows. we should fix it asap" + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", ) def test_task_logs_info_basic(shutdown_only): """Test tasks (normal tasks/actor tasks) execution logging @@ -594,6 +595,10 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif( + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", +) def test_task_logs_info_disabled(shutdown_only, monkeypatch): """Test when redirect disabled, no task log info is available due to missing log file @@ -619,6 +624,10 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif( + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", +) def test_task_logs_info_running_task(shutdown_only): ray.init(num_cpus=1) diff --git a/python/ray/train/tests/test_batch_predictor.py b/python/ray/train/tests/test_batch_predictor.py index 2ec9b9f55c62..7b8175040108 100644 --- a/python/ray/train/tests/test_batch_predictor.py +++ b/python/ray/train/tests/test_batch_predictor.py @@ -114,7 +114,7 @@ def test_separate_gpu_stage(shutdown_only): num_gpus_per_worker=1, separate_gpu_stage=True, allow_gpu=True, - ).cache() + ).materialize() stats = ds.stats() assert "Stage 1 ReadRange->DummyPreprocessor:" in stats, stats assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats @@ -125,7 +125,7 @@ def test_separate_gpu_stage(shutdown_only): num_gpus_per_worker=1, separate_gpu_stage=False, allow_gpu=True, - ).cache() + ).materialize() stats = ds.stats() assert "Stage 1 ReadRange:" in stats, stats assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats @@ -148,7 +148,7 @@ def test_automatic_enable_gpu_from_num_gpus_per_worker(shutdown_only): with pytest.raises( ValueError, match="DummyPredictor does not support GPU prediction" ): - batch_predictor.predict(test_dataset, num_gpus_per_worker=1).cache() + batch_predictor.predict(test_dataset, num_gpus_per_worker=1).materialize() def test_batch_prediction(): @@ -158,7 +158,7 @@ def test_batch_prediction(): ) test_dataset = ray.data.range_table(4) - ds = batch_predictor.predict(test_dataset).cache() + ds = batch_predictor.predict(test_dataset).materialize() # Check fusion occurred. assert "ReadRange->DummyPreprocessor" in ds.stats(), ds.stats() assert ds.to_pandas().to_numpy().squeeze().tolist() == [ @@ -282,7 +282,7 @@ def test_batch_prediction_various_combination(): predictor_cls, ) - ds = batch_predictor.predict(input_dataset).cache() + ds = batch_predictor.predict(input_dataset).materialize() # Check no fusion needed since we're not doing a dataset read. assert f"Stage 1 {preprocessor.__class__.__name__}" in ds.stats(), ds.stats() assert ds.to_pandas().to_numpy().squeeze().tolist() == [ diff --git a/python/ray/tune/experimental/output.py b/python/ray/tune/experimental/output.py index 84b3dcda65ba..369f9c6cfa96 100644 --- a/python/ray/tune/experimental/output.py +++ b/python/ray/tune/experimental/output.py @@ -22,6 +22,7 @@ except ImportError: rich = None +import ray from ray._private.dict import unflattened_lookup from ray.air._internal.checkpoint_manager import _TrackedCheckpoint from ray.tune.callback import Callback @@ -81,11 +82,7 @@ class AirVerbosity(IntEnum): VERBOSE = 2 -try: - class_name = get_ipython().__class__.__name__ - IS_NOTEBOOK = True if "Terminal" not in class_name else False -except NameError: - IS_NOTEBOOK = False +IS_NOTEBOOK = ray.widgets.util.in_notebook() def get_air_verbosity() -> Optional[AirVerbosity]: diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 74aaee9f321c..9bc46281fe9f 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -58,12 +58,7 @@ "'pip install ray[rllib]'." ) -try: - class_name = get_ipython().__class__.__name__ - IS_NOTEBOOK = True if "Terminal" not in class_name else False -except NameError: - IS_NOTEBOOK = False - +IS_NOTEBOOK = ray.widgets.util.in_notebook() SKIP_RESULTS_IN_REPORT = {"config", TRIAL_ID, EXPERIMENT_TAG, DONE} diff --git a/python/ray/widgets/util.py b/python/ray/widgets/util.py index 0c6e406a189e..056d5ca0d385 100644 --- a/python/ray/widgets/util.py +++ b/python/ray/widgets/util.py @@ -186,3 +186,14 @@ def wrapped(self, *args, **kwargs): return None return wrapped + + +@DeveloperAPI +def in_notebook() -> bool: + """Return whether we are in a Jupyter notebook.""" + try: + class_name = get_ipython().__class__.__name__ + is_notebook = True if "Terminal" not in class_name else False + except NameError: + is_notebook = False + return is_notebook diff --git a/python/requirements_test.txt b/python/requirements_test.txt index 2742b6e9b7bc..be87fb41babb 100644 --- a/python/requirements_test.txt +++ b/python/requirements_test.txt @@ -91,7 +91,6 @@ markdown-it-py==1.1.0 attrs==21.4.0 importlib-metadata==4.13.0 - # For test_basic.py::test_omp_threads_set threadpoolctl==3.1.0 -numexpr==2.8.4 \ No newline at end of file +numexpr==2.8.4 diff --git a/release/air_tests/air_benchmarks/mlperf-train/tf_utils.py b/release/air_tests/air_benchmarks/mlperf-train/tf_utils.py index 4b169c0469b9..235a4e447e91 100644 --- a/release/air_tests/air_benchmarks/mlperf-train/tf_utils.py +++ b/release/air_tests/air_benchmarks/mlperf-train/tf_utils.py @@ -328,7 +328,7 @@ def build_tf_dataset( if dataset_cache: # Improve training / eval performance when data is in remote storage and # can fit into worker memory. - dataset = dataset.cache() + dataset = dataset.materialize() return process_record_dataset( dataset=dataset, diff --git a/release/nightly_tests/dataset/aggregate_benchmark.py b/release/nightly_tests/dataset/aggregate_benchmark.py index d16c0395355a..b4c81629a8c3 100644 --- a/release/nightly_tests/dataset/aggregate_benchmark.py +++ b/release/nightly_tests/dataset/aggregate_benchmark.py @@ -28,7 +28,7 @@ def run_h2oai(benchmark: Benchmark): # Number of blocks (parallelism) should be set as number of available CPUs # to get best performance. num_blocks = int(ray.cluster_resources().get("CPU", 1)) - input_ds = input_ds.repartition(num_blocks).cache() + input_ds = input_ds.repartition(num_blocks).materialize() q_list = [ (h2oai_q1, "q1"), diff --git a/release/nightly_tests/dataset/benchmark.py b/release/nightly_tests/dataset/benchmark.py index 9ce6672e38cc..0b650e72c5ec 100644 --- a/release/nightly_tests/dataset/benchmark.py +++ b/release/nightly_tests/dataset/benchmark.py @@ -47,7 +47,7 @@ def run(self, name: str, fn: Callable[..., Dataset], **fn_run_args): print(f"Running case: {name}") start_time = time.perf_counter() output_ds = fn(**fn_run_args) - output_ds.cache() + output_ds.materialize() duration = time.perf_counter() - start_time # TODO(chengsu): Record more metrics based on dataset stats. diff --git a/release/nightly_tests/dataset/inference.py b/release/nightly_tests/dataset/inference.py index b4e64b03a847..ba1e4b1d9b11 100644 --- a/release/nightly_tests/dataset/inference.py +++ b/release/nightly_tests/dataset/inference.py @@ -86,17 +86,17 @@ def infer(batch): ray_remote_args={"num_cpus": 0.5}, ) # Do a blocking map so that we can measure the download time. -ds = ds.map(lambda x: x).cache() +ds = ds.map(lambda x: x).materialize() end_download_time = time.time() print("Preprocessing...") -ds = ds.map(preprocess).cache() +ds = ds.map(preprocess).materialize() end_preprocess_time = time.time() print("Inferring...") # NOTE: set a small batch size to avoid OOM on GRAM when doing inference. ds = ds.map_batches( infer, num_gpus=0.25, batch_size=128, batch_format="pandas", compute="actors" -).cache() +).materialize() end_time = time.time() diff --git a/release/nightly_tests/dataset/iter_batches_benchmark.py b/release/nightly_tests/dataset/iter_batches_benchmark.py index a2176a08754a..2720a733e2f8 100644 --- a/release/nightly_tests/dataset/iter_batches_benchmark.py +++ b/release/nightly_tests/dataset/iter_batches_benchmark.py @@ -54,7 +54,7 @@ def run_iter_batches_benchmark(benchmark: Benchmark): "s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2018/01" ) .repartition(12) - .cache() + .materialize() ) batch_formats = ["pandas", "numpy"] @@ -73,7 +73,7 @@ def run_iter_batches_benchmark(benchmark: Benchmark): for current_format in ["pyarrow", "pandas"]: new_ds = ds.map_batches( lambda ds: ds, batch_format=current_format, batch_size=None - ).cache() + ).materialize() for new_format in ["pyarrow", "pandas", "numpy"]: for batch_size in batch_sizes: test_name = f"iter-batches-conversion-{current_format}-to-{new_format}-{batch_size}" # noqa: E501 @@ -106,7 +106,7 @@ def run_iter_batches_benchmark(benchmark: Benchmark): new_ds = ds.repartition(512) new_ds = new_ds.map_batches( lambda ds: ds, batch_format="pandas", batch_size=None - ).cache() + ).materialize() for batch_size in [32 * 1024, 64 * 1024, 256 * 1024]: test_name = f"iter-batches-block-concat-to-batch-{batch_size}" benchmark.run( diff --git a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py index dc71092f45f5..805c28f16dae 100644 --- a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py +++ b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py @@ -70,7 +70,7 @@ def add_label(batch): batch["__value__"] = label return batch - ds = ds.map_batches(add_label).cache() + ds = ds.map_batches(add_label).materialize() # Test iter_torch_batches() with default args. benchmark.run( diff --git a/release/nightly_tests/dataset/map_batches_benchmark.py b/release/nightly_tests/dataset/map_batches_benchmark.py index aa9862ec7cdd..631b3b02573f 100644 --- a/release/nightly_tests/dataset/map_batches_benchmark.py +++ b/release/nightly_tests/dataset/map_batches_benchmark.py @@ -24,7 +24,7 @@ def map_batches( ds = input_ds if is_eager_executed: - ds.cache() + ds.materialize() for _ in range(num_calls): ds = ds.map_batches( @@ -34,7 +34,7 @@ def map_batches( compute=compute, ) if is_eager_executed: - ds.cache() + ds.materialize() return ds @@ -43,7 +43,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): "s3://air-example-data/ursa-labs-taxi-data/by_year/2018/01" ) lazy_input_ds = input_ds.lazy() - input_ds.cache() + input_ds.materialize() batch_formats = ["pandas", "numpy"] batch_sizes = [1024, 2048, 4096, None] @@ -124,7 +124,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): for current_format in ["pyarrow", "pandas"]: new_input_ds = input_ds.map_batches( lambda ds: ds, batch_format=current_format, batch_size=None - ).cache() + ).materialize() for new_format in ["pyarrow", "pandas", "numpy"]: for batch_size in batch_sizes: test_name = f"map-batches-{current_format}-to-{new_format}-{batch_size}" @@ -140,7 +140,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): # Test reading multiple files. input_ds = ray.data.read_parquet( "s3://air-example-data/ursa-labs-taxi-data/by_year/2018" - ).cache() + ).materialize() for batch_format in batch_formats: for compute in ["tasks", "actors"]: diff --git a/release/nightly_tests/dataset/read_parquet_benchmark.py b/release/nightly_tests/dataset/read_parquet_benchmark.py index 4849d424c573..6e6fff795627 100644 --- a/release/nightly_tests/dataset/read_parquet_benchmark.py +++ b/release/nightly_tests/dataset/read_parquet_benchmark.py @@ -21,7 +21,7 @@ def read_parquet( use_threads=use_threads, filter=filter, columns=columns, - ).cache() + ).materialize() def run_read_parquet_benchmark(benchmark: Benchmark): diff --git a/release/nightly_tests/dataset/sort.py b/release/nightly_tests/dataset/sort.py index e45ad7e55df5..1e874fe90869 100644 --- a/release/nightly_tests/dataset/sort.py +++ b/release/nightly_tests/dataset/sort.py @@ -119,7 +119,7 @@ def make_block(count: int, num_columns: int) -> Block: ds = ds.random_shuffle() else: ds = ds.sort(key="c_0") - ds.cache() + ds.materialize() ds_stats = ds.stats() except Exception as e: exc = e diff --git a/src/mock/ray/gcs/gcs_client/accessor.h b/src/mock/ray/gcs/gcs_client/accessor.h index 4535f5455a38..04922fd036fb 100644 --- a/src/mock/ray/gcs/gcs_client/accessor.h +++ b/src/mock/ray/gcs/gcs_client/accessor.h @@ -132,6 +132,10 @@ class MockNodeInfoAccessor : public NodeInfoAccessor { AsyncDrainNode, (const NodeID &node_id, const StatusCallback &callback), (override)); + MOCK_METHOD(Status, + AsyncCheckSelfAlive, + (const std::function &callback, int64_t timeout_ms), + (override)); MOCK_METHOD(Status, AsyncGetAll, (const MultiItemCallback &callback), diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 5e9b78babf9b..3c3f95c8e09a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -789,3 +789,17 @@ RAY_CONFIG(bool, kill_idle_workers_of_terminated_job, true) // If left empty, no such attempt will be made. // Example: RAY_preload_python_modules=tensorflow,pytorch RAY_CONFIG(std::vector, preload_python_modules, {}) + +// By default, raylet send a self liveness check to GCS every 60s +RAY_CONFIG(int64_t, raylet_liveness_self_check_interval_ms, 60000) + +// Instruct the CoreWorker to kill its child processes while +// it exits. This prevents certain classes of resource leaks +// that are caused by the worker processes leaking processes. +// If a user relies on Ray's old behavior of leaking processes, +// then they can disable this behavior with +// RAY_kill_child_processes_on_worker_exit=false. We anticipate +// keeping this flag around at least until Ray 2.5. +// See https://github.com/ray-project/ray/pull/33976 for more +// info. +RAY_CONFIG(bool, kill_child_processes_on_worker_exit, true) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index a6e64f7b1b45..89e6758f18b0 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -229,7 +229,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { node_versions[message->message_type()] = message->version(); message_processor_(message); } else { - RAY_LOG_EVERY_N(WARNING, 100) + RAY_LOG_EVERY_MS(WARNING, 1000) << "Drop message received from " << NodeID::FromBinary(message->node_id()) << " because the message version " << message->version() << " is older than the local version " << node_versions[message->message_type()] @@ -286,8 +286,8 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { if (ok) { SendNext(); } else { - RAY_LOG_EVERY_N(ERROR, 100) << "Failed to send the message to: " - << NodeID::FromBinary(GetRemoteNodeID()); + RAY_LOG_EVERY_MS(ERROR, 1000) << "Failed to send the message to: " + << NodeID::FromBinary(GetRemoteNodeID()); Disconnect(); } }, @@ -302,8 +302,8 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { ReceiveUpdate(std::move(msg)); StartPull(); } else { - RAY_LOG_EVERY_N(ERROR, 100) << "Failed to read the message from: " - << NodeID::FromBinary(GetRemoteNodeID()); + RAY_LOG_EVERY_MS(ERROR, 1000) << "Failed to read the message from: " + << NodeID::FromBinary(GetRemoteNodeID()); Disconnect(); } }, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index af0a78988c00..2b001719a72b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -704,6 +704,56 @@ void CoreWorker::Disconnect( } } +void CoreWorker::KillChildProcs() { + // There are cases where worker processes can "leak" child processes. + // Basically this means that the worker process (either itself, or via + // code in a task or actor) spawned a process and did not kill it on termination. + // The process will continue living beyond the lifetime of the worker process. + // If that leaked process has expensive resources, such as a CUDA context and associated + // GPU memory, then those resources will never be cleaned until something else kills the + // process. + // + // This function lists all processes that are direct children of the current worker + // process, then kills them. This currently only works for the "happy-path"; worker + // process crashes will still leak processes. + // TODO(cade) Use more robust method to catch leaked processes even in worker crash + // scenarios (subreaper). + + if (!RayConfig::instance().kill_child_processes_on_worker_exit()) { + RAY_LOG(DEBUG) + << "kill_child_processes_on_worker_exit is not true, skipping KillChildProcs"; + return; + } + + RAY_LOG(DEBUG) << "kill_child_processes_on_worker_exit true, KillChildProcs"; + auto maybe_child_procs = GetAllProcsWithPpid(GetPID()); + + // Enumerating child procs is not supported on this platform. + if (!maybe_child_procs) { + RAY_LOG(DEBUG) << "Killing leaked procs not supported on this platform."; + return; + } + + const auto &child_procs = *maybe_child_procs; + const auto child_procs_str = absl::StrJoin(child_procs, ","); + RAY_LOG(INFO) << "Try killing all child processes of this worker as it exits. " + << "Child process pids: " << child_procs_str; + + for (const auto &child_pid : child_procs) { + auto maybe_error_code = KillProc(child_pid); + RAY_CHECK(maybe_error_code) + << "Expected this path to only be called when KillProc is supported."; + auto error_code = *maybe_error_code; + + RAY_LOG(INFO) << "Kill result for child pid " << child_pid << ": " + << error_code.message() << ", bool " << (bool)error_code; + if (error_code) { + RAY_LOG(WARNING) << "Unable to kill potentially leaked process " << child_pid + << ": " << error_code.message(); + } + } +} + void CoreWorker::Exit( const rpc::WorkerExitType exit_type, const std::string &detail, @@ -734,6 +784,7 @@ void CoreWorker::Exit( creation_task_exception_pb_bytes]() { rpc::DrainAndResetServerCallExecutor(); Disconnect(exit_type, detail, creation_task_exception_pb_bytes); + KillChildProcs(); Shutdown(); }, "CoreWorker.Shutdown"); @@ -778,6 +829,9 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type, RAY_LOG(WARNING) << "Force exit the process. " << " Details: " << detail; Disconnect(exit_type, detail); + + KillChildProcs(); + // NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup. // `exit()` will destruct static objects in an incorrect order, which will lead to // core dumps. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 41274c81f25e..088fd620644e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1212,6 +1212,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param exit_detail The detailed reason for a given exit. void ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail); + /// Forcefully kill child processes. User code running in actors or tasks + /// can spawn processes that don't get terminated. If those processes + /// own resources (such as GPU memory), then those resources will become + /// unavailable until the process is killed. + /// This is called during shutdown of the process. + void KillChildProcs(); + /// Register this worker or driver to GCS. void RegisterToGcs(int64_t worker_launch_time_ms, int64_t worker_launched_time_ms); diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index a7a72ff13bdd..ccb225a62931 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -470,6 +470,27 @@ Status NodeInfoAccessor::AsyncRegister(const rpc::GcsNodeInfo &node_info, return Status::OK(); } +Status NodeInfoAccessor::AsyncCheckSelfAlive( + const std::function &callback, int64_t timeout_ms = -1) { + rpc::CheckAliveRequest request; + auto node_addr = local_node_info_.node_manager_address() + ":" + + std::to_string(local_node_info_.node_manager_port()); + RAY_CHECK(callback != nullptr); + request.add_raylet_address(node_addr); + client_impl_->GetGcsRpcClient().CheckAlive( + request, + [callback](auto status, const auto &reply) { + if (status.ok()) { + RAY_CHECK(reply.raylet_alive().size() == 1); + callback(status, reply.raylet_alive()[0]); + } else { + callback(status, true); + } + }, + timeout_ms); + return Status::OK(); +} + Status NodeInfoAccessor::AsyncDrainNode(const NodeID &node_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Draining node, node id = " << node_id; diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index a08b6bcd1b54..93697d11cf0c 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -305,6 +305,14 @@ class NodeInfoAccessor { virtual Status AsyncRegister(const rpc::GcsNodeInfo &node_info, const StatusCallback &callback); + /// Send a check alive request to GCS for the liveness of some node. + /// + /// \param callback The callback function once the request is finished. + /// \param timeout_ms The timeout for this request. + /// \return Status + virtual Status AsyncCheckSelfAlive(const std::function &callback, + int64_t timeout_ms); + /// Drain (remove the information of the node from the cluster) the local node from GCS /// asynchronously. /// diff --git a/src/ray/gcs/redis_client.cc b/src/ray/gcs/redis_client.cc index d4cf0c0702db..60a905bc9d26 100644 --- a/src/ray/gcs/redis_client.cc +++ b/src/ray/gcs/redis_client.cc @@ -14,7 +14,6 @@ #include "ray/gcs/redis_client.h" -#include "absl/strings/str_split.h" #include "ray/common/ray_config.h" #include "ray/gcs/redis_context.h" @@ -134,56 +133,7 @@ static void GetRedisShards(redisContext *context, freeReplyObject(reply); } -RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) { - instrumented_io_context io_context; - auto tmp_context = std::make_shared(io_context); - RAY_CHECK_OK(tmp_context->Connect(options_.server_ip_, - options_.server_port_, - /*sharding=*/options_.enable_sharding_conn_, - /*password=*/options_.password_, - /*enable_ssl=*/options_.enable_ssl_)); - // Firstly, we need to find the master node - // Replica information contains data as the following format: - // # Replication - // role:master - // connected_slaves:0 - // master_failover_state:no-failover - // master_replid:07a4734361f1c81d019d0a0449310e7bd76d459c - // master_replid2:0000000000000000000000000000000000000000 - // master_repl_offset:0 - // second_repl_offset:-1 - // repl_backlog_active:0 - // repl_backlog_size:1048576 - // repl_backlog_first_byte_offset:0 - // repl_backlog_histlen:0 - // Each line end with \r\n - auto reply = tmp_context->RunArgvSync(std::vector{"INFO", "REPLICATION"}); - RAY_CHECK(reply && !reply->IsNil()) << "Failed to get Redis replication info"; - auto replication_info = reply->ReadAsString(); - auto parts = absl::StrSplit(replication_info, "\r\n"); - - for (auto &part : parts) { - if (part.empty() || part[0] == '#') { - continue; - } - std::vector kv = absl::StrSplit(part, ":"); - RAY_CHECK(kv.size() == 2); - if (kv[0] == "role" && kv[1] == "master") { - leader_ip_ = options_.server_ip_; - leader_port_ = options_.server_port_; - break; - } - - if (kv[0] == "master_host") { - leader_ip_ = kv[1]; - } - if (kv[0] == "master_port") { - leader_port_ = std::stoi(kv[1]); - } - } - RAY_LOG(INFO) << "Find redis leader: " << leader_ip_ << ":" << leader_port_; - RAY_CHECK(!leader_ip_.empty() && leader_port_ != 0) << "Failed to get leader info"; -} +RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {} Status RedisClient::Connect(instrumented_io_context &io_service) { std::vector io_services; @@ -195,15 +145,15 @@ Status RedisClient::Connect(std::vector io_services) RAY_CHECK(!is_connected_); RAY_CHECK(!io_services.empty()); - if (leader_ip_.empty()) { + if (options_.server_ip_.empty()) { RAY_LOG(ERROR) << "Failed to connect, redis server address is empty."; return Status::Invalid("Redis server address is invalid!"); } primary_context_ = std::make_shared(*io_services[0]); - RAY_CHECK_OK(primary_context_->Connect(leader_ip_, - leader_port_, + RAY_CHECK_OK(primary_context_->Connect(options_.server_ip_, + options_.server_port_, /*sharding=*/options_.enable_sharding_conn_, /*password=*/options_.password_, /*enable_ssl=*/options_.enable_ssl_)); @@ -216,8 +166,8 @@ Status RedisClient::Connect(std::vector io_services) GetRedisShards(primary_context_->sync_context(), &addresses, &ports); if (addresses.empty()) { RAY_CHECK(ports.empty()); - addresses.push_back(leader_ip_); - ports.push_back(leader_port_); + addresses.push_back(options_.server_ip_); + ports.push_back(options_.server_port_); } for (size_t i = 0; i < addresses.size(); ++i) { @@ -235,8 +185,8 @@ Status RedisClient::Connect(std::vector io_services) } else { shard_contexts_.push_back(std::make_shared(*io_services[0])); // Only async context is used in sharding context, so wen disable the other two. - RAY_CHECK_OK(shard_contexts_[0]->Connect(leader_ip_, - leader_port_, + RAY_CHECK_OK(shard_contexts_[0]->Connect(options_.server_ip_, + options_.server_port_, /*sharding=*/true, /*password=*/options_.password_, /*enable_ssl=*/options_.enable_ssl_)); diff --git a/src/ray/gcs/redis_client.h b/src/ray/gcs/redis_client.h index ccf7a43b55fb..b2b540886616 100644 --- a/src/ray/gcs/redis_client.h +++ b/src/ray/gcs/redis_client.h @@ -108,8 +108,6 @@ class RedisClient { std::unique_ptr asio_async_auxiliary_client_; // The following context writes everything to the primary shard std::shared_ptr primary_context_; - std::string leader_ip_; - int32_t leader_port_ = 0; }; } // namespace gcs diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 63be3bde450f..361d40021744 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -26,6 +26,7 @@ extern "C" { } // TODO(pcm): Integrate into the C++ tree. +#include "absl/strings/str_split.h" #include "ray/common/ray_config.h" namespace { @@ -269,7 +270,9 @@ RedisContext::RedisContext(instrumented_io_context &io_service) << redisSSLContextGetError(ssl_error); } -RedisContext::~RedisContext() { +RedisContext::~RedisContext() { Disconnect(); } + +void RedisContext::Disconnect() { if (context_) { redisFree(context_); context_ = nullptr; @@ -278,6 +281,7 @@ RedisContext::~RedisContext() { redisFreeSSLContext(ssl_context_); ssl_context_ = nullptr; } + redis_async_context_.reset(); } Status AuthenticateRedis(redisContext *context, const std::string &password) { @@ -384,15 +388,92 @@ Status RedisContext::PingPort(const std::string &address, int port) { address, port, redisConnect, static_cast(nullptr)); } +void ValidateRedisDB(RedisContext &context) { + auto reply = context.RunArgvSync(std::vector{"INFO", "CLUSTER"}); + // cluster_state:ok + // cluster_slots_assigned:16384 + // cluster_slots_ok:16384 + // cluster_slots_pfail:0 + // cluster_size:1 + RAY_CHECK(reply && !reply->IsNil()) << "Failed to get Redis cluster info"; + auto cluster_info = reply->ReadAsString(); + + std::vector parts = absl::StrSplit(cluster_info, "\r\n"); + bool cluster_mode = false; + int cluster_size = 0; + + // Check the cluster status first + for (const auto &part : parts) { + if (part.empty() || part[0] == '#') { + // it's a comment + continue; + } + std::vector kv = absl::StrSplit(part, ":"); + RAY_CHECK(kv.size() == 2); + if (kv[0] == "cluster_state") { + if (kv[1] == "ok") { + cluster_mode = true; + } else if (kv[1] == "fail") { + RAY_LOG(FATAL) + << "The Redis cluster is not healthy. cluster_state shows failed status: " + << cluster_info << "." + << " Please check Redis cluster used."; + } + } + if (kv[0] == "cluster_size") { + cluster_size = std::stoi(kv[1]); + } + } + + if (cluster_mode) { + RAY_CHECK(cluster_size == 1) + << "Ray currently doesn't support Redis Cluster with more than one shard. "; + } +} + +std::vector ResolveDNS(const std::string &address, int port) { + using namespace boost::asio; + io_context ctx; + ip::tcp::resolver resolver(ctx); + ip::tcp::resolver::iterator iter = resolver.resolve(address, std::to_string(port)); + ip::tcp::resolver::iterator end; + std::vector ip_addresses; + while (iter != end) { + ip::tcp::endpoint endpoint = *iter++; + ip_addresses.push_back(endpoint.address().to_string()); + } + return ip_addresses; +} + Status RedisContext::Connect(const std::string &address, int port, bool sharding, const std::string &password, bool enable_ssl) { + // Connect to the leader of the Redis cluster: + // 1. Resolve the ip address from domain name. + // It might return multiple ip addresses + // 2. Connect to the first ip address. + // 3. Validate the Redis cluster to make sure it's configured in the way + // Ray accept: + // - If it's cluster mode redis, only 1 shard in the cluster. + // - Make sure the cluster is healthy. + // 4. Send a dummy delete and check the return. + // - If return OK, connection is finished. + // - Otherwise, make sure it's MOVED error. And we'll get the leader + // address from the error message. Re-run this function with the + // right leader address. + RAY_CHECK(!context_); RAY_CHECK(!redis_async_context_); + // Fetch the ip address from the address. It might return multiple + // addresses and only the first one will be used. + auto ip_addresses = ResolveDNS(address, port); + RAY_CHECK(!ip_addresses.empty()) + << "Failed to resolve DNS for " << address << ":" << port; + + RAY_CHECK_OK(ConnectWithRetries(ip_addresses[0], port, redisConnect, &context_)); - RAY_CHECK_OK(ConnectWithRetries(address, port, redisConnect, &context_)); if (enable_ssl) { RAY_CHECK(ssl_context_ != nullptr); RAY_CHECK(redisInitiateSSLWithContext(context_, ssl_context_) == REDIS_OK) @@ -412,6 +493,39 @@ Status RedisContext::Connect(const std::string &address, redis_async_context_.reset(new RedisAsyncContext(async_context)); SetDisconnectCallback(redis_async_context_.get()); + // Ray has some restrictions for RedisDB. Validate it here. + ValidateRedisDB(*this); + + // Find the true leader + std::vector argv; + std::vector argc; + std::vector cmds = {"DEL", "DUMMY"}; + for (const auto &arg : cmds) { + argv.push_back(arg.data()); + argc.push_back(arg.size()); + } + + auto redis_reply = reinterpret_cast( + ::redisCommandArgv(context_, cmds.size(), argv.data(), argc.data())); + + if (redis_reply->type == REDIS_REPLY_ERROR) { + // This should be a MOVED error + // MOVED 14946 10.xx.xx.xx:7001 + std::string error_msg(redis_reply->str, redis_reply->len); + freeReplyObject(redis_reply); + std::vector parts = absl::StrSplit(error_msg, " "); + RAY_CHECK(parts[0] == "MOVED" && parts.size() == 3) + << "Setup Redis cluster failed in the dummy deletion: " << error_msg; + std::vector ip_port = absl::StrSplit(parts[2], ":"); + RAY_CHECK(ip_port.size() == 2); + + Disconnect(); + // Connect to the true leader. + return Connect(ip_port[0], std::stoi(ip_port[1]), sharding, password, enable_ssl); + } else { + freeReplyObject(redis_reply); + } + return Status::OK(); } @@ -460,8 +574,6 @@ Status RedisContext::RunArgvAsync(const std::vector &args, void RedisContext::FreeRedisReply(void *reply) { return freeReplyObject(reply); } -int RedisContext::GetRedisError(redisContext *context) { return context->err; } - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index f3fc9328a1f1..0287d1ef7b79 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -89,6 +89,9 @@ class CallbackReply { /// Reply data if reply_type_ is REDIS_REPLY_STRING. std::string string_reply_; + /// Reply data if reply_type_ is REDIS_REPLY_ERROR. + std::string error_reply_; + /// Reply data if reply_type_ is REDIS_REPLY_ARRAY. /// Represent the reply of StringArray or ScanArray. std::vector> string_array_reply_; @@ -178,6 +181,9 @@ class RedisContext { const std::string &password, bool enable_ssl = false); + /// Disconnect from the server. + void Disconnect(); + /// Run an arbitrary Redis command synchronously. /// /// \param args The vector of command args to pass to Redis. @@ -204,6 +210,8 @@ class RedisContext { instrumented_io_context &io_service() { return io_service_; } + std::pair GetLeaderAddress(); + private: // These functions avoid problems with dependence on hiredis headers with clang-cl. static int GetRedisError(redisContext *context); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a8b8a1a7ddd9..9ce5dcc963cb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -541,6 +541,36 @@ ray::Status NodeManager::RegisterGcs() { RayConfig::instance().raylet_check_gc_period_milliseconds(), "NodeManager.CheckGC"); } + // Raylet periodically check whether it's alive in GCS. + // For failure cases, GCS might think this raylet dead, but this + // raylet still think it's alive. This could happen when the cluster setup is wrong, + // for example, there is data loss in the DB. + periodical_runner_.RunFnPeriodically( + [this] { + // Flag to see whether a request is running. + static bool checking = false; + if (checking) { + return; + } + checking = true; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncCheckSelfAlive( + // capture checking ptr here because vs17 fail to compile + [checking_ptr = &checking](auto status, auto alive) mutable { + if (status.ok()) { + if (!alive) { + // GCS think this raylet is dead. Fail the node + RAY_LOG(FATAL) + << "GCS consider this node to be dead. This may happen when " + << "GCS is not backed by a DB and restarted or there is data loss " + << "in the DB."; + } + *checking_ptr = false; + } + }, + /* timeout_ms = */ 30000)); + }, + RayConfig::instance().raylet_liveness_self_check_interval_ms(), + "NodeManager.GcsCheckAlive"); return ray::Status::OK(); } @@ -1905,10 +1935,6 @@ void NodeManager::HandleCommitBundleResources( RAY_LOG(DEBUG) << "Request to commit resources for bundles: " << GetDebugStringForBundles(bundle_specs); placement_group_resource_manager_->CommitBundles(bundle_specs); - if (RayConfig::instance().use_ray_syncer()) { - // To reduce the lag, we trigger a broadcasting immediately. - RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW)); - } send_reply_callback(Status::OK(), nullptr, nullptr); cluster_task_manager_->ScheduleAndDispatchTasks(); @@ -1949,10 +1975,6 @@ void NodeManager::HandleCancelResourceReserve( // Return bundle resources. placement_group_resource_manager_->ReturnBundle(bundle_spec); - if (RayConfig::instance().use_ray_syncer()) { - // To reduce the lag, we trigger a broadcasting immediately. - RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW)); - } cluster_task_manager_->ScheduleAndDispatchTasks(); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index b3a89acde266..a3e8d6bb6055 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -636,6 +636,87 @@ bool IsProcessAlive(pid_t pid) { #endif } +#if defined(__linux__) +static inline std::error_code KillProcLinux(pid_t pid) { + std::error_code error; + if (kill(pid, SIGKILL) != 0) { + error = std::error_code(errno, std::system_category()); + } + return error; +} +#endif + +std::optional KillProc(pid_t pid) { +#if defined(__linux__) + return {KillProcLinux(pid)}; +#else + return std::nullopt; +#endif +} + +#if defined(__linux__) +static inline std::vector GetAllProcsWithPpidLinux(pid_t parent_pid) { + std::vector child_pids; + + // Iterate over all files in the /proc directory, looking for directories. + // See `man proc` for information on the directory structure. + // Directories with only digits in their name correspond to processes in the process + // table. We read in the status of each such process and parse the parent PID. If the + // process parent PID is equal to parent_pid, then we add it to the vector to be + // returned. Ideally, we use a library for this, but at the time of writing one is not + // available in Ray C++. + + std::filesystem::directory_iterator dir(kProcDirectory); + for (const auto &file : dir) { + if (!file.is_directory()) { + continue; + } + + // Determine if the directory name consists of only digits (means it's a PID). + const auto filename = file.path().filename().string(); + bool file_name_is_only_digit = + std::all_of(filename.begin(), filename.end(), ::isdigit); + if (!file_name_is_only_digit) { + continue; + } + + // If so, open the status file for reading. + pid_t pid = std::stoi(filename); + std::ifstream status_file(file.path() / "status"); + if (!status_file.is_open()) { + continue; + } + + // Scan for the line that starts with the ppid key. + std::string line; + const std::string key = "PPid:"; + while (std::getline(status_file, line)) { + const auto substr = line.substr(0, key.size()); + if (substr != key) { + continue; + } + + // We found it, read and parse the PPID. + pid_t ppid = std::stoi(line.substr(substr.size())); + if (ppid == parent_pid) { + child_pids.push_back(pid); + } + break; + } + } + + return child_pids; +} +#endif + +std::optional> GetAllProcsWithPpid(pid_t parent_pid) { +#if defined(__linux__) + return {GetAllProcsWithPpidLinux(parent_pid)}; +#else + return std::nullopt; +#endif +} + } // namespace ray namespace std { diff --git a/src/ray/util/process.h b/src/ray/util/process.h index 2e79df06d2bf..d5e08d009d2a 100644 --- a/src/ray/util/process.h +++ b/src/ray/util/process.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -120,6 +121,20 @@ bool IsParentProcessAlive(); bool IsProcessAlive(pid_t pid); +static constexpr char kProcDirectory[] = "/proc"; + +// Platform-specific kill for the specified process identifier. +// Currently only supported on Linux. Returns nullopt for other platforms. +std::optional KillProc(pid_t pid); + +// Platform-specific utility to find the process IDs of all processes +// that have the specified parent_pid as their parent. +// In other words, find all immediate children of the specified process +// id. +// +// Currently only supported on Linux. Returns nullopt on other platforms. +std::optional> GetAllProcsWithPpid(pid_t parent_pid); + } // namespace ray // We only define operators required by the standard library (==, hash): diff --git a/src/ray/util/util_test.cc b/src/ray/util/util_test.cc index c309e40c550a..9be4bf110c95 100644 --- a/src/ray/util/util_test.cc +++ b/src/ray/util/util_test.cc @@ -21,6 +21,7 @@ #include #include +#include "gmock/gmock.h" #include "gtest/gtest.h" #include "ray/util/logging.h" #include "ray/util/process.h" @@ -205,6 +206,41 @@ TEST(UtilTest, IsProcessAlive) { RAY_CHECK(!IsProcessAlive(pid)); } +TEST(UtilTest, GetAllProcsWithPpid) { +#if defined(__linux__) + // Verify correctness by spawning several child processes, + // then asserting that each PID is present in the output. + + namespace bp = boost::process; + + std::vector actual_child_procs; + + for (int i = 0; i < 10; ++i) { + actual_child_procs.push_back(bp::child("bash")); + } + + std::optional> maybe_child_procs = GetAllProcsWithPpid(GetPID()); + + // Assert optional has value. + ASSERT_EQ(static_cast(maybe_child_procs), true); + + // Assert each actual process ID is contained in the returned vector. + auto child_procs = *maybe_child_procs; + for (auto &child_proc : actual_child_procs) { + pid_t pid = child_proc.id(); + EXPECT_THAT(child_procs, ::testing::Contains(pid)); + } + + // Clean up each child proc. + for (auto &child_proc : actual_child_procs) { + child_proc.join(); + } +#else + auto result = GetAllProcsWithPpid(1); + ASSERT_EQ(result, std::nullopt); +#endif +} + } // namespace ray int main(int argc, char **argv) {