-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[tune] Chunk file transfers in cross-node checkpoint syncing #23804
[tune] Chunk file transfers in cross-node checkpoint syncing #23804
Conversation
def _pack_dir( | ||
source_dir: str, files_stats: Optional[Dict[str, Tuple[float, int]]] = None | ||
) -> io.BytesIO: | ||
"""Pack whole directory contents into a uncompressed tarfile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for not using compression?
nit:
"""Pack whole directory contents into a uncompressed tarfile. | |
"""Pack whole directory contents into an uncompressed tarfile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reopening this question! Would using compression help as an optimization for transferring large directories?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry didn't reply to this earlier. In my benchmarks gzip compression actually added both wallclock time and memory overhead. This will likely depend on the kind of data we're transfering, but my assumption is that we have either large binary data (hard to compress) or small text data (easy to compress), whereas we would get most benefit from compressing large text data.
# Only export once | ||
_remote_get_recursive_files_and_stats = ray.remote(_get_recursive_files_and_stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw this was from the existing code, but for my learning do you know what the purpose of doing this is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to do this is to only export the remote function once. If you do this in the public API, you'll export the same method multiple times. Think of this as "caching" the remote wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(just fyi if the same method gets exported too many times Ray will raise a warning)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see! Why not use the decorator pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a previous iteration we also directly called the non-remote version, but this was removed - I'll update the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'd like to keep both methods, as I might be accessing the non-remote method in a follow up PR. Also since this is private scope it should be ok to have both here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic looks good to me!
# Only export once | ||
_remote_get_recursive_files_and_stats = ray.remote(_get_recursive_files_and_stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see! Why not use the decorator pattern?
|
||
|
||
@ray.remote | ||
class _PackActor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on exposing a configurable max_size
which can be used to warn or raise an exception if surpassed? (even though this isn't actually exposed to the user today)
Anecdotally I've appreciated runtime envs immediately erroring out when I accidentally include a 50GB dataset 😄 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a max size bytes argument defaulting to 1 GB, but it is disabled in the sync client. I agree if users call this method they should be protected from these transfers (even if this is not a public API right now). in the sync client we mostly sync checkpoints and actually have to support large file sizes, so a default limit doesn't really make sense.
chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, | ||
_return_all_remotes: bool = False, | ||
) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: | ||
"""Synchronize directory on source node to directory on target node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably from last PR...
Is it assumed that source_path and target_path must be both directories or both files? It can't be mixed right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name is sync_dir_between_nodes
so it is assumed that both paths are directories
target_path: str, | ||
force_all: bool = False, | ||
chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, | ||
_return_all_remotes: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ever used with False?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the API, and yes, if we end up promoting this to a public API most use cases will call it with False. I have a follow up script for this ready
chunk_size: int = _DEFAULT_CHUNK_SIZE_BYTES, | ||
_return_all_remotes: bool = False, | ||
) -> Union[ray.ObjectRef, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]: | ||
"""Synchronize directory on source node to directory on target node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we beef up that this is only about kicking off the sync/scheduling the sync rather than really having it synced by the time the function returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just updated the public functions, it defaults to blocking calls and only returns futures when return_futures=True
_DEFAULT_CHUNK_SIZE_BYTES = 500 * 1024 * 1024 | ||
|
||
|
||
def sync_dir_between_nodes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to be part of this PR, but would be good to have some benchmark like for this file size, takes this amount of time to sync between nodes, assuming there is not a variance (so that this benchmark is meaningful as a reference)
Also this sort of opens up the possibility of syncing arbitrarily large size files by chunking. Would also be nice to have some safety measurement to warn if slow, since everything is pretty much running on one thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calling method has a warning if this is slow. IMO this should also always stay part of the enclosing function
except Exception as e: | ||
logger.error( | ||
f"Could not delete path {target} on remote node {node_ip}: {e}" | ||
) | ||
|
||
def wait(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can sync arbitrarily large file size now, we may run into this method more often than before and end up in a blocking situation. We need to think of how to supply visibility if/when this happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw, we synced arbitrarily large files before as well, just with rsync and not with remote tasks. And we do warn already: with warn_if_slow("callbacks.on_trial_save")
, though we may want to think about making this message a bit more insightful
@krfricke CI fails on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good to me. please do give the other folks a chance to look at the updated version.
@@ -523,16 +453,18 @@ class RemoteTaskClient(SyncClient): | |||
will not kill the previous sync command, so it may still be executed. | |||
""" | |||
|
|||
def __init__(self, store_pack_future: bool = False): | |||
def __init__(self, _store_remotes: bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to make this a private argument? store_remotes should be good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only needed for testing (so we can access and inspect the futures), so it's nothing a user would usually do or use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I see. can you please these it super clear that nobody should flip this parameter unless for testing.
like a doc string please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add this in the next update (coming soon :-) ) - I think it's not urgent as users never instantiate SyncClients themselves. It's an internal concept and it's instantiated by Ray Tune automatically. So nobody ever calls SomeSyncClient(..)
.
Co-authored-by: matthewdeng <[email protected]>
Why are these changes needed?
What: This introduces a general utility to synchronize directories between two nodes, derived from the RemoteTaskClient. This implementation uses chunked transfers for more efficient communication.
Why: Transferring files over 2GB in size leads to superlinear time complexity in some setups (e.g. local macbooks). This could be due to memory limits, swapping, or gRPC limits, and is explored in a different thread. To overcome this limitation, we use chunked data transfers which show quasi-linear scalability for larger files.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.