Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] pyarrow.fs persistence (3/n): Introduce new Checkpoint API #37925

Merged

Conversation

justinvyu
Copy link
Contributor

Why are these changes needed?

This PR introduces the new Checkpoint API (based on the prototype PR #36969). This PR also adds a set of simplified unit tests for the checkpoint class functionality that tests multiple types of checkpoint path/fs inputs.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Justin Yu <[email protected]>

Fix lint

Signed-off-by: Justin Yu <[email protected]>
Signed-off-by: Justin Yu <[email protected]>

Missing import

Signed-off-by: Justin Yu <[email protected]>
Signed-off-by: Justin Yu <[email protected]>
Signed-off-by: Justin Yu <[email protected]>
Comment on lines 196 to +198
_create_directory(fs=fs, fs_path=fs_path)
_pyarrow_fs_copy_files(local_path, fs_path, destination_filesystem=fs)
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: if exclude is not passed, we were previously passing through, even though it should just perform this if block.

if path and not filesystem:
self.filesystem, self.path = pyarrow.fs.FileSystem.from_uri(path)

# The UUID is generated by hashing the combination of the file system type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this could potentially be dangerous if the data was updated somehow. What if we made it purely randomly generated (presumably it gets carried along whenenever Checkpoint is passed to different workers within this class)?

Copy link
Contributor Author

@justinvyu justinvyu Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about we just generate this uuid whenever to_directory gets called, and don't keep it as an attribute? That way we always use the latest path/filesystem rather than what it was at initialization.

If it's a random uuid, then we no longer de-duplicate downloads to the same directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl I think the random uuid idea also works in most use cases.

ds.map_batches(Predictor, fn_args=(result.checkpoint,))  # <-- each map batches worker uses the same checkpoint w/ the same uuid

Trainer(resume_from_checkpoint=result.checkpoint)  # <-- each train worker downloads a ckpt with the same uuid

Only case it doesn't cover is multiple processes creating a separate checkpoints pointing to the same location.

I'm ok with either way, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's go with the random uuid then, because I think this is what is currently implemented in the air.Checkpoint code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the air.Checkpoint currently uses a canonical uuid for URI-checkpoints (so same as the implementation I have now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Hmm, I feel it's a bit risky to use that so would still prefer to generate random ones to start at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, changed, ptal!

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question on whether we can use a random UUID instead.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 31, 2023
Signed-off-by: Justin Yu <[email protected]>
@justinvyu justinvyu removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 31, 2023
@justinvyu justinvyu requested a review from ericl July 31, 2023 22:54
@justinvyu justinvyu changed the title [air] pyarrow.fs persistence: Introduce new Checkpoint API [air] pyarrow.fs persistence (3/n): Introduce new Checkpoint API Jul 31, 2023
@justinvyu justinvyu added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Aug 1, 2023
@ericl ericl merged commit 9d9f482 into ray-project:master Aug 1, 2023
2 checks passed
@justinvyu justinvyu deleted the air/persistence/new_checkpoint_api branch August 1, 2023 06:09
@pcmoritz
Copy link
Contributor

pcmoritz commented Aug 1, 2023

Why do we suddenly have a public ray.train.checkpoint namespace? Please put it into _internal and only expose it as ray.train.Checkpoint, see ray-project/enhancements#36

@matthewdeng
Copy link
Contributor

@pcmoritz why should this be moved to _internal? Wouldn't the typical pattern be to:

  1. Define Checkpoint in train/checkpoint.py
  2. Import ray.train.checkpoint.Checkpoint in train/__init__.py and include it in __all__?

@ericl
Copy link
Contributor

ericl commented Aug 1, 2023

The main things is avoiding having redundant public aliases for the same class right? So we don't want both ray.train.checkpoint.Checkpoint and ray.train.Checkpoint to exist at the same time.

@ericl
Copy link
Contributor

ericl commented Aug 1, 2023

Though, I don't think we are consistently following this throughout the codebase. For example, in Ray Data, we have ray.data.Dataset as well as ray.data.dataset.Dataset.

ericl pushed a commit that referenced this pull request Aug 3, 2023
…ection (#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from #37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from #37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: NripeshN <[email protected]>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: e428265 <[email protected]>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: Victor <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants