From a9ce537c5bd9503f9ba2c57bad290a42e7b7b68c Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 8 Apr 2022 12:07:01 -0700 Subject: [PATCH 01/12] [tune] Chunk file transfers in cross-node checkpoint syncing --- python/ray/tune/sync_client.py | 144 ++++----------- python/ray/tune/tests/test_sync.py | 10 +- python/ray/tune/utils/file_transfer.py | 236 +++++++++++++++++++++++++ 3 files changed, 279 insertions(+), 111 deletions(-) create mode 100644 python/ray/tune/utils/file_transfer.py diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index 3c891183610c..e7d14378a23b 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -2,24 +2,21 @@ import distutils import distutils.spawn import inspect -import io import logging -import os import pathlib -import shutil import subprocess -import tarfile import tempfile import time import types import warnings -from typing import Optional, List, Callable, Union, Tuple, Dict +from typing import Optional, List, Callable, Union, Tuple from shlex import quote import ray from ray.tune.error import TuneError +from ray.tune.utils.file_transfer import sync_dir_between_nodes, delete_on_node from ray.util.annotations import PublicAPI from ray.util.debug import log_once from ray.util.ml_utils.cloud import ( @@ -435,73 +432,6 @@ def _validate_exclude_template(exclude_template): ) -def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]: - """Return dict of files mapping to stats in ``path``. - - This function scans a directory ``path`` recursively and returns a dict - mapping each contained file to a tuple of (mtime, filesize). - - mtime and filesize are returned from ``os.lstat`` and are usually a - floating point number (timestamp) and an int (filesize in bytes). - """ - files_stats = {} - for root, dirs, files in os.walk(path, topdown=False): - rel_root = os.path.relpath(root, path) - for file in files: - key = os.path.join(rel_root, file) - stat = os.lstat(os.path.join(path, key)) - files_stats[key] = stat.st_mtime, stat.st_size - - return files_stats - - -# Only export once -_remote_get_recursive_files_and_stats = ray.remote(_get_recursive_files_and_stats) - - -@ray.remote -def _pack_dir( - source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] -) -> bytes: - stream = io.BytesIO() - with tarfile.open(fileobj=stream, mode="w:gz", format=tarfile.PAX_FORMAT) as tar: - if not files_stats: - # If no `files_stats` is passed, pack whole directory - tar.add(source_dir, arcname="", recursive=True) - else: - # Otherwise, only pack differing files - tar.add(source_dir, arcname="", recursive=False) - for root, dirs, files in os.walk(source_dir, topdown=False): - rel_root = os.path.relpath(root, source_dir) - # Always add all directories - for dir in dirs: - key = os.path.join(rel_root, dir) - tar.add(os.path.join(source_dir, key), arcname=key, recursive=False) - # Add files where our information differs - for file in files: - key = os.path.join(rel_root, file) - stat = os.lstat(os.path.join(source_dir, key)) - file_stat = stat.st_mtime, stat.st_size - if key not in files_stats or file_stat != files_stats[key]: - tar.add(os.path.join(source_dir, key), arcname=key) - - return stream.getvalue() - - -@ray.remote -def _unpack_dir(stream: bytes, target_dir: str): - with tarfile.open(fileobj=io.BytesIO(stream)) as tar: - tar.extractall(target_dir) - - -@ray.remote -def _delete_dir(target_dir: str) -> bool: - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - return True - return False - - class RemoteTaskClient(SyncClient): """Sync client that uses remote tasks to synchronize two directories. @@ -523,16 +453,16 @@ class RemoteTaskClient(SyncClient): will not kill the previous sync command, so it may still be executed. """ - def __init__(self, store_pack_future: bool = False): + def __init__(self, _store_remotes: bool = False): # Used for testing - self._store_pack_future = store_pack_future + self._store_remotes = _store_remotes + self._stored_pack_actor_ref = None + self._stored_files_stats_future = None - self._pack_future = None self._sync_future = None self._last_source_tuple = None self._last_target_tuple = None - self._last_files_stats = None def _sync_still_running(self) -> bool: if not self._sync_future: @@ -560,12 +490,7 @@ def sync_down( self._last_source_tuple = source_ip, source_path self._last_target_tuple = target_ip, target - # Get existing files on local node before packing on remote node - self._last_files_stats = _get_recursive_files_and_stats(target) - - return self._execute_sync( - self._last_source_tuple, self._last_target_tuple, self._last_files_stats - ) + return self._execute_sync(self._last_source_tuple, self._last_target_tuple) def sync_up( self, source: str, target: Tuple[str, str], exclude: Optional[List] = None @@ -583,39 +508,45 @@ def sync_up( self._last_source_tuple = source_ip, source self._last_target_tuple = target_ip, target_path - # Get existing files on remote node before packing on local node - self._last_files_stats = _remote_get_recursive_files_and_stats.options( - num_cpus=0, resources={f"node:{target_ip}": 0.01} - ).remote(target_path) - - return self._execute_sync( - self._last_source_tuple, self._last_target_tuple, self._last_files_stats - ) + return self._execute_sync(self._last_source_tuple, self._last_target_tuple) def _execute_sync( self, source_tuple: Tuple[str, str], target_tuple: Tuple[str, str], - files_stats: Optional[Dict[str, Tuple[float, int]]] = None, ) -> bool: source_ip, source_path = source_tuple target_ip, target_path = target_tuple - pack_on_source_node = _pack_dir.options( - num_cpus=0, resources={f"node:{source_ip}": 0.01} - ) - unpack_on_target_node = _unpack_dir.options( - num_cpus=0, resources={f"node:{target_ip}": 0.01} + self._sync_future, pack_actor, files_stats = sync_dir_between_nodes( + source_ip=source_ip, + source_path=source_path, + target_ip=target_ip, + target_path=target_path, + _return_all_remotes=True, ) - pack_future = pack_on_source_node.remote(source_path, files_stats) - if self._store_pack_future: - self._pack_future = pack_future - self._sync_future = unpack_on_target_node.remote(pack_future, target_path) + if self._store_remotes: + self._stored_pack_actor_ref = pack_actor + self._stored_files_stats = files_stats + return True def delete(self, target: str): - pass + if not self._last_target_tuple: + logger.error( + f"Could not delete path {target} as we target node is not known." + ) + return + + node_ip = self._last_target_tuple[0] + + try: + ray.get(delete_on_node(node_ip=node_ip, path=target)) + except Exception as e: + logger.error( + f"Could not delete path {target} on remote node {node_ip}: {e}" + ) def wait(self): if self._sync_future: @@ -628,7 +559,8 @@ def wait(self): f"{self._last_target_tuple}: {e}" ) from e self._sync_future = None - self._pack_future = None + self._stored_pack_actor_ref = None + self._stored_files_stats_future = None def wait_or_retry(self, max_retries: int = 3, backoff_s: int = 5): assert max_retries > 0 @@ -646,22 +578,22 @@ def wait_or_retry(self, max_retries: int = 3, backoff_s: int = 5): self._execute_sync( self._last_source_tuple, self._last_target_tuple, - self._last_files_stats, ) continue return self._sync_future = None - self._pack_future = None + self._stored_pack_actor_ref = None + self._stored_files_stats_future = None raise TuneError(f"Failed sync even after {max_retries} retries.") def reset(self): if self._sync_future: logger.warning("Sync process still running but resetting anyways.") self._sync_future = None - self._pack_future = None self._last_source_tuple = None self._last_target_tuple = None - self._last_files_stats = None + self._stored_pack_actor_ref = None + self._stored_files_stats_future = None def close(self): self._sync_future = None # Avoid warning diff --git a/python/ray/tune/tests/test_sync.py b/python/ray/tune/tests/test_sync.py index 5bd1406c2110..7ccd4c41c5a6 100644 --- a/python/ray/tune/tests/test_sync.py +++ b/python/ray/tune/tests/test_sync.py @@ -489,7 +489,7 @@ def testSyncRemoteTaskOnlyDifferences(self): this_node_ip = ray.util.get_node_ip_address() # Sync everything up - client = RemoteTaskClient(store_pack_future=True) + client = RemoteTaskClient(_store_remotes=True) client.sync_up(source=temp_source, target=(this_node_ip, temp_up_target)) client.wait() @@ -526,8 +526,8 @@ def testSyncRemoteTaskOnlyDifferences(self): client.sync_up(source=temp_source, target=(this_node_ip, temp_up_target)) # Hi-jack futures - files_stats = ray.get(client._last_files_stats) - tarball = ray.get(client._pack_future) + files_stats = ray.get(client._stored_files_stats) + tarball = ray.get(client._stored_pack_actor_ref.get_full_data.remote()) client.wait() # Existing file should have new content @@ -559,8 +559,8 @@ def testSyncRemoteTaskOnlyDifferences(self): client.sync_down(source=(this_node_ip, temp_source), target=temp_down_target) # Hi-jack futures - files_stats = client._last_files_stats - tarball = ray.get(client._pack_future) + files_stats = ray.get(client._stored_files_stats) + tarball = ray.get(client._stored_pack_actor_ref.get_full_data.remote()) client.wait() # Existing file should have new content diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py new file mode 100644 index 000000000000..64f64d4e96cb --- /dev/null +++ b/python/ray/tune/utils/file_transfer.py @@ -0,0 +1,236 @@ +import io +import os +import shutil +import tarfile + +from typing import Optional, Tuple, Dict, Generator, Union + +import ray +from ray.exceptions import RayTaskError + + +DEFAULT_CHUNK_SIZE = 500 * 1024 * 1024 + + +def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]: + """Return dict of files mapping to stats in ``path``. + + This function scans a directory ``path`` recursively and returns a dict + mapping each contained file to a tuple of (mtime, filesize). + + mtime and filesize are returned from ``os.lstat`` and are usually a + floating point number (timestamp) and an int (filesize in bytes). + """ + files_stats = {} + for root, dirs, files in os.walk(path, topdown=False): + rel_root = os.path.relpath(root, path) + for file in files: + key = os.path.join(rel_root, file) + stat = os.lstat(os.path.join(path, key)) + files_stats[key] = stat.st_mtime, stat.st_size + + return files_stats + + +# Only export once +_remote_get_recursive_files_and_stats = ray.remote(_get_recursive_files_and_stats) + + +def _pack_dir( + source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] = None +) -> io.BytesIO: + """Pack whole directory contents into a uncompressed tarfile. + + This function accepts a ``files_stats`` argument. If given, only files + whose stats differ from these stats will be packed. + + The main use case for this is that we can collect information about files + already existing in the target directory, and only pack files that have + been updated. This is similar to how cloud syncing utilities decide + which files to transfer. + + Args: + source_dir: Path to local directory to pack into tarfile. + files_stats: Dict of relative filenames mapping to a tuple of + (mtime, filesize). Only files that differ from these stats + will be packed. + + Returns: + Tarfile as a stream object. + """ + stream = io.BytesIO() + with tarfile.open(fileobj=stream, mode="w", format=tarfile.PAX_FORMAT) as tar: + if not files_stats: + # If no `files_stats` is passed, pack whole directory + tar.add(source_dir, arcname="", recursive=True) + else: + # Otherwise, only pack differing files + tar.add(source_dir, arcname="", recursive=False) + for root, dirs, files in os.walk(source_dir, topdown=False): + rel_root = os.path.relpath(root, source_dir) + # Always add all directories + for dir in dirs: + key = os.path.join(rel_root, dir) + tar.add(os.path.join(source_dir, key), arcname=key, recursive=False) + # Add files where our information differs + for file in files: + key = os.path.join(rel_root, file) + stat = os.lstat(os.path.join(source_dir, key)) + file_stat = stat.st_mtime, stat.st_size + if key not in files_stats or file_stat != files_stats[key]: + tar.add(os.path.join(source_dir, key), arcname=key) + + return stream + + +@ray.remote +class _PackActor: + """Actor wrapping around a packing job. + + This actor is used for chunking the packed data into smaller chunks that + can be transferred via the object store more efficiently. + + The actor will start packing the directory when initialized, and separate + chunks can be received by calling the remote ``next()`` task. + + Args: + source_dir: Path to local directory to pack into tarfile. + files_stats: Dict of relative filenames mapping to a tuple of + (mtime, filesize). Only files that differ from these stats + will be packed. + chunk_size: Cut bytes stream into chunks of this size in bytes. + """ + + def __init__( + self, + source_dir: str, + files_stats: Optional[Dict[str, Tuple[float, int]]] = None, + chunk_size: int = DEFAULT_CHUNK_SIZE, + ): + self.stream = _pack_dir(source_dir=source_dir, files_stats=files_stats) + self.chunk_size = chunk_size + self.iter = None + + def get_full_data(self) -> bytes: + return self.stream.getvalue() + + def _chunk_generator(self) -> Generator[None, bytes, None]: + self.stream.seek(0) + data = self.stream.read(self.chunk_size) + while data: + yield data + data = self.stream.read(self.chunk_size) + + def next(self) -> Optional[bytes]: + if not self.iter: + self.iter = iter(self._chunk_generator()) + try: + return next(self.iter) + except StopIteration: + return None + + +def _iter_remote(actor: ray.ActorID) -> Generator[None, bytes, None]: + """Iterate over actor task and return as generator.""" + while True: + try: + buffer = ray.get(actor.next.remote()) + if buffer is None: + return + yield buffer + except RayTaskError: + return None + + +def _unpack_dir(stream: io.BytesIO, target_dir: str): + """Unpack tarfile stream into target directory.""" + stream.seek(0) + with tarfile.open(fileobj=stream) as tar: + tar.extractall(target_dir) + + +@ray.remote +def _unpack_from_actor(pack_actor: ray.ActorID, target_dir: str): + """Iterate over chunks received from pack actor and unpack.""" + stream = io.BytesIO() + for buffer in _iter_remote(pack_actor): + stream.write(buffer) + _unpack_dir(stream, target_dir=target_dir) + + +@ray.remote +def _delete_path(target_path: str) -> bool: + """Delete path (files and directories)""" + if os.path.exists(target_path): + if os.path.isdir(target_path): + shutil.rmtree(target_path) + else: + os.remove(target_path) + return True + return False + + +def delete_on_node(node_ip: str, path: str) -> ray.ObjectRef: + """Delete path on node. + + Args: + node_ip: IP of node to delete path on. + path: Path to delete on remote node. + + Returns: + Ray future for scheduled delete task. + """ + delete_task = _delete_path.options(num_cpus=0, resources={f"node:{node_ip}": 0.01}) + return delete_task.remote(path) + + +def sync_dir_between_nodes( + source_ip: str, + source_path: str, + target_ip: str, + target_path: str, + force_all: bool = False, + chunk_size: int = DEFAULT_CHUNK_SIZE, + _return_all_remotes: bool = False, +) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: + """Synchronize directory on source node to directory on target node. + + Per default, this function will collect information about already existing + files in the target directory. Only files that differ in either mtime or + filesize will be transferred, unless ``force_all=True``. + + Args: + source_ip: IP of source node. + source_path: Path to file or directory on source node. + target_ip: IP of target node. + target_path: Path to file or directory on target node. + force_all: If True, all files will be transferred (not just differing files). + chunk_size: Chunk size for data transfer. + + Returns: + Ray future for scheduled unpacking task. + + """ + pack_actor_on_source_node = _PackActor.options( + num_cpus=0, resources={f"node:{source_ip}": 0.01} + ) + unpack_on_target_node = _unpack_from_actor.options( + num_cpus=0, resources={f"node:{target_ip}": 0.01} + ) + + if force_all: + files_stats = None + else: + files_stats = _remote_get_recursive_files_and_stats.options( + num_cpus=0, resources={f"node:{target_ip}": 0.01} + ).remote(target_path) + + pack_actor = pack_actor_on_source_node.remote( + source_path, files_stats, chunk_size=chunk_size + ) + unpack_future = unpack_on_target_node.remote(pack_actor, target_path) + + if _return_all_remotes: + return unpack_future, pack_actor, files_stats + + return unpack_future From c79fd103f4052398eebb9361fc691ad05f24d981 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 8 Apr 2022 12:14:32 -0700 Subject: [PATCH 02/12] Re-order public functions to top --- python/ray/tune/utils/file_transfer.py | 132 ++++++++++++------------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index 64f64d4e96cb..900cd1de0ad0 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -12,6 +12,72 @@ DEFAULT_CHUNK_SIZE = 500 * 1024 * 1024 +def sync_dir_between_nodes( + source_ip: str, + source_path: str, + target_ip: str, + target_path: str, + force_all: bool = False, + chunk_size: int = DEFAULT_CHUNK_SIZE, + _return_all_remotes: bool = False, +) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: + """Synchronize directory on source node to directory on target node. + + Per default, this function will collect information about already existing + files in the target directory. Only files that differ in either mtime or + filesize will be transferred, unless ``force_all=True``. + + Args: + source_ip: IP of source node. + source_path: Path to file or directory on source node. + target_ip: IP of target node. + target_path: Path to file or directory on target node. + force_all: If True, all files will be transferred (not just differing files). + chunk_size: Chunk size for data transfer. + + Returns: + Ray future for scheduled unpacking task. + + """ + pack_actor_on_source_node = _PackActor.options( + num_cpus=0, resources={f"node:{source_ip}": 0.01} + ) + unpack_on_target_node = _unpack_from_actor.options( + num_cpus=0, resources={f"node:{target_ip}": 0.01} + ) + + if force_all: + files_stats = None + else: + files_stats = _remote_get_recursive_files_and_stats.options( + num_cpus=0, resources={f"node:{target_ip}": 0.01} + ).remote(target_path) + + pack_actor = pack_actor_on_source_node.remote( + source_path, files_stats, chunk_size=chunk_size + ) + unpack_future = unpack_on_target_node.remote(pack_actor, target_path) + + if _return_all_remotes: + return unpack_future, pack_actor, files_stats + + return unpack_future + + +def delete_on_node(node_ip: str, path: str) -> ray.ObjectRef: + """Delete path on node. + + Args: + node_ip: IP of node to delete path on. + path: Path to delete on remote node. + + Returns: + Ray future for scheduled delete task. + """ + delete_task = _delete_path.options(num_cpus=0, resources={f"node:{node_ip}": 0.01}) + return delete_task.remote(path) + + def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]: """Return dict of files mapping to stats in ``path``. @@ -168,69 +234,3 @@ def _delete_path(target_path: str) -> bool: os.remove(target_path) return True return False - - -def delete_on_node(node_ip: str, path: str) -> ray.ObjectRef: - """Delete path on node. - - Args: - node_ip: IP of node to delete path on. - path: Path to delete on remote node. - - Returns: - Ray future for scheduled delete task. - """ - delete_task = _delete_path.options(num_cpus=0, resources={f"node:{node_ip}": 0.01}) - return delete_task.remote(path) - - -def sync_dir_between_nodes( - source_ip: str, - source_path: str, - target_ip: str, - target_path: str, - force_all: bool = False, - chunk_size: int = DEFAULT_CHUNK_SIZE, - _return_all_remotes: bool = False, -) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: - """Synchronize directory on source node to directory on target node. - - Per default, this function will collect information about already existing - files in the target directory. Only files that differ in either mtime or - filesize will be transferred, unless ``force_all=True``. - - Args: - source_ip: IP of source node. - source_path: Path to file or directory on source node. - target_ip: IP of target node. - target_path: Path to file or directory on target node. - force_all: If True, all files will be transferred (not just differing files). - chunk_size: Chunk size for data transfer. - - Returns: - Ray future for scheduled unpacking task. - - """ - pack_actor_on_source_node = _PackActor.options( - num_cpus=0, resources={f"node:{source_ip}": 0.01} - ) - unpack_on_target_node = _unpack_from_actor.options( - num_cpus=0, resources={f"node:{target_ip}": 0.01} - ) - - if force_all: - files_stats = None - else: - files_stats = _remote_get_recursive_files_and_stats.options( - num_cpus=0, resources={f"node:{target_ip}": 0.01} - ).remote(target_path) - - pack_actor = pack_actor_on_source_node.remote( - source_path, files_stats, chunk_size=chunk_size - ) - unpack_future = unpack_on_target_node.remote(pack_actor, target_path) - - if _return_all_remotes: - return unpack_future, pack_actor, files_stats - - return unpack_future From 6ce95f4144f221a557382bb924fd9ed41d85ae0e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 8 Apr 2022 18:09:33 -0700 Subject: [PATCH 03/12] Apply suggestions from code review --- python/ray/tune/utils/file_transfer.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index 900cd1de0ad0..bfbeef21fc6c 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -6,10 +6,9 @@ from typing import Optional, Tuple, Dict, Generator, Union import ray -from ray.exceptions import RayTaskError -DEFAULT_CHUNK_SIZE = 500 * 1024 * 1024 +_DEFAULT_CHUNK_SIZE_BYTES = 500 * 1024 * 1024 def sync_dir_between_nodes( @@ -18,7 +17,7 @@ def sync_dir_between_nodes( target_ip: str, target_path: str, force_all: bool = False, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, _return_all_remotes: bool = False, ) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: """Synchronize directory on source node to directory on target node. @@ -34,6 +33,8 @@ def sync_dir_between_nodes( target_path: Path to file or directory on target node. force_all: If True, all files will be transferred (not just differing files). chunk_size: Chunk size for data transfer. + _return_all_remotes: If True, returns a tuple of the unpack future, + the pack actor, and the files_stats future. Returns: Ray future for scheduled unpacking task. @@ -171,7 +172,7 @@ def __init__( self, source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] = None, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, ): self.stream = _pack_dir(source_dir=source_dir, files_stats=files_stats) self.chunk_size = chunk_size @@ -199,13 +200,10 @@ def next(self) -> Optional[bytes]: def _iter_remote(actor: ray.ActorID) -> Generator[None, bytes, None]: """Iterate over actor task and return as generator.""" while True: - try: - buffer = ray.get(actor.next.remote()) - if buffer is None: - return - yield buffer - except RayTaskError: - return None + buffer = ray.get(actor.next.remote()) + if buffer is None: + return + yield buffer def _unpack_dir(stream: io.BytesIO, target_dir: str): From 90c8ff1f6d9d5083737f8bf7514448d9408de6d1 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 8 Apr 2022 18:10:01 -0700 Subject: [PATCH 04/12] nit typo --- python/ray/tune/utils/file_transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index bfbeef21fc6c..b1b28c6ad223 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -106,7 +106,7 @@ def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]: def _pack_dir( source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] = None ) -> io.BytesIO: - """Pack whole directory contents into a uncompressed tarfile. + """Pack whole directory contents into an uncompressed tarfile. This function accepts a ``files_stats`` argument. If given, only files whose stats differ from these stats will be packed. From 6043c66734087a38ac16ca0689dbe1a65c3500c3 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 11 Apr 2022 09:20:43 +0100 Subject: [PATCH 05/12] Add tests, default to blocking operations --- python/ray/tune/sync_client.py | 6 ++-- python/ray/tune/tests/test_sync.py | 48 ++++++++++++++++++++++++++ python/ray/tune/utils/file_transfer.py | 45 ++++++++++++++++-------- 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index e7d14378a23b..25b514858431 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -523,7 +523,7 @@ def _execute_sync( source_path=source_path, target_ip=target_ip, target_path=target_path, - _return_all_remotes=True, + return_futures=True, ) if self._store_remotes: @@ -535,14 +535,14 @@ def _execute_sync( def delete(self, target: str): if not self._last_target_tuple: logger.error( - f"Could not delete path {target} as we target node is not known." + f"Could not delete path {target} as the target node is not known." ) return node_ip = self._last_target_tuple[0] try: - ray.get(delete_on_node(node_ip=node_ip, path=target)) + delete_on_node(node_ip=node_ip, path=target) except Exception as e: logger.error( f"Could not delete path {target} on remote node {node_ip}: {e}" diff --git a/python/ray/tune/tests/test_sync.py b/python/ray/tune/tests/test_sync.py index 7ccd4c41c5a6..cbb76cd943ed 100644 --- a/python/ray/tune/tests/test_sync.py +++ b/python/ray/tune/tests/test_sync.py @@ -28,6 +28,7 @@ SyncerCallback, ) from ray.tune.utils.callback import create_default_callbacks +from ray.tune.utils.file_transfer import sync_dir_between_nodes, delete_on_node class TestSyncFunctionality(unittest.TestCase): @@ -455,6 +456,53 @@ def _start_process(self, cmd): self.assertEquals(client._sync_downs, 2) + def testSyncBetweenNodesAndDelete(self): + temp_source = tempfile.mkdtemp() + temp_up_target = tempfile.mkdtemp() + temp_down_target = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, temp_source) + self.addCleanup(shutil.rmtree, temp_up_target, ignore_errors=True) + self.addCleanup(shutil.rmtree, temp_down_target) + + os.makedirs(os.path.join(temp_source, "dir_level0", "dir_level1")) + with open(os.path.join(temp_source, "dir_level0", "file_level1.txt"), "w") as f: + f.write("Data\n") + + def check_dir_contents(path: str): + assert os.path.exists(os.path.join(path, "dir_level0")) + assert os.path.exists(os.path.join(path, "dir_level0", "dir_level1")) + assert os.path.exists(os.path.join(path, "dir_level0", "file_level1.txt")) + with open(os.path.join(path, "dir_level0", "file_level1.txt"), "r") as f: + assert f.read() == "Data\n" + + # Sanity check + check_dir_contents(temp_source) + + sync_dir_between_nodes( + source_ip=ray.util.get_node_ip_address(), + source_path=temp_source, + target_ip=ray.util.get_node_ip_address(), + target_path=temp_up_target, + ) + + # Check sync up + check_dir_contents(temp_up_target) + + sync_dir_between_nodes( + source_ip=ray.util.get_node_ip_address(), + source_path=temp_up_target, + target_ip=ray.util.get_node_ip_address(), + target_path=temp_down_target, + ) + + # Check sync up + check_dir_contents(temp_down_target) + + # Delete in some dir + delete_on_node(node_ip=ray.util.get_node_ip_address(), path=temp_up_target) + + assert not os.path.exists(temp_up_target) + def testSyncRemoteTaskOnlyDifferences(self): """Tests the RemoteTaskClient sync client. diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index b1b28c6ad223..0c40fbf24ce9 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -18,8 +18,8 @@ def sync_dir_between_nodes( target_path: str, force_all: bool = False, chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, - _return_all_remotes: bool = False, -) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: + return_futures: bool = False, +) -> Union[None, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: """Synchronize directory on source node to directory on target node. Per default, this function will collect information about already existing @@ -33,11 +33,12 @@ def sync_dir_between_nodes( target_path: Path to file or directory on target node. force_all: If True, all files will be transferred (not just differing files). chunk_size: Chunk size for data transfer. - _return_all_remotes: If True, returns a tuple of the unpack future, - the pack actor, and the files_stats future. + return_futures: If True, returns a tuple of the unpack future, + the pack actor, and the files_stats future. If False (default) will + block until synchronization finished and return None. Returns: - Ray future for scheduled unpacking task. + None, or Tuple of unpack future, pack actor, and files_stats future. """ pack_actor_on_source_node = _PackActor.options( @@ -59,24 +60,37 @@ def sync_dir_between_nodes( ) unpack_future = unpack_on_target_node.remote(pack_actor, target_path) - if _return_all_remotes: + if return_futures: return unpack_future, pack_actor, files_stats - return unpack_future + return ray.get(unpack_future) -def delete_on_node(node_ip: str, path: str) -> ray.ObjectRef: +def delete_on_node( + node_ip: str, path: str, return_future: bool = False +) -> Union[bool, ray.ObjectRef]: """Delete path on node. Args: node_ip: IP of node to delete path on. path: Path to delete on remote node. + return_future: If True, returns the delete future. Otherwise, blocks until + the task finished and returns True if the path was deleted or False if not + (e.g. if the path does not exist on the remote node). Returns: - Ray future for scheduled delete task. + Boolean indicating if deletion succeeded, or Ray future + for scheduled delete task. """ - delete_task = _delete_path.options(num_cpus=0, resources={f"node:{node_ip}": 0.01}) - return delete_task.remote(path) + delete_task = _remote_delete_path.options( + num_cpus=0, resources={f"node:{node_ip}": 0.01} + ) + future = delete_task.remote(path) + + if return_future: + return future + + return ray.get(future) def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]: @@ -206,7 +220,7 @@ def _iter_remote(actor: ray.ActorID) -> Generator[None, bytes, None]: yield buffer -def _unpack_dir(stream: io.BytesIO, target_dir: str): +def _unpack_dir(stream: io.BytesIO, target_dir: str) -> None: """Unpack tarfile stream into target directory.""" stream.seek(0) with tarfile.open(fileobj=stream) as tar: @@ -214,7 +228,7 @@ def _unpack_dir(stream: io.BytesIO, target_dir: str): @ray.remote -def _unpack_from_actor(pack_actor: ray.ActorID, target_dir: str): +def _unpack_from_actor(pack_actor: ray.ActorID, target_dir: str) -> None: """Iterate over chunks received from pack actor and unpack.""" stream = io.BytesIO() for buffer in _iter_remote(pack_actor): @@ -222,7 +236,6 @@ def _unpack_from_actor(pack_actor: ray.ActorID, target_dir: str): _unpack_dir(stream, target_dir=target_dir) -@ray.remote def _delete_path(target_path: str) -> bool: """Delete path (files and directories)""" if os.path.exists(target_path): @@ -232,3 +245,7 @@ def _delete_path(target_path: str) -> bool: os.remove(target_path) return True return False + + +# Only export once +_remote_delete_path = ray.remote(_delete_path) From 4cf906d2d0241465514f9f171b6beffea88bb23e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 11 Apr 2022 09:26:56 +0100 Subject: [PATCH 06/12] Fix generator type hint --- python/ray/tune/utils/file_transfer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index 0c40fbf24ce9..45de5d8d1219 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -195,7 +195,7 @@ def __init__( def get_full_data(self) -> bytes: return self.stream.getvalue() - def _chunk_generator(self) -> Generator[None, bytes, None]: + def _chunk_generator(self) -> Generator[bytes, None, None]: self.stream.seek(0) data = self.stream.read(self.chunk_size) while data: @@ -211,7 +211,7 @@ def next(self) -> Optional[bytes]: return None -def _iter_remote(actor: ray.ActorID) -> Generator[None, bytes, None]: +def _iter_remote(actor: ray.ActorID) -> Generator[bytes, None, None]: """Iterate over actor task and return as generator.""" while True: buffer = ray.get(actor.next.remote()) From bbf2f12de36e1b6202f35145636578312b99a827 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 11 Apr 2022 10:24:17 +0100 Subject: [PATCH 07/12] Add max_size argument --- python/ray/tune/sync_client.py | 3 ++ python/ray/tune/tests/test_sync.py | 14 ++++++++- python/ray/tune/utils/file_transfer.py | 42 +++++++++++++++++++++----- 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index 25b514858431..adc2018b13a1 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -464,6 +464,8 @@ def __init__(self, _store_remotes: bool = False): self._last_source_tuple = None self._last_target_tuple = None + self._max_size_bytes = None # No file size limit + def _sync_still_running(self) -> bool: if not self._sync_future: return False @@ -524,6 +526,7 @@ def _execute_sync( target_ip=target_ip, target_path=target_path, return_futures=True, + max_size_bytes=self.max_size_bytes, ) if self._store_remotes: diff --git a/python/ray/tune/tests/test_sync.py b/python/ray/tune/tests/test_sync.py index cbb76cd943ed..2a11f1922d0b 100644 --- a/python/ray/tune/tests/test_sync.py +++ b/python/ray/tune/tests/test_sync.py @@ -14,6 +14,7 @@ from collections import deque import ray +from ray.exceptions import RayTaskError from ray.rllib import _register_all from ray import tune @@ -488,6 +489,17 @@ def check_dir_contents(path: str): # Check sync up check_dir_contents(temp_up_target) + # Max size exceeded + with self.assertRaises(RayTaskError): + sync_dir_between_nodes( + source_ip=ray.util.get_node_ip_address(), + source_path=temp_up_target, + target_ip=ray.util.get_node_ip_address(), + target_path=temp_down_target, + max_size_bytes=2, + ) + assert not os.listdir(temp_down_target) + sync_dir_between_nodes( source_ip=ray.util.get_node_ip_address(), source_path=temp_up_target, @@ -495,7 +507,7 @@ def check_dir_contents(path: str): target_path=temp_down_target, ) - # Check sync up + # Check sync down check_dir_contents(temp_down_target) # Delete in some dir diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index 45de5d8d1219..c0ab2ea0c09d 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -8,7 +8,8 @@ import ray -_DEFAULT_CHUNK_SIZE_BYTES = 500 * 1024 * 1024 +_DEFAULT_CHUNK_SIZE_BYTES = 500 * 1024 * 1024 # 500 MiB +_DEFAULT_MAX_SIZE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GiB def sync_dir_between_nodes( @@ -17,7 +18,8 @@ def sync_dir_between_nodes( target_ip: str, target_path: str, force_all: bool = False, - chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, + chunk_size_bytes: int = _DEFAULT_CHUNK_SIZE_BYTES, + max_size_bytes: Optional[int] = _DEFAULT_MAX_SIZE_BYTES, return_futures: bool = False, ) -> Union[None, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: """Synchronize directory on source node to directory on target node. @@ -32,7 +34,9 @@ def sync_dir_between_nodes( target_ip: IP of target node. target_path: Path to file or directory on target node. force_all: If True, all files will be transferred (not just differing files). - chunk_size: Chunk size for data transfer. + chunk_size_bytes: Chunk size for data transfer. + max_size_bytes: If packed data exceeds this value, raise an error before + transfer. If ``None``, no limit is enforced. return_futures: If True, returns a tuple of the unpack future, the pack actor, and the files_stats future. If False (default) will block until synchronization finished and return None. @@ -56,7 +60,10 @@ def sync_dir_between_nodes( ).remote(target_path) pack_actor = pack_actor_on_source_node.remote( - source_path, files_stats, chunk_size=chunk_size + source_dir=source_path, + files_stats=files_stats, + chunk_size_bytes=chunk_size_bytes, + max_size_bytes=max_size_bytes, ) unpack_future = unpack_on_target_node.remote(pack_actor, target_path) @@ -164,6 +171,10 @@ def _pack_dir( return stream +def _gib_string(num_bytes: float) -> str: + return f"{float(num_bytes / 1024 ** 3):.2f}GiB" + + @ray.remote class _PackActor: """Actor wrapping around a packing job. @@ -179,17 +190,34 @@ class _PackActor: files_stats: Dict of relative filenames mapping to a tuple of (mtime, filesize). Only files that differ from these stats will be packed. - chunk_size: Cut bytes stream into chunks of this size in bytes. + chunk_size_bytes: Cut bytes stream into chunks of this size in bytes. + max_size_bytes: If packed data exceeds this value, raise an error before + transfer. If ``None``, no limit is enforced. """ def __init__( self, source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] = None, - chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, + chunk_size_bytes: int = _DEFAULT_CHUNK_SIZE_BYTES, + max_size_bytes: Optional[int] = _DEFAULT_MAX_SIZE_BYTES, ): self.stream = _pack_dir(source_dir=source_dir, files_stats=files_stats) - self.chunk_size = chunk_size + + # Get buffer size + self.stream.seek(0, 2) + file_size = self.stream.tell() + + if max_size_bytes and file_size > max_size_bytes: + raise RuntimeError( + f"Packed directory {source_dir} content has a size of " + f"{_gib_string(file_size)}, which exceeds the limit " + f"of {_gib_string(max_size_bytes)}. Please check the directory " + f"contents. If you want to transfer everything, you can increase " + f"or disable the limit by passing the `max_size` argument." + ) + self.chunk_size = chunk_size_bytes + self.max_size = max_size_bytes self.iter = None def get_full_data(self) -> bytes: From 9f6900f8b27c2afaeb30d838289eb40c65133a69 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 11 Apr 2022 16:47:51 +0100 Subject: [PATCH 08/12] Update docstring --- python/ray/tune/utils/file_transfer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index c0ab2ea0c09d..c988cf1fea0a 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -30,9 +30,9 @@ def sync_dir_between_nodes( Args: source_ip: IP of source node. - source_path: Path to file or directory on source node. + source_path: Path to directory on source node. target_ip: IP of target node. - target_path: Path to file or directory on target node. + target_path: Path to directory on target node. force_all: If True, all files will be transferred (not just differing files). chunk_size_bytes: Chunk size for data transfer. max_size_bytes: If packed data exceeds this value, raise an error before From 06ceadff1a4d181933ec7cf462fb1b96bed6a5f6 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 11 Apr 2022 18:40:18 +0100 Subject: [PATCH 09/12] Fix private attribute access --- python/ray/tune/sync_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index adc2018b13a1..8f3b3eef5668 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -526,7 +526,7 @@ def _execute_sync( target_ip=target_ip, target_path=target_path, return_futures=True, - max_size_bytes=self.max_size_bytes, + max_size_bytes=self._max_size_bytes, ) if self._store_remotes: From c1202ecbcba5fb2be21744b77b7a1d99293e924d Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 12 Apr 2022 08:56:04 +0100 Subject: [PATCH 10/12] Update python/ray/tune/tests/test_sync.py Co-authored-by: matthewdeng --- python/ray/tune/tests/test_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/tests/test_sync.py b/python/ray/tune/tests/test_sync.py index 2a11f1922d0b..14def3f34881 100644 --- a/python/ray/tune/tests/test_sync.py +++ b/python/ray/tune/tests/test_sync.py @@ -498,7 +498,7 @@ def check_dir_contents(path: str): target_path=temp_down_target, max_size_bytes=2, ) - assert not os.listdir(temp_down_target) + assert not os.listdir(temp_down_target) sync_dir_between_nodes( source_ip=ray.util.get_node_ip_address(), From 3e7beb1dbc4abe8fc7d7e3a77b568916d803cb5c Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 12 Apr 2022 08:58:24 +0100 Subject: [PATCH 11/12] Nit --- python/ray/tune/tests/test_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/tune/tests/test_sync.py b/python/ray/tune/tests/test_sync.py index 14def3f34881..49f605cf574b 100644 --- a/python/ray/tune/tests/test_sync.py +++ b/python/ray/tune/tests/test_sync.py @@ -498,6 +498,7 @@ def check_dir_contents(path: str): target_path=temp_down_target, max_size_bytes=2, ) + assert not os.listdir(temp_down_target) sync_dir_between_nodes( From 3af776e7951d5b0a03b34d5dc5c915fa1de0b442 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 12 Apr 2022 09:05:30 +0100 Subject: [PATCH 12/12] logger error -> warning --- python/ray/tune/sync_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index 8f3b3eef5668..15d54b3bf39c 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -537,7 +537,7 @@ def _execute_sync( def delete(self, target: str): if not self._last_target_tuple: - logger.error( + logger.warning( f"Could not delete path {target} as the target node is not known." ) return @@ -547,7 +547,7 @@ def delete(self, target: str): try: delete_on_node(node_ip=node_ip, path=target) except Exception as e: - logger.error( + logger.warning( f"Could not delete path {target} on remote node {node_ip}: {e}" )