Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xgboost.dask.DaskXGBClassifier not working with >1 dask distributed worker in case of large datasets #5451

Closed
harshit-2115 opened this issue Mar 29, 2020 · 24 comments
Assignees
Labels

Comments

@harshit-2115
Copy link

Hi XGBoost devs,
I am running the this code on an EC2 machine with 32 threads and 128 GB ram. The size of csv being loaded in 800MB.

class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, columns=[]):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        cols_missing = list(set(self.columns) - set(X.columns))
        print(cols_missing)
        for cols in cols_missing:
            X[cols] = np.nan
        return X[self.columns]


cluster = LocalCluster(n_workers=1)
client = Client(cluster)
client

lp3 = dd.read_csv('./collection_model/collection_train.csv')
lp3

pre_pipe = Pipeline([
('colsel', ColumnSelector(columns=['column1', 'column2', ......]],
  )),
('missna', CustomMissingImputer()),
])
post_pipe= Pipeline([
('pre_pipe', pre_pipe),
('impute', IterativeImputer(n_nearest_features=5, max_iter=100, random_state=0)),
('qt', QuantileTransformer(n_quantiles=10))])

pi= xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client= client

param_grid = {
    'learning_rate': [0.1, 0.2],
    'n_estimators': [100],
    'reg_lambda': [0.7],
    }


kfold = 5
skf = model_selection.StratifiedKFold(
    n_splits = kfold, shuffle = True, random_state = 30
)


scoring_1=make_scorer(ks_scorer, greater_is_better = True, needs_proba = True)
scoring={'auc': 'roc_auc', 'ks_scorer': scoring_1}


clf=GridSearchCV(
        estimator = pi,
        param_grid = param_grid,
        verbose = 5,
        cv = skf,
        iid = True,
        return_train_score = True,
        scoring = 'neg_mean_squared_error',
        refit = False
    )


pp = post_pipe.fit_transform(lp3,lp3['target'])        

label = da.from_array(np.array(lp3['target'].astype(int).compute()), chunks=200000)
clf.fit(da.from_array(pp, chunks=200000),label)
clf.cv_results_

It works if the model is trained using a subset of the features with worker=1.

Some cases where it fails :

  1. Same subset of features and with workers > 1, It keeps on running in the notebook with no result. In terminal,
    WARNING: /home/conda/feedstock_root/build_artifacts/xgboost_1584539733809/work/src/objective/regression_obj.cu:58: Label set is empty.

  2. Using all features with worker=1, it gives memory warnings in the terminal

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 118.65 GB -- Worker memory limit: 126.75 GB```
and after some time error in notebook
```KilledWorker: ("('from-delayed-pandas_read_text-read-block-assign-d8c832b8fa114d4e528a9953dd6402de', 0)", <Worker 'tcp://127.0.0.1:40365', name: 0, memory: 0, processing: 11>)

How can a 800MB csv file consume 118 GB memory ?

Also, there is not 'predict_proba' attribute in DaskXgbClassifier, so metrics like roc_auc gives error.

Currently, we are using xgboost with sklearn gridsearch(to distribute the fits). With large datasets, hyper-parameter tuning jobs with 4k-5k fits take days to complete on EC2 and sagemaker.

We are trying dask xgboost to reduce training time.

@trivialfis
Copy link
Member

Hi thanks for rasing an issue. Here are some questions:

  • How sparse is your dataset?
  • Also where's the gridsearch coming from? Skl or dask ml.
  • Did you encode your label to be discrete values?
  • Lastly are you pulling Xgboost from master branch? If so which commit it's?

@harshit-2115
Copy link
Author

harshit-2115 commented Mar 29, 2020

Thanks for the prompt reply.

  • shape of dataset is 850k x 244. sparsity(0s) = 65%.
  • I am using skl gridsearch.
  • yeah, the label only contains two classes 0 and 1
  • xgboost version '1.0.2'

@trivialfis

@harshit-2115
Copy link
Author

import os
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline, make_pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import FeatureUnion, Parallel, delayed,    _fit_transform_one, _transform_one
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection

import xgboost
import json

from sklearn.preprocessing import QuantileTransformer
from sklearn.metrics import accuracy_score,roc_curve,roc_auc_score
from sklearn.metrics import make_scorer
from sklearn.experimental import enable_iterative_imputer
from sklearn.datasets import make_classification
from sklearn.impute import IterativeImputer, MissingIndicator
import random


import dask.dataframe as dd
import dask.array as da
from dask.distributed import LocalCluster, Client

cluster = LocalCluster(n_workers=10)
client = Client(cluster)
client



X, y = make_classification(n_features=244, n_samples=815414, n_classes=2)


DX = da.from_array(X, chunks = 200000)
Dy = da.from_array(y, chunks = 200000)


post_pipe = Pipeline([
('impute', IterativeImputer(n_nearest_features = 5,max_iter = 100,random_state=0)),
('qt',QuantileTransformer(n_quantiles=10))])

pi = xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client = client

param_grid ={
    'learning_rate':[0.1, 0.2],
    'n_estimators':[100],
    'reg_lambda': [0.7],
    }


kfold = 5
skf = model_selection.StratifiedKFold(
    n_splits=kfold, shuffle=True,random_state=30
)

clf = GridSearchCV(
        estimator=pi,
        param_grid=param_grid,
        verbose=5,
        cv=skf,
        iid=True,
        return_train_score=True,
        scoring='neg_mean_squared_error',
        refit=False    
    )


pp = post_pipe.fit_transform(DX,Dy)        

clf.fit(da.from_array(pp, chunks=200000), Dy)

I am trying the same on a sample dataset. It runs with worker=1 but with workers > 1, it is still giving the same 'Label set is empty' warning.

@trivialfis
Copy link
Member

trivialfis commented Mar 30, 2020

  • You may want to pull in the latest XGBoost, as we fixed a prediction bug recently.
  • I don't think the memory consumption comes from XGBoost, rather it's somewhere in your pipeline. I checked XGBoost's own memory consumption by loading some datasets directly and it works fine. XGBoost just force the lazy computation to materialize for pulling data into c++ core. Sometimes others believes it's XGBoost's error. Also sometimes Python libraries are not very memory efficient. Quoting from https://wesmckinney.com/blog/apache-arrow-pandas-internals/

pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset

  • The empty label warning is a sign that, your data is not well balanced among workers. Some workers have empty dataset.

@harshit-2115
Copy link
Author

How to balance the data between workers ? I get the same warning on the sample dataset too. Can you please debug the sample dataset script and post the changes required to balance the data between workers ?

@sandys
Copy link

sandys commented Mar 31, 2020

hi guys - we have the same issue.
@trivialfis if you can post a recommended script, it will be great and we will follow the same.

@trivialfis
Copy link
Member

trivialfis commented Mar 31, 2020

Em .. Sometimes its the chunk size, sometimes it's other problems. XGBoost does not move data, it accepts whatever dask provides for each worker. Let me take a look.

@trivialfis
Copy link
Member

trivialfis commented Mar 31, 2020

In the sample script above posted by @harshit-2115 , reducing the chunk size should balance the data enough to prevent starved workers:

from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection

import xgboost

from sklearn.preprocessing import QuantileTransformer
from sklearn.datasets import make_classification


import dask.array as da
from dask.distributed import LocalCluster, Client

if __name__ == '__main__':
    cluster = LocalCluster(n_workers=10, memory_limit='8GB')
    print(cluster.dashboard_link)
    client = Client(cluster)

    X, y = make_classification(n_features=244, n_samples=815414, n_classes=2)

    chunk_size = 1500

    DX = da.from_array(X, chunks=chunk_size)
    Dy = da.from_array(y, chunks=chunk_size)

    post_pipe = Pipeline([('qt', QuantileTransformer(n_quantiles=10))])

    pi = xgboost.dask.DaskXGBClassifier(tree_method='hist')
    pi.client = client

    param_grid = {
        'learning_rate': [0.1, 0.2],
        'n_estimators': [100],
        'reg_lambda': [0.7],
    }

    kfold = 5
    skf = model_selection.StratifiedKFold(n_splits=kfold,
                                          shuffle=True,
                                          random_state=30)

    clf = GridSearchCV(estimator=pi,
                       param_grid=param_grid,
                       verbose=5,
                       cv=skf,
                       iid=True,
                       return_train_score=True,
                       scoring='neg_mean_squared_error',
                       refit=False)

    pp = post_pipe.fit_transform(DX, Dy)

    clf.fit(da.from_array(pp, chunks=chunk_size), Dy)

@trivialfis
Copy link
Member

trivialfis commented Mar 31, 2020

Some simple testing from above script: for using 1500 as chunk size, the data is distributed quite nicely for the first round:

rows  cols
81249 244
63380 244
63623 244
58846 244
62584 244
60052 244
60037 244
65877 244
53922 244
82761 244

But somehow the data is moved in following rounds, will look further.

@harshit-2115
Copy link
Author

Thank you for debugging the script @trivialfis

  • I ran your script with both dask xgboost and xgboost. Dask xgboost is taking longer than xgboost to train, maybe because of the overheads due to the large number of chunks.
  • skl gridsearch can parallelize xgboost fits on a multi-core machine, but in case of dask xgboost skl gridsearch gives error when n_jobs > 1.
  • Also, increasing the chunk_size lowers the training time, but if the chunk_size is increased above a certain threshold, we get the same 'Label set is empty' warning again.
  • Should we consider number of workers and chunk_size as hyperparameters which should be tuned to get the best performance ?

On a multi-core EC2 machine, which model xgboost(parallelized single-thread fits) or dask xgboost(single-fit distributed across workers) will give us more speed and performance ?

@trivialfis
Copy link
Member

trivialfis commented Apr 1, 2020

Dask xgboost is taking longer than xgboost to train

Not surprising. Even within XGBoost, simply syncing gradients between workers has its overhead. On the dask end there are even more operations. There's always a trade off.

but in case of dask xgboost skl gridsearch gives error when n_jobs > 1

skl functions are designed to work on local data. In our case, it operates on dask.Array/dask.DataFrame, which "looks like" a local data thanks to Python's duck typing. But you can imagine if you slice the data for each thread, you are essentially bypassing the scheduler (for threads, workers and data distribution) in dask. That's probably why the gridsearching in above sample script doesn't work well.

Also, increasing the chunk_size lowers the training time

Again. Trade off. That's something responsible for dask to handle in the future. But https://docs.dask.org/en/latest/understanding-performance.html might help.

Should we consider number of workers and chunk_size as hyperparameters which should be tuned to get the best performance

Accuracy performance? No. Computation performance? Probably. See above link. But I believe there's some low hanging fruits for computation perf. Like switching the backend to something else than pandas. I ( work for NVIDIA) use cudf most of the time. But I believe there are other backends mentioned in dask's document.

On a multi-core EC2 machine, which model xgboost(parallelized single-thread fits) or dask xgboost(single-fit distributed across workers) will give us more speed and performance ?

Hard to say. It really depends on your data. For small data just use normal single node multi-thread training. Your dataset 800MB csv is considered as small. I believe you can train it with normal:

xgboost.train({'tree_method': 'hist'}, ...)

in no time. I use gpu_hist for training on HIGGS (7.9GB dense) on single GPU for 1000 rounds can finish within 100 seconds.

@sandys
Copy link

sandys commented Apr 1, 2020

@trivialfis that's a very interesting comment.

Do you always do your training on one BIG machine with GPU or do you do distributed (with many machines) - like the way Sagemaker recommends . So the question becomes - is xgboost distributed training better on 100 machines ...even though there are overheads ?

another question - in general do you use gridsearchcv ever ? how would you do cross-validation with a k-fold ?

@trivialfis
Copy link
Member

@sandys Your questions are asking my personal opinion. So the following answers are from personal perspective.

Do you always do your training on one BIG machine with GPU or do you do distributed (with many machines)

Both. I'm a developer.

So the question becomes - is xgboost distributed training better on 100 machines ...even though there are overheads ?

Depends. Is your data 100 machines worthy? For example, one normally don't put iris into a cluster. As commented, it's a trade off. See the performance section in https://medium.com/rapids-ai/a-new-official-dask-api-for-xgboost-e8b10f3d1eb7 . You can see the scaling status of training on HIGGS dataset with gpu_hist and dask. There's a limit in terms of performance scaling, which is effected both by your data (sparsity, shape etc) and your network (nvlink? tcp? ucx?). It's an area we are trying to improve.

in general do you use gridsearchcv ever

Yes. But right now only with single machine. Rapidsai has a notebook for using dask-ml with single node XGBoost if you are interested. Dask is still new here. (and I'm no expert). Like we talked about in #5347 . There are issues we need to address. You can also try the Spark version of XGBoost, which has been here for a very long time (longer than me contributing to XGBoost ..)

@trivialfis
Copy link
Member

The questions here are really more about dask instead of XGBoost. I would recommend:

https://docs.dask.org/en/latest/best-practices.html
https://docs.dask.org/en/latest/dataframe-best-practices.html

@harshit-2115
Copy link
Author

Your dataset 800MB csv is considered as small. I believe you can train it with normal in no time.

True, single xgboost fit takes no time. But if I am using gridsearchcv for hyperparameter tuning with 5 fold cv, total number of fits increases exponentially. That is why we use multi-core machines to distribute the fits.

@harshit-2115
Copy link
Author

I trained my dataset using dask xgboost, 2000 chunksize with a single gridsearch candidate and 5 Fold cv. It took 28 mins for the first fit to complete, but the rest 4 fits took just 1-2 mins. I didn't get this. What do you think ?

@trivialfis
Copy link
Member

multi-core machines to distribute the fits.

XGBoost is multi-threaded even in single node configuration.

it took 28 mins for the first fit to complete, but the rest 4 fits took just 1-2 mins. I didn't get this

Could you take a look into the web interface and see what's dask doing?

Screenshot from 2020-04-02 17-21-04

@harshit-2115
Copy link
Author

Could you take a look into the web interface and see what's dask doing?

So for the first fit, dask took extra time to read the csv into memory. For the subsequent fits, it just did the computation.

XGBoost is multi-threaded even in single node configuration.

Yeah, so do we need dask XGBoost only when we have a cluster of machines ? We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

@harshit-2115
Copy link
Author

harshit-2115 commented Apr 6, 2020

there is no 'predict_proba' attribute in DaskXgbClassifier, so metrics like roc_auc gives error.

What can we do about this ? @trivialfis

@thvasilo
Copy link
Contributor

Yeah, so do we need dask XGBoost only when we have a cluster of machines ? We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

If you need distributed training, then you can use Dask or YARN/Spark which are much more mature.

We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

Correct.

@harshit-2115
Copy link
Author

Thank you for clarifying.
Can we add 'predict_proba' attribute to dask xgboost ? @trivialfis @thvasilo

@trivialfis
Copy link
Member

Yes, will come back to this after 1.1 release.

@hcho3
Copy link
Collaborator

hcho3 commented Sep 27, 2020

@harshit-2115 Can this issue be closed, now that predict_proba() has been implemented in xgboost.dask?

@trivialfis trivialfis added the dask label Oct 5, 2020
@trivialfis trivialfis self-assigned this Oct 5, 2020
@trivialfis
Copy link
Member

Can this issue be closed, now that predict_proba() has been implemented in xgboost.dask?

I believe so. Feel free to reopen if there's objection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants