Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host IP resolution problem w/ dask on kubernetes or dask-gateway #5765

Closed
gforsyth opened this issue Jun 8, 2020 · 5 comments · Fixed by #6343
Closed

Host IP resolution problem w/ dask on kubernetes or dask-gateway #5765

gforsyth opened this issue Jun 8, 2020 · 5 comments · Fixed by #6343

Comments

@gforsyth
Copy link
Contributor

gforsyth commented Jun 8, 2020

Hi!

There are two different failure modes here, but I think the solution to them is the same so I'm going to keep them bundled. Both of these come up using the bundled dask helpers and in both cases (k8s & dask-gateway) I believe the culprit is here:

https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/dask.py#L360-L367

For a dask cluster deployed with a helm chart, the scheduler pod is exposed via a kubernetes service. The service acts as a reverse proxy, providing a static route to the scheduler pod (that would survive the scheduler pod restarting).

Currently, xgboost.dask isn't compatible with this deployment pattern:

import dask.array as da
from distributed.client import Client
from xgboost.dask import DaskXGBClassifier

client = Client()
print(client.scheduler.address)

xgbc = DaskXGBClassifier()

X = da.random.random((10, 10))
y = da.random.random((10, 1))
xgbc.fit(X, y)

tcp://dask-scheduler:8786
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-12-4607fe3bd004> in <module>
     11 X = da.random.random((10, 10))
     12 y = da.random.random((10, 1))
---> 13 xgbc.fit(X, y)

/opt/conda/lib/python3.7/site-packages/xgboost/dask.py in fit(self, X, y, sample_weights, eval_set, sample_weight_eval_set)
    651         results = train(self.client, params, dtrain,
    652                         num_boost_round=self.get_num_boosting_rounds(),
--> 653                         evals=evals)
    654         self._Booster = results['booster']
    655         # pylint: disable=attribute-defined-outside-init

/opt/conda/lib/python3.7/site-packages/xgboost/dask.py in train(client, params, dtrain, evals, *args, **kwargs)
    382     workers = list(_get_client_workers(client).keys())
    383 
--> 384     rabit_args = _get_rabit_args(workers, client)
    385 
    386     def dispatched_train(worker_addr):

/opt/conda/lib/python3.7/site-packages/xgboost/dask.py in _get_rabit_args(worker_map, client)
    336 
    337     env = client.run_on_scheduler(_start_tracker, host.strip('/:'),
--> 338                                   len(worker_map))
    339     rabit_args = [('%s=%s' % item).encode() for item in env.items()]
    340     return rabit_args

/opt/conda/lib/python3.7/site-packages/distributed/client.py in run_on_scheduler(self, function, *args, **kwargs)
   2386         Client.start_ipython_scheduler: Start an IPython session on scheduler
   2387         """
-> 2388         return self.sync(self._run_on_scheduler, function, *args, **kwargs)
   2389 
   2390     async def _run(

/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    822         else:
    823             return sync(
--> 824                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    825             )
    826 

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/distributed/client.py in _run_on_scheduler(self, function, wait, *args, **kwargs)
   2352         if response["status"] == "error":
   2353             typ, exc, tb = clean_exception(**response)
-> 2354             raise exc.with_traceback(tb)
   2355         else:
   2356             return response["result"]

/opt/conda/lib/python3.7/site-packages/xgboost/dask.py in _start_tracker()
     46     """Start Rabit tracker """
     47     env = {'DMLC_NUM_WORKER': n_workers}
---> 48     rabit_context = RabitTracker(hostIP=host, nslave=n_workers)
     49     env.update(rabit_context.slave_envs())
     50 

/opt/conda/lib/python3.7/site-packages/xgboost/tracker.py in __init__()
    146         for _port in range(port, port_end):
    147             try:
--> 148                 sock.bind((hostIP, _port))
    149                 self.port = _port
    150                 break

OSError: [Errno 99] Cannot assign requested address

The failure here is because the client.scheduler.address is pointing to the service IP, but the underlying scheduler pod can't open a port in the service, so it barfs.
One workaround is to use hostname -i via subprocess and run this on the scheduler via client.run_on_scheduler, then reconnect to the scheduler via the pod IP.

I think this can be generalized using something like the fix proposed here (for the same issue): dask/dask-xgboost#40

In short, perform hostname lookup on the scheduler (or have an exposed way to choose how to resolve the host IP).

I think that performing hostname lookup this way will also allow xgboost.dask to work with clusters spun up via a dask-gateway. These currently time-out because the hostname parsing of client.scheduler.address doesn't know how to deal with an SNI routed scheduler address (example below of address type)

from dask_gateway import Gateway
​gateway = Gateway()
​cluster = gateway.new_cluster()
​client = cluster.get_client()
​print(client.scheduler.address)
gateway://somehost.com:8786/5d6b32e087ff475f8a1fd67404a0c550
@trivialfis
Copy link
Member

I'm trying to expose the rabit context through dask interface. Hopefully I can get it done in this release.

@mmccarty
Copy link

Thanks @trivialfis !

@jakirkham
Copy link
Contributor

Am I correct in reading that PR ( #6142 ) may address this?

@trivialfis
Copy link
Member

trivialfis commented Oct 19, 2020

We spent a fair amount of time on this. We were able to get it working on local testing with k8s, but failed on GKE. I will try resolving dask gateway issue first.

@trivialfis
Copy link
Member

Quick note: After resolving the IP issue, dask failed at gathering data partitions on GKE.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants