Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] update xgboost test (catch test failures properly). #27023

Merged
merged 2 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions python/ray/train/lightgbm/lightgbm_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import tempfile
from typing import TYPE_CHECKING, Optional

import lightgbm
Expand Down Expand Up @@ -26,15 +27,13 @@ def from_model(
cls,
booster: lightgbm.Booster,
*,
path: os.PathLike,
preprocessor: Optional["Preprocessor"] = None,
) -> "LightGBMCheckpoint":
"""Create a :py:class:`~ray.air.checkpoint.Checkpoint` that stores a LightGBM
model.

Args:
booster: The LightGBM model to store in the checkpoint.
path: The directory where the checkpoint will be stored.
preprocessor: A fitted preprocessor to be applied before inference.

Returns:
Expand All @@ -45,7 +44,7 @@ def from_model(
>>> import lightgbm
>>>
>>> booster = lightgbm.Booster() # doctest: +SKIP
>>> checkpoint = LightGBMCheckpoint.from_model(booster, path=".") # doctest: +SKIP # noqa: #501
>>> checkpoint = LightGBMCheckpoint.from_model(booster) # doctest: +SKIP # noqa: #501

You can use a :py:class:`LightGBMCheckpoint` to create an
:py:class:`~ray.train.lightgbm.LightGBMPredictor` and preform inference.
Expand All @@ -54,14 +53,16 @@ def from_model(
>>>
>>> predictor = LightGBMPredictor.from_checkpoint(checkpoint) # doctest: +SKIP # noqa: #501
"""
booster.save_model(os.path.join(path, MODEL_KEY))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this lead to saving full checkpoint to a new temp directory rather than a user given path ? Seems current behavior is better that user has control over where they want to save preprocessor to, and this PR's change is changing it to an ephemeral path we created

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this proposal, the user doesn't supply a path anymore.

This is a simple API that solves a lot of headaches (e.g. user managing temporary directories). If users need more efficiency by specifying their own non-ephemeral path, we can add this if this is requested. Most xgboost models are small, the biggest ones I've seen in production are about 50MB. So ser/de should be relatively fast.

with tempfile.TemporaryDirectory() as tmpdirname:
booster.save_model(os.path.join(tmpdirname, MODEL_KEY))

if preprocessor:
save_preprocessor_to_dir(preprocessor, path)
if preprocessor:
save_preprocessor_to_dir(preprocessor, tmpdirname)

checkpoint = cls.from_directory(path)
checkpoint = cls.from_directory(tmpdirname)
ckpt_dict = checkpoint.to_dict()

return checkpoint
return cls.from_dict(ckpt_dict)

def get_model(self) -> lightgbm.Booster:
"""Retrieve the LightGBM model stored in this checkpoint."""
Expand Down
5 changes: 2 additions & 3 deletions python/ray/train/tests/test_lightgbm_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ def test_predict_feature_columns_pandas():


def test_predict_no_preprocessor_no_training():
with tempfile.TemporaryDirectory() as tmpdir:
checkpoint = LightGBMCheckpoint.from_model(booster=model, path=tmpdir)
predictor = LightGBMPredictor.from_checkpoint(checkpoint)
checkpoint = LightGBMCheckpoint.from_model(booster=model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we need to test checkpoint, creating tmpdir in unittest is better than surfacing it to class checkpoint implementation level

predictor = LightGBMPredictor.from_checkpoint(checkpoint)

data_batch = np.array([[1, 2], [3, 4], [5, 6]])
predictions = predictor.predict(data_batch)
Expand Down
5 changes: 2 additions & 3 deletions python/ray/train/tests/test_xgboost_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ def test_predict_feature_columns_pandas():


def test_predict_no_preprocessor_no_training():
with tempfile.TemporaryDirectory() as tmpdir:
checkpoint = XGBoostCheckpoint.from_model(booster=model, path=tmpdir)
predictor = XGBoostPredictor.from_checkpoint(checkpoint)
checkpoint = XGBoostCheckpoint.from_model(booster=model)
predictor = XGBoostPredictor.from_checkpoint(checkpoint)

data_batch = np.array([[1, 2], [3, 4], [5, 6]])
predictions = predictor.predict(data_batch)
Expand Down
17 changes: 9 additions & 8 deletions python/ray/train/xgboost/xgboost_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import tempfile
from typing import TYPE_CHECKING, Optional

import xgboost
Expand Down Expand Up @@ -26,15 +27,13 @@ def from_model(
cls,
booster: xgboost.Booster,
*,
path: os.PathLike,
preprocessor: Optional["Preprocessor"] = None,
) -> "XGBoostCheckpoint":
"""Create a :py:class:`~ray.air.checkpoint.Checkpoint` that stores an XGBoost
model.

Args:
booster: The XGBoost model to store in the checkpoint.
path: The directory where the checkpoint will be stored.
preprocessor: A fitted preprocessor to be applied before inference.

Returns:
Expand All @@ -45,7 +44,7 @@ def from_model(
>>> import xgboost
>>>
>>> booster = xgboost.Booster()
>>> checkpoint = XGBoostCheckpoint.from_model(booster, path=".") # doctest: +SKIP # noqa: E501
>>> checkpoint = XGBoostCheckpoint.from_model(booster) # doctest: +SKIP # noqa: E501

You can use a :py:class:`XGBoostCheckpoint` to create an
:py:class:`~ray.train.xgboost.XGBoostPredictor` and preform inference.
Expand All @@ -54,14 +53,16 @@ def from_model(
>>>
>>> predictor = XGBoostPredictor.from_checkpoint(checkpoint) # doctest: +SKIP # noqa: E501
"""
booster.save_model(os.path.join(path, MODEL_KEY))
with tempfile.TemporaryDirectory() as tmpdirname:
booster.save_model(os.path.join(tmpdirname, MODEL_KEY))

if preprocessor:
save_preprocessor_to_dir(preprocessor, path)
if preprocessor:
save_preprocessor_to_dir(preprocessor, tmpdirname)

checkpoint = cls.from_directory(path)
checkpoint = cls.from_directory(tmpdirname)
ckpt_dict = checkpoint.to_dict()

return checkpoint
return cls.from_dict(ckpt_dict)

def get_model(self) -> xgboost.Booster:
"""Retrieve the XGBoost model stored in this checkpoint."""
Expand Down
30 changes: 27 additions & 3 deletions release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from functools import wraps
import json
import multiprocessing
from multiprocessing import Process
import os
import time
import traceback
import xgboost as xgb

import ray
Expand Down Expand Up @@ -32,14 +34,36 @@


def run_and_time_it(f):
"""Runs f in a separate process and time it."""
"""Runs f in a separate process and times it."""

@wraps(f)
def wrapper(*args, **kwargs):
p = Process(target=f, args=args)
class MyProcess(Process):
def __init__(self, *args, **kwargs):
super(MyProcess, self).__init__(*args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None

def run(self):
try:
super(MyProcess, self).run()
except Exception as e:
tb = traceback.format_exc()
print(tb)
self._cconn.send(e)

@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception

p = MyProcess(target=f, *args, **kwargs)
start = time.monotonic()
p.start()
p.join()
if p.exception:
raise p.exception
time_taken = time.monotonic() - start
print(f"{f.__name__} takes {time_taken} seconds.")
return time_taken
Expand Down Expand Up @@ -76,7 +100,7 @@ def run_xgboost_prediction(model_path: str, data_path: str):
model = xgb.Booster()
model.load_model(model_path)
ds = data.read_parquet(data_path)
ckpt = XGBoostCheckpoint.from_model(".", model)
ckpt = XGBoostCheckpoint.from_model(booster=model)
batch_predictor = BatchPredictor.from_checkpoint(ckpt, XGBoostPredictor)
result = batch_predictor.predict(ds.drop_columns(["labels"]))
return result
Expand Down