From a3da6c855f10fb90470e4deb9389c9f2db3c9467 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 13 Dec 2022 16:00:56 -0800 Subject: [PATCH] [Train] Force GBDTTrainer to use distributed loading for Ray Datasets (#31079) Signed-off-by: amogkam Closes #31068 xgboost_ray has 2 modes for data loading: A centralized mode where the driver first loads in all the data and then partitions it for the remote training actors to load. A distributed mode where the remote training actors load in the data partitions directly. When using Ray Datasets with xgboost_ray, we should always do distributed data loading (option 2). However, this is no longer the case after #30575 is merged. #30575 adds an __iter__ method to Ray Datasets causing isinstance(dataset, Iterable) to return True. This causes Ray Dataset inputs to enter this if statement: https://github.com/ray-project/xgboost_ray/blob/v0.1.12/xgboost_ray/matrix.py#L943-L949, causing xgboost-ray to think that Ray Datasets are not distributed and therefore going with option 1 for loading. This centralized loading leads to excessive object spilling and ultimately crashes large scale xgboost training. In this PR, we force distributed data loading when using the AIR GBDTTrainers. In a follow up, we should clean up the distributed detection logic directly in xgboost-ray, removing input formats that are no longer supported, and then do a new release. Signed-off-by: Weichen Xu --- python/ray/train/gbdt_trainer.py | 10 ++++++++- .../ray/train/tests/test_xgboost_trainer.py | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/python/ray/train/gbdt_trainer.py b/python/ray/train/gbdt_trainer.py index debdb526ae9a..48a19866e011 100644 --- a/python/ray/train/gbdt_trainer.py +++ b/python/ray/train/gbdt_trainer.py @@ -158,8 +158,10 @@ def __init__( ): self.label_column = label_column self.params = params - self.dmatrix_params = dmatrix_params or {} + self.train_kwargs = train_kwargs + self.dmatrix_params = dmatrix_params or {} + super().__init__( scaling_config=scaling_config, run_config=run_config, @@ -168,6 +170,12 @@ def __init__( resume_from_checkpoint=resume_from_checkpoint, ) + # Ray Datasets should always use distributed loading. + for dataset_name in self.datasets.keys(): + dataset_params = self.dmatrix_params.get(dataset_name, {}) + dataset_params["distributed"] = True + self.dmatrix_params[dataset_name] = dataset_params + def _validate_attributes(self): super()._validate_attributes() self._validate_config_and_datasets() diff --git a/python/ray/train/tests/test_xgboost_trainer.py b/python/ray/train/tests/test_xgboost_trainer.py index 38f8b1fc130e..931515e207ad 100644 --- a/python/ray/train/tests/test_xgboost_trainer.py +++ b/python/ray/train/tests/test_xgboost_trainer.py @@ -237,6 +237,27 @@ def test_validation(ray_start_4_cpus): ) +def test_distributed_data_loading(ray_start_4_cpus): + """Checks that XGBoostTrainer does distributed data loading for Ray Datasets.""" + + class DummyXGBoostTrainer(XGBoostTrainer): + def _train(self, params, dtrain, **kwargs): + assert dtrain.distributed + return super()._train(params=params, dtrain=dtrain, **kwargs) + + train_dataset = ray.data.from_pandas(train_df) + + trainer = DummyXGBoostTrainer( + scaling_config=ScalingConfig(num_workers=2), + label_column="target", + params=params, + datasets={TRAIN_DATASET_KEY: train_dataset}, + ) + + assert trainer.dmatrix_params[TRAIN_DATASET_KEY]["distributed"] + trainer.fit() + + if __name__ == "__main__": import pytest import sys