Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster hangs with a few tasks in "processing" state but no cpu load on any workers #4724

Closed
GFleishman opened this issue Apr 9, 2021 · 11 comments
Labels
bug Something is broken needs info Needs further information from the user

Comments

@GFleishman
Copy link

This problem is stochastic. It seems to occur more frequently when there is more sharing of data between workers. map_overlap calls seem particularly problematic.

Cluster is set up using dask-jobqueue.LSFCluster and dask.distributed.Client

cluster = LSFCluster(
    cores, ncpus, memory, mem,
    walltime=walltime,
    env_extra=env_extra,
    **kwargs,
)
client = Client(cluster)
cluster.scale(job=njobs)  # number of workers

Workers are all allocated properly, bash scripts invoking LSF all seem fine. The task graph starts to execute, but then gets hung up and sits indefinitely in this type of state:

Screen Shot 2021-04-09 at 12 26 36 PM

Screen Shot 2021-04-09 at 12 27 24 PM

No workers show any cpu activity (2-4% for all workers). env_extra above makes sure all MKL, BLAS, and OpenMP environment variables are set to 2 threads per core (should be fine with hyper threading?).

When I click on the red task on the left of the graph I see:
hung_cluster_last_task_left.pdf

When I click on the red task on the right of the graph (second to last column) I see:
hung_cluster_last_task.pdf

For the red task on the right, the two "workers with data" show:

Screen Shot 2021-04-09 at 12 28 30 PM

Screen Shot 2021-04-09 at 12 28 32 PM

I've let these hang for upwards of 30 minutes with no meaningful cpu activity on any workers before killing the cluster manually. I can't let it run any longer because I'm paying for cluster time so I don't know if it's just (intractably) slow or totally hung. Comparatively the entire rest of the task graph was executed in less than 180 seconds.

Any pointers as to what could be causing this or how to permanently avoid it would be really appreciated.

  • Dask version: 2020.12.0
  • Python version: 3.8.5
  • Operating System: CentOS
  • Install method (conda, pip, source): pip
@jsignell
Copy link
Member

Sorry for the late response. My first suggestion is that you upgrade dask and distributed. This is kind of a shot in the dark, but 2020.12.0 had some pretty big changes with the scheduler so it does seem possible that there would be issues.

I will also ping @jrbourbeau on this in case he has encountered anything similar.

@jrbourbeau
Copy link
Member

I agree with @jsignell that it would be good to confirm whether or not the issue is still present when using the latest dask and distributed releases (version 2021.04.0). If you still observe the same hanging behavior, the next place I would look would be the worker and scheduler logs to see if there is any relevant information there

@GFleishman
Copy link
Author

Thanks for the replies @jsignell and @jrbourbeau, and of course thanks for dask and dask-distributed to begin with!

I have confirmed that I still get hanging behavior with 2021.04.0

I have also checked the logs and found something. It was difficult at first since the problem was mostly occurring for jobs with hundreds of workers and there were lots of logs to comb through without really knowing what to look for.

But I ran a simpler job with only 1 worker with 4 cpu cores and 2 processes and it was really helpful:

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.36.111.40:26395'
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.36.111.40:28603'
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:15535
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:15535
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:26721
distributed.worker - INFO -          dashboard at:         10.36.111.40:39249
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:26721
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO -          dashboard at:         10.36.111.40:35299
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-20inj_a7
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-ju9nhbqw
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
IBM Spectrum LSF 10.1.0.0 build 532214, Oct 16 2019
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64
IBM Spectrum LSF 10.1.0.0 build 532214, Oct 16 2019
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64
distributed.worker - INFO - Closing worker gracefully: tcp://10.36.111.40:26721
distributed.worker - INFO - Closing worker gracefully: tcp://10.36.111.40:15535
distributed.worker - INFO - Stopping worker at tcp://10.36.111.40:26721
distributed.worker - ERROR - failed during get data with tcp://10.36.111.40:15535 -> tcp://10.36.111.40:26721
Traceback (most recent call last):
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/tornado/iostream.py", line 1148, in write_to_fd
    return self.socket.send(data)  # type: ignore
BrokenPipeError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/worker.py", line 1366, in get_data
    response = await comm.read(deserializers=serializers)
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/comm/tcp.py", line 206, in read
    convert_stream_closed_error(self, e)
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    raise CommClosedError(
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/10: in <closed TCP>: Stream is closed
distributed.nanny - INFO - Worker closed
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:25607
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:25607
distributed.worker - INFO -          dashboard at:         10.36.111.40:35905
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-2o34ymo8
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

About halfway down are two lines that say Closing worker gracefully (once for each of the processes). I don't know why the processes would be restarted. The maximum memory use for each process is very comfortably below 10GB and each process has been nominally allocated with 30GB.

It seems that both workers were closed, losing intermediate results required for subsequent tasks. An attempt at a data transfer to facilitate one of those tasks then fails. Then the scheduler tries to restart the workers, which is successful - but now the cluster is hung. The dashboard still says the (lost) intermediates are either in the "released" or "memory" state and the dependent tasks are just waiting. The restarted workers show 2-4% cpu load indefinitely.

So I'm not sure why the scheduler closes the workers? And I assumed gracefully meant intermediate results would either be stored elsewhere before closing (though in this case all workers were restarted) - or scheduled to be recomputed after workers restart.

(I'm not being critical, I just want to understand how things work and I'm willing to help out with fixes if given some guidance - I've never looked at any of the distributed source; assuming it's not something in my own code of course - which is still possible).

@jsignell
Copy link
Member

That is really helpful extra info @GFleishman! I am going to transfer this issue to https://github.com/dask/distributed where it'll get more eyes from the hard-core distributed people. I think Closing worker gracefully just means without throwing a loud KilledWorker error, but I might be wrong about that.

@jsignell jsignell transferred this issue from dask/dask Apr 20, 2021
@rubenvdg
Copy link

We’ve seen very similar problems with map_overlap (running on Kubernetes). In the end we prepended and appended data to each partition with our own logic to circumvent it...

@GFleishman
Copy link
Author

Hi @rubenvdg - thanks for the comment. This does happen most often with map_overlap, but it also occurs for me with map_blocks on very large arrays: (40000, 500, 2048, 1024).... that's (time, z, y, x) for 4D imaging data.

@rubenvdg
Copy link

Same here. In our case it also happened on map_partitions, although with map_overlap the problem seemed to happen more often. Was difficult to reproduce, so was never able to make a good issue for it.

@GFleishman
Copy link
Author

I agree that it's difficult to reproduce. On several occasions I've convinced myself that the problem was solved only to find out on the next big run that it wasn't. I think it must have something to do with worker to worker communication of dependencies and/or task states, which can be disrupted for a number of reasons, and then potentially is not reset properly by the scheduler. That's all speculation, but the error logs and behavior so far point that way. On some lucky runs I think it's possible that the disrupting events just don't occur (e.g. might be dependent on network traffic).

@abergou
Copy link

abergou commented Apr 21, 2021

@fjetter I think this may be another instance of the issues you're working on.

@fjetter
Copy link
Member

fjetter commented Jun 18, 2021

We've recently merged an important PR addressing a few error handling edge cases which caused unrecoverable deadlocks.
These deadlocks where associated with failing worker, connection failures or host co-located workers. All of these issues could be connected to fetching dependencies, therefore dense, highly connected task graphs were more likely to be affected. Ultimately, the deadlocks where caused by subtle race conditions which made them hard to reproduce and some of them cannot be correlated to any user facing logs which is why I cannot say for certain whether your issue could be fixed.
I would encourage you to try out the latest changes on main and/or wait for the upcoming release later today. Feedback on whether your issue could be resolved is highly appreciated!

Deadlock fix #4784
Upcoming release dask/community#165

@fjetter fjetter added bug Something is broken needs info Needs further information from the user labels Jun 18, 2021
@jrbourbeau
Copy link
Member

As @fjetter mentioned there have been several stability updates since this issue was originally opened. Closing for now, but @GFleishman let us know if you're still experiencing the same issue on the latest distributed release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

6 participants