Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers erased from distributed scheduler before they're removed from Marathon #6

Open
colobas opened this issue Oct 24, 2017 · 0 comments

Comments

@colobas
Copy link

colobas commented Oct 24, 2017

The way the Adaptive class' _retire_workers method works is:

  • it calls the scheduler's retire_workers method, which gives it a list of the addresses of the workers to terminate
  • it then calls the cluster's scale_down method with that list of workers

The problem with that is that inside the scheduler's retire_workers call, the workers are closed and erased from the scheduler's state. However, in the particular case of dask-marathon, the cluster's scale_down method needs to access the scheduler's worker_info dict, which contains the MESOS_URI that Marathon needs to refer to the running container and kill it. This yields error logs such as:

distributed.scheduler - INFO -   Scheduler at:    tcp://10.32.0.13:42805
distributed.scheduler - INFO -       bokeh at:           10.32.0.13:8787
INFO:marathon:Got response from http://172.16.0.5:8080
INFO:daskathon.core:Started marathon workers /daskathon-3e06-workers
INFO:root:Scheduler address: tcp://10.32.0.13:42805
distributed.core - DEBUG - Connection from 'tcp://10.32.0.13:47562' to Scheduler
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'register', 'ncores': 2, 'address': 'tcp://10.32.0.13:4583', 'keys': [], 'name': 'daskathon-3e06-workers.8e07ed66-b8a8-11e7-be28-1251e2c9cfa6', 'nbytes': {}, 'now': 1508841978.2218635, 'host_info': {'time': 1508841978.2218704, 'cpu': 52.6, 'memory': 7296647168, 'memory_percent': 15.4, 'network-send': 0, 'network-recv': 0, 'disk-read': 0, 'disk-write': 0}, 'services': {'nanny': 4584, 'http': 4585, 'bokeh': 4586}, 'memory_limit': 409600000.0, 'local_directory': 'worker-sfasiiho', 'resources': {}, 'pid': 13, 'reply': True}
distributed.core - DEBUG - Calling into handler add_worker
distributed.scheduler - INFO - Register tcp://10.32.0.13:4583
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.32.0.13:4583
distributed.scheduler - INFO - Closing worker tcp://10.32.0.13:4583
distributed.scheduler - INFO - Remove worker tcp://10.32.0.13:4583
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - DEBUG - Removed worker tcp://10.32.0.13:4583
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'unregister', 'address': 'tcp://10.32.0.13:4583', 'reply': True}
distributed.core - DEBUG - Calling into handler remove_worker
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'close', 'reply': False}
distributed.deploy.adaptive - INFO - Retiring workers ['tcp://10.32.0.13:4583']
distributed.utils - ERROR - 'tcp://10.32.0.13:4583'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 454, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 162, in _retire_workers
    f = self.cluster.scale_down(workers)
  File "/opt/conda/lib/python3.6/site-packages/daskathon/core.py", line 117, in scale_down
    self.scheduler.worker_info[worker]['name'],
KeyError: 'tcp://10.32.0.13:4583'
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f48ab9cb488>, <tornado.concurrent.Future object at 0x7f48aba16da0>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 199, in _adapt
    yield self._retire_workers()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 162, in _retire_workers
    f = self.cluster.scale_down(workers)
  File "/opt/conda/lib/python3.6/site-packages/daskathon/core.py", line 117, in scale_down
    self.scheduler.worker_info[worker]['name'],
KeyError: 'tcp://10.32.0.13:4583'

As you can see, that dict key is inaccessible by the time the cluster's scale_down method is called.

I'm thinking of a workaround where I store the needed information elsewhere, but is that the best way? I could also subclass stuff and have the workers list contain tuples with their names instead of only the address. Does this make sense? Or should the Adaptive class' behaviour be changed in itself?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant