From 76facaabfc8fe1d1bbdc013f223c7b3ec52d595c Mon Sep 17 00:00:00 2001 From: xwjiang2010 Date: Tue, 26 Jul 2022 11:24:44 -0700 Subject: [PATCH 1/2] [air] update xgboost test (catch test failures properly). remove `path` from `from_model` for XGBoostCheckpoint and LightGbmCheckpoint. Signed-off-by: xwjiang2010 --- .../ray/train/lightgbm/lightgbm_checkpoint.py | 17 ++++++----- .../ray/train/xgboost/xgboost_checkpoint.py | 17 ++++++----- .../workloads/xgboost_benchmark.py | 30 +++++++++++++++++-- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/python/ray/train/lightgbm/lightgbm_checkpoint.py b/python/ray/train/lightgbm/lightgbm_checkpoint.py index 1df9bc2a6e0a..d18ffb43ecc1 100644 --- a/python/ray/train/lightgbm/lightgbm_checkpoint.py +++ b/python/ray/train/lightgbm/lightgbm_checkpoint.py @@ -1,4 +1,5 @@ import os +import tempfile from typing import TYPE_CHECKING, Optional import lightgbm @@ -26,7 +27,6 @@ def from_model( cls, booster: lightgbm.Booster, *, - path: os.PathLike, preprocessor: Optional["Preprocessor"] = None, ) -> "LightGBMCheckpoint": """Create a :py:class:`~ray.air.checkpoint.Checkpoint` that stores a LightGBM @@ -34,7 +34,6 @@ def from_model( Args: booster: The LightGBM model to store in the checkpoint. - path: The directory where the checkpoint will be stored. preprocessor: A fitted preprocessor to be applied before inference. Returns: @@ -45,7 +44,7 @@ def from_model( >>> import lightgbm >>> >>> booster = lightgbm.Booster() # doctest: +SKIP - >>> checkpoint = LightGBMCheckpoint.from_model(booster, path=".") # doctest: +SKIP # noqa: #501 + >>> checkpoint = LightGBMCheckpoint.from_model(booster) # doctest: +SKIP # noqa: #501 You can use a :py:class:`LightGBMCheckpoint` to create an :py:class:`~ray.train.lightgbm.LightGBMPredictor` and preform inference. @@ -54,14 +53,16 @@ def from_model( >>> >>> predictor = LightGBMPredictor.from_checkpoint(checkpoint) # doctest: +SKIP # noqa: #501 """ - booster.save_model(os.path.join(path, MODEL_KEY)) + with tempfile.TemporaryDirectory() as tmpdirname: + booster.save_model(os.path.join(tmpdirname, MODEL_KEY)) - if preprocessor: - save_preprocessor_to_dir(preprocessor, path) + if preprocessor: + save_preprocessor_to_dir(preprocessor, tmpdirname) - checkpoint = cls.from_directory(path) + checkpoint = cls.from_directory(tmpdirname) + ckpt_dict = checkpoint.to_dict() - return checkpoint + return cls.from_dict(ckpt_dict) def get_model(self) -> lightgbm.Booster: """Retrieve the LightGBM model stored in this checkpoint.""" diff --git a/python/ray/train/xgboost/xgboost_checkpoint.py b/python/ray/train/xgboost/xgboost_checkpoint.py index 6a0783a63f6f..362f8784990c 100644 --- a/python/ray/train/xgboost/xgboost_checkpoint.py +++ b/python/ray/train/xgboost/xgboost_checkpoint.py @@ -1,4 +1,5 @@ import os +import tempfile from typing import TYPE_CHECKING, Optional import xgboost @@ -26,7 +27,6 @@ def from_model( cls, booster: xgboost.Booster, *, - path: os.PathLike, preprocessor: Optional["Preprocessor"] = None, ) -> "XGBoostCheckpoint": """Create a :py:class:`~ray.air.checkpoint.Checkpoint` that stores an XGBoost @@ -34,7 +34,6 @@ def from_model( Args: booster: The XGBoost model to store in the checkpoint. - path: The directory where the checkpoint will be stored. preprocessor: A fitted preprocessor to be applied before inference. Returns: @@ -45,7 +44,7 @@ def from_model( >>> import xgboost >>> >>> booster = xgboost.Booster() - >>> checkpoint = XGBoostCheckpoint.from_model(booster, path=".") # doctest: +SKIP # noqa: E501 + >>> checkpoint = XGBoostCheckpoint.from_model(booster) # doctest: +SKIP # noqa: E501 You can use a :py:class:`XGBoostCheckpoint` to create an :py:class:`~ray.train.xgboost.XGBoostPredictor` and preform inference. @@ -54,14 +53,16 @@ def from_model( >>> >>> predictor = XGBoostPredictor.from_checkpoint(checkpoint) # doctest: +SKIP # noqa: E501 """ - booster.save_model(os.path.join(path, MODEL_KEY)) + with tempfile.TemporaryDirectory() as tmpdirname: + booster.save_model(os.path.join(tmpdirname, MODEL_KEY)) - if preprocessor: - save_preprocessor_to_dir(preprocessor, path) + if preprocessor: + save_preprocessor_to_dir(preprocessor, tmpdirname) - checkpoint = cls.from_directory(path) + checkpoint = cls.from_directory(tmpdirname) + ckpt_dict = checkpoint.to_dict() - return checkpoint + return cls.from_dict(ckpt_dict) def get_model(self) -> xgboost.Booster: """Retrieve the XGBoost model stored in this checkpoint.""" diff --git a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py index 8476c7939cd3..047e98276a69 100644 --- a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py @@ -1,8 +1,10 @@ from functools import wraps import json +import multiprocessing from multiprocessing import Process import os import time +import traceback import xgboost as xgb import ray @@ -32,14 +34,36 @@ def run_and_time_it(f): - """Runs f in a separate process and time it.""" + """Runs f in a separate process and times it.""" @wraps(f) def wrapper(*args, **kwargs): - p = Process(target=f, args=args) + class MyProcess(Process): + def __init__(self, *args, **kwargs): + super(MyProcess, self).__init__(*args, **kwargs) + self._pconn, self._cconn = multiprocessing.Pipe() + self._exception = None + + def run(self): + try: + super(MyProcess, self).run() + except Exception as e: + tb = traceback.format_exc() + print(tb) + self._cconn.send(e) + + @property + def exception(self): + if self._pconn.poll(): + self._exception = self._pconn.recv() + return self._exception + + p = MyProcess(target=f, *args, **kwargs) start = time.monotonic() p.start() p.join() + if p.exception: + raise p.exception time_taken = time.monotonic() - start print(f"{f.__name__} takes {time_taken} seconds.") return time_taken @@ -76,7 +100,7 @@ def run_xgboost_prediction(model_path: str, data_path: str): model = xgb.Booster() model.load_model(model_path) ds = data.read_parquet(data_path) - ckpt = XGBoostCheckpoint.from_model(".", model) + ckpt = XGBoostCheckpoint.from_model(booster=model) batch_predictor = BatchPredictor.from_checkpoint(ckpt, XGBoostPredictor) result = batch_predictor.predict(ds.drop_columns(["labels"])) return result From 30e236ba6878aaaca634abcda182a4c664382d9f Mon Sep 17 00:00:00 2001 From: xwjiang2010 Date: Tue, 26 Jul 2022 13:17:29 -0700 Subject: [PATCH 2/2] fix test Signed-off-by: xwjiang2010 --- python/ray/train/tests/test_lightgbm_predictor.py | 5 ++--- python/ray/train/tests/test_xgboost_predictor.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/python/ray/train/tests/test_lightgbm_predictor.py b/python/ray/train/tests/test_lightgbm_predictor.py index deda0ae21193..451aaf5c3340 100644 --- a/python/ray/train/tests/test_lightgbm_predictor.py +++ b/python/ray/train/tests/test_lightgbm_predictor.py @@ -97,9 +97,8 @@ def test_predict_feature_columns_pandas(): def test_predict_no_preprocessor_no_training(): - with tempfile.TemporaryDirectory() as tmpdir: - checkpoint = LightGBMCheckpoint.from_model(booster=model, path=tmpdir) - predictor = LightGBMPredictor.from_checkpoint(checkpoint) + checkpoint = LightGBMCheckpoint.from_model(booster=model) + predictor = LightGBMPredictor.from_checkpoint(checkpoint) data_batch = np.array([[1, 2], [3, 4], [5, 6]]) predictions = predictor.predict(data_batch) diff --git a/python/ray/train/tests/test_xgboost_predictor.py b/python/ray/train/tests/test_xgboost_predictor.py index bf028ca5cc00..5e5946a7d538 100644 --- a/python/ray/train/tests/test_xgboost_predictor.py +++ b/python/ray/train/tests/test_xgboost_predictor.py @@ -99,9 +99,8 @@ def test_predict_feature_columns_pandas(): def test_predict_no_preprocessor_no_training(): - with tempfile.TemporaryDirectory() as tmpdir: - checkpoint = XGBoostCheckpoint.from_model(booster=model, path=tmpdir) - predictor = XGBoostPredictor.from_checkpoint(checkpoint) + checkpoint = XGBoostCheckpoint.from_model(booster=model) + predictor = XGBoostPredictor.from_checkpoint(checkpoint) data_batch = np.array([[1, 2], [3, 4], [5, 6]]) predictions = predictor.predict(data_batch)