Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC][Train] Allow for reporting results from multiple workers #31409

Closed
Yard1 opened this issue Jan 3, 2023 · 14 comments
Closed

[RFC][Train] Allow for reporting results from multiple workers #31409

Yard1 opened this issue Jan 3, 2023 · 14 comments
Assignees
Labels
enhancement Request for new feature and/or capability ray-team-created Ray Team created RFC RFC issues train Ray Train Related Issue

Comments

@Yard1
Copy link
Member

Yard1 commented Jan 3, 2023

Description

Current state

Currently, Ray Train only reports metrics from the first worker. This is fine in most cases, but for some applications, it may be desirable to report metrics from all workers and/or report aggregations, such as mean and std. We also require that functionality for some tests.

Note: Saving checkpoints from multiple workers is beyond the scope of this proposal.

Before Ray AIR, Ray Train supported reporting result aggregation through result preprocessors (#22099).

With the current structure of the DataParallelTrainer, the reporting code is fully contained within the _report method:

    def _report(self, training_iterator: TrainingIterator) -> None:
        for results in training_iterator:
            first_worker_results = results[0]
            tune.report(**first_worker_results)

As can be seen, it would be trivial to extend this functionality to arbitrary number of workers or aggregation logic. Below are two proposals on how to allow users to do that in a lightweight manner.

Proposal 1: Promote _report to DeveloperAPI and encourage users to subclass

In this proposal, we encourage users to simply subclass DataParallelTrainer/TorchTrainer (and so on) and override the report method with their own custom logic, eg.

class TorchTrainerMean(TorchTrainer):
    def report(self, training_iterator: TrainingIterator) -> None:
        for results in training_iterator:
            mean_results = {
                f"mean_{k}": np.mean([result[k] for result in results])
                for k in results[0]
                if not k.startswith("_")
            }
            tune.report(mean_results)

Proposal 2: Add results_processing_fn argument to DataParallelTrainer

The class would be modified to include:

    def __init__(
        self,
        ...,
        *,
        results_processing_fn: Callable[[List[Dict[str, Any]]], Dict[str, Any]]=lambda results: results[0]
    ):
        ...

    def _report(self, training_iterator: TrainingIterator) -> None:
        for results in training_iterator:
            processed_results = self.results_processing_fn(results)
            tune.report(**processed_results)

Proposal 3: Direct users to use third party libraries like torchmetrics

For Torch, users can use torchmetrics, which has built-in support for DDP. Similar solutions may exist for Tensorflow. It's unclear how that supports non-metric usecases, such as eg. time measurement, profiling info such as memory usage etc. On the other hand, this would require us to only update documentation to mention this approach.

Conclusion

Either proposal would be a lightweight way to allow users to modify the data reported to Tune. I do not have a personal preference towards either, though I feel like Proposal 2 fits better with the rest of the API.

Proposal 3 requires only documentation changes, and can be implemented independently (tracked here #31434)

Use case

No response

@Yard1 Yard1 added enhancement Request for new feature and/or capability RFC RFC issues train Ray Train Related Issue ray-team-created Ray Team created labels Jan 3, 2023
@bveeramani
Copy link
Member

Thanks for putting this together!

Few questions:

  1. Is this only a problem with DLTrainer and its subclasses? Would it make sense to have a results_processing_fn for other trainers like XGBoostTrainer?
  2. Would results_processing_fn be exposed in TorchTrainer and TensorflowTrainer?
  3. Would it make sense to place results_processing_fn in a config like RunConfig instead of exposing it as a top-level parameter?

@Yard1
Copy link
Member Author

Yard1 commented Jan 3, 2023

  1. Other trainers have different internals - eg. for XGBoost, the reporting is done by a callback (which too can be overriden to report metrics from multiple workers). I'd like to focus on DL for now as this is where we had the most requests and which is the simplest to tackle.
  2. Yes!
  3. I think it shouldn't, unless it's supported by all Trainers. We could make it an argument in TorchConfig (and so on), but I am not sure whether it makes sense to put it there, as those configs deal with setting up the workers and not with what happens on the Tune side.

@richardliaw
Copy link
Contributor

Currently, Ray Train only reports metrics from the first worker. This is fine in most cases, but for some applications, it may be desirable to report metrics from all workers and/or report aggregations, such as mean and std. We also require that functionality for some tests.

For aggregations, can they use torchmetrics instead? That's becoming the standard in the pytorch ecosystem AFAICT

@Yard1
Copy link
Member Author

Yard1 commented Jan 3, 2023

Yeah, it's possible to use that right now. That being said,torchmetrics doesn't cover tensorflow or anything else that you may want to log from multiple workers aside from actual metrics.

@bveeramani
Copy link
Member

@Yard1 how would I use torchmetrics for aggregation? Wouldn't you run into the same problem of not having access to all of the results?

@richardliaw
Copy link
Contributor

@Yard1 Also, what are metrics that you want to aggregate from all workers individually?

@bveeramani torchmetrics is distributed training compatible... it will automatically aggregate across workers using allreduce.

@Yard1
Copy link
Member Author

Yard1 commented Jan 3, 2023

@richardliaw I was thinking profiling information could be useful? I don't have a special need myself - this is something we have been talking about on and off for a while. Some users were also interested in this feature, eg. https://discuss.ray.io/t/how-can-i-synchronization-metrics-in-ray-train-valid-loop/8500 https://discuss.ray.io/t/pytorch-distributedtrainable-tune-report-on-rank-0-only/5127/1

@richardliaw
Copy link
Contributor

For both cases seems like we just need to provide best practices - telling users to do a sum/average/median across all workers with torchmetrics, and also reporting the same things on all workers if necessary?

@Yard1
Copy link
Member Author

Yard1 commented Jan 4, 2023

I'll add that as a proposal!

@richardliaw
Copy link
Contributor

richardliaw commented Jan 4, 2023 via email

@Yard1
Copy link
Member Author

Yard1 commented Jan 4, 2023

That's fair. In any case, if we do not want to provide an API for this and instead rely on third party tools like torchmetrics, we should update documentation & provide an example, so that's still an action item.

@richardliaw
Copy link
Contributor

richardliaw commented Jan 4, 2023 via email

@Yard1
Copy link
Member Author

Yard1 commented Jan 4, 2023

I'll make a separate issue for that, and we can defer this one until we have a concrete usecase.

#31434

@richardliaw
Copy link
Contributor

Closing this one since we have a separate issue for now. When we have a concrete use case we can bring it up again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Request for new feature and/or capability ray-team-created Ray Team created RFC RFC issues train Ray Train Related Issue
Projects
None yet
Development

No branches or pull requests

4 participants