Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Support large checkpoints and other arguments #28826

Merged
merged 6 commits into from
Sep 29, 2022

Conversation

amogkam
Copy link
Contributor

@amogkam amogkam commented Sep 27, 2022

Signed-off-by: Amog Kamsetty [email protected]

Resolves https://discuss.ray.io/t/resuming-training-from-big-models-in-ray-train-leads-to-grcp-error/7652.

Previously the arguments passed to the Trainer would be captured in the Trainable context. For arguments that are very large in size, this would prevent the Trainable from being registered due to gRPC resource limits:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/tuner.py", line 234, in fit
    return self._local_tuner.fit()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/impl/tuner_internal.py", line 283, in fit
    analysis = self._fit_internal(trainable, param_space)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/impl/tuner_internal.py", line 380, in _fit_internal
    analysis = run(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/tune.py", line 520, in run
    experiments[i] = Experiment(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/experiment/experiment.py", line 166, in __init__
    raise TuneError(
ray.tune.error.TuneError: The Trainable/training function is too large for grpc resource limit. Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use tune.with_parameters() to put large objects in the Ray object store. 
Original exception: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/experiment/experiment.py", line 163, in __init__
    self._run_identifier = Experiment.register_if_needed(run)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/experiment/experiment.py", line 356, in register_if_needed
    register_trainable(name, run_object)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/registry.py", line 101, in register_trainable
    _global_registry.register(TRAINABLE_CLASS, name, trainable)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/registry.py", line 189, in register
    self.flush_values()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/tune/registry.py", line 211, in flush_values
    _internal_kv_put(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/experimental/internal_kv.py", line 94, in _internal_kv_put
    return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/gcs_utils.py", line 177, in wrapper
    return f(self, *args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/gcs_utils.py", line 296, in internal_kv_put
    reply = self._kv_stub.InternalKVPut(req, timeout=timeout)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "Received message larger than max (500029218 vs. 262144000)"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:172.31.14.26:9031 {created_time:"2022-09-27T13:14:53.915140197-07:00", grpc_status:8, grpc_message:"Received message larger than max (500029218 vs. 262144000)"}"
>

Instead, we now always use tune.with_parameters to save the Trainer arguments in the object store rather than capturing it in the context.

The test fails before these changes, but passes afterwards.

Failing test before the changes:

self = <ray.tune.experiment.experiment.Experiment object at 0x11ae54bb0>, name = None, run = <class 'ray.train.base_trainer.BaseTrainer.as_trainable.<locals>.TrainTrainable'>
stop = None, time_budget_s = None, config = {}, resources_per_trial = None, num_samples = 1, local_dir = '/Users/amog/ray_results'
_experiment_checkpoint_dir = '/Users/amog/ray_results/DummyTrainer_2022-09-27_16-18-54'
sync_config = SyncConfig(upload_dir=None, syncer='auto', sync_on_checkpoint=True, sync_period=300, sync_timeout=1800), trial_name_creator = None, trial_dirname_creator = None
log_to_file = False, checkpoint_freq = 0, checkpoint_at_end = True, keep_checkpoints_num = None, checkpoint_score_attr = None, export_formats = None, max_failures = 0
restore = None

    def __init__(
        self,
        name,
        run,
        stop=None,
        time_budget_s=None,
        config=None,
        resources_per_trial=None,
        num_samples=1,
        local_dir=None,
        _experiment_checkpoint_dir: Optional[str] = None,
        sync_config=None,
        trial_name_creator=None,
        trial_dirname_creator=None,
        log_to_file=False,
        checkpoint_freq=0,
        checkpoint_at_end=False,
        keep_checkpoints_num=None,
        checkpoint_score_attr=None,
        export_formats=None,
        max_failures=0,
        restore=None,
    ):

        local_dir = _get_local_dir_with_expand_user(local_dir)
        # `_experiment_checkpoint_dir` is for internal use only for better
        # support of Tuner API.
        # If set, it should be a subpath under `local_dir`. Also deduce `dir_name`.
        self._experiment_checkpoint_dir = _experiment_checkpoint_dir
        if _experiment_checkpoint_dir:
            experiment_checkpoint_dir_path = Path(_experiment_checkpoint_dir)
            local_dir_path = Path(local_dir)
            assert local_dir_path in experiment_checkpoint_dir_path.parents
            # `dir_name` is set by `_experiment_checkpoint_dir` indirectly.
            self.dir_name = os.path.relpath(_experiment_checkpoint_dir, local_dir)

        config = config or {}
        sync_config = sync_config or SyncConfig()
        if (
            callable(run)
            and not inspect.isclass(run)
            and _detect_checkpoint_function(run)
        ):
            if checkpoint_at_end:
                raise ValueError(
                    "'checkpoint_at_end' cannot be used with a "
                    "checkpointable function. You can specify "
                    "and register checkpoints within "
                    "your trainable function."
                )
            if checkpoint_freq:
                raise ValueError(
                    "'checkpoint_freq' cannot be used with a "
                    "checkpointable function. You can specify checkpoints "
                    "within your trainable function."
                )
        try:
            self._run_identifier = Experiment.register_if_needed(run)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
>               raise TuneError(
                    f"The Trainable/training function is too large for grpc resource "
                    f"limit. Check that its definition is not implicitly capturing a "
                    f"large array or other object in scope. "
                    f"Tip: use tune.with_parameters() to put large objects "
                    f"in the Ray object store. \n"
                    f"Original exception: {traceback.format_exc()}"
                )
E               ray.tune.error.TuneError: The Trainable/training function is too large for grpc resource limit. Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use tune.with_parameters() to put large objects in the Ray object store.
E               Original exception: Traceback (most recent call last):
E                 File "/Users/amog/dev/ray/python/ray/tune/experiment/experiment.py", line 163, in __init__
E                   self._run_identifier = Experiment.register_if_needed(run)
E                 File "/Users/amog/dev/ray/python/ray/tune/experiment/experiment.py", line 356, in register_if_needed
E                   register_trainable(name, run_object)
E                 File "/Users/amog/dev/ray/python/ray/tune/registry.py", line 101, in register_trainable
E                   _global_registry.register(TRAINABLE_CLASS, name, trainable)
E                 File "/Users/amog/dev/ray/python/ray/tune/registry.py", line 189, in register
E                   self.flush_values()
E                 File "/Users/amog/dev/ray/python/ray/tune/registry.py", line 211, in flush_values
E                   _internal_kv_put(
E                 File "/Users/amog/dev/ray/python/ray/_private/client_mode_hook.py", line 105, in wrapper
E                   return func(*args, **kwargs)
E                 File "/Users/amog/dev/ray/python/ray/experimental/internal_kv.py", line 94, in _internal_kv_put
E                   return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
E                 File "/Users/amog/dev/ray/python/ray/_private/gcs_utils.py", line 177, in wrapper
E                   return f(self, *args, **kwargs)
E                 File "/Users/amog/dev/ray/python/ray/_private/gcs_utils.py", line 296, in internal_kv_put
E                   reply = self._kv_stub.InternalKVPut(req, timeout=timeout)
E                 File "/Users/amog/dev/ray/lib/python3.8/site-packages/grpc/_channel.py", line 923, in __call__
E                   return _end_unary_response_blocking(state, call, False, None)
E                 File "/Users/amog/dev/ray/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _end_unary_response_blocking
E                   raise _InactiveRpcError(state)
E               grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
E               	status = StatusCode.RESOURCE_EXHAUSTED
E               	details = "Sent message larger than max (800007477 vs. 536870912)"
E               	debug_error_string = "{"created":"@1664313548.580192000","description":"Sent message larger than max (800007477 vs. 536870912)","file":"src/core/ext/filters/message_size/message_size_filter.cc","file_line":270,"grpc_status":8}"
E               >

../../tune/experiment/experiment.py:166: TuneError

Why are these changes needed?

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Amog Kamsetty <[email protected]>
@amogkam amogkam requested review from richardliaw and removed request for richardliaw September 27, 2022 21:24
@Yard1
Copy link
Member

Yard1 commented Sep 27, 2022

This is great! I am a bit worried this will make subclassing difficult. Could we perhaps apply the with_parameters in a method? Perhaps something like:

def _with_parameters(self, trainable_cls, **config):
    return tune.with_parameters(trainable_cls, **config)

def as_trainable(self):
    ...
    return self._with_parameters(Trainable, **config)

That way if as_trainable is overriden and the original trainable subclassed, with_parameters can be called only on the final class by making self._with_parameters a noop (and applying the logic in the subclass manually). Should make for nicer dev experience!

Copy link
Contributor

@krfricke krfricke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big fan of this change, but it seems it break some tests

@amogkam
Copy link
Contributor Author

amogkam commented Sep 28, 2022

Hmm @Yard1, I don't think as_trainable is supposed to be overridden by developers. Trainables hopefully should not be exposed to developers. Seems like we are only doing this in HuggingfaceTrainer, but only to workaround checkpoint syncing, and we should eventually remove this. We are calling super().as_trainable() anyways, so it can take advantage of tune.with_parameters by default.

@amogkam
Copy link
Contributor Author

amogkam commented Sep 28, 2022

I think we should make as_trainable private to make this more clear

@amogkam
Copy link
Contributor Author

amogkam commented Sep 28, 2022

Is your suggestion specifically needed for HuggingfaceTrainer @Yard1?

Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Copy link
Member

@Yard1 Yard1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

python/ray/train/base_trainer.py Show resolved Hide resolved
Signed-off-by: Amog Kamsetty <[email protected]>
@@ -339,6 +339,11 @@ def setup(self, config):
setup_kwargs[k] = parameter_registry.get(prefix + k)
super(_Inner, self).setup(config, **setup_kwargs)

# Workaround for actor name not being logged correctly
# if __repr__ is not directly defined in a class.
def __repr__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah is this always an issue then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still hasn't been fixed in core afaik

Copy link
Contributor

@xwjiang2010 xwjiang2010 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a ton for the change!

Copy link
Contributor

@krfricke krfricke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@krfricke krfricke removed their assignment Sep 29, 2022
Signed-off-by: Amog Kamsetty <[email protected]>
@amogkam amogkam merged commit 2b62bba into ray-project:master Sep 29, 2022
@amogkam amogkam deleted the air-support-large-checkpoints branch September 29, 2022 19:26
WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
Signed-off-by: Amog Kamsetty [email protected]

Previously the arguments passed to the Trainer would be captured in the Trainable context. For arguments that are very large in size, this would prevent the Trainable from being registered due to gRPC resource limits.

Instead, we now always use tune.with_parameters to save the Trainer arguments in the object store rather than capturing it in the context.

Signed-off-by: Weichen Xu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants