Skip to content

Commit

Permalink
Fix dask predict shape infer. (#5989)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed Aug 8, 2020
1 parent 9c6e791 commit 801e6b6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
13 changes: 7 additions & 6 deletions python-package/xgboost/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ def dispatched_predict(worker_id):
predt = booster.predict(data=local_x,
validate_features=local_x.num_row() != 0,
*args)
ret = (delayed(predt), order)
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
ret = ((delayed(predt), columns), order)
predictions.append(ret)
return predictions

Expand Down Expand Up @@ -775,8 +776,10 @@ async def map_function(func):
# See https://docs.dask.org/en/latest/array-creation.html
arrays = []
for i, shape in enumerate(shapes):
arrays.append(da.from_delayed(results[i], shape=(shape[0], ),
dtype=numpy.float32))
arrays.append(da.from_delayed(
results[i][0], shape=(shape[0],)
if results[i][1] == 1 else (shape[0], results[i][1]),
dtype=numpy.float32))
predictions = await da.concatenate(arrays, axis=0)
return predictions

Expand Down Expand Up @@ -978,6 +981,7 @@ def client(self):
def client(self, clt):
self._client = clt


@xgboost_model_doc("""Implementation of the Scikit-Learn API for XGBoost.""",
['estimators', 'model'])
class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
Expand Down Expand Up @@ -1032,9 +1036,6 @@ def predict(self, data):
['estimators', 'model']
)
class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
# pylint: disable=missing-docstring
_client = None

async def _fit_async(self, X, y,
sample_weights=None,
eval_set=None,
Expand Down
50 changes: 37 additions & 13 deletions tests/python/test_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import json
import asyncio
from sklearn.datasets import make_classification

if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows", allow_module_level=True)
Expand Down Expand Up @@ -36,7 +37,7 @@ def generate_array():


def test_from_dask_dataframe():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()

Expand Down Expand Up @@ -74,7 +75,7 @@ def test_from_dask_dataframe():


def test_from_dask_array():
with LocalCluster(n_workers=5, threads_per_worker=5) as cluster:
with LocalCluster(n_workers=kWorkers, threads_per_worker=5) as cluster:
with Client(cluster) as client:
X, y = generate_array()
dtrain = DaskDMatrix(client, X, y)
Expand Down Expand Up @@ -104,8 +105,28 @@ def test_from_dask_array():
assert np.all(single_node_predt == from_arr.compute())


def test_dask_predict_shape_infer():
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = make_classification(n_samples=1000, n_informative=5,
n_classes=3)
X_ = dd.from_array(X, chunksize=100)
y_ = dd.from_array(y, chunksize=100)
dtrain = xgb.dask.DaskDMatrix(client, data=X_, label=y_)

model = xgb.dask.train(
client,
{"objective": "multi:softprob", "num_class": 3},
dtrain=dtrain
)

preds = xgb.dask.predict(client, model, dtrain)
assert preds.shape[0] == preds.compute().shape[0]
assert preds.shape[1] == preds.compute().shape[1]


def test_dask_missing_value_reg():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X_0 = np.ones((20 // 2, kCols))
X_1 = np.zeros((20 // 2, kCols))
Expand Down Expand Up @@ -156,7 +177,7 @@ def test_dask_missing_value_cls():


def test_dask_regressor():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2)
Expand All @@ -178,7 +199,7 @@ def test_dask_regressor():


def test_dask_classifier():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
y = (y * 10).astype(np.int32)
Expand All @@ -188,7 +209,7 @@ def test_dask_classifier():
classifier.fit(X, y, eval_set=[(X, y)])
prediction = classifier.predict(X)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows

history = classifier.evals_result()
Expand All @@ -211,14 +232,14 @@ def test_dask_classifier():
assert classifier.n_classes_ == 10
prediction = classifier.predict(X_d)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows


@pytest.mark.skipif(**tm.no_sklearn())
def test_sklearn_grid_search():
from sklearn.model_selection import GridSearchCV
with LocalCluster(n_workers=4) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
reg = xgb.dask.DaskXGBRegressor(learning_rate=0.1,
Expand Down Expand Up @@ -292,7 +313,9 @@ def _check_outputs(out, predictions):
evals=[(dtrain, 'validation')],
num_boost_round=2)
predictions = xgb.dask.predict(client=client, model=out,
data=dtrain).compute()
data=dtrain)
assert predictions.shape[1] == n_classes
predictions = predictions.compute()
_check_outputs(out, predictions)

# train has more rows than evals
Expand All @@ -315,15 +338,15 @@ def _check_outputs(out, predictions):
# environment and Exact doesn't support it.

def test_empty_dmatrix_hist():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'hist'}
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)


def test_empty_dmatrix_approx():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'approx'}
run_empty_dmatrix_reg(client, parameters)
Expand Down Expand Up @@ -384,7 +407,7 @@ async def run_dask_classifier_asyncio(scheduler_address):
await classifier.fit(X, y, eval_set=[(X, y)])
prediction = await classifier.predict(X)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows

history = classifier.evals_result()
Expand All @@ -407,8 +430,9 @@ async def run_dask_classifier_asyncio(scheduler_address):
assert classifier.n_classes_ == 10
prediction = await classifier.predict(X_d)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows
assert prediction.shape[1] == 10


def test_with_asyncio():
Expand Down

0 comments on commit 801e6b6

Please sign in to comment.