Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Add support for metrics aggregation #22099

Merged
merged 23 commits into from
Mar 8, 2022

Conversation

jwyyy
Copy link
Contributor

@jwyyy jwyyy commented Feb 3, 2022

Why are these changes needed?

This PR allows users to aggregate metrics returned from all workers.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

python/ray/train/trainer.py Outdated Show resolved Hide resolved
@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 4, 2022

@amogkam @matthewdeng gentle ping 😄

@matthewdeng matthewdeng self-assigned this Feb 4, 2022
@matthewdeng
Copy link
Contributor

cc @Yard1

@amogkam
Copy link
Contributor

amogkam commented Feb 8, 2022

Can we implement this as an AverageResultsPreprocessor (https://github.com/ray-project/ray/blob/master/python/ray/train/callbacks/results_preprocessors/preprocessor.py#L8) so that it can be leveraged with callbacks?

Also, there would have to be some way for users to specify which keys to average, and also how many samples per key each worker is processing. There might be some workers that are processing more batches than the others, so we can't just do an even average for all the workers.

@Yard1
Copy link
Member

Yard1 commented Feb 8, 2022

@amogkam wouldn't implementing this as a result preprocessor require users to create their own callback subclasses? furthermore, each callback would have to rerun the same code, unless we implement some sort of caching for result preprocessors.

@amogkam
Copy link
Contributor

amogkam commented Feb 8, 2022

@Yard1 right we would also have to provide a way for users to easily configure the preprocessors that are used for the callbacks

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 8, 2022

@amogkam @Yard1 thank you for the helpful discussion! So what is the final plan? Do we integrate the aggregation into TrainingIterator or add a new preprocessor?

@amogkam
Copy link
Contributor

amogkam commented Feb 9, 2022

@jwyyy I would recommend the below as the implementation:

  1. Add a new AverageResultsPreprocessor(Preprocessor) that implements a generic form of averaging for numerical types. For non numerical types, it can just aggregate the results into a list. The implementation can roughly follow this
    def _process_stats(self, worker_stats):
    .
  2. Add a new preprocessor arg to Trainer.run. Then when iterating through the results, call self.preprocessor.process_results(results) before sending them to the Callback.
  3. We also need to add a magic key that users would call in train.report to specify the number of samples/batches that this worker processed. In the AveragePreprocessor, we then use this magic key to average the results instead of just an even average across all workers.

Let me know if this makes sense!

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 13, 2022

Hi @amogkam @Yard1 @matthewdeng , I revised the PR based on the previous discussion.

Average metrics are not appended to the results list. Instead they are added as new key-value pairs to existing result Dict. It makes sure the average metrics are visible to all workers' results, and (I think) this is better integrated with the worker_to_log argument in MLflowLoggerCallback and TBXLoggerCallback.

Please let me know your comments and feedback. Thank you very much! (Failed tests seem unrelated to this PR.)

Copy link
Contributor

@matthewdeng matthewdeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation makes sense to me for the averaging use-case, tagging @Yard1 for some input on the usability/extensibility aspect!

python/ray/train/trainer.py Outdated Show resolved Hide resolved
python/ray/train/trainer.py Outdated Show resolved Hide resolved
@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 17, 2022

I updated the PR with the support for a default list of aggregated metrics and customized aggregation methods. Looking forward to more discussion on where to save aggregated metrics.

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 23, 2022

Hi @matthewdeng @Yard1, thank you for your suggestions! I have revised the implementation based on this. One modification: to get the average weight before calling self.aggregate_fn(), I add a prepare() to AggregateFn class (because self.aggregate_fn(values) has no access to weights).

Currently, aggregated results are still added to the existing result Dicts. Do you have any agreement on how to store them? I assume we want to implement a customized dict/list class (link) (but the integration with callbacks wasn't completely sorted out). Can we add a new argument to callbacks and allow them to use aggregated metrics directly just like worker_to_log?

Please let me know your comments when you have time to review it again. Thanks!

Copy link
Contributor

@matthewdeng matthewdeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great - thanks a ton for the many iterations!

Regarding where to store these, after thinking about it some more it seems like the current options worth discussing are:

  1. Add aggregate metrics to all workers.
  2. Add aggregate metrics to 0th worker.
  3. Change the entire pattern of passing reported metrics to use a more complex dictionary as opposed to the current list.

In the long run I do think we'll move towards (3), but it's out of scope for this PR. There are some additional considerations we'll need to think more about such as the general usability of Callbacks and integration with Tune (currently we only pass worker 0 results and don't support Train Callbacks, but we'll eventually need to support passing aggregate metrics to Tune).

For (1) vs (2), the tradeoff is verbosity (e.g. if the user wants to log all results to a JSON file) vs. configurability (e.g. if the user wants to log only worker 2 results + aggregate results to TensorBoard). My initial hunch is to go with (2) and see if users request this type of customizability. @jwyyy @Yard1 thoughts?

Also cc @amogkam since this is worth considering for the API redesign.

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 23, 2022

@matthewdeng Thank you very much for your comments! I will address all issues by tomorrow.

For (1) vs (2), the tradeoff is verbosity (e.g. if the user wants to log all results to a JSON file) vs. configurability (e.g. if the user wants to log only worker 2 results + aggregate results to TensorBoard). My initial hunch is to go with (2) and see if users request this type of customizability. @jwyyy @Yard1 thoughts?

I think in MLflowLoggerCallback and TBXLoggerCallback, the worker to log is chosen by users (handled by IndexedResultsPreprocessor), so it may not be worker 0. Since we don't necessarily know which worker users want to log (default is 0), we may need to add aggregated results to all workers. But it has the drawbacks you mentioned.

Also, callbacks have results_preprocessors as well. Should we change the argument name of the aggregation preprocessors, i.e. not using results_preprocessors, to avoid potential confusion?

@Yard1
Copy link
Member

Yard1 commented Feb 23, 2022

I think for now we should go with option (1), due to what @jwyyy has pointed out. That being said, it would be great if we could get a followup PR quickly to move to a more complex data structure, as that would be the best. I still like my list subclass idea 😂

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 23, 2022

I think for now we should go with option (1), due to what @jwyyy has pointed out. That being said, it would be great if we could get a followup PR quickly to move to a more complex data structure, as that would be the best. I still like my list subclass idea 😂

If there is an agreement, I can follow up with another PR to implement the data structure idea 😄

Copy link
Contributor

@matthewdeng matthewdeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally this looks good to me! Could you add tests and docs for these new changes?

@amogkam can you take a pass?

@jwyyy
Copy link
Contributor Author

jwyyy commented Feb 25, 2022

Functionally this looks good to me! Could you add tests and docs for these new changes?

Sure! I will add some tests and update doc soon. Thanks a lot!

Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the work on this @jwyyy! Left some comments.

Can we also add tests for this in test_results_preprocessors?

One more point I want to clarify, what is the behavior if some workers report valid results for a key, but some workers do not? Should we ignore the key in it's entirety or still do the aggregation, but only for the workers with valid values? also cc @matthewdeng @Yard1 for thoughts on this.

python/ray/train/callbacks/print.py Outdated Show resolved Hide resolved
Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the updates @jwyyy! I think this should be our final round, left some minor comments!

Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jwyyy! Just one minor nit, but other than that this lgtm!

[(AverageResultsPreprocessor, 2.0), (MaxResultsPreprocessor, 3.0)],
)
def test_warning_in_aggregate_results_preprocessors(
caplog, results_preprocessor, expected_value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I never knew about the caplog fixture :)

@amogkam amogkam merged commit d1009c8 into ray-project:master Mar 8, 2022
@jwyyy jwyyy deleted the aggregate_metrics branch March 9, 2022 03:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants