From bb87ad6cf0915fcf439932f1adbc568dea141f37 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 14:16:51 -0700 Subject: [PATCH 1/9] annotate everything --- ci/lint/check_api_annotations.py | 2 +- python/ray/air/train/data_parallel_trainer.py | 4 +- python/ray/train/__init__.py | 2 +- python/ray/train/_internal/__init__.py | 0 .../ray/train/{ => _internal}/accelerator.py | 0 .../ray/train/_internal/backend_executor.py | 557 ++++++++++++++++ .../checkpoint.py} | 4 +- .../results_preprocessors/__init__.py | 8 +- .../aggregate/__init__.py | 2 +- .../aggregate/aggregate_fn.py | 2 +- .../aggregate/aggregate_preprocessor.py | 11 +- .../aggregate/aggregate_utils.py | 0 .../results_preprocessors/index.py | 5 +- .../results_preprocessors/keys.py | 5 +- .../results_preprocessors/preprocessor.py | 4 - python/ray/train/{ => _internal}/session.py | 259 +------- python/ray/train/{ => _internal}/utils.py | 0 .../ray/train/{ => _internal}/worker_group.py | 0 python/ray/train/backend.py | 555 +--------------- python/ray/train/callbacks/callback.py | 5 +- python/ray/train/callbacks/logging.py | 19 +- python/ray/train/callbacks/print.py | 3 +- python/ray/train/callbacks/profile.py | 9 +- python/ray/train/horovod/__init__.py | 12 + .../train/{horovod.py => horovod/config.py} | 93 ++- python/ray/train/tensorflow/__init__.py | 12 + .../{tensorflow.py => tensorflow/config.py} | 40 +- .../ray/train/tensorflow/train_loop_utils.py | 24 + python/ray/train/tests/test_backend.py | 8 +- python/ray/train/tests/test_callbacks.py | 6 +- python/ray/train/tests/test_minimal.py | 2 +- .../train/tests/test_results_preprocessors.py | 2 +- python/ray/train/tests/test_session.py | 2 +- python/ray/train/tests/test_trainer.py | 5 +- python/ray/train/tests/test_tune.py | 2 +- python/ray/train/tests/test_utils.py | 2 +- python/ray/train/tests/test_worker_group.py | 2 +- python/ray/train/torch/__init__.py | 28 + python/ray/train/torch/config.py | 180 ++++++ .../{torch.py => torch/train_loop_utils.py} | 598 +++++++----------- python/ray/train/train_loop_utils.py | 263 ++++++++ python/ray/train/trainer.py | 24 +- 42 files changed, 1412 insertions(+), 1349 deletions(-) create mode 100644 python/ray/train/_internal/__init__.py rename python/ray/train/{ => _internal}/accelerator.py (100%) create mode 100644 python/ray/train/_internal/backend_executor.py rename python/ray/train/{_checkpoint.py => _internal/checkpoint.py} (98%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/__init__.py (65%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/aggregate/__init__.py (79%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/aggregate/aggregate_fn.py (97%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/aggregate/aggregate_preprocessor.py (95%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/aggregate/aggregate_utils.py (100%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/index.py (90%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/keys.py (86%) rename python/ray/train/{callbacks => _internal}/results_preprocessors/preprocessor.py (95%) rename python/ray/train/{ => _internal}/session.py (56%) rename python/ray/train/{ => _internal}/utils.py (100%) rename python/ray/train/{ => _internal}/worker_group.py (100%) create mode 100644 python/ray/train/horovod/__init__.py rename python/ray/train/{horovod.py => horovod/config.py} (84%) create mode 100644 python/ray/train/tensorflow/__init__.py rename python/ray/train/{tensorflow.py => tensorflow/config.py} (51%) create mode 100644 python/ray/train/tensorflow/train_loop_utils.py create mode 100644 python/ray/train/torch/__init__.py create mode 100644 python/ray/train/torch/config.py rename python/ray/train/{torch.py => torch/train_loop_utils.py} (75%) create mode 100644 python/ray/train/train_loop_utils.py diff --git a/ci/lint/check_api_annotations.py b/ci/lint/check_api_annotations.py index a827d0ba74c8..58bf3b8a12ed 100755 --- a/ci/lint/check_api_annotations.py +++ b/ci/lint/check_api_annotations.py @@ -88,8 +88,8 @@ def verify(symbol, scanned, ok, output, prefix=None): verify(ray.rllib, set(), ok, output) verify(ray.air, set(), ok, output) + verify(ray.train, set(), ok, output) # TODO(ekl) enable it for all modules. - # verify(ray.train, set(), ok, output) # verify(ray.serve, set(), ok, output) # verify(ray.tune, set(), ok, output) # verify(ray, set(), ok, output) diff --git a/python/ray/air/train/data_parallel_trainer.py b/python/ray/air/train/data_parallel_trainer.py index 89fe7ed014d3..c844bdae2ce9 100644 --- a/python/ray/air/train/data_parallel_trainer.py +++ b/python/ray/air/train/data_parallel_trainer.py @@ -26,8 +26,8 @@ from ray.air.checkpoint import Checkpoint from ray.air.train.data_parallel_ingest import _DataParallelIngestSpec from ray.train import BackendConfig, TrainingIterator -from ray.train.backend import BackendExecutor -from ray.train._checkpoint import TuneCheckpointManager +from ray.train._internal.backend_executor import BackendExecutor +from ray.train._internal.checkpoint import TuneCheckpointManager from ray.train.utils import construct_train_func from ray.util.annotations import DeveloperAPI from ray.util.ml_utils.checkpoint_manager import CheckpointStrategy, _TrackedCheckpoint diff --git a/python/ray/train/__init__.py b/python/ray/train/__init__.py index e84d4561173f..ab8c40b70db7 100644 --- a/python/ray/train/__init__.py +++ b/python/ray/train/__init__.py @@ -1,6 +1,6 @@ from ray.train.backend import BackendConfig from ray.train.callbacks import TrainingCallback -from ray.train.session import ( +from ray.train.train_loop_utils import ( get_dataset_shard, local_rank, load_checkpoint, diff --git a/python/ray/train/_internal/__init__.py b/python/ray/train/_internal/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/train/accelerator.py b/python/ray/train/_internal/accelerator.py similarity index 100% rename from python/ray/train/accelerator.py rename to python/ray/train/_internal/accelerator.py diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py new file mode 100644 index 000000000000..55bc688378be --- /dev/null +++ b/python/ray/train/_internal/backend_executor.py @@ -0,0 +1,557 @@ +import logging +import os +from collections import defaultdict +from typing import Callable, List, Optional, Dict, Type, Tuple, TypeVar + +import ray +from ray.exceptions import RayActorError +from ray.ray_constants import env_integer +from ray.train.constants import ( + ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, + ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, + TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, + TRAIN_ENABLE_WORKER_SPREAD_ENV, +) +from ray.train.backend import BackendConfig +from ray.train.impl.dataset_spec import _RayDatasetSpec +from ray.train._internal.session import TrainingResult +from ray.train._internal.session import init_session, get_session, shutdown_session +from ray.train._internal.utils import check_for_failure +from ray.train._internal.worker_group import WorkerGroup +from ray.util.placement_group import get_current_placement_group, remove_placement_group + +T = TypeVar("T") + +logger = logging.getLogger(__name__) + + +class TrainBackendError(Exception): + """Errors with BackendExecutor that should not be exposed to user.""" + + +class TrainingWorkerError(Exception): + """Raised if a worker fails during training.""" + + +class BackendExecutor: + """Main execution class for training backends. + + This class holds a worker group and is responsible for executing the + training function on the workers, and collecting intermediate results + from ``train.report()`` and ``train.checkpoint()``. + + Args: + backend_config: The configurations for this + specific backend. + num_workers: Number of workers to use for training. + num_cpus_per_worker: Number of CPUs to use per worker. + num_gpus_per_worker: Number of GPUs to use per worker. + additional_resources_per_worker (Optional[Dict[str, float]]): + Dictionary specifying the extra resources that will be + requested for each worker in addition to ``num_cpus_per_worker`` + and ``num_gpus_per_worker``. + max_retries: Number of retries when Ray actors fail. + Defaults to 3. Set to -1 for unlimited retries. + """ + + def __init__( + self, + backend_config: BackendConfig, + num_workers: int = 1, + num_cpus_per_worker: float = 1, + num_gpus_per_worker: float = 0, + additional_resources_per_worker: Optional[Dict[str, float]] = None, + max_retries: int = 3, + ): + self._backend_config = backend_config + self._backend = backend_config.backend_cls() + self._num_workers = num_workers + self._num_cpus_per_worker = num_cpus_per_worker + self._num_gpus_per_worker = num_gpus_per_worker + self._additional_resources_per_worker = additional_resources_per_worker + self._max_failures = max_retries + if self._max_failures < 0: + self._max_failures = float("inf") + self._num_failures = 0 + self._initialization_hook = None + self._placement_group = None + + self.worker_group = InactiveWorkerGroup() + self.dataset_shards = None + + def start( + self, + initialization_hook: Optional[Callable[[], None]] = None, + train_cls: Optional[Type] = None, + train_cls_args: Optional[Tuple] = None, + train_cls_kwargs: Optional[Dict] = None, + ): + """Starts the worker group.""" + self._create_placement_group() + placement_group = self._placement_group or "default" + self.worker_group = WorkerGroup( + num_workers=self._num_workers, + num_cpus_per_worker=self._num_cpus_per_worker, + num_gpus_per_worker=self._num_gpus_per_worker, + additional_resources_per_worker=self._additional_resources_per_worker, + actor_cls=train_cls, + actor_cls_args=train_cls_args, + actor_cls_kwargs=train_cls_kwargs, + placement_group=placement_group, + ) + try: + if initialization_hook: + self._initialization_hook = initialization_hook + self.worker_group.execute(initialization_hook) + + share_cuda_visible_devices_enabled = bool( + env_integer( + ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, + self._backend.share_cuda_visible_devices, + ) + ) + + if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: + self._share_cuda_visible_devices() + self._backend.on_start(self.worker_group, self._backend_config) + except RayActorError as exc: + logger.exception(str(exc)) + logger.warning( + "Failure occurred during startup. Restarting all workers and " + "attempting to startup again." + ) + self._increment_failures() + self._restart() + + def _create_placement_group(self): + """Creates a placement group if it does not exist. + + If a placement group is already detected (Tune) this will be a no-op. + + By default the placement group will be created with PACK strategy. + This is optimized for colocating GPUs on a minimal number of nodes. + This behavior can be overridden to use the SPREAD strategy by defining + ``TRAIN_ENABLE_WORKER_SPREAD_ENV`` + + If a placement group is created it will be stored as + self._placement_group. + """ + current_placement_group = get_current_placement_group() + should_capture_child_tasks_in_placement_group = ( + ray.worker.global_worker.should_capture_child_tasks_in_placement_group + ) + should_create_placement_group = ( + current_placement_group is None + or not should_capture_child_tasks_in_placement_group + ) + + if should_create_placement_group: + additional_resources_per_worker = ( + self._additional_resources_per_worker or {} + ) + bundle = { + "CPU": self._num_cpus_per_worker, + "GPU": self._num_gpus_per_worker, + **additional_resources_per_worker, + } + bundles = [bundle.copy() for _ in range(self._num_workers)] + + use_spread = bool(env_integer(TRAIN_ENABLE_WORKER_SPREAD_ENV, 0)) + strategy = "SPREAD" if use_spread else "PACK" + + placement_group = ray.util.placement_group(bundles, strategy=strategy) + logger.debug("Waiting for placement group to start.") + timeout = env_integer(TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, 100) + ready, _ = ray.wait([placement_group.ready()], timeout=timeout) + if ready: + logger.debug("Placement group has started.") + else: + raise TimeoutError( + "Placement group creation timed out. Make sure your " + "cluster either has enough resources or use an " + "autoscaling cluster. If you are running on a cluster, " + "make sure you specify an address in `ray.init()`, for example, " + '`ray.init("auto")`. You can also increase the timeout by setting ' + "the TRAIN_PLACEMENT_GROUP_TIMEOUT_S environment variable. " + "Current resources available: {}, resources requested by the " + "placement group: {}".format( + ray.available_resources(), placement_group.bundle_specs + ) + ) + self._placement_group = placement_group + + def _share_cuda_visible_devices(self): + """Sets CUDA_VISIBLE_DEVICES on all workers. + + For each worker, CUDA_VISIBLE_DEVICES will be set to the GPU IDs + visible to all workers on that worker's node. + + This allows GPU workers on the same node to communicate with one + another. + + Example: + + Setup: + - Node1: + - Worker1: {0, 1} + - Worker2: {2, 3} + - Node2: + - Worker3: {0, 1} + + CUDA_VISIBLE_DEVICES: + - Worker1: "0,1,2,3" + - Worker2: "0,1,2,3" + - Worker2: "0,1" + + """ + + node_ids_and_gpu_ids = [ + (w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers + ] + + node_id_to_worker_id = defaultdict(set) + node_id_to_gpu_ids = defaultdict(set) + + for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids): + node_id_to_worker_id[node_id].add(worker_id) + node_id_to_gpu_ids[node_id].update(gpu_ids) + + futures = [] + for node_id, gpu_ids in node_id_to_gpu_ids.items(): + all_gpu_ids = ",".join([str(gpu_id) for gpu_id in gpu_ids]) + + def set_gpu_ids(): + os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids + + for worker_id in node_id_to_worker_id[node_id]: + futures.append( + self.worker_group.execute_single_async(worker_id, set_gpu_ids) + ) + ray.get(futures) + + def _create_local_rank_map(self) -> Dict: + """Create mapping from worker world_rank to local_rank. + + Example: + Worker 0: 0.0.0.0 + Worker 1: 0.0.0.0 + Worker 2: 0.0.0.1 + Worker 3: 0.0.0.0 + Worker 4: 0.0.0.1 + + Workers 0, 1, 3 are on 0.0.0.0. + Workers 2, 4 are on 0.0.0.1. + + Expected Output: + { + 0 -> 0, + 1 -> 1, + 2 -> 0, + 3 -> 2, + 4 -> 1 + } + """ + rank_mapping = {} + ip_dict = defaultdict(int) + for world_rank in range(len(self.worker_group)): + worker = self.worker_group.workers[world_rank] + node_ip = worker.metadata.node_ip + rank_mapping[world_rank] = ip_dict[node_ip] + ip_dict[node_ip] += 1 + return rank_mapping + + def start_training( + self, + train_func: Callable[[], T], + dataset_spec: _RayDatasetSpec, + checkpoint: Optional[Dict] = None, + ) -> None: + """Executes a training function on all workers in a separate thread. + + ``finish_training`` should be called after this. + + Args: + train_func: The training function to run on each worker. + dataset_spec: A specification for the Ray Dataset to be + passed to the training workers, and the logic on how to shard the Ray + Dataset. + checkpoint: The checkpoint data that + should be loaded onto each worker and accessed by the + training function via ``train.load_checkpoint()``. If this + is ``None`` then no checkpoint will be loaded. + """ + use_detailed_autofilled_metrics = env_integer( + ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, 0 + ) + + # First initialize the session. + def initialize_session( + train_func, + world_rank, + local_rank, + world_size, + checkpoint, + dataset_shard, + encode_data_fn, + ): + try: + init_session( + training_func=train_func, + world_rank=world_rank, + local_rank=local_rank, + world_size=world_size, + dataset_shard=dataset_shard, + checkpoint=checkpoint, + encode_data_fn=encode_data_fn, + detailed_autofilled_metrics=use_detailed_autofilled_metrics, + ) + except ValueError: + raise TrainBackendError( + "Attempting to start training but a " + "previous training run is still ongoing. " + "You must call `finish_training` before " + "calling `start_training` again." + ) + + if self.dataset_shards is None: + actors = [worker.actor for worker in self.worker_group.workers] + self.dataset_shards = dataset_spec.get_dataset_shards(actors) + + local_rank_map = self._create_local_rank_map() + + futures = [] + for index in range(len(self.worker_group)): + futures.append( + self.worker_group.execute_single_async( + index, + initialize_session, + world_rank=index, + local_rank=local_rank_map[index], + world_size=len(self.worker_group), + train_func=train_func, + dataset_shard=self.dataset_shards[index], + checkpoint=checkpoint, + encode_data_fn=self._backend.encode_data, + ) + ) + + self.get_with_failure_handling(futures) + + # Run the training function asynchronously in its own thread. + def train_async(): + session = get_session() + session.start() + + self.worker_group.execute_async(train_async) + + def get_next_results(self) -> Optional[List[TrainingResult]]: + """Fetches the next ``TrainingResult`` from each worker. + + Each ``TrainingResult`` is expected to correspond to the same step from + each worker (e.g. the same call to ``train.report()`` or + ``train.checkpoint()``). + + Returns: + A list of ``TrainingResult``s with the same + ``TrainingResultType``, or ``None`` if there are no more results. + """ + + def get_next(): + session = _get_session("get_next_results") + try: + result = session.get_next() + except RuntimeError: + # Training thread has not been started yet. + raise TrainBackendError( + "`get_next_results` has been called " + "before `start_training`. Please call " + "`start_training` before " + "`get_next_results`." + ) + + return result + + # Get next result from each worker. + futures = self.worker_group.execute_async(get_next) + results = self.get_with_failure_handling(futures) + + # Check if any worker returned None. + if any(r is None for r in results): + # Either all workers have results or none of them do. + if not all(r is None for r in results): + raise RuntimeError( + "Some workers returned results while " + "others didn't. Make sure that " + "`train.report()` and `train.save_checkpoint()` " + "are called the same number of times on all " + "workers." + ) + else: + # Return None if all results are None. + return None + first_result = results[0] + result_type = first_result.type + if any(r.type != result_type for r in results): + raise RuntimeError( + "Some workers returned results with " + "different types. Make sure `train.report()` " + "and `train.save_checkpoint()` are called the " + "same number of times and in the same order on " + "each worker." + ) + return results + + def pause_reporting(self): + """Disable workers from enqueuing results from `train.report()`. + + Note: Already reported results may still be enqueued at this point, + and should be handled appropriately. + """ + + def pause_session_reporting(): + session = _get_session("pause_reporting") + return session.pause_reporting() + + futures = self.worker_group.execute_async(pause_session_reporting) + self.get_with_failure_handling(futures) + + def finish_training(self): + """Finish training and return final results. Propagate any exceptions. + + Blocks until training is finished on all workers. + + Assumes `start_training` has already been called. + + Returns: + A list of return values from calling ``train_func`` on each worker. + Each item corresponds to the return value from a single worker. + """ + + def end_training(): + session = _get_session("finish_training") + try: + # session.finish raises any Exceptions from training. + output = session.finish() + finally: + # Shutdown session even if session.finish() raises an + # Exception. + shutdown_session() + + return output + + futures = self.worker_group.execute_async(end_training) + results = self.get_with_failure_handling(futures) + return results + + def get_with_failure_handling(self, remote_values): + """Gets the remote values while handling for worker failures. + + This method should be called instead of ``ray.get()`` directly in + order to handle worker failures. + + If a worker failure is identified, backend specific failure handling + is executed and a ``TrainingWorkerError`` is raised. + + Args: + remote_values: List of object refs representing functions + that may fail in the middle of execution. For example, running + a Train training loop in multiple parallel actor calls. + Returns: + The resolved objects represented by the passed in ObjectRefs. + """ + success = check_for_failure(remote_values) + if success: + return ray.get(remote_values) + else: + self._increment_failures() + logger.warning( + "Failure identified during training. Restarting all workers and " + "continuing training from latest checkpoint." + ) + self._restart() + raise TrainingWorkerError + + def shutdown(self): + """Shuts down the workers in the worker group.""" + try: + self._backend.on_shutdown(self.worker_group, self._backend_config) + except RayActorError: + logger.warning( + "Graceful shutdown of backend failed. This is " + "expected if one of the workers has crashed." + ) + self.worker_group.shutdown() + self.worker_group = InactiveWorkerGroup() + + if self._placement_group: + remove_placement_group(self._placement_group) + self._placement_group = None + + self.dataset_shards = None + + def is_started(self): + return not isinstance(self.worker_group, InactiveWorkerGroup) + + def _restart(self): + self.worker_group.shutdown() + if self._initialization_hook is not None: + initialization_hook = self._initialization_hook + else: + initialization_hook = None + if self._placement_group: + remove_placement_group(self._placement_group) + self._placement_group = None + self.start(initialization_hook=initialization_hook) + + def _increment_failures(self): + self._num_failures += 1 + if self._num_failures >= self._max_failures: + raise RuntimeError( + "Training has failed even after " + f"{self._num_failures} " + "attempts. You can change the number of max " + "failure attempts by setting the " + "`max_retries` arg in your `Trainer`." + ) from None + + def get_worker_group(self): + return self.worker_group + + def _get_num_failures(self): + return self._num_failures + + +class InactiveWorkerGroupError(Exception): + """Raised when underlying worker group is inactive.""" + + +class InactiveWorkerGroup: + # TODO: fix inheritence. perhaps create WorkerGroupInterface. + + # Need to define getstate and setstate so that getattr does not screwup + # pickling. See https://stackoverflow.com/a/50888571/11249691 + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) + + def __getattr__(self, name): + raise InactiveWorkerGroupError() + + def __len__(self): + raise InactiveWorkerGroupError() + + +def _get_session(method_name: str): + # Get the session for this worker. + session = get_session() + if not session: + # Session is not initialized yet. + raise TrainBackendError( + f"`{method_name}` has been called " + "before `start_training`. Please call " + "`start_training` before " + f"`{method_name}`." + ) + return session diff --git a/python/ray/train/_checkpoint.py b/python/ray/train/_internal/checkpoint.py similarity index 98% rename from python/ray/train/_checkpoint.py rename to python/ray/train/_internal/checkpoint.py index baa8003108fa..c130f28ffd28 100644 --- a/python/ray/train/_checkpoint.py +++ b/python/ray/train/_internal/checkpoint.py @@ -9,8 +9,8 @@ TUNE_CHECKPOINT_ID, TUNE_INSTALLED, ) -from ray.train.session import TrainingResult -from ray.train.utils import construct_path +from ray.train._internal.session import TrainingResult +from ray.train._internal.utils import construct_path from ray.util.ml_utils.checkpoint_manager import ( _CheckpointManager as CommonCheckpointManager, _TrackedCheckpoint, diff --git a/python/ray/train/callbacks/results_preprocessors/__init__.py b/python/ray/train/_internal/results_preprocessors/__init__.py similarity index 65% rename from python/ray/train/callbacks/results_preprocessors/__init__.py rename to python/ray/train/_internal/results_preprocessors/__init__.py index 81bbfb3fb936..4dbdacc64d3c 100644 --- a/python/ray/train/callbacks/results_preprocessors/__init__.py +++ b/python/ray/train/_internal/results_preprocessors/__init__.py @@ -1,13 +1,13 @@ -from ray.train.callbacks.results_preprocessors.index import IndexedResultsPreprocessor -from ray.train.callbacks.results_preprocessors.keys import ( +from ray.train._internal.results_preprocessors.index import IndexedResultsPreprocessor +from ray.train._internal.results_preprocessors.keys import ( ExcludedKeysResultsPreprocessor, ) -from ray.train.callbacks.results_preprocessors.aggregate import ( +from ray.train._internal.results_preprocessors.aggregate import ( AverageResultsPreprocessor, MaxResultsPreprocessor, WeightedAverageResultsPreprocessor, ) -from ray.train.callbacks.results_preprocessors.preprocessor import ( +from ray.train._internal.results_preprocessors.preprocessor import ( SequentialResultsPreprocessor, ResultsPreprocessor, ) diff --git a/python/ray/train/callbacks/results_preprocessors/aggregate/__init__.py b/python/ray/train/_internal/results_preprocessors/aggregate/__init__.py similarity index 79% rename from python/ray/train/callbacks/results_preprocessors/aggregate/__init__.py rename to python/ray/train/_internal/results_preprocessors/aggregate/__init__.py index e3e125b2fd75..7de694ab65c1 100644 --- a/python/ray/train/callbacks/results_preprocessors/aggregate/__init__.py +++ b/python/ray/train/_internal/results_preprocessors/aggregate/__init__.py @@ -1,4 +1,4 @@ -from ray.train.callbacks.results_preprocessors.aggregate.aggregate_preprocessor import ( +from ray.train._internal.results_preprocessors.aggregate.aggregate_preprocessor import ( AverageResultsPreprocessor, MaxResultsPreprocessor, WeightedAverageResultsPreprocessor, diff --git a/python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_fn.py b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_fn.py similarity index 97% rename from python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_fn.py rename to python/ray/train/_internal/results_preprocessors/aggregate/aggregate_fn.py index f22de03c161e..64cba83d3014 100644 --- a/python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_fn.py +++ b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_fn.py @@ -3,7 +3,7 @@ import numpy as np -from ray.train.callbacks.results_preprocessors.aggregate.aggregate_utils import ( +from ray.train._internal.results_preprocessors.aggregate.aggregate_utils import ( VALID_AGGREGATE_TYPES, _get_weights_from_results, ) diff --git a/python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_preprocessor.py b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py similarity index 95% rename from python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_preprocessor.py rename to python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py index bd95a2100a1b..f33dc5c451e9 100644 --- a/python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_preprocessor.py +++ b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py @@ -1,22 +1,20 @@ import logging from typing import Dict, List, Optional -from ray.util.annotations import DeveloperAPI -from ray.train.callbacks.results_preprocessors.preprocessor import ResultsPreprocessor -from ray.train.callbacks.results_preprocessors.aggregate.aggregate_fn import ( +from ray.train._internal.results_preprocessors.preprocessor import ResultsPreprocessor +from ray.train._internal.results_preprocessors.aggregate.aggregate_fn import ( AggregateFn, Average, Max, WeightedAverage, ) -from ray.train.callbacks.results_preprocessors.aggregate.aggregate_utils import ( +from ray.train._internal.results_preprocessors.aggregate.aggregate_utils import ( _get_metrics_from_results, ) logger = logging.getLogger(__name__) -@DeveloperAPI class AggregateResultsPreprocessor(ResultsPreprocessor): """A preprocessor that aggregates training metrics from all workers. @@ -78,7 +76,6 @@ def preprocess(self, results: Optional[List[Dict]] = None) -> Optional[List[Dict return results - class AverageResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that averages results with equal weight. @@ -101,7 +98,6 @@ class AverageResultsPreprocessor(AggregateResultsPreprocessor): def __init__(self, keys: Optional[List[str]] = None): super().__init__(Average(), keys) - class MaxResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that computes maximum values of specified keys. @@ -124,7 +120,6 @@ class MaxResultsPreprocessor(AggregateResultsPreprocessor): def __init__(self, keys: Optional[List[str]] = None): super().__init__(Max(), keys) - class WeightedAverageResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that performs weighted average over metrics. diff --git a/python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_utils.py b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_utils.py similarity index 100% rename from python/ray/train/callbacks/results_preprocessors/aggregate/aggregate_utils.py rename to python/ray/train/_internal/results_preprocessors/aggregate/aggregate_utils.py diff --git a/python/ray/train/callbacks/results_preprocessors/index.py b/python/ray/train/_internal/results_preprocessors/index.py similarity index 90% rename from python/ray/train/callbacks/results_preprocessors/index.py rename to python/ray/train/_internal/results_preprocessors/index.py index 8ade1c375450..f5dc52be404c 100644 --- a/python/ray/train/callbacks/results_preprocessors/index.py +++ b/python/ray/train/_internal/results_preprocessors/index.py @@ -1,10 +1,9 @@ from typing import List, Dict, Iterable, Union, Optional -from ray.train.callbacks.results_preprocessors.preprocessor import ResultsPreprocessor -from ray.util.annotations import DeveloperAPI +from ray.train._internal.results_preprocessors.preprocessor import \ + ResultsPreprocessor -@DeveloperAPI class IndexedResultsPreprocessor(ResultsPreprocessor): """Preprocesses results by filtering by index. diff --git a/python/ray/train/callbacks/results_preprocessors/keys.py b/python/ray/train/_internal/results_preprocessors/keys.py similarity index 86% rename from python/ray/train/callbacks/results_preprocessors/keys.py rename to python/ray/train/_internal/results_preprocessors/keys.py index 39cf74c9171f..54d9640aeae0 100644 --- a/python/ray/train/callbacks/results_preprocessors/keys.py +++ b/python/ray/train/_internal/results_preprocessors/keys.py @@ -1,10 +1,9 @@ from typing import List, Dict, Optional, Iterable -from ray.train.callbacks.results_preprocessors.preprocessor import ResultsPreprocessor -from ray.util.annotations import DeveloperAPI +from ray.train._internal.results_preprocessors.preprocessor import \ + ResultsPreprocessor -@DeveloperAPI class ExcludedKeysResultsPreprocessor(ResultsPreprocessor): """Preprocesses each result dictionary by excluding specified keys. diff --git a/python/ray/train/callbacks/results_preprocessors/preprocessor.py b/python/ray/train/_internal/results_preprocessors/preprocessor.py similarity index 95% rename from python/ray/train/callbacks/results_preprocessors/preprocessor.py rename to python/ray/train/_internal/results_preprocessors/preprocessor.py index c98cd7c423e1..c9a598104d2c 100644 --- a/python/ray/train/callbacks/results_preprocessors/preprocessor.py +++ b/python/ray/train/_internal/results_preprocessors/preprocessor.py @@ -1,10 +1,7 @@ import abc from typing import List, Dict -from ray.util.annotations import DeveloperAPI - -@DeveloperAPI class ResultsPreprocessor(abc.ABC): """Abstract class for preprocessing Train results.""" @@ -27,7 +24,6 @@ def preprocess(self, results: List[Dict]) -> List[Dict]: return results -@DeveloperAPI class SequentialResultsPreprocessor(ResultsPreprocessor): """A processor that sequentially runs a series of preprocessing steps. diff --git a/python/ray/train/session.py b/python/ray/train/_internal/session.py similarity index 56% rename from python/ray/train/session.py rename to python/ray/train/_internal/session.py index 49d76f946878..a335a9176b9c 100644 --- a/python/ray/train/session.py +++ b/python/ray/train/_internal/session.py @@ -8,11 +8,10 @@ from enum import Enum, auto from typing import Callable from typing import Optional, Dict, Type, Union -import warnings import ray from ray.data import Dataset, DatasetPipeline -from ray.train.accelerator import Accelerator +from ray.train._internal.accelerator import Accelerator from ray.train.constants import ( DETAILED_AUTOFILLED_KEYS, TIME_THIS_ITER_S, @@ -24,10 +23,8 @@ HOSTNAME, DATE, RESULT_FETCH_TIMEOUT, - SESSION_MISUSE_LOG_ONCE_KEY, ) -from ray.train.utils import PropagatingThread -from ray.util import PublicAPI, log_once +from ray.train._internal.utils import PropagatingThread class TrainingResultType(Enum): @@ -240,22 +237,6 @@ def checkpoint(self, **kwargs): _session = None -def _warn_session_misuse(fn_name: str): - """Logs warning message on provided fn being used outside of session. - - Args: - fn_name: The name of the function to warn about. - """ - - if log_once(f"{SESSION_MISUSE_LOG_ONCE_KEY}-{fn_name}"): - warnings.warn( - f"`train.{fn_name}()` is meant to only be " - f"called " - "inside a training function that is executed by " - "`Trainer.run`. Returning None." - ) - - def init_session(*args, **kwargs) -> None: global _session if _session: @@ -277,242 +258,6 @@ def shutdown_session(): _session = None -@PublicAPI(stability="beta") -def get_dataset_shard( - dataset_name: Optional[str] = None, -) -> Optional[Union[Dataset, DatasetPipeline]]: - """Returns the Ray Dataset or DatasetPipeline shard for this worker. - - You should call ``to_torch()`` or ``to_tf()`` on this shard to convert - it to the appropriate framework-specific Dataset. - - .. code-block:: python - - import ray - from ray import train - - def train_func(): - model = Net() - for iter in range(100): - data_shard = train.get_dataset_shard().to_torch() - model.train(data_shard) - return model - - dataset = ray.data.read_csv("train.csv") - dataset.filter(...).repeat().random_shuffle() - - trainer = Trainer(backend="torch") - trainer.start() - - # Trainer will automatically handle sharding. - train_model = trainer.run(train_func, dataset=dataset) - trainer.shutdown() - - Args: - dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then - specifies which dataset shard to return. - - Returns: - The ``Dataset`` or ``DatasetPipeline`` shard to use for this worker. - If no dataset is passed into Trainer, then return None. - """ - session = get_session() - if session is None: - _warn_session_misuse(get_dataset_shard.__name__) - return - shard = session.dataset_shard - if shard is None: - warnings.warn( - "No dataset passed in. Returning None. Make sure to " - "pass in a Ray Dataset to Trainer.run to use this " - "function." - ) - elif isinstance(shard, dict): - if not dataset_name: - raise RuntimeError( - "Multiple datasets were passed into ``Trainer``, " - "but no ``dataset_name`` is passed into " - "``get_dataset_shard``. Please specify which " - "dataset shard to retrieve." - ) - return shard.get(dataset_name) - return shard - - -@PublicAPI(stability="beta") -def report(**kwargs) -> None: - """Reports all keyword arguments to Train as intermediate results. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - train.report(hello="world") - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be reported by Train. - If callbacks are provided, they are executed on these - intermediate results. - """ - session = get_session() - if session is None: - _warn_session_misuse(report.__name__) - return - session.report(**kwargs) - - -@PublicAPI(stability="beta") -def world_rank() -> int: - """Get the world rank of this worker. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - if train.world_rank() == 0: - print("Worker 0") - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - """ - session = get_session() - if session is None: - return 0 - return session.world_rank - - -@PublicAPI(stability="beta") -def local_rank() -> int: - """Get the local rank of this worker (rank of the worker on its node). - - .. code-block:: python - - import time - from ray import train - - def train_func(): - if torch.cuda.is_available(): - torch.cuda.set_device(train.local_rank()) - ... - - trainer = Trainer(backend="torch", use_gpu=True) - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - """ - session = get_session() - if session is None: - return 0 - return session.local_rank - - -@PublicAPI(stability="beta") -def load_checkpoint() -> Optional[Dict]: - """Loads checkpoint data onto the worker. - - .. code-block:: python - - from ray import train - - def train_func(): - checkpoint = train.load_checkpoint() - for iter in range(checkpoint["epoch"], 5): - print(iter) - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func, checkpoint={"epoch": 3}) - # 3 - # 4 - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be checkpointed by Train. - Returns: - The most recently saved checkpoint if ``train.save_checkpoint()`` - has been called. Otherwise, the checkpoint that the session was - originally initialized with. ``None`` if neither exist. - """ - session = get_session() - if session is None: - _warn_session_misuse(load_checkpoint.__name__) - return - return session.loaded_checkpoint - - -@PublicAPI(stability="beta") -def save_checkpoint(**kwargs) -> None: - """Checkpoints all keyword arguments to Train as restorable state. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - train.save_checkpoint(epoch=iter) - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be checkpointed by Train. - """ - session = get_session() - if session is None: - _warn_session_misuse(save_checkpoint.__name__) - return - session.checkpoint(**kwargs) - - -@PublicAPI(stability="beta") -def world_size() -> int: - """Get the current world size (i.e. total number of workers) for this run. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - assert train.world_size() == 4 - - trainer = Trainer(backend="torch", num_workers=4) - trainer.start() - trainer.run(train_func) - trainer.shutdown() - """ - session = get_session() - if session is None: - return 1 - return session.world_size - - -class SessionMisuseError(Exception): - """Method or function was used outside of a session.""" - - def _raise_accelerator_session_misuse(): """Raises a SessionMisuseError because a utility function was used improperly.""" raise SessionMisuseError( diff --git a/python/ray/train/utils.py b/python/ray/train/_internal/utils.py similarity index 100% rename from python/ray/train/utils.py rename to python/ray/train/_internal/utils.py diff --git a/python/ray/train/worker_group.py b/python/ray/train/_internal/worker_group.py similarity index 100% rename from python/ray/train/worker_group.py rename to python/ray/train/_internal/worker_group.py diff --git a/python/ray/train/backend.py b/python/ray/train/backend.py index a4d8a09447b6..f450b27544be 100644 --- a/python/ray/train/backend.py +++ b/python/ray/train/backend.py @@ -1,26 +1,9 @@ import logging -import os -from collections import defaultdict -from typing import Callable, TypeVar, List, Optional, Dict, Type, Tuple +from typing import TypeVar, Dict -import ray -from ray.exceptions import RayActorError -from ray.ray_constants import env_integer -from ray.train.constants import ( - ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, - ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, - TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, - TRAIN_ENABLE_WORKER_SPREAD_ENV, -) -from ray.train.impl.dataset_spec import _RayDatasetSpec -from ray.train.session import TrainingResult -from ray.train.session import init_session, get_session, shutdown_session -from ray.train.utils import check_for_failure, Singleton -from ray.train.worker_group import WorkerGroup +from ray.train._internal.utils import Singleton +from ray.train._internal.worker_group import WorkerGroup from ray.util.annotations import DeveloperAPI -from ray.util.placement_group import get_current_placement_group, remove_placement_group - -T = TypeVar("T") EncodedData = TypeVar("EncodedData") @@ -78,535 +61,3 @@ def decode_data(encoded_data: EncodedData) -> Dict: """ return encoded_data - - -class TrainBackendError(Exception): - """Errors with BackendExecutor that should not be exposed to user.""" - - -class TrainingWorkerError(Exception): - """Raised if a worker fails during training.""" - - -class BackendExecutor: - """Main execution class for training backends. - - This class holds a worker group and is responsible for executing the - training function on the workers, and collecting intermediate results - from ``train.report()`` and ``train.checkpoint()``. - - Args: - backend_config: The configurations for this - specific backend. - num_workers: Number of workers to use for training. - num_cpus_per_worker: Number of CPUs to use per worker. - num_gpus_per_worker: Number of GPUs to use per worker. - additional_resources_per_worker (Optional[Dict[str, float]]): - Dictionary specifying the extra resources that will be - requested for each worker in addition to ``num_cpus_per_worker`` - and ``num_gpus_per_worker``. - max_retries: Number of retries when Ray actors fail. - Defaults to 3. Set to -1 for unlimited retries. - """ - - def __init__( - self, - backend_config: BackendConfig, - num_workers: int = 1, - num_cpus_per_worker: float = 1, - num_gpus_per_worker: float = 0, - additional_resources_per_worker: Optional[Dict[str, float]] = None, - max_retries: int = 3, - ): - self._backend_config = backend_config - self._backend = backend_config.backend_cls() - self._num_workers = num_workers - self._num_cpus_per_worker = num_cpus_per_worker - self._num_gpus_per_worker = num_gpus_per_worker - self._additional_resources_per_worker = additional_resources_per_worker - self._max_failures = max_retries - if self._max_failures < 0: - self._max_failures = float("inf") - self._num_failures = 0 - self._initialization_hook = None - self._placement_group = None - - self.worker_group = InactiveWorkerGroup() - self.dataset_shards = None - - def start( - self, - initialization_hook: Optional[Callable[[], None]] = None, - train_cls: Optional[Type] = None, - train_cls_args: Optional[Tuple] = None, - train_cls_kwargs: Optional[Dict] = None, - ): - """Starts the worker group.""" - self._create_placement_group() - placement_group = self._placement_group or "default" - self.worker_group = WorkerGroup( - num_workers=self._num_workers, - num_cpus_per_worker=self._num_cpus_per_worker, - num_gpus_per_worker=self._num_gpus_per_worker, - additional_resources_per_worker=self._additional_resources_per_worker, - actor_cls=train_cls, - actor_cls_args=train_cls_args, - actor_cls_kwargs=train_cls_kwargs, - placement_group=placement_group, - ) - try: - if initialization_hook: - self._initialization_hook = initialization_hook - self.worker_group.execute(initialization_hook) - - share_cuda_visible_devices_enabled = bool( - env_integer( - ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, - self._backend.share_cuda_visible_devices, - ) - ) - - if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: - self._share_cuda_visible_devices() - self._backend.on_start(self.worker_group, self._backend_config) - except RayActorError as exc: - logger.exception(str(exc)) - logger.warning( - "Failure occurred during startup. Restarting all workers and " - "attempting to startup again." - ) - self._increment_failures() - self._restart() - - def _create_placement_group(self): - """Creates a placement group if it does not exist. - - If a placement group is already detected (Tune) this will be a no-op. - - By default the placement group will be created with PACK strategy. - This is optimized for colocating GPUs on a minimal number of nodes. - This behavior can be overridden to use the SPREAD strategy by defining - ``TRAIN_ENABLE_WORKER_SPREAD_ENV`` - - If a placement group is created it will be stored as - self._placement_group. - """ - current_placement_group = get_current_placement_group() - should_capture_child_tasks_in_placement_group = ( - ray.worker.global_worker.should_capture_child_tasks_in_placement_group - ) - should_create_placement_group = ( - current_placement_group is None - or not should_capture_child_tasks_in_placement_group - ) - - if should_create_placement_group: - additional_resources_per_worker = ( - self._additional_resources_per_worker or {} - ) - bundle = { - "CPU": self._num_cpus_per_worker, - "GPU": self._num_gpus_per_worker, - **additional_resources_per_worker, - } - bundles = [bundle.copy() for _ in range(self._num_workers)] - - use_spread = bool(env_integer(TRAIN_ENABLE_WORKER_SPREAD_ENV, 0)) - strategy = "SPREAD" if use_spread else "PACK" - - placement_group = ray.util.placement_group(bundles, strategy=strategy) - logger.debug("Waiting for placement group to start.") - timeout = env_integer(TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, 100) - ready, _ = ray.wait([placement_group.ready()], timeout=timeout) - if ready: - logger.debug("Placement group has started.") - else: - raise TimeoutError( - "Placement group creation timed out. Make sure your " - "cluster either has enough resources or use an " - "autoscaling cluster. If you are running on a cluster, " - "make sure you specify an address in `ray.init()`, for example, " - '`ray.init("auto")`. You can also increase the timeout by setting ' - "the TRAIN_PLACEMENT_GROUP_TIMEOUT_S environment variable. " - "Current resources available: {}, resources requested by the " - "placement group: {}".format( - ray.available_resources(), placement_group.bundle_specs - ) - ) - self._placement_group = placement_group - - def _share_cuda_visible_devices(self): - """Sets CUDA_VISIBLE_DEVICES on all workers. - - For each worker, CUDA_VISIBLE_DEVICES will be set to the GPU IDs - visible to all workers on that worker's node. - - This allows GPU workers on the same node to communicate with one - another. - - Example: - - Setup: - - Node1: - - Worker1: {0, 1} - - Worker2: {2, 3} - - Node2: - - Worker3: {0, 1} - - CUDA_VISIBLE_DEVICES: - - Worker1: "0,1,2,3" - - Worker2: "0,1,2,3" - - Worker2: "0,1" - - """ - - node_ids_and_gpu_ids = [ - (w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers - ] - - node_id_to_worker_id = defaultdict(set) - node_id_to_gpu_ids = defaultdict(set) - - for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids): - node_id_to_worker_id[node_id].add(worker_id) - node_id_to_gpu_ids[node_id].update(gpu_ids) - - futures = [] - for node_id, gpu_ids in node_id_to_gpu_ids.items(): - all_gpu_ids = ",".join([str(gpu_id) for gpu_id in gpu_ids]) - - def set_gpu_ids(): - os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids - - for worker_id in node_id_to_worker_id[node_id]: - futures.append( - self.worker_group.execute_single_async(worker_id, set_gpu_ids) - ) - ray.get(futures) - - def _create_local_rank_map(self) -> Dict: - """Create mapping from worker world_rank to local_rank. - - Example: - Worker 0: 0.0.0.0 - Worker 1: 0.0.0.0 - Worker 2: 0.0.0.1 - Worker 3: 0.0.0.0 - Worker 4: 0.0.0.1 - - Workers 0, 1, 3 are on 0.0.0.0. - Workers 2, 4 are on 0.0.0.1. - - Expected Output: - { - 0 -> 0, - 1 -> 1, - 2 -> 0, - 3 -> 2, - 4 -> 1 - } - """ - rank_mapping = {} - ip_dict = defaultdict(int) - for world_rank in range(len(self.worker_group)): - worker = self.worker_group.workers[world_rank] - node_ip = worker.metadata.node_ip - rank_mapping[world_rank] = ip_dict[node_ip] - ip_dict[node_ip] += 1 - return rank_mapping - - def start_training( - self, - train_func: Callable[[], T], - dataset_spec: _RayDatasetSpec, - checkpoint: Optional[Dict] = None, - ) -> None: - """Executes a training function on all workers in a separate thread. - - ``finish_training`` should be called after this. - - Args: - train_func: The training function to run on each worker. - dataset_spec: A specification for the Ray Dataset to be - passed to the training workers, and the logic on how to shard the Ray - Dataset. - checkpoint: The checkpoint data that - should be loaded onto each worker and accessed by the - training function via ``train.load_checkpoint()``. If this - is ``None`` then no checkpoint will be loaded. - """ - use_detailed_autofilled_metrics = env_integer( - ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, 0 - ) - - # First initialize the session. - def initialize_session( - train_func, - world_rank, - local_rank, - world_size, - checkpoint, - dataset_shard, - encode_data_fn, - ): - try: - init_session( - training_func=train_func, - world_rank=world_rank, - local_rank=local_rank, - world_size=world_size, - dataset_shard=dataset_shard, - checkpoint=checkpoint, - encode_data_fn=encode_data_fn, - detailed_autofilled_metrics=use_detailed_autofilled_metrics, - ) - except ValueError: - raise TrainBackendError( - "Attempting to start training but a " - "previous training run is still ongoing. " - "You must call `finish_training` before " - "calling `start_training` again." - ) - - if self.dataset_shards is None: - actors = [worker.actor for worker in self.worker_group.workers] - self.dataset_shards = dataset_spec.get_dataset_shards(actors) - - local_rank_map = self._create_local_rank_map() - - futures = [] - for index in range(len(self.worker_group)): - futures.append( - self.worker_group.execute_single_async( - index, - initialize_session, - world_rank=index, - local_rank=local_rank_map[index], - world_size=len(self.worker_group), - train_func=train_func, - dataset_shard=self.dataset_shards[index], - checkpoint=checkpoint, - encode_data_fn=self._backend.encode_data, - ) - ) - - self.get_with_failure_handling(futures) - - # Run the training function asynchronously in its own thread. - def train_async(): - session = get_session() - session.start() - - self.worker_group.execute_async(train_async) - - def get_next_results(self) -> Optional[List[TrainingResult]]: - """Fetches the next ``TrainingResult`` from each worker. - - Each ``TrainingResult`` is expected to correspond to the same step from - each worker (e.g. the same call to ``train.report()`` or - ``train.checkpoint()``). - - Returns: - A list of ``TrainingResult``s with the same - ``TrainingResultType``, or ``None`` if there are no more results. - """ - - def get_next(): - session = _get_session("get_next_results") - try: - result = session.get_next() - except RuntimeError: - # Training thread has not been started yet. - raise TrainBackendError( - "`get_next_results` has been called " - "before `start_training`. Please call " - "`start_training` before " - "`get_next_results`." - ) - - return result - - # Get next result from each worker. - futures = self.worker_group.execute_async(get_next) - results = self.get_with_failure_handling(futures) - - # Check if any worker returned None. - if any(r is None for r in results): - # Either all workers have results or none of them do. - if not all(r is None for r in results): - raise RuntimeError( - "Some workers returned results while " - "others didn't. Make sure that " - "`train.report()` and `train.save_checkpoint()` " - "are called the same number of times on all " - "workers." - ) - else: - # Return None if all results are None. - return None - first_result = results[0] - result_type = first_result.type - if any(r.type != result_type for r in results): - raise RuntimeError( - "Some workers returned results with " - "different types. Make sure `train.report()` " - "and `train.save_checkpoint()` are called the " - "same number of times and in the same order on " - "each worker." - ) - return results - - def pause_reporting(self): - """Disable workers from enqueuing results from `train.report()`. - - Note: Already reported results may still be enqueued at this point, - and should be handled appropriately. - """ - - def pause_session_reporting(): - session = _get_session("pause_reporting") - return session.pause_reporting() - - futures = self.worker_group.execute_async(pause_session_reporting) - self.get_with_failure_handling(futures) - - def finish_training(self): - """Finish training and return final results. Propagate any exceptions. - - Blocks until training is finished on all workers. - - Assumes `start_training` has already been called. - - Returns: - A list of return values from calling ``train_func`` on each worker. - Each item corresponds to the return value from a single worker. - """ - - def end_training(): - session = _get_session("finish_training") - try: - # session.finish raises any Exceptions from training. - output = session.finish() - finally: - # Shutdown session even if session.finish() raises an - # Exception. - shutdown_session() - - return output - - futures = self.worker_group.execute_async(end_training) - results = self.get_with_failure_handling(futures) - return results - - def get_with_failure_handling(self, remote_values): - """Gets the remote values while handling for worker failures. - - This method should be called instead of ``ray.get()`` directly in - order to handle worker failures. - - If a worker failure is identified, backend specific failure handling - is executed and a ``TrainingWorkerError`` is raised. - - Args: - remote_values: List of object refs representing functions - that may fail in the middle of execution. For example, running - a Train training loop in multiple parallel actor calls. - Returns: - The resolved objects represented by the passed in ObjectRefs. - """ - success = check_for_failure(remote_values) - if success: - return ray.get(remote_values) - else: - self._increment_failures() - logger.warning( - "Failure identified during training. Restarting all workers and " - "continuing training from latest checkpoint." - ) - self._restart() - raise TrainingWorkerError - - def shutdown(self): - """Shuts down the workers in the worker group.""" - try: - self._backend.on_shutdown(self.worker_group, self._backend_config) - except RayActorError: - logger.warning( - "Graceful shutdown of backend failed. This is " - "expected if one of the workers has crashed." - ) - self.worker_group.shutdown() - self.worker_group = InactiveWorkerGroup() - - if self._placement_group: - remove_placement_group(self._placement_group) - self._placement_group = None - - self.dataset_shards = None - - def is_started(self): - return not isinstance(self.worker_group, InactiveWorkerGroup) - - def _restart(self): - self.worker_group.shutdown() - if self._initialization_hook is not None: - initialization_hook = self._initialization_hook - else: - initialization_hook = None - if self._placement_group: - remove_placement_group(self._placement_group) - self._placement_group = None - self.start(initialization_hook=initialization_hook) - - def _increment_failures(self): - self._num_failures += 1 - if self._num_failures >= self._max_failures: - raise RuntimeError( - "Training has failed even after " - f"{self._num_failures} " - "attempts. You can change the number of max " - "failure attempts by setting the " - "`max_retries` arg in your `Trainer`." - ) from None - - def get_worker_group(self): - return self.worker_group - - def _get_num_failures(self): - return self._num_failures - - -class InactiveWorkerGroupError(Exception): - """Raised when underlying worker group is inactive.""" - - -class InactiveWorkerGroup: - # TODO: fix inheritence. perhaps create WorkerGroupInterface. - - # Need to define getstate and setstate so that getattr does not screwup - # pickling. See https://stackoverflow.com/a/50888571/11249691 - def __getstate__(self): - return vars(self) - - def __setstate__(self, state): - vars(self).update(state) - - def __getattr__(self, name): - raise InactiveWorkerGroupError() - - def __len__(self): - raise InactiveWorkerGroupError() - - -def _get_session(method_name: str): - # Get the session for this worker. - session = get_session() - if not session: - # Session is not initialized yet. - raise TrainBackendError( - f"`{method_name}` has been called " - "before `start_training`. Please call " - "`start_training` before " - f"`{method_name}`." - ) - return session diff --git a/python/ray/train/callbacks/callback.py b/python/ray/train/callbacks/callback.py index 595080633ae0..51f6f92a1a9b 100644 --- a/python/ray/train/callbacks/callback.py +++ b/python/ray/train/callbacks/callback.py @@ -1,14 +1,15 @@ import abc from typing import List, Dict -from ray.train.callbacks.results_preprocessors import ( +from ray.train._internal.results_preprocessors import ( ResultsPreprocessor, ExcludedKeysResultsPreprocessor, SequentialResultsPreprocessor, ) from ray.train.constants import ALL_RESERVED_KEYS +from ray.util.annotations import DeveloperAPI - +@DeveloperAPI class TrainingCallback(abc.ABC): """Abstract Train callback class.""" diff --git a/python/ray/train/callbacks/logging.py b/python/ray/train/callbacks/logging.py index 0f6ae06f11ca..31cdf6bd6d94 100644 --- a/python/ray/train/callbacks/logging.py +++ b/python/ray/train/callbacks/logging.py @@ -8,11 +8,11 @@ import numpy as np from ray.train.callbacks import TrainingCallback -from ray.train.callbacks.results_preprocessors import ( +from ray.train._internal.results_preprocessors import ( IndexedResultsPreprocessor, ExcludedKeysResultsPreprocessor, ) -from ray.train.callbacks.results_preprocessors.preprocessor import ( +from ray.train._internal.results_preprocessors.preprocessor import ( SequentialResultsPreprocessor, ) from ray.train.constants import ( @@ -27,11 +27,12 @@ from ray.util.ml_utils.dict import flatten_dict from ray.util.ml_utils.json import SafeFallbackEncoder from ray.util.ml_utils.mlflow import MLflowLoggerUtil +from ray.util.annotations import PublicAPI logger = logging.getLogger(__name__) -class TrainCallbackLogdirManager: +class _TrainCallbackLogdirManager: """Sets up a logging directory for a callback. The path of the ``logdir`` can be set during initialization. Otherwise, the @@ -90,7 +91,7 @@ def logdir_path(self) -> Path: return self._default_logdir raise RuntimeError("Logdir must be set in init or setup_logdir.") - +@PublicAPI(stability="beta") class JsonLoggerCallback(TrainingCallback): """Logs Train results in json format. @@ -112,7 +113,7 @@ def __init__( workers_to_log: Optional[Union[int, List[int]]] = 0, ): self._filename = filename - self._logdir_manager = TrainCallbackLogdirManager(logdir=logdir) + self._logdir_manager = _TrainCallbackLogdirManager(logdir=logdir) self.results_preprocessor = IndexedResultsPreprocessor(indices=workers_to_log) def start_training(self, logdir: str, **info): @@ -138,7 +139,7 @@ def log_path(self) -> Path: filename = self._filename or JsonLoggerCallback._default_filename return self.logdir.joinpath(filename) - +@PublicAPI(stability="beta") class MLflowLoggerCallback(TrainingCallback): """MLflow Logger to automatically log Train results and config to MLflow. @@ -189,7 +190,7 @@ def __init__( logdir: Optional[str] = None, worker_to_log: int = 0, ): - self._logdir_manager = TrainCallbackLogdirManager(logdir=logdir) + self._logdir_manager = _TrainCallbackLogdirManager(logdir=logdir) self.results_preprocessor = IndexedResultsPreprocessor(indices=worker_to_log) self.tracking_uri = tracking_uri @@ -235,7 +236,7 @@ def finish_training(self, error: bool = False, **info): def logdir(self) -> Path: return self._logdir_manager.logdir_path - +@PublicAPI(stability="beta") class TBXLoggerCallback(TrainingCallback): """Logs Train results in TensorboardX format. @@ -257,7 +258,7 @@ class TBXLoggerCallback(TrainingCallback): IGNORE_KEYS: Set[str] = {PID, TIMESTAMP, TIME_TOTAL_S} def __init__(self, logdir: Optional[str] = None, worker_to_log: int = 0) -> None: - self._logdir_manager = TrainCallbackLogdirManager(logdir=logdir) + self._logdir_manager = _TrainCallbackLogdirManager(logdir=logdir) results_preprocessors = [ IndexedResultsPreprocessor(indices=worker_to_log), diff --git a/python/ray/train/callbacks/print.py b/python/ray/train/callbacks/print.py index b90835acf8b3..b86977ab122d 100644 --- a/python/ray/train/callbacks/print.py +++ b/python/ray/train/callbacks/print.py @@ -2,8 +2,9 @@ from typing import Dict, List from ray.train.callbacks import TrainingCallback +from ray.util.annotations import PublicAPI - +@PublicAPI(stability="beta") class PrintCallback(TrainingCallback): """A callback that prints training results to STDOUT. diff --git a/python/ray/train/callbacks/profile.py b/python/ray/train/callbacks/profile.py index 7f74c8ffb9cb..a94454c95db6 100644 --- a/python/ray/train/callbacks/profile.py +++ b/python/ray/train/callbacks/profile.py @@ -3,15 +3,16 @@ from typing import List, Dict, Optional, Union from ray.train.callbacks import TrainingCallback -from ray.train.callbacks.logging import TrainCallbackLogdirManager -from ray.train.callbacks.results_preprocessors import IndexedResultsPreprocessor +from ray.train.callbacks.logging import _TrainCallbackLogdirManager +from ray.train._internal.results_preprocessors import IndexedResultsPreprocessor from ray.train.constants import PYTORCH_PROFILER_KEY +from ray.util.annotations import PublicAPI logger = logging.getLogger(__name__) DRIVER_TRACE_DIR_NAME = "pytorch_profiler" - +@PublicAPI(stability="beta") class TorchTensorboardProfilerCallback(TrainingCallback): """Synchronizes PyTorch Profiler traces onto disk. @@ -36,7 +37,7 @@ def __init__( ) -> None: super().__init__() self._logdir = logdir - self._logdir_manager = TrainCallbackLogdirManager(logdir=logdir) + self._logdir_manager = _TrainCallbackLogdirManager(logdir=logdir) self.results_preprocessor = IndexedResultsPreprocessor(indices=workers_to_log) def start_training(self, logdir: str, **info): diff --git a/python/ray/train/horovod/__init__.py b/python/ray/train/horovod/__init__.py new file mode 100644 index 000000000000..227b13741ac5 --- /dev/null +++ b/python/ray/train/horovod/__init__.py @@ -0,0 +1,12 @@ +try: + import horovod +except ModuleNotFoundError: + raise ModuleNotFoundError( + "Horovod isn't installed. To install Horovod with PyTorch support, run 'pip " + "install 'horovod[pytorch]''. To install Horovod with TensorFlow support, " + "run 'pip install 'horovod[tensorflow]''." + ) + +from ray.train.horovod.config import HorovodConfig + +__all__ = ["HorovodConfig"] diff --git a/python/ray/train/horovod.py b/python/ray/train/horovod/config.py similarity index 84% rename from python/ray/train/horovod.py rename to python/ray/train/horovod/config.py index cd3650c5a2c0..13b00a68fb18 100644 --- a/python/ray/train/horovod.py +++ b/python/ray/train/horovod/config.py @@ -1,27 +1,18 @@ -import logging +from typing import Optional, Set + import os from dataclasses import dataclass -from typing import Optional, Set import ray from ray.train.backend import BackendConfig, Backend -from ray.train.utils import update_env_vars -from ray.train.worker_group import WorkerGroup, Worker - -try: - from horovod.ray.runner import Coordinator - from horovod.ray.utils import detect_nics, nics_to_env_var - from horovod.runner.common.util import secret, timeout -except ModuleNotFoundError: - raise ModuleNotFoundError( - "Horovod isn't installed. To install Horovod with PyTorch support, run 'pip " - "install 'horovod[pytorch]''. To install Horovod with TensorFlow support, " - "run 'pip install 'horovod[tensorflow]''." - ) +from ray.train._internal.utils import update_env_vars +from ray.train._internal.worker_group import WorkerGroup, Worker -from ray.util import PublicAPI +from horovod.ray.runner import Coordinator +from horovod.ray.utils import detect_nics, nics_to_env_var +from horovod.runner.common.util import secret, timeout -logger = logging.getLogger(__name__) +from ray.util import PublicAPI @PublicAPI(stability="beta") @@ -76,40 +67,10 @@ def __post_init__(self): @property def backend_cls(self): - return HorovodBackend - - -def init_env_vars(world_rank: int, world_size: int, node_id: str): - """Initialize Horovod environment variables.""" - os.environ["HOROVOD_HOSTNAME"] = node_id - os.environ["HOROVOD_RANK"] = str(world_rank) - os.environ["HOROVOD_SIZE"] = str(world_size) + return _HorovodBackend -# TODO(tgaddair): temporary workaround for Horovod's worker discovery logic, -# which requires passing in an extra parameter as part of the RayExecutor -# API. This will be removed in the future as we migrate more of the -# RayExecutor utils into Ray Train. -# See: https://github.com/horovod/horovod/blob/v0.23.0/horovod/ray/driver_service.py#L9 # noqa: E501 -@dataclass -class HorovodWorkerWrapper: - w: Worker - - @property - def execute(self): - w = self.w - - class ExecuteHandle: - def remote(self, func, *args, **kwargs): - _ = None - return w.actor._BaseWorkerMixin__execute.remote( - func, _, *args, **kwargs - ) - - return ExecuteHandle() - - -class HorovodBackend(Backend): +class _HorovodBackend(Backend): share_cuda_visible_devices: bool = True def on_start(self, worker_group: WorkerGroup, backend_config: HorovodConfig): @@ -122,7 +83,7 @@ def on_start(self, worker_group: WorkerGroup, backend_config: HorovodConfig): worker_node_id = worker_group.workers[rank].metadata.node_id setup_futures.append( worker_group.execute_single_async( - rank, init_env_vars, rank, len(worker_group), worker_node_id + rank, _init_env_vars, rank, len(worker_group), worker_node_id ) ) ray.get(setup_futures) @@ -154,7 +115,7 @@ def on_start(self, worker_group: WorkerGroup, backend_config: HorovodConfig): # Get one worker from each host/node. node_worker_indexes = [node_ids.index(node_id) for node_id in set(node_ids)] node_workers = [ - HorovodWorkerWrapper(worker_group.workers[worker_index]) + _HorovodWorkerWrapper(worker_group.workers[worker_index]) for worker_index in node_worker_indexes ] assert len(node_workers) == len(self.coordinator.hostnames) @@ -167,3 +128,33 @@ def on_start(self, worker_group: WorkerGroup, backend_config: HorovodConfig): coordinator_envs.update(nics_to_env_var(nics)) worker_group.execute(update_env_vars, coordinator_envs) + + +def _init_env_vars(world_rank: int, world_size: int, node_id: str): + """Initialize Horovod environment variables.""" + os.environ["HOROVOD_HOSTNAME"] = node_id + os.environ["HOROVOD_RANK"] = str(world_rank) + os.environ["HOROVOD_SIZE"] = str(world_size) + + +# TODO(tgaddair): temporary workaround for Horovod's worker discovery logic, +# which requires passing in an extra parameter as part of the RayExecutor +# API. This will be removed in the future as we migrate more of the +# RayExecutor utils into Ray Train. +# See: https://github.com/horovod/horovod/blob/v0.23.0/horovod/ray/driver_service.py#L9 # noqa: E501 +@dataclass +class _HorovodWorkerWrapper: + w: Worker + + @property + def execute(self): + w = self.w + + class ExecuteHandle: + def remote(self, func, *args, **kwargs): + _ = None + return w.actor._BaseWorkerMixin__execute.remote( + func, _, *args, **kwargs + ) + + return ExecuteHandle() diff --git a/python/ray/train/tensorflow/__init__.py b/python/ray/train/tensorflow/__init__.py new file mode 100644 index 000000000000..4bff83c4fbac --- /dev/null +++ b/python/ray/train/tensorflow/__init__.py @@ -0,0 +1,12 @@ +try: + import tensorflow as tf +except ModuleNotFoundError: + raise ModuleNotFoundError( + "TensorFlow isn't installed. To install TensorFlow, run 'pip install " + "tensorflow'." + ) + +from ray.train.tensorflow.config import TensorflowConfig +from ray.train.tensorflow.train_loop_utils import prepare_dataset_shard + +__all__ = ["TensorflowConfig", "prepare_dataset_shard"] diff --git a/python/ray/train/tensorflow.py b/python/ray/train/tensorflow/config.py similarity index 51% rename from python/ray/train/tensorflow.py rename to python/ray/train/tensorflow/config.py index c0f12b154880..88ccafbb839b 100644 --- a/python/ray/train/tensorflow.py +++ b/python/ray/train/tensorflow/config.py @@ -6,17 +6,10 @@ import ray from ray.train.backend import BackendConfig, Backend -from ray.train.utils import get_address_and_port -from ray.train.worker_group import WorkerGroup +from ray.train._internal.utils import get_address_and_port +from ray.train._internal.worker_group import WorkerGroup from ray.util import PublicAPI -try: - import tensorflow as tf -except ModuleNotFoundError: - raise ModuleNotFoundError( - "TensorFlow isn't installed. To install TensorFlow, run 'pip install " - "tensorflow'." - ) logger = logging.getLogger(__name__) @@ -26,10 +19,10 @@ class TensorflowConfig(BackendConfig): @property def backend_cls(self): - return TensorflowBackend + return _TensorflowBackend -def setup_tensorflow_environment(worker_addresses: List[str], index: int): +def _setup_tensorflow_environment(worker_addresses: List[str], index: int): """Set up distributed Tensorflow training information. This function should be called on each worker. @@ -45,7 +38,7 @@ def setup_tensorflow_environment(worker_addresses: List[str], index: int): os.environ["TF_CONFIG"] = json.dumps(tf_config) -class TensorflowBackend(Backend): +class _TensorflowBackend(Backend): def on_start(self, worker_group: WorkerGroup, backend_config: TensorflowConfig): # Compute URL for initializing distributed setup. def get_url(): @@ -59,28 +52,7 @@ def get_url(): for i in range(len(worker_group)): setup_futures.append( worker_group.execute_single_async( - i, setup_tensorflow_environment, worker_addresses=urls, index=i + i, _setup_tensorflow_environment, worker_addresses=urls, index=i ) ) ray.get(setup_futures) - - -@PublicAPI(stability="beta") -def prepare_dataset_shard(tf_dataset_shard: tf.data.Dataset): - """A utility function that disables Tensorflow autosharding. - - This should be used on a TensorFlow ``Dataset`` created by calling ``to_tf()`` - on a ``ray.data.Dataset`` returned by ``ray.train.get_dataset_shard()`` since - the dataset has already been sharded across the workers. - - Args: - tf_dataset_shard (tf.data.Dataset): A TensorFlow Dataset. - - Returns: - A TensorFlow Dataset with autosharding turned off. - """ - options = tf.data.Options() - options.experimental_distribute.auto_shard_policy = ( - tf.data.experimental.AutoShardPolicy.OFF - ) - return tf_dataset_shard.with_options(options) diff --git a/python/ray/train/tensorflow/train_loop_utils.py b/python/ray/train/tensorflow/train_loop_utils.py new file mode 100644 index 000000000000..4f1f32b3d892 --- /dev/null +++ b/python/ray/train/tensorflow/train_loop_utils.py @@ -0,0 +1,24 @@ +from ray.util.annotations import PublicAPI + +import tensorflow as tf + + +@PublicAPI(stability="beta") +def prepare_dataset_shard(tf_dataset_shard: tf.data.Dataset): + """A utility function that disables Tensorflow autosharding. + + This should be used on a TensorFlow ``Dataset`` created by calling ``to_tf()`` + on a ``ray.data.Dataset`` returned by ``ray.train.get_dataset_shard()`` since + the dataset has already been sharded across the workers. + + Args: + tf_dataset_shard (tf.data.Dataset): A TensorFlow Dataset. + + Returns: + A TensorFlow Dataset with autosharding turned off. + """ + options = tf.data.Options() + options.experimental_distribute.auto_shard_policy = ( + tf.data.experimental.AutoShardPolicy.OFF + ) + return tf_dataset_shard.with_options(options) diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index 59db65d5f117..038ec1ba24cf 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -7,13 +7,13 @@ import ray import ray.train as train from ray.cluster_utils import Cluster -from ray.train.backend import ( - Backend, +from ray.train.backend import Backend, BackendConfig +from ray.train._internal.backend_executor import ( + BackendExecutor, InactiveWorkerGroupError, TrainBackendError, TrainingWorkerError, ) -from ray.train.backend import BackendConfig, BackendExecutor from ray.train.impl.dataset_spec import _RayDatasetSpec from ray.train.tensorflow import TensorflowConfig from ray.train.torch import TorchConfig @@ -21,7 +21,7 @@ ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, ) -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup from ray.util.placement_group import get_current_placement_group # Trigger pytest hook to automatically zip test cluster logs to archive dir on failure diff --git a/python/ray/train/tests/test_callbacks.py b/python/ray/train/tests/test_callbacks.py index 1bc007f6b19a..5bb3ff9e2d69 100644 --- a/python/ray/train/tests/test_callbacks.py +++ b/python/ray/train/tests/test_callbacks.py @@ -18,14 +18,14 @@ TBXLoggerCallback, TorchTensorboardProfilerCallback, ) -from ray.train.callbacks.logging import MLflowLoggerCallback, TrainCallbackLogdirManager +from ray.train.callbacks.logging import MLflowLoggerCallback, _TrainCallbackLogdirManager from ray.train.constants import ( TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS, BASIC_AUTOFILLED_KEYS, ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, ) -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup try: from tensorflow.python.summary.summary_iterator import summary_iterator @@ -90,7 +90,7 @@ def test_train_callback_logdir_manager(tmp_path, input): else: input_logdir = None - logdir_manager = TrainCallbackLogdirManager(input_logdir) + logdir_manager = _TrainCallbackLogdirManager(input_logdir) if input_logdir: path = logdir_manager.logdir_path diff --git a/python/ray/train/tests/test_minimal.py b/python/ray/train/tests/test_minimal.py index 730535010f39..9a82c1b35651 100644 --- a/python/ray/train/tests/test_minimal.py +++ b/python/ray/train/tests/test_minimal.py @@ -7,7 +7,7 @@ from ray.train import Trainer from ray.train.backend import BackendConfig, Backend from ray.train.callbacks import TrainingCallback -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup @pytest.fixture diff --git a/python/ray/train/tests/test_results_preprocessors.py b/python/ray/train/tests/test_results_preprocessors.py index 77736ec9682c..26a6063b682a 100644 --- a/python/ray/train/tests/test_results_preprocessors.py +++ b/python/ray/train/tests/test_results_preprocessors.py @@ -1,6 +1,6 @@ import pytest -from ray.train.callbacks.results_preprocessors import ( +from ray.train.callbacks._internal.results_preprocessors import ( ExcludedKeysResultsPreprocessor, IndexedResultsPreprocessor, SequentialResultsPreprocessor, diff --git a/python/ray/train/tests/test_session.py b/python/ray/train/tests/test_session.py index 2e974d9fb2c7..4198a99dddcb 100644 --- a/python/ray/train/tests/test_session.py +++ b/python/ray/train/tests/test_session.py @@ -3,7 +3,7 @@ import pytest import ray -from ray.train.accelerator import Accelerator +from ray.train._internal.accelerator import Accelerator from ray.train.constants import SESSION_MISUSE_LOG_ONCE_KEY from ray.train.session import ( init_session, diff --git a/python/ray/train/tests/test_trainer.py b/python/ray/train/tests/test_trainer.py index 2de099848f4d..207d45f79d28 100644 --- a/python/ray/train/tests/test_trainer.py +++ b/python/ray/train/tests/test_trainer.py @@ -11,14 +11,15 @@ import ray.train as train from ray._private.test_utils import wait_for_condition from ray.train import Trainer, CheckpointStrategy -from ray.train.backend import BackendConfig, Backend, BackendExecutor +from ray.train.backend import BackendConfig, Backend from ray.train.constants import TRAIN_ENABLE_WORKER_SPREAD_ENV from ray.train.torch import TorchConfig from ray.train.tensorflow import TensorflowConfig from ray.train.horovod import HorovodConfig from ray.train.callbacks.callback import TrainingCallback -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup +from ray.train._internal.backend_executor import BackendExecutor @pytest.fixture diff --git a/python/ray/train/tests/test_tune.py b/python/ray/train/tests/test_tune.py index 2ddd73bbdbfa..3ee503437b42 100644 --- a/python/ray/train/tests/test_tune.py +++ b/python/ray/train/tests/test_tune.py @@ -14,7 +14,7 @@ from ray.train.examples.train_fashion_mnist_example import ( train_func as fashion_mnist_train_func, ) -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup @pytest.fixture diff --git a/python/ray/train/tests/test_utils.py b/python/ray/train/tests/test_utils.py index 4dbb014aa2c2..9c06a998094a 100644 --- a/python/ray/train/tests/test_utils.py +++ b/python/ray/train/tests/test_utils.py @@ -1,6 +1,6 @@ from pathlib import Path -from ray.train.utils import construct_path +from ray.train._internal.utils import construct_path def test_construct_path(): diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index 7c5fe387d24b..bc307547e9ba 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -2,7 +2,7 @@ import time import ray -from ray.train.worker_group import WorkerGroup +from ray.train._internal.worker_group import WorkerGroup @pytest.fixture diff --git a/python/ray/train/torch/__init__.py b/python/ray/train/torch/__init__.py new file mode 100644 index 000000000000..cb084ced5caf --- /dev/null +++ b/python/ray/train/torch/__init__.py @@ -0,0 +1,28 @@ +try: + import torch +except ModuleNotFoundError: + raise ModuleNotFoundError( + "PyTorch isn't installed. To install PyTorch, run 'pip install torch'" + ) + +from ray.train.torch.config import TorchConfig +from ray.train.torch.train_loop_utils import ( + get_device, + prepare_model, + prepare_data_loader, + prepare_optimizer, + backward, + enable_reproducibility, + TorchWorkerProfiler, +) + +__all__ = [ + "TorchConfig", + "get_device", + "prepare_model", + "prepare_optimizer", + "prepare_data_loader", + "backward", + "enable_reproducibility", + "TorchWorkerProfiler", +] diff --git a/python/ray/train/torch/config.py b/python/ray/train/torch/config.py new file mode 100644 index 000000000000..5d6c1bd9d698 --- /dev/null +++ b/python/ray/train/torch/config.py @@ -0,0 +1,180 @@ +from dataclasses import dataclass +import io +import logging +import os +from datetime import timedelta +from typing import Dict, Optional + +import ray +from ray.train.backend import BackendConfig, Backend, EncodedData +from ray.train._internal.worker_group import WorkerGroup +from ray.train._internal.utils import get_address_and_port +from ray.util import PublicAPI + +import torch +import torch.distributed as dist +from torch.nn.parallel import DistributedDataParallel + +try: + from torch.profiler import profile +except ImportError: + profile = None + +logger = logging.getLogger(__name__) + + +@PublicAPI(stability="beta") +@dataclass +class TorchConfig(BackendConfig): + """Configuration for torch process group setup. + + See https://pytorch.org/docs/stable/distributed.html for more info. + + Args: + backend: The backend to use for training. + See ``torch.distributed.init_process_group`` for more info and + valid values. + If set to None, nccl will be used if GPUs are requested, else gloo + will be used. + init_method: The initialization method to use. Either "env" + for environment variable initialization or "tcp" for TCP + initialization. Defaults to "env". + timeout_s: Seconds for process group operations to timeout. + """ + + backend: Optional[str] = None + init_method: str = "env" + timeout_s: int = 1800 + + @property + def backend_cls(self): + return _TorchBackend + + +def _setup_torch_process_group( + backend: str, + world_rank: int, + world_size: int, + init_method: str, + timeout_s: int = 1800, +): + """Connects the distributed PyTorch backend. + + Args: + backend: The backend (nccl, gloo, etc.) to use for training. + world_rank: Rank of the current worker. + world_size: Number of workers participating in the job. + init_method: URL specifying how to initialize the process group. + timeout_s: Seconds for process group operations to timeout. + """ + logger.info( + f"Setting up process group for: {init_method} [rank={world_rank}, " + f"world_size={world_size}]" + ) + logger.debug(f"using {backend}") + + if backend == "nccl" and "NCCL_BLOCKING_WAIT" not in os.environ: + logger.debug( + "Setting NCCL_BLOCKING_WAIT for detecting node failure. " + "To override this behavior, you can set NCCL_BLOCKING_WAIT=0." + ) + os.environ["NCCL_BLOCKING_WAIT"] = "1" + + dist.init_process_group( + backend=backend, + init_method=init_method, + rank=world_rank, + world_size=world_size, + timeout=timedelta(seconds=timeout_s), + ) + + +def _shutdown_torch(destroy_process_group=False): + if destroy_process_group: + dist.destroy_process_group() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + +class _TorchBackend(Backend): + share_cuda_visible_devices: bool = True + + def on_start(self, worker_group: WorkerGroup, backend_config: TorchConfig): + if dist.is_available(): + # Set the appropriate training backend. + if backend_config.backend is None: + if worker_group.num_gpus_per_worker > 0: + backend = "nccl" + else: + backend = "gloo" + else: + backend = backend_config.backend + + master_addr, master_port = worker_group.execute_single( + 0, get_address_and_port + ) + if backend_config.init_method == "env": + + def set_env_vars(addr, port): + os.environ["MASTER_ADDR"] = addr + os.environ["MASTER_PORT"] = str(port) + + worker_group.execute(set_env_vars, addr=master_addr, port=master_port) + url = "env://" + elif backend_config.init_method == "tcp": + url = f"tcp://{master_addr}:{master_port}" + else: + raise ValueError( + f"The provided init_method (" + f"{backend_config.init_method}) is not supported. Must " + f"be either 'env' or 'tcp'." + ) + + setup_futures = [] + for i in range(len(worker_group)): + setup_futures.append( + worker_group.execute_single_async( + i, + _setup_torch_process_group, + backend=backend, + world_rank=i, + world_size=len(worker_group), + init_method=url, + timeout_s=backend_config.timeout_s, + ) + ) + ray.get(setup_futures) + else: + raise RuntimeError("Distributed torch is not available.") + + def on_shutdown(self, worker_group: WorkerGroup, backend_config: TorchConfig): + + worker_group.execute( + _shutdown_torch, destroy_process_group=len(worker_group) > 1 + ) + + @staticmethod + def encode_data(data_dict: Dict) -> EncodedData: + """Special handling for moving model from worker to driver.""" + + # If model is being checkpointed and is wrapped in DDP, then extract + # out the underlying module. If not, then deserialization will fail + # since the torch process group is not initialized on the driver. + + for k, v in data_dict.items(): + if isinstance(v, DistributedDataParallel) and hasattr(v, "module"): + data_dict[k] = v.module + + # Convert the checkpoint dict to bytes, so that any GPU tensors that + # are in the checkpoint dict can be properly deserialized on the + # driver side, even if the driver does not have access to a GPU device. + _buffer = io.BytesIO() + torch.save(data_dict, _buffer) + return _buffer.getvalue() + + @staticmethod + def decode_data(encoded_data: EncodedData) -> Dict: + # When decoding the bytes on the driver side, always map to CPU. + _buffer = io.BytesIO(encoded_data) + checkpoint_dict = torch.load(_buffer, map_location="cpu") + return checkpoint_dict diff --git a/python/ray/train/torch.py b/python/ray/train/torch/train_loop_utils.py similarity index 75% rename from python/ray/train/torch.py rename to python/ray/train/torch/train_loop_utils.py index 24926bcc8f49..6d817de4fcd5 100644 --- a/python/ray/train/torch.py +++ b/python/ray/train/torch/train_loop_utils.py @@ -1,44 +1,32 @@ import tempfile -from dataclasses import dataclass -import io import logging import os import random import types -from datetime import timedelta from pathlib import Path from typing import Any, Dict, Optional import ray from ray import train -from ray.train.accelerator import Accelerator -from ray.train.backend import BackendConfig, Backend, EncodedData +from ray.train._internal.accelerator import Accelerator from ray.train.constants import PYTORCH_PROFILER_KEY from torch.optim import Optimizer -from ray.train.session import get_accelerator, set_accelerator -from ray.train.worker_group import WorkerGroup -from ray.train.utils import get_address_and_port +from ray.train._internal.session import get_accelerator, set_accelerator from ray.util import PublicAPI import numpy as np -try: - import torch - from torch.cuda.amp import autocast, GradScaler - import torch.distributed as dist - from torch.nn.parallel import DistributedDataParallel - from torch.utils.data import ( - DistributedSampler, - DataLoader, - IterableDataset, - SequentialSampler, - RandomSampler, - ) -except ModuleNotFoundError: - raise ModuleNotFoundError( - "PyTorch isn't installed. To install PyTorch, run 'pip install torch'" - ) +import torch +from torch.cuda.amp import autocast, GradScaler +from torch.nn.parallel import DistributedDataParallel +from torch.utils.data import ( + DistributedSampler, + DataLoader, + IterableDataset, + SequentialSampler, + RandomSampler, +) try: from torch.profiler import profile @@ -48,7 +36,209 @@ logger = logging.getLogger(__name__) -class TorchAccelerator(Accelerator): +@PublicAPI(stability="beta") +def get_device() -> torch.device: + """Gets the correct torch device to use for training.""" + return get_accelerator(_TorchAccelerator).get_device() + + +@PublicAPI(stability="beta") +def prepare_model( + model: torch.nn.Module, + move_to_device: bool = True, + wrap_ddp: bool = True, + ddp_kwargs: Optional[Dict[str, Any]] = None, +) -> torch.nn.Module: + """Prepares the model for distributed execution. + + This allows you to use the same exact code regardless of number of + workers or the device type being used (CPU, GPU). + + Args: + model (torch.nn.Module): A torch model to prepare. + move_to_device: Whether to move the model to the correct + device. If set to False, the model needs to manually be moved + to the correct device. + wrap_ddp: Whether to wrap models in + ``DistributedDataParallel``. + ddp_kwargs (Dict[str, Any]): Args to pass into + ``DistributedDataParallel`` initialization if ``wrap_ddp`` is + set to True. + """ + return get_accelerator(_TorchAccelerator).prepare_model( + model, + move_to_device=move_to_device, + wrap_ddp=wrap_ddp, + ddp_kwargs=ddp_kwargs, + ) + + +@PublicAPI(stability="beta") +def prepare_data_loader( + data_loader: torch.utils.data.DataLoader, + add_dist_sampler: bool = True, + move_to_device: bool = True, + auto_transfer: bool = True, +) -> torch.utils.data.DataLoader: + """Prepares DataLoader for distributed execution. + + This allows you to use the same exact code regardless of number of + workers or the device type being used (CPU, GPU). + + Args: + data_loader (torch.utils.data.DataLoader): The DataLoader to + prepare. + add_dist_sampler: Whether to add a DistributedSampler to + the provided DataLoader. + move_to_device: If set, automatically move the data + returned by the data loader to the correct device. + auto_transfer: If set and device is GPU, another CUDA stream + is created to automatically copy data from host (CPU) memory + to device (GPU) memory (the default CUDA stream still runs the + training procedure). If device is CPU, it will be disabled + regardless of the setting. This configuration will be ignored + if ``move_to_device`` is False. + """ + return get_accelerator(_TorchAccelerator).prepare_data_loader( + data_loader, + add_dist_sampler=add_dist_sampler, + move_to_device=move_to_device, + auto_transfer=auto_transfer, + ) + + +@PublicAPI(stability="beta") +def accelerate(amp: bool = False) -> None: + """Enables training optimizations. + + Arguments: + amp: If true, perform training with automatic mixed precision. + Otherwise, use full precision. + + .. warning:: ``train.torch.accelerate`` cannot be called more than once, and it + must be called before any other ``train.torch`` utility function. + """ + try: + set_accelerator(_TorchAccelerator(amp=amp)) + except RuntimeError: + raise RuntimeError( + "An accelerator has already been set. Make sure " + "`train.torch.accelerate()` is not called multiple times, and is called " + "before any of the prepare methods." + ) + + +@PublicAPI(stability="beta") +def prepare_optimizer(optimizer: torch.optim.Optimizer) -> torch.optim.Optimizer: + """Wraps optimizer to support automatic mixed precision. + + Args: + optimizer (torch.optim.Optimizer): The DataLoader to prepare. + + Returns: + A wrapped optimizer. + """ + return get_accelerator(_TorchAccelerator).prepare_optimizer(optimizer) + + +@PublicAPI(stability="beta") +def backward(tensor: torch.Tensor) -> None: + """Computes the gradient of the specified tensor w.r.t. graph leaves. + + Args: + tensor (torch.Tensor): Tensor of which the derivative will be computed. + """ + get_accelerator(_TorchAccelerator).backward(tensor) + + +@PublicAPI(stability="beta") +def enable_reproducibility(seed: int = 0) -> None: + """Limits sources of nondeterministic behavior. + + This function: + + * Seeds PyTorch, Python, and NumPy. + * Disables CUDA convolution benchmarking. + * Configures PyTorch to use determinstic algorithms. + * Seeds workers spawned for multi-process data loading. + + Args: + seed: The number to seed libraries and data workers with. + + .. warning:: ``train.torch.enable_reproducibility()`` can't guarantee + completely reproducible results across executions. To learn more, read + the `PyTorch notes on randomness + `_. + """ + get_accelerator(_TorchAccelerator).enable_reproducibility(seed) + + +@PublicAPI(stability="beta") +class TorchWorkerProfiler: + """Utility class for running PyTorch Profiler on a Train worker. + + Args: + trace_dir (Optional[str]): The directory to store traces on the + worker node. If ``None``, this will use a default temporary dir. + """ + + WORKER_TRACE_DIR_NAME = "pytorch_profiler_worker_traces" + + def __init__(self, trace_dir: Optional[str] = None): + if profile is None: + raise ImportError( + "Torch Profiler requires torch>=1.8.1. " + "Run `pip install 'torch>=1.8.1'` to use TorchWorkerProfiler." + ) + + trace_dir = trace_dir or Path(tempfile.gettempdir()).joinpath( + self.WORKER_TRACE_DIR_NAME + ) + self.trace_dir = Path(trace_dir) + self.trace_dir.mkdir(parents=True, exist_ok=True) + # Accumulated traces. + self.profiler_trace_filenames = [] + + def trace_handler(self, p: profile): + """A stateful PyTorch Profiler trace handler. + + This will the export chrome trace to a file on disk. + + These exported traces can then be fetched by calling + ``get_and_clear_profile_traces``. + + Args: + p: A PyTorch Profiler profile. + """ + trace_filename = f"worker_{train.world_rank()}_epoch_{p.step_num}.pt.trace.json" + trace_path = self.trace_dir.joinpath(trace_filename) + + logger.debug(f"Writing worker trace to {trace_path}.") + p.export_chrome_trace(str(trace_path)) + self.profiler_trace_filenames.append(trace_filename) + + def get_and_clear_profile_traces(self): + """Reads unread Profiler traces from this worker. + + Returns: + The traces in a format consumable by + ``TorchTensorboardProfilerCallback``. + """ + + def get_trace(filename): + trace_path = self.trace_dir.joinpath(filename) + return trace_path.read_text() + + traces = [ + (trace_filename, get_trace(trace_filename)) + for trace_filename in self.profiler_trace_filenames + ] + + self.profiler_trace_files = [] + return {PYTORCH_PROFILER_KEY: traces} + + +class _TorchAccelerator(Accelerator): """A utility that implements methods to accelerate PyTorch training. Arguments: @@ -302,163 +492,6 @@ def enable_reproducibility(self, seed: int = 0) -> None: os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8" -@PublicAPI(stability="beta") -@dataclass -class TorchConfig(BackendConfig): - """Configuration for torch process group setup. - - See https://pytorch.org/docs/stable/distributed.html for more info. - - Args: - backend: The backend to use for training. - See ``torch.distributed.init_process_group`` for more info and - valid values. - If set to None, nccl will be used if GPUs are requested, else gloo - will be used. - init_method: The initialization method to use. Either "env" - for environment variable initialization or "tcp" for TCP - initialization. Defaults to "env". - timeout_s: Seconds for process group operations to timeout. - """ - - backend: Optional[str] = None - init_method: str = "env" - timeout_s: int = 1800 - - @property - def backend_cls(self): - return TorchBackend - - -def setup_torch_process_group( - backend: str, - world_rank: int, - world_size: int, - init_method: str, - timeout_s: int = 1800, -): - """Connects the distributed PyTorch backend. - - Args: - backend: The backend (nccl, gloo, etc.) to use for training. - world_rank: Rank of the current worker. - world_size: Number of workers participating in the job. - init_method: URL specifying how to initialize the process group. - timeout_s: Seconds for process group operations to timeout. - """ - logger.info( - f"Setting up process group for: {init_method} [rank={world_rank}, " - f"world_size={world_size}]" - ) - logger.debug(f"using {backend}") - - if backend == "nccl" and "NCCL_BLOCKING_WAIT" not in os.environ: - logger.debug( - "Setting NCCL_BLOCKING_WAIT for detecting node failure. " - "To override this behavior, you can set NCCL_BLOCKING_WAIT=0." - ) - os.environ["NCCL_BLOCKING_WAIT"] = "1" - - dist.init_process_group( - backend=backend, - init_method=init_method, - rank=world_rank, - world_size=world_size, - timeout=timedelta(seconds=timeout_s), - ) - - -def shutdown_torch(destroy_process_group=False): - if destroy_process_group: - dist.destroy_process_group() - if torch.cuda.is_available(): - torch.cuda.empty_cache() - - -class TorchBackend(Backend): - share_cuda_visible_devices: bool = True - - def on_start(self, worker_group: WorkerGroup, backend_config: TorchConfig): - if dist.is_available(): - # Set the appropriate training backend. - if backend_config.backend is None: - if worker_group.num_gpus_per_worker > 0: - backend = "nccl" - else: - backend = "gloo" - else: - backend = backend_config.backend - - master_addr, master_port = worker_group.execute_single( - 0, get_address_and_port - ) - if backend_config.init_method == "env": - - def set_env_vars(addr, port): - os.environ["MASTER_ADDR"] = addr - os.environ["MASTER_PORT"] = str(port) - - worker_group.execute(set_env_vars, addr=master_addr, port=master_port) - url = "env://" - elif backend_config.init_method == "tcp": - url = f"tcp://{master_addr}:{master_port}" - else: - raise ValueError( - f"The provided init_method (" - f"{backend_config.init_method}) is not supported. Must " - f"be either 'env' or 'tcp'." - ) - - setup_futures = [] - for i in range(len(worker_group)): - setup_futures.append( - worker_group.execute_single_async( - i, - setup_torch_process_group, - backend=backend, - world_rank=i, - world_size=len(worker_group), - init_method=url, - timeout_s=backend_config.timeout_s, - ) - ) - ray.get(setup_futures) - else: - raise RuntimeError("Distributed torch is not available.") - - def on_shutdown(self, worker_group: WorkerGroup, backend_config: TorchConfig): - - worker_group.execute( - shutdown_torch, destroy_process_group=len(worker_group) > 1 - ) - - @staticmethod - def encode_data(data_dict: Dict) -> EncodedData: - """Special handling for moving model from worker to driver.""" - - # If model is being checkpointed and is wrapped in DDP, then extract - # out the underlying module. If not, then deserialization will fail - # since the torch process group is not initialized on the driver. - - for k, v in data_dict.items(): - if isinstance(v, DistributedDataParallel) and hasattr(v, "module"): - data_dict[k] = v.module - - # Convert the checkpoint dict to bytes, so that any GPU tensors that - # are in the checkpoint dict can be properly deserialized on the - # driver side, even if the driver does not have access to a GPU device. - _buffer = io.BytesIO() - torch.save(data_dict, _buffer) - return _buffer.getvalue() - - @staticmethod - def decode_data(encoded_data: EncodedData) -> Dict: - # When decoding the bytes on the driver side, always map to CPU. - _buffer = io.BytesIO(encoded_data) - checkpoint_dict = torch.load(_buffer, map_location="cpu") - return checkpoint_dict - - class _WrappedDataLoader(DataLoader): def __init__( self, base_dataloader: DataLoader, device: torch.device, auto_transfer: bool @@ -583,204 +616,3 @@ def step(self, closure=None): self.scaler.update() else: self.optimizer.step(closure) - - -@PublicAPI(stability="beta") -def get_device() -> torch.device: - """Gets the correct torch device to use for training.""" - return get_accelerator(TorchAccelerator).get_device() - - -@PublicAPI(stability="beta") -def prepare_model( - model: torch.nn.Module, - move_to_device: bool = True, - wrap_ddp: bool = True, - ddp_kwargs: Optional[Dict[str, Any]] = None, -) -> torch.nn.Module: - """Prepares the model for distributed execution. - - This allows you to use the same exact code regardless of number of - workers or the device type being used (CPU, GPU). - - Args: - model (torch.nn.Module): A torch model to prepare. - move_to_device: Whether to move the model to the correct - device. If set to False, the model needs to manually be moved - to the correct device. - wrap_ddp: Whether to wrap models in - ``DistributedDataParallel``. - ddp_kwargs (Dict[str, Any]): Args to pass into - ``DistributedDataParallel`` initialization if ``wrap_ddp`` is - set to True. - """ - return get_accelerator(TorchAccelerator).prepare_model( - model, - move_to_device=move_to_device, - wrap_ddp=wrap_ddp, - ddp_kwargs=ddp_kwargs, - ) - - -@PublicAPI(stability="beta") -def prepare_data_loader( - data_loader: torch.utils.data.DataLoader, - add_dist_sampler: bool = True, - move_to_device: bool = True, - auto_transfer: bool = True, -) -> torch.utils.data.DataLoader: - """Prepares DataLoader for distributed execution. - - This allows you to use the same exact code regardless of number of - workers or the device type being used (CPU, GPU). - - Args: - data_loader (torch.utils.data.DataLoader): The DataLoader to - prepare. - add_dist_sampler: Whether to add a DistributedSampler to - the provided DataLoader. - move_to_device: If set, automatically move the data - returned by the data loader to the correct device. - auto_transfer: If set and device is GPU, another CUDA stream - is created to automatically copy data from host (CPU) memory - to device (GPU) memory (the default CUDA stream still runs the - training procedure). If device is CPU, it will be disabled - regardless of the setting. This configuration will be ignored - if ``move_to_device`` is False. - """ - return get_accelerator(TorchAccelerator).prepare_data_loader( - data_loader, - add_dist_sampler=add_dist_sampler, - move_to_device=move_to_device, - auto_transfer=auto_transfer, - ) - - -@PublicAPI(stability="beta") -def accelerate(amp: bool = False) -> None: - """Enables training optimizations. - - Arguments: - amp: If true, perform training with automatic mixed precision. - Otherwise, use full precision. - - .. warning:: ``train.torch.accelerate`` cannot be called more than once, and it - must be called before any other ``train.torch`` utility function. - """ - try: - set_accelerator(TorchAccelerator(amp=amp)) - except RuntimeError: - raise RuntimeError( - "An accelerator has already been set. Make sure " - "`train.torch.accelerate()` is not called multiple times, and is called " - "before any of the prepare methods." - ) - - -@PublicAPI(stability="beta") -def prepare_optimizer(optimizer: torch.optim.Optimizer) -> torch.optim.Optimizer: - """Wraps optimizer to support automatic mixed precision. - - Args: - optimizer (torch.optim.Optimizer): The DataLoader to prepare. - - Returns: - A wrapped optimizer. - """ - return get_accelerator(TorchAccelerator).prepare_optimizer(optimizer) - - -@PublicAPI(stability="beta") -def backward(tensor: torch.Tensor) -> None: - """Computes the gradient of the specified tensor w.r.t. graph leaves. - - Args: - tensor (torch.Tensor): Tensor of which the derivative will be computed. - """ - get_accelerator(TorchAccelerator).backward(tensor) - - -def enable_reproducibility(seed: int = 0) -> None: - """Limits sources of nondeterministic behavior. - - This function: - - * Seeds PyTorch, Python, and NumPy. - * Disables CUDA convolution benchmarking. - * Configures PyTorch to use determinstic algorithms. - * Seeds workers spawned for multi-process data loading. - - Args: - seed: The number to seed libraries and data workers with. - - .. warning:: ``train.torch.enable_reproducibility()`` can't guarantee - completely reproducible results across executions. To learn more, read - the `PyTorch notes on randomness - `_. - """ - get_accelerator(TorchAccelerator).enable_reproducibility(seed) - - -WORKER_TRACE_DIR_NAME = "pytorch_profiler_worker_traces" - - -class TorchWorkerProfiler: - """Utility class for running PyTorch Profiler on a Train worker. - - Args: - trace_dir (Optional[str]): The directory to store traces on the - worker node. If ``None``, this will use a default temporary dir. - """ - - def __init__(self, trace_dir: Optional[str] = None): - if profile is None: - raise ImportError( - "Torch Profiler requires torch>=1.8.1. " - "Run `pip install 'torch>=1.8.1'` to use TorchWorkerProfiler." - ) - - trace_dir = trace_dir or Path(tempfile.gettempdir()).joinpath( - WORKER_TRACE_DIR_NAME - ) - self.trace_dir = Path(trace_dir) - self.trace_dir.mkdir(parents=True, exist_ok=True) - # Accumulated traces. - self.profiler_trace_filenames = [] - - def trace_handler(self, p: profile): - """A stateful PyTorch Profiler trace handler. - - This will the export chrome trace to a file on disk. - - These exported traces can then be fetched by calling - ``get_and_clear_profile_traces``. - - Args: - p: A PyTorch Profiler profile. - """ - trace_filename = f"worker_{train.world_rank()}_epoch_{p.step_num}.pt.trace.json" - trace_path = self.trace_dir.joinpath(trace_filename) - - logger.debug(f"Writing worker trace to {trace_path}.") - p.export_chrome_trace(str(trace_path)) - self.profiler_trace_filenames.append(trace_filename) - - def get_and_clear_profile_traces(self): - """Reads unread Profiler traces from this worker. - - Returns: - The traces in a format consumable by - ``TorchTensorboardProfilerCallback``. - """ - - def get_trace(filename): - trace_path = self.trace_dir.joinpath(filename) - return trace_path.read_text() - - traces = [ - (trace_filename, get_trace(trace_filename)) - for trace_filename in self.profiler_trace_filenames - ] - - self.profiler_trace_files = [] - return {PYTORCH_PROFILER_KEY: traces} diff --git a/python/ray/train/train_loop_utils.py b/python/ray/train/train_loop_utils.py new file mode 100644 index 000000000000..a6013f5ed8ef --- /dev/null +++ b/python/ray/train/train_loop_utils.py @@ -0,0 +1,263 @@ +from typing import TYPE_CHECKING +from typing import Optional, Dict, Union +import warnings + +from ray.train.constants import SESSION_MISUSE_LOG_ONCE_KEY +from ray.train._internal.session import ( + get_session, +) +from ray.util import PublicAPI, log_once + +if TYPE_CHECKING: + from ray.data import Dataset, DatasetPipeline + +def _warn_session_misuse(fn_name: str): + """Logs warning message on provided fn being used outside of session. + + Args: + fn_name: The name of the function to warn about. + """ + + if log_once(f"{SESSION_MISUSE_LOG_ONCE_KEY}-{fn_name}"): + warnings.warn( + f"`train.{fn_name}()` is meant to only be " + f"called " + "inside a training function that is executed by " + "`Trainer.run`. Returning None." + ) + +@PublicAPI(stability="beta") +def get_dataset_shard( + dataset_name: Optional[str] = None, +) -> Optional[Union["Dataset", "DatasetPipeline"]]: + """Returns the Ray Dataset or DatasetPipeline shard for this worker. + + You should call ``to_torch()`` or ``to_tf()`` on this shard to convert + it to the appropriate framework-specific Dataset. + + .. code-block:: python + + import ray + from ray import train + + def train_func(): + model = Net() + for iter in range(100): + data_shard = train.get_dataset_shard().to_torch() + model.train(data_shard) + return model + + dataset = ray.data.read_csv("train.csv") + dataset.filter(...).repeat().random_shuffle() + + trainer = Trainer(backend="torch") + trainer.start() + + # Trainer will automatically handle sharding. + train_model = trainer.run(train_func, dataset=dataset) + trainer.shutdown() + + Args: + dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then + specifies which dataset shard to return. + + Returns: + The ``Dataset`` or ``DatasetPipeline`` shard to use for this worker. + If no dataset is passed into Trainer, then return None. + """ + session = get_session() + if session is None: + _warn_session_misuse(get_dataset_shard.__name__) + return + shard = session.dataset_shard + if shard is None: + warnings.warn( + "No dataset passed in. Returning None. Make sure to " + "pass in a Ray Dataset to Trainer.run to use this " + "function." + ) + elif isinstance(shard, dict): + if not dataset_name: + raise RuntimeError( + "Multiple datasets were passed into ``Trainer``, " + "but no ``dataset_name`` is passed into " + "``get_dataset_shard``. Please specify which " + "dataset shard to retrieve." + ) + return shard.get(dataset_name) + return shard + + +@PublicAPI(stability="beta") +def report(**kwargs) -> None: + """Reports all keyword arguments to Train as intermediate results. + + .. code-block:: python + + import time + from ray import train + + def train_func(): + for iter in range(100): + time.sleep(1) + train.report(hello="world") + + trainer = Trainer(backend="torch") + trainer.start() + trainer.run(train_func) + trainer.shutdown() + + Args: + **kwargs: Any key value pair to be reported by Train. + If callbacks are provided, they are executed on these + intermediate results. + """ + session = get_session() + if session is None: + _warn_session_misuse(report.__name__) + return + session.report(**kwargs) + + +@PublicAPI(stability="beta") +def world_rank() -> int: + """Get the world rank of this worker. + + .. code-block:: python + + import time + from ray import train + + def train_func(): + for iter in range(100): + time.sleep(1) + if train.world_rank() == 0: + print("Worker 0") + + trainer = Trainer(backend="torch") + trainer.start() + trainer.run(train_func) + trainer.shutdown() + + """ + session = get_session() + if session is None: + return 0 + return session.world_rank + + +@PublicAPI(stability="beta") +def local_rank() -> int: + """Get the local rank of this worker (rank of the worker on its node). + + .. code-block:: python + + import time + from ray import train + + def train_func(): + if torch.cuda.is_available(): + torch.cuda.set_device(train.local_rank()) + ... + + trainer = Trainer(backend="torch", use_gpu=True) + trainer.start() + trainer.run(train_func) + trainer.shutdown() + + """ + session = get_session() + if session is None: + return 0 + return session.local_rank + + +@PublicAPI(stability="beta") +def load_checkpoint() -> Optional[Dict]: + """Loads checkpoint data onto the worker. + + .. code-block:: python + + from ray import train + + def train_func(): + checkpoint = train.load_checkpoint() + for iter in range(checkpoint["epoch"], 5): + print(iter) + + trainer = Trainer(backend="torch") + trainer.start() + trainer.run(train_func, checkpoint={"epoch": 3}) + # 3 + # 4 + trainer.shutdown() + + Args: + **kwargs: Any key value pair to be checkpointed by Train. + Returns: + The most recently saved checkpoint if ``train.save_checkpoint()`` + has been called. Otherwise, the checkpoint that the session was + originally initialized with. ``None`` if neither exist. + """ + session = get_session() + if session is None: + _warn_session_misuse(load_checkpoint.__name__) + return + return session.loaded_checkpoint + + +@PublicAPI(stability="beta") +def save_checkpoint(**kwargs) -> None: + """Checkpoints all keyword arguments to Train as restorable state. + + .. code-block:: python + + import time + from ray import train + + def train_func(): + for iter in range(100): + time.sleep(1) + train.save_checkpoint(epoch=iter) + + trainer = Trainer(backend="torch") + trainer.start() + trainer.run(train_func) + trainer.shutdown() + + Args: + **kwargs: Any key value pair to be checkpointed by Train. + """ + session = get_session() + if session is None: + _warn_session_misuse(save_checkpoint.__name__) + return + session.checkpoint(**kwargs) + + +@PublicAPI(stability="beta") +def world_size() -> int: + """Get the current world size (i.e. total number of workers) for this run. + + .. code-block:: python + + import time + from ray import train + + def train_func(): + assert train.world_size() == 4 + + trainer = Trainer(backend="torch", num_workers=4) + trainer.start() + trainer.run(train_func) + trainer.shutdown() + """ + session = get_session() + if session is None: + return 1 + return session.world_size + + +@PublicAPI(stability="beta") +class SessionMisuseError(Exception): + """Method or function was used outside of a session.""" \ No newline at end of file diff --git a/python/ray/train/trainer.py b/python/ray/train/trainer.py index 7ab29bd65959..df01ae1716ff 100644 --- a/python/ray/train/trainer.py +++ b/python/ray/train/trainer.py @@ -10,19 +10,21 @@ from ray.actor import ActorHandle from ray.train.backend import ( BackendConfig, - BackendExecutor, - InactiveWorkerGroupError, - TrainBackendError, - TrainingWorkerError, ) from ray.train.callbacks.callback import TrainingCallback from ray.train.impl.dataset_spec import RayDataset, _RayDatasetSpec -from ray.train.session import TrainingResultType -from ray.train.utils import ( +from ray.train._internal.session import TrainingResultType +from ray.train._internal.utils import ( construct_train_func, ActorWrapper, ) -from ray.train._checkpoint import ( +from ray.train._internal.backend_executor import ( + BackendExecutor, + InactiveWorkerGroupError, + TrainBackendError, + TrainingWorkerError, +) +from ray.train._internal.checkpoint import ( TuneCheckpointManager, CheckpointManager, load_checkpoint_from_path, @@ -37,8 +39,8 @@ ) # Ray Train should be usable even if Tune is not installed. -from ray.train.utils import construct_path -from ray.train.worker_group import WorkerGroup +from ray.train._internal.utils import construct_path +from ray.train._internal.worker_group import WorkerGroup from ray.util import PublicAPI from ray.util.annotations import DeveloperAPI from ray.util.ml_utils.checkpoint_manager import CheckpointStrategy @@ -79,7 +81,7 @@ def noop(): # Import backend configurations dynamically since not all subdependencies # may be installed. -def get_backend_config_cls(backend_name) -> type: +def _get_backend_config_cls(backend_name) -> type: if backend_name not in BACKEND_NAME_TO_CONFIG_CLS_NAME: raise ValueError( f"Invalid backend: {backend_name}. " @@ -269,7 +271,7 @@ def _get_backend_config(self, backend: Union[str, BackendConfig]) -> BackendConf if isinstance(backend, BackendConfig): return backend elif isinstance(backend, str): - return get_backend_config_cls(backend)() + return _get_backend_config_cls(backend)() else: raise TypeError(f"Invalid type for backend: {type(backend)}.") From d5b895bc96f75d5a37ccec410a4e2d7d805c0086 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 14:46:50 -0700 Subject: [PATCH 2/9] update --- python/ray/train/_internal/backend_executor.py | 4 ++-- python/ray/train/{impl => _internal}/dataset_spec.py | 2 +- .../aggregate/aggregate_preprocessor.py | 3 +++ .../train/_internal/results_preprocessors/index.py | 3 +-- .../train/_internal/results_preprocessors/keys.py | 3 +-- python/ray/train/_internal/session.py | 1 + python/ray/train/callbacks/callback.py | 1 + python/ray/train/callbacks/logging.py | 3 +++ python/ray/train/callbacks/print.py | 1 + python/ray/train/callbacks/profile.py | 1 + python/ray/train/horovod/__init__.py | 2 +- python/ray/train/impl/__init__.py | 0 python/ray/train/tensorflow/__init__.py | 2 +- python/ray/train/tests/test_backend.py | 4 ++-- python/ray/train/tests/test_callbacks.py | 5 ++++- python/ray/train/tests/test_session.py | 12 +++++++----- python/ray/train/torch/__init__.py | 2 +- python/ray/train/train_loop_utils.py | 7 ++----- python/ray/train/trainer.py | 8 ++++---- 19 files changed, 37 insertions(+), 27 deletions(-) rename python/ray/train/{impl => _internal}/dataset_spec.py (99%) delete mode 100644 python/ray/train/impl/__init__.py diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 55bc688378be..fbab0c18e750 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -13,7 +13,7 @@ TRAIN_ENABLE_WORKER_SPREAD_ENV, ) from ray.train.backend import BackendConfig -from ray.train.impl.dataset_spec import _RayDatasetSpec +from ray.train._internal.dataset_spec import RayDatasetSpec from ray.train._internal.session import TrainingResult from ray.train._internal.session import init_session, get_session, shutdown_session from ray.train._internal.utils import check_for_failure @@ -263,7 +263,7 @@ def _create_local_rank_map(self) -> Dict: def start_training( self, train_func: Callable[[], T], - dataset_spec: _RayDatasetSpec, + dataset_spec: RayDatasetSpec, checkpoint: Optional[Dict] = None, ) -> None: """Executes a training function on all workers in a separate thread. diff --git a/python/ray/train/impl/dataset_spec.py b/python/ray/train/_internal/dataset_spec.py similarity index 99% rename from python/ray/train/impl/dataset_spec.py rename to python/ray/train/_internal/dataset_spec.py index 4199748d17e2..e2b4fbbbb3c8 100644 --- a/python/ray/train/impl/dataset_spec.py +++ b/python/ray/train/_internal/dataset_spec.py @@ -10,7 +10,7 @@ @dataclass -class _RayDatasetSpec: +class RayDatasetSpec: """Configuration for Ray Datasets to pass to the training workers. dataset_or_dict: An optional Ray Dataset (or DatasetPipeline) or a dictionary of diff --git a/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py index f33dc5c451e9..04278f6e3766 100644 --- a/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py +++ b/python/ray/train/_internal/results_preprocessors/aggregate/aggregate_preprocessor.py @@ -76,6 +76,7 @@ def preprocess(self, results: Optional[List[Dict]] = None) -> Optional[List[Dict return results + class AverageResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that averages results with equal weight. @@ -98,6 +99,7 @@ class AverageResultsPreprocessor(AggregateResultsPreprocessor): def __init__(self, keys: Optional[List[str]] = None): super().__init__(Average(), keys) + class MaxResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that computes maximum values of specified keys. @@ -120,6 +122,7 @@ class MaxResultsPreprocessor(AggregateResultsPreprocessor): def __init__(self, keys: Optional[List[str]] = None): super().__init__(Max(), keys) + class WeightedAverageResultsPreprocessor(AggregateResultsPreprocessor): """A preprocessor that performs weighted average over metrics. diff --git a/python/ray/train/_internal/results_preprocessors/index.py b/python/ray/train/_internal/results_preprocessors/index.py index f5dc52be404c..ce482959ef86 100644 --- a/python/ray/train/_internal/results_preprocessors/index.py +++ b/python/ray/train/_internal/results_preprocessors/index.py @@ -1,7 +1,6 @@ from typing import List, Dict, Iterable, Union, Optional -from ray.train._internal.results_preprocessors.preprocessor import \ - ResultsPreprocessor +from ray.train._internal.results_preprocessors.preprocessor import ResultsPreprocessor class IndexedResultsPreprocessor(ResultsPreprocessor): diff --git a/python/ray/train/_internal/results_preprocessors/keys.py b/python/ray/train/_internal/results_preprocessors/keys.py index 54d9640aeae0..bba3afa131db 100644 --- a/python/ray/train/_internal/results_preprocessors/keys.py +++ b/python/ray/train/_internal/results_preprocessors/keys.py @@ -1,7 +1,6 @@ from typing import List, Dict, Optional, Iterable -from ray.train._internal.results_preprocessors.preprocessor import \ - ResultsPreprocessor +from ray.train._internal.results_preprocessors.preprocessor import ResultsPreprocessor class ExcludedKeysResultsPreprocessor(ResultsPreprocessor): diff --git a/python/ray/train/_internal/session.py b/python/ray/train/_internal/session.py index a335a9176b9c..b46e810115db 100644 --- a/python/ray/train/_internal/session.py +++ b/python/ray/train/_internal/session.py @@ -25,6 +25,7 @@ RESULT_FETCH_TIMEOUT, ) from ray.train._internal.utils import PropagatingThread +from ray.train.error import SessionMisuseError class TrainingResultType(Enum): diff --git a/python/ray/train/callbacks/callback.py b/python/ray/train/callbacks/callback.py index 51f6f92a1a9b..9ae965537706 100644 --- a/python/ray/train/callbacks/callback.py +++ b/python/ray/train/callbacks/callback.py @@ -9,6 +9,7 @@ from ray.train.constants import ALL_RESERVED_KEYS from ray.util.annotations import DeveloperAPI + @DeveloperAPI class TrainingCallback(abc.ABC): """Abstract Train callback class.""" diff --git a/python/ray/train/callbacks/logging.py b/python/ray/train/callbacks/logging.py index 31cdf6bd6d94..4e6e64751ba0 100644 --- a/python/ray/train/callbacks/logging.py +++ b/python/ray/train/callbacks/logging.py @@ -91,6 +91,7 @@ def logdir_path(self) -> Path: return self._default_logdir raise RuntimeError("Logdir must be set in init or setup_logdir.") + @PublicAPI(stability="beta") class JsonLoggerCallback(TrainingCallback): """Logs Train results in json format. @@ -139,6 +140,7 @@ def log_path(self) -> Path: filename = self._filename or JsonLoggerCallback._default_filename return self.logdir.joinpath(filename) + @PublicAPI(stability="beta") class MLflowLoggerCallback(TrainingCallback): """MLflow Logger to automatically log Train results and config to MLflow. @@ -236,6 +238,7 @@ def finish_training(self, error: bool = False, **info): def logdir(self) -> Path: return self._logdir_manager.logdir_path + @PublicAPI(stability="beta") class TBXLoggerCallback(TrainingCallback): """Logs Train results in TensorboardX format. diff --git a/python/ray/train/callbacks/print.py b/python/ray/train/callbacks/print.py index b86977ab122d..301f197493e6 100644 --- a/python/ray/train/callbacks/print.py +++ b/python/ray/train/callbacks/print.py @@ -4,6 +4,7 @@ from ray.train.callbacks import TrainingCallback from ray.util.annotations import PublicAPI + @PublicAPI(stability="beta") class PrintCallback(TrainingCallback): """A callback that prints training results to STDOUT. diff --git a/python/ray/train/callbacks/profile.py b/python/ray/train/callbacks/profile.py index a94454c95db6..f1d75b6c2a3a 100644 --- a/python/ray/train/callbacks/profile.py +++ b/python/ray/train/callbacks/profile.py @@ -12,6 +12,7 @@ DRIVER_TRACE_DIR_NAME = "pytorch_profiler" + @PublicAPI(stability="beta") class TorchTensorboardProfilerCallback(TrainingCallback): """Synchronizes PyTorch Profiler traces onto disk. diff --git a/python/ray/train/horovod/__init__.py b/python/ray/train/horovod/__init__.py index 227b13741ac5..945daf23c311 100644 --- a/python/ray/train/horovod/__init__.py +++ b/python/ray/train/horovod/__init__.py @@ -1,5 +1,5 @@ try: - import horovod + import horovod # noqa: F401 except ModuleNotFoundError: raise ModuleNotFoundError( "Horovod isn't installed. To install Horovod with PyTorch support, run 'pip " diff --git a/python/ray/train/impl/__init__.py b/python/ray/train/impl/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/python/ray/train/tensorflow/__init__.py b/python/ray/train/tensorflow/__init__.py index 4bff83c4fbac..a934ea3b439b 100644 --- a/python/ray/train/tensorflow/__init__.py +++ b/python/ray/train/tensorflow/__init__.py @@ -1,5 +1,5 @@ try: - import tensorflow as tf + import tensorflow as tf # noqa: F401 except ModuleNotFoundError: raise ModuleNotFoundError( "TensorFlow isn't installed. To install TensorFlow, run 'pip install " diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index 038ec1ba24cf..56dadbd94cd4 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -14,7 +14,7 @@ TrainBackendError, TrainingWorkerError, ) -from ray.train.impl.dataset_spec import _RayDatasetSpec +from ray.train._internal.dataset_spec import RayDatasetSpec from ray.train.tensorflow import TensorflowConfig from ray.train.torch import TorchConfig from ray.train.constants import ( @@ -103,7 +103,7 @@ def on_shutdown(self, worker_group: WorkerGroup, backend_config: TestConfig): pass -EMPTY_RAY_DATASET_SPEC = _RayDatasetSpec(dataset_or_dict=None) +EMPTY_RAY_DATASET_SPEC = RayDatasetSpec(dataset_or_dict=None) def test_start(ray_start_2_cpus): diff --git a/python/ray/train/tests/test_callbacks.py b/python/ray/train/tests/test_callbacks.py index 5bb3ff9e2d69..7ff2e73e2ae8 100644 --- a/python/ray/train/tests/test_callbacks.py +++ b/python/ray/train/tests/test_callbacks.py @@ -18,7 +18,10 @@ TBXLoggerCallback, TorchTensorboardProfilerCallback, ) -from ray.train.callbacks.logging import MLflowLoggerCallback, _TrainCallbackLogdirManager +from ray.train.callbacks.logging import ( + MLflowLoggerCallback, + _TrainCallbackLogdirManager, +) from ray.train.constants import ( TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS, diff --git a/python/ray/train/tests/test_session.py b/python/ray/train/tests/test_session.py index 4198a99dddcb..d2954d59e7fc 100644 --- a/python/ray/train/tests/test_session.py +++ b/python/ray/train/tests/test_session.py @@ -5,22 +5,24 @@ import ray from ray.train._internal.accelerator import Accelerator from ray.train.constants import SESSION_MISUSE_LOG_ONCE_KEY -from ray.train.session import ( +from ray.train._internal.session import ( init_session, shutdown_session, get_session, + TrainingResultType, + get_accelerator, + set_accelerator, +) +from ray.train.train_loop_utils import ( world_rank, local_rank, report, save_checkpoint, - TrainingResultType, load_checkpoint, get_dataset_shard, world_size, - get_accelerator, - set_accelerator, - SessionMisuseError, ) +from ray.train.error import SessionMisuseError @pytest.fixture(scope="function") diff --git a/python/ray/train/torch/__init__.py b/python/ray/train/torch/__init__.py index cb084ced5caf..3024da3bdedd 100644 --- a/python/ray/train/torch/__init__.py +++ b/python/ray/train/torch/__init__.py @@ -1,5 +1,5 @@ try: - import torch + import torch # noqa: F401 except ModuleNotFoundError: raise ModuleNotFoundError( "PyTorch isn't installed. To install PyTorch, run 'pip install torch'" diff --git a/python/ray/train/train_loop_utils.py b/python/ray/train/train_loop_utils.py index a6013f5ed8ef..b4dde5f4ca7b 100644 --- a/python/ray/train/train_loop_utils.py +++ b/python/ray/train/train_loop_utils.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: from ray.data import Dataset, DatasetPipeline + def _warn_session_misuse(fn_name: str): """Logs warning message on provided fn being used outside of session. @@ -26,6 +27,7 @@ def _warn_session_misuse(fn_name: str): "`Trainer.run`. Returning None." ) + @PublicAPI(stability="beta") def get_dataset_shard( dataset_name: Optional[str] = None, @@ -256,8 +258,3 @@ def train_func(): if session is None: return 1 return session.world_size - - -@PublicAPI(stability="beta") -class SessionMisuseError(Exception): - """Method or function was used outside of a session.""" \ No newline at end of file diff --git a/python/ray/train/trainer.py b/python/ray/train/trainer.py index df01ae1716ff..082e2bb194c3 100644 --- a/python/ray/train/trainer.py +++ b/python/ray/train/trainer.py @@ -12,7 +12,7 @@ BackendConfig, ) from ray.train.callbacks.callback import TrainingCallback -from ray.train.impl.dataset_spec import RayDataset, _RayDatasetSpec +from ray.train._internal.dataset_spec import RayDataset, RayDatasetSpec from ray.train._internal.session import TrainingResultType from ray.train._internal.utils import ( construct_train_func, @@ -346,7 +346,7 @@ def run( train_func = construct_train_func(train_func, config) - dataset_spec = _RayDatasetSpec(dataset_or_dict=dataset) + dataset_spec = RayDatasetSpec(dataset_or_dict=dataset) try: iterator = TrainingIterator( @@ -425,7 +425,7 @@ def train_func(config): train_func = construct_train_func(train_func, config) - dataset_spec = _RayDatasetSpec(dataset_or_dict=dataset) + dataset_spec = RayDatasetSpec(dataset_or_dict=dataset) return TrainingIterator( backend_executor=self._backend_executor, @@ -664,7 +664,7 @@ def __init__( backend_executor: Union[BackendExecutor, ActorWrapper], backend_config: BackendConfig, train_func: Union[Callable[[], T], Callable[[Dict[str, Any]], T]], - dataset_spec: _RayDatasetSpec, + dataset_spec: RayDatasetSpec, checkpoint_manager: CheckpointManager, checkpoint: Optional[Union[Dict, str, Path]], checkpoint_strategy: Optional[CheckpointStrategy], From 20c8aade5862598455d59c17845d160ea5a7b387 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 15:52:15 -0700 Subject: [PATCH 3/9] add missing file --- python/ray/train/error.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 python/ray/train/error.py diff --git a/python/ray/train/error.py b/python/ray/train/error.py new file mode 100644 index 000000000000..0717423b98d3 --- /dev/null +++ b/python/ray/train/error.py @@ -0,0 +1,5 @@ +from ray.util.annotations import PublicAPI + +@PublicAPI(stability="beta") +class SessionMisuseError(Exception): + """Indicates a method or function was used outside of a session.""" \ No newline at end of file From aced422939da9173410bf93eab113c58307f7c2e Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 16:55:47 -0700 Subject: [PATCH 4/9] lint --- python/ray/train/error.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/train/error.py b/python/ray/train/error.py index 0717423b98d3..1aa8c82471bb 100644 --- a/python/ray/train/error.py +++ b/python/ray/train/error.py @@ -1,5 +1,6 @@ from ray.util.annotations import PublicAPI + @PublicAPI(stability="beta") class SessionMisuseError(Exception): - """Indicates a method or function was used outside of a session.""" \ No newline at end of file + """Indicates a method or function was used outside of a session.""" From a2e4099de54b20137ae83d202609b273d5a3224b Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 17:11:59 -0700 Subject: [PATCH 5/9] update --- python/ray/train/tests/test_callbacks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/train/tests/test_callbacks.py b/python/ray/train/tests/test_callbacks.py index 7ff2e73e2ae8..9aeb54088801 100644 --- a/python/ray/train/tests/test_callbacks.py +++ b/python/ray/train/tests/test_callbacks.py @@ -1,3 +1,4 @@ +from typing import Dict, List import glob import io import json @@ -29,6 +30,9 @@ ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, ) from ray.train._internal.worker_group import WorkerGroup +from ray.train._internal.results_preprocessors.preprocessor import ( + SequentialResultsPreprocessor, +) try: from tensorflow.python.summary.summary_iterator import summary_iterator @@ -309,11 +313,6 @@ def train_func(): # fix issue: repeat assignments for preprocessor results nested recursive calling # see https://github.com/ray-project/ray/issues/25005 def test_hotfix_callback_nested_recusive_calling(): - from ray.train.callbacks.results_preprocessors.preprocessor import ( - SequentialResultsPreprocessor, - ) - from typing import Dict, List - # test callback used to simulate the nested recursive calling for preprocess() class TestCallback(TrainingCallback): def __init__(self): From 26bb27632ca3e1b1eb7848a387f10ec8957dc503 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 18:08:30 -0700 Subject: [PATCH 6/9] fix --- python/ray/air/train/data_parallel_trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/air/train/data_parallel_trainer.py b/python/ray/air/train/data_parallel_trainer.py index c844bdae2ce9..f19cfe3be2cd 100644 --- a/python/ray/air/train/data_parallel_trainer.py +++ b/python/ray/air/train/data_parallel_trainer.py @@ -28,7 +28,7 @@ from ray.train import BackendConfig, TrainingIterator from ray.train._internal.backend_executor import BackendExecutor from ray.train._internal.checkpoint import TuneCheckpointManager -from ray.train.utils import construct_train_func +from ray.train._internal.utils import construct_train_func from ray.util.annotations import DeveloperAPI from ray.util.ml_utils.checkpoint_manager import CheckpointStrategy, _TrackedCheckpoint From e7537ce3e04a00371843e07eaedc017448ad0c9d Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 20:00:54 -0700 Subject: [PATCH 7/9] update --- doc/source/train/api.rst | 26 -------------------------- python/ray/train/torch/__init__.py | 2 ++ 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/doc/source/train/api.rst b/doc/source/train/api.rst index f7e2d6ba79a3..006cc4117e8c 100644 --- a/doc/source/train/api.rst +++ b/doc/source/train/api.rst @@ -109,32 +109,6 @@ TorchTensorboardProfilerCallback .. autoclass:: ray.train.callbacks.TorchTensorboardProfilerCallback -ResultsPreprocessors -~~~~~~~~~~~~~~~~~~~~ - -.. _train-api-results-preprocessor: - -ResultsPreprocessor -+++++++++++++++++++ - -.. autoclass:: ray.train.callbacks.results_preprocessors.ResultsPreprocessor - :members: - -SequentialResultsPreprocessor -+++++++++++++++++++++++++++++++ - -.. autoclass:: ray.train.callbacks.results_preprocessors.SequentialResultsPreprocessor - -IndexedResultsPreprocessor -+++++++++++++++++++++++++++++++ - -.. autoclass:: ray.train.callbacks.results_preprocessors.IndexedResultsPreprocessor - -ExcludedKeysResultsPreprocessor -+++++++++++++++++++++++++++++++ - -.. autoclass:: ray.train.callbacks.results_preprocessors.ExcludedKeysResultsPreprocessor - Checkpointing ------------- diff --git a/python/ray/train/torch/__init__.py b/python/ray/train/torch/__init__.py index 3024da3bdedd..035e04266a73 100644 --- a/python/ray/train/torch/__init__.py +++ b/python/ray/train/torch/__init__.py @@ -7,6 +7,7 @@ from ray.train.torch.config import TorchConfig from ray.train.torch.train_loop_utils import ( + accelerate, get_device, prepare_model, prepare_data_loader, @@ -18,6 +19,7 @@ __all__ = [ "TorchConfig", + "accelerate", "get_device", "prepare_model", "prepare_optimizer", From 36916611c3c023ca043cb60b55ecc3896691b14b Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 20:10:04 -0700 Subject: [PATCH 8/9] skip ludwig until Ray 2.0 --- .buildkite/pipeline.ml.yml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 3bd44d9e4f70..93feac5cf060 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -319,12 +319,13 @@ - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/util/horovod/... - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/util/ray_lightning/... -- label: ":octopus: Ludwig tests and examples. Python 3.7" - conditions: ["RAY_CI_TUNE_AFFECTED"] - commands: - - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - - PYTHON=3.7 INSTALL_LUDWIG=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh - - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/tests/ludwig/... +# TODO(amogkam): Re-enable Ludwig tests after Ludwig supports Ray 2.0 +#- label: ":octopus: Ludwig tests and examples. Python 3.7" +# conditions: ["RAY_CI_TUNE_AFFECTED"] +# commands: +# - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT +# - PYTHON=3.7 INSTALL_LUDWIG=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh +# - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/tests/ludwig/... - label: ":tropical_fish: ML Libraries w/ Ray Client Examples (Python 3.7)." conditions: ["RAY_CI_TUNE_AFFECTED"] From 2bcdc9f2d69c56e42d5a1e8f16c99fe1b18f2f1c Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 7 Jun 2022 23:56:24 -0700 Subject: [PATCH 9/9] fix --- python/ray/train/tests/test_gpu.py | 2 +- python/ray/train/tests/test_results_preprocessors.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index 7204c9a062d0..875ad766ebda 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -403,7 +403,7 @@ def host_to_device(device): torch.matmul(x, x) def host_to_device_auto_pipeline(device): - wrapped_dataloader = ray.train.torch._WrappedDataLoader( + wrapped_dataloader = ray.train.torch.train_loop_utils._WrappedDataLoader( small_dataloader, device, auto_transfer ) for (x,) in wrapped_dataloader: diff --git a/python/ray/train/tests/test_results_preprocessors.py b/python/ray/train/tests/test_results_preprocessors.py index 26a6063b682a..8969f96f8b08 100644 --- a/python/ray/train/tests/test_results_preprocessors.py +++ b/python/ray/train/tests/test_results_preprocessors.py @@ -1,6 +1,6 @@ import pytest -from ray.train.callbacks._internal.results_preprocessors import ( +from ray.train._internal.results_preprocessors import ( ExcludedKeysResultsPreprocessor, IndexedResultsPreprocessor, SequentialResultsPreprocessor,