Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler deadlocks after asynchronous client.who_has call #5144

Open
pfackeldey opened this issue Jul 30, 2021 · 3 comments
Open

Scheduler deadlocks after asynchronous client.who_has call #5144

pfackeldey opened this issue Jul 30, 2021 · 3 comments

Comments

@pfackeldey
Copy link

pfackeldey commented Jul 30, 2021

What happened:

Dear dask-distributed developers,

First of all, thank you for this wonderful project!
We are using dask-distributed for our local HTCondor computing cluster.
In our use-case we periodically kill and spawn dask-workers in HTCondorJobs, such that other HTCondorJobs from other users can slide in between our computing runs.
We also need to work with heavy input, which we need to distribute to the dask-workers beforehand using client.scatter. Of course we want to replicate this as soon as new dask-worker are spawned. Thus we added an asynchronous periodic callback to the client's IOLoop, which takes care of this replication.
Unfortunately we noticed that the client.who_has(..., asynchronous=True) call deadlocks our scheduler (unfortunately without a stack trace). Any connection to the scheduler results then in a timeout.

What you expected to happen:

We expected that we can add a asynchronous callback, which uses client.who_has(..., asynchronous=True), to the client's IOLoop without deadlocking the scheduler.

Minimal Complete Verifiable Example:

This is a minimal reproducible example, which shows the above-mentioned problem. Since it also happens on a LocalCluster the problem seems to be batch-system-agnostic.

# coding: utf-8

from tornado.ioloop import PeriodicCallback
from dask.distributed import LocalCluster, Client
import numpy as np


class Heavy:
    def __init__(self):
        self.state = np.arange(1000000)

    def __call__(self, arg):
        return np.sum(self.state * arg)


heavy = Heavy()


async def replicate() -> None:
    print("In callback")
    workers = client.ncores(asynchronous=True)
    print("Workers", await workers)
    avail = client.who_has([func], asynchronous=True)  # <--- This seems to deadlock the scheduler
    print("Avail", await avail)
    missing = set((await workers).keys()) - set((await avail)[func.key])
    print("Missing", missing)
    if missing:
        try:
            await client.replicate(func, asynchronous=True)
            print("replicate: success!")
        except:
            print("replicate: failed!")


if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    func = client.scatter(heavy, broadcast=True, hash=False, direct=None)

    pc = PeriodicCallback(replicate, callback_time=100)
    client.loop.add_callback(pc.start)

The output (only once!):

In callback
Workers {'tls://127.0.0.1:17693': 8, 'tls://127.0.0.1:17747': 8, 'tls://127.0.0.1:18651': 8, 'tls://127.0.0.1:20033': 8, 'tls://127.0.0.1:25061': 8, 'tls://127.0.0.1:4257': 8, 'tls://127.0.0.1:6413': 8, 'tls://127.0.0.1:9189': 8}

Afterwards the scheduler is stuck.

Anything else we need to know?:

-

Environment:

  • Dask version: '2021.06.2'
  • Tornado version: '6.2.dev1'
  • Python version: '3.8.10'
  • Operating System: 'Ubuntu 20.04.2 LTS'
  • Install method (conda, pip, source): With pip into a conda environment

client.get_versions(check=True) does not throw an error and outputs:

 'client': {'host': {'python': '3.8.10.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '5.4.0-80-generic',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'en_US.UTF-8',
   'LANG': 'en_US.UTF-8'},
  'packages': {'python': '3.8.10.final.0',
   'dask': '2021.06.2',
   'distributed': '2021.06.2',
   'msgpack': '1.0.2',
   'cloudpickle': '1.6.0',
   'tornado': '6.2.dev1',
   'toolz': '0.11.1',
   'numpy': '1.19.5',
   'lz4': '3.1.3',
   'blosc': '1.10.4'}}}

Thank you very much in advance for your input and help!

Best, Peter

@jrbourbeau
Copy link
Member

Thanks for raising an issue @pfackeldey. Looking at your example, client.who_has doesn't support an asynchronous keyword argument. Whether client operations are blocking or asynchronous is dictated by the asynchronous= keyword argument when constructing your Client object https://distributed.dask.org/en/latest/client.html#async-await-operation (the default value is asynchronous=False. If you need your client operations to be asynchronous, you should pass asynchronous=True when you create the Client instead of to individual methods

@jrbourbeau
Copy link
Member

I should also add that active memory management in Dask is actively be worked on (xref #4982), so this type of replica tracking should be much more transparent in the future

@pfackeldey
Copy link
Author

Thank you for your fast reply @jrbourbeau !

The documentation link https://distributed.dask.org/en/latest/client.html#async-await-operation states in the second part:

If you want to reuse the same client in asynchronous and synchronous environments you can apply the asynchronous=True keyword at each method call.

Thus I expected the above mentioned code to work.
Otherwise I am willing to help to update/remove this part of the documentation to your wishes so others won't run into the same misunderstanding/problem as me.
What is your opinion here?

Also thank you very much for pointing to the ongoing work on the active memory management, I'll keep an eye on this!

Best, Peter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants