Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grid searching with dask interface. #5567

Closed
harshit-2115 opened this issue Apr 20, 2020 · 14 comments
Closed

Grid searching with dask interface. #5567

harshit-2115 opened this issue Apr 20, 2020 · 14 comments

Comments

@harshit-2115
Copy link

Hey guys,
We want to run dask xgboost on a cluster of EC2 machines. I've 2 EC2 machines with 2 cores and 4GB ram each. I am trying to run this script on the cluster.

#!/usr/bin/env python
# coding: utf-8

from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection
from sklearn.preprocessing import QuantileTransformer
from sklearn.datasets import make_classification

import xgboost
import pandas as pd

import dask.array as da
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client

client = Client('172.31.18.116:8786')
client

X, y = make_classification(n_features=10, n_samples=10000, n_classes=2)
chunk_size = 200

DX = da.from_array(X, chunks=chunk_size)
Dy = da.from_array(y, chunks=chunk_size)

post_pipe = Pipeline([('qt', QuantileTransformer(n_quantiles=10))])

pi = xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client = client

param_grid = {
    'learning_rate': [0.1, 0.2],
    'n_estimators': [100],
    'reg_lambda': [0.7],
}

kfold = 5
skf = model_selection.StratifiedKFold(n_splits=kfold,
                                      shuffle=True,
                                      random_state=30)

clf = GridSearchCV(estimator=pi,
                   param_grid=param_grid,
                   verbose=5,
                   cv=skf,
                   iid=True,
                   return_train_score=True,
                   scoring='neg_mean_squared_error',
                   refit=False)

pp = post_pipe.fit_transform(DX, Dy)

clf.fit(da.from_array(pp, chunks=chunk_size), Dy)
clf.cv_results_

When I run clf.fit(), I get a lot of the following messages in the worker logs:

distributed.worker - INFO - Can't find dependencies for key tuple-d79e442a-8b9c-45d4-82b9-5590ba5972bd
distributed.worker - INFO - Dependent not found: ('array-e0a14630bae8fa4527682de6d1d77b17', 10) 1 .  Asking scheduler

What am I missing here ?

@harshit-2115
Copy link
Author

This issue is fixed now. In case of EC2 machines, we need to open ports for communication and run dask workers on these ports so that they can communicate with the scheduler.

@harshit-2115
Copy link
Author

My current setup is:
EC2 Machine 1 (2 cores 4 GB RAM) : dask-scheduler and 1 dask-worker
EC2 Machine 2 (2 cores 4 GB RAM) : 1 dask-worker
I am manually setting up the cluster using terminal commands.

The script is running but only 2% of cpu is being used for both workers. Using the script posted above. Why the full cpu power of worker is not being used ?

@trivialfis
Copy link
Member

Grid searching for dask is not yet supported. We need a specialized implementation.

@harshit-2115
Copy link
Author

Grid searching for dask is not yet supported.

It was working here #5451 (comment)
Is it not working because we switched to a cluster of machines now ?

@trivialfis
Copy link
Member

No, it's not working correctly. We need a specialized implementation that can split data locally on each worker. I will seek to implement it on dask ml once 1.1 is sorted out. If you are interested in doing so, feel free to let me know I will explain more details. :-)

@harshit-2115
Copy link
Author

@trivialfis Trying without gridsearch. Still not working.

from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection
from sklearn.preprocessing import QuantileTransformer
from sklearn.datasets import make_classification

import xgboost
import pandas as pd

import dask.array as da
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client

client = Client('172.31.18.116:8786')
client

X, y = make_classification(n_features=10, n_samples=100000, n_classes=2)
chunk_size = 200

DX = da.from_array(X, chunks=chunk_size)
Dy = da.from_array(y, chunks=chunk_size)


# X and y are dask dataframes or arrays
dtrain = xgboost.dask.DaskDMatrix(client, DX, Dy)


output = xgboost.dask.train(client,
                            {'verbosity': 2,
                             'tree_method': 'hist',
                             'nthread': -1},
                            dtrain,
                            num_boost_round=4, evals=[(dtrain, 'train')])

Commands to set up the cluster:
On Machine 1

dask-scheduler
dask-worker  tcp://172.31.18.116:8786 --worker-port 46101 --nanny-port 37805

On Machine 2

dask-worker  tcp://172.31.18.116:8786 --worker-port 46101 --nanny-port 37805

@trivialfis
Copy link
Member

What's the error message?

@harshit-2115
Copy link
Author

The error is that only one worker is being used and that too with just 2% CPU usage.

Screenshot from 2020-04-22 15-23-54

On 1st worker's terminal ( worker which is on the same machine as scheduler)

task NULL connected to the tracker
task NULL connected to the tracker
task NULL got new rank 0
task NULL got new rank 1

Remote worker's terminal shows connection timed out after 5-10 minutes and restarts.

@trivialfis
Copy link
Member

Could you check what dask is doing on the web interface? Also we have some bug fixes for dask on master branch.

@harshit-2115
Copy link
Author

Screenshot from 2020-04-22 15-49-01

This looks fine to me. I just don't understand why it is using just 2% of CPU ?

@trivialfis trivialfis reopened this Apr 22, 2020
@trivialfis trivialfis changed the title Can't find dependencies/dependent not found error dask-xgboost Dask hangs during training. Apr 22, 2020
@MajorCarrot
Copy link

I am facing a similar issue. I am trying to add dask support for a project

    def regression(self, df_in, target_series, model_params):
        """ Fit a model to predict target_series with df_in features/columns
            and retain the features importances in the dependency matrix.

            :param df_in: input dataframe representing the context, predictors.
            :param target_series: pandas series of the target variable. Share
            the same indexes as the df_in dataframe.
        """
        # Split df_in and target to train and test dataset

        assert isinstance(df_in, dd.DataFrame)

        df_in_train, df_in_test, target_train, target_test = train_test_split(
            df_in,
            target_series,
            test_size=0.2,
            random_state=self.xcorr_params['random_state'],
            shuffle=True)

        # log_param('Train test split ratio', '80/20')
        # log_params(model_params)

        dtrain = xgb.dask.DaskDMatrix(self.client, df_in_train, target_train)

        # Create and train a XGBoost booster
        bst = xgb.dask.train(client=self.client,
                             params=model_params,
                             dtrain=dtrain,
                             evals=[(dtrain, 'train')])

        # Make predictions
        dtest = xgb.dask.DaskDMatrix(self.client, df_in_test)
        target_series_predict = xgb.dask.predict(self.client, bst, dtest)

        target_test = target_test.astype('float32').to_dask_array(
            lengths=True).rechunk('auto')
        target_series_predict = target_series_predict.rechunk('auto')

        rmse = np.sqrt(mean_squared_error(target_test, target_series_predict))

        log_metric(target_series.name, rmse)
        LOGGER.info('Making predictions for : %s', target_series.name)
        LOGGER.info('Root Mean Square Error : %s', str(rmse))

        new_row = {}
        total = 0

        for column in df_in.columns:
            try:
                new_row[column] = [
                    bst['booster'].get_score(importance_type='gain')[column]
                ]
                total = total + bst['booster'].get_score(
                    importance_type='gain')[column]
            except KeyError:
                new_row[column] = [float('NaN')]

        if total:
            for column in df_in.columns:
                new_row[column] = [i / total for i in new_row[column]]

        new_row[target_series.name] = [0.0]

        # Sorting new_row to avoid concatenation warnings
        new_row = dict(sorted(new_row.items()))

        # Concatenating new information about feature importances
        if self._importances_map is not None:
            self._importances_map = pd.concat([
                self._importances_map,
                pd.DataFrame(index=[target_series.name], data=new_row)
            ])

        return bst['booster']

The function I am using ^

task [xgboost.dask]:tcp://127.0.0.1:44009 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:36299 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:35119 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:43259 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:44009 got new rank 0
task [xgboost.dask]:tcp://127.0.0.1:36299 got new rank 1
task [xgboost.dask]:tcp://127.0.0.1:35119 got new rank 2
task [xgboost.dask]:tcp://127.0.0.1:43259 got new rank 3

Messages in the output (with the master branch)

Have I done something wrong? (I am posting this here because I have the same error log and under-utilization of CPU (around 40%) like the OP)

@harshit-2115
Copy link
Author

If you are interested in doing so, feel free to let me know I will explain more details. :-)

We need to start distributed training as soon as possible. How can I contribute ?
@trivialfis

@trivialfis
Copy link
Member

Sorry, completely missed this.... To enable grid searching with dask, we need a grid search implementation that is aware of MPI based algorithm. I'm hoping I can jump into dask ml, but so many things piled up.

@trivialfis trivialfis added the dask label Sep 9, 2020
@trivialfis trivialfis changed the title Dask hangs during training. Grid searching with dask interface. Oct 29, 2020
@trivialfis
Copy link
Member

Prefer #6525 instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants