Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last few tasks stall while merging two large dataframes #6493

Open
bkahloon opened this issue Jun 2, 2022 · 8 comments
Open

Last few tasks stall while merging two large dataframes #6493

bkahloon opened this issue Jun 2, 2022 · 8 comments
Labels
deadlock The cluster appears to not make any progress needs info Needs further information from the user

Comments

@bkahloon
Copy link

bkahloon commented Jun 2, 2022

What happened:

  1. read in two large dataframes via (both dfs are around 40Gb of parquet file data and we select a subset of the data so in memory is less)
COLUMNS = []
df1_path = 's3://some_path/df1'
df2_path = 's3://some_path/df2'
df1 = dd.read_parquet(df1_path,columns=COLUMNS)
df2 = dd.read_parquet(df2_path,columns=COLUMNS)
  1. set the index, global index column is sorted across all the parquet files before hand
df1 = df1.set_index("global_index", sorted=True)
df2 = df2.set_index("global_index", sorted=True)
  1. merge and persist df3 in memory
df3 = df2.merge(df1,suffixes=["_df1","_df2"] , left_index=True,right_index=True)
df3 = df3.persist()
  1. Dask dashboard just shows tasks stalled after a few of them are completed

no progress for over 30 mins (after which I just cancel the job)
Screen Shot 2022-06-02 at 10 00 44 AM

workers seem idle -- no cpu usage (memory usage seems to be utilized, workers have a limit of 10Gb memory)

Screen Shot 2022-06-02 at 10 01 38 AM

the graph page shows some tasks stuck "in-memory" at the set_index stage ?

Screen Shot 2022-06-02 at 10 07 20 AM

In the worker logs just see this about communication being lost while the workers are talking to each other (I'm assuming they are shuffling data for joins)

worker logs

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:37995 Traceback (most recent call last): File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 971, in _handle_write num_bytes = self.write_to_fd(self._write_buffer.peek(size)) File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 1568, in write_to_fd return self.socket.send(data) # type: ignore File "python3.6/usr/lib/python3.6/ssl.py", line 944, in send return self._sslobj.write(data) File "python3.6/usr/lib/python3.6/ssl.py", line 642, in write return self._sslobj.write(data) BrokenPipeError: [Errno 32] Broken pipe The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3320, in _get_data max_connections=max_connections, File "distributed/core.py", line 644, in send_recv response = await comm.read(deserializers=deserializers) File "distributed/comm/tcp.py", line 205, in read convert_stream_closed_error(self, e) File "distributed/comm/tcp.py", line 126, in convert_stream_closed_error ) from exc distributed.comm.core.CommClosedError: in : BrokenPipeError: [Errno 32] Broken pipe

distributed.worker - INFO - Can't find dependencies for key ('fix-overlap-0ca23fb934d93f7d6d24d7aba82d6b8e', 128)

distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 128) 0 . Asking scheduler

distributed.worker - ERROR - Handle missing dep failed, retrying Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2212, in handle_missing_dep self.scheduler.who_has, keys=list(dep.key for dep in deps) File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/core.py", line 858, in send_recv_from_rpc comm = await self.pool.connect(self.addr) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://dask-8f0f8f8ffd3942fd8143478f1ffa95e2.daskgateway:8786 after 10 s

distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 128) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:37995 Traceback (most recent call last): File "distributed/comm/core.py", line 319, in connect handshake = await asyncio.wait_for(comm.read(), time_left()) File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tls://123.12.123.123:37995 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('fix-overlap-0ca23fb934d93f7d6d24d7aba82d6b8e', 164)

distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 163) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:42005 Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://123.12.123.123:42005 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 112)

distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:39863 Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://123.12.123.123:39863 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 262)

distributed.worker - INFO - Dependent not found: ('repartition-merge-771c2501dd21c164d0c6e29ee1493490', 112) 0 . Asking scheduler

distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 262) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:41695 Traceback (most recent call last): File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 971, in _handle_write num_bytes = self.write_to_fd(self._write_buffer.peek(size)) File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 1568, in write_to_fd return self.socket.send(data) # type: ignore File "python3.6/usr/lib/python3.6/ssl.py", line 944, in send return self._sslobj.write(data) File "python3.6/usr/lib/python3.6/ssl.py", line 642, in write return self._sslobj.write(data) BrokenPipeError: [Errno 32] Broken pipe The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3320, in _get_data max_connections=max_connections, File "distributed/core.py", line 644, in send_recv response = await comm.read(deserializers=deserializers) File "distributed/comm/tcp.py", line 205, in read convert_stream_closed_error(self, e) File "distributed/comm/tcp.py", line 126, in convert_stream_closed_error ) from exc distributed.comm.core.CommClosedError: in : BrokenPipeError: [Errno 32] Broken pipe

distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 358)

distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 361)

distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 358) 0 . Asking scheduler

distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 361) 0 . Asking scheduler

What you expected to happen:

This seems to be a fairly simple join operation of data not sure why the tasks are just stalling, I also tried not persisting the data and just doing len(df3) and that too has the same result

Minimal Complete Verifiable Example:

Embedded in the description above

Anything else we need to know?:

I was using daskgateway to manage the cluster, I tried to use the approach in this video to launch subprocesses from the dask worker but I don't think that option is available in my dask version (I did try from a newer version but faced some issues as well but will leave that out)

Environment:

  • Dask version: 2021.03.0
  • Python version: 3.6
  • Install method (conda, pip, source): pip

Cluster dump state seems to not be supported in this version

Cluster Dump State:
@bkahloon
Copy link
Author

bkahloon commented Jun 3, 2022

I did try with the latest dask version as well, 2022.05.2 and it again seemed to stall. Greatly appreciate any help with this. Looking at the worker logs the error was the same as the version above

@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2022

@gjoseph92 does anything here sound familiar to you?

@fjetter
Copy link
Member

fjetter commented Jun 3, 2022

Hi @bkahloon

Looking at your worker logs shows a specific log message that was removed a long time ago

Specifically Handle missing dep failed, retrying was removed in #5046 / 2021.10.0

While the logs may look similar to you, I'm sure there are subtle differences and it would be helpful to have up to date logs. Can you please post the logs when running on 2022.05.2?

It would also be ideal if you could provide a cluster dump

  1. Get your cluster in a state that is stuck
  2. Run https://distributed.dask.org/en/stable/api.html?highlight=dump_cluster_state#distributed.Client.dump_cluster_state
  3. Upload the json/msgpack file here such that we can investigate

@fjetter fjetter added needs info Needs further information from the user deadlock The cluster appears to not make any progress labels Jun 3, 2022
@bkahloon
Copy link
Author

bkahloon commented Jun 3, 2022

Hi @fjetter , sorry for not adding the cluster dump with the new version attempt. I've attached it below
dask.msgpack.gz

@Rudradutt09
Copy link

Hey. I am having a similar issue where I am trying to save the output as a NetCDF file using Xarray. The dask works perfectly fine till the last moment and then suddenly freezes. There is still ample memory in all the workers but they freeze and do not further process the tasks.

I am using
Xarray version: 2022.3.0
Dask version: 2022.05.1
Distributed version: 2022.5.1

It doesn't show any error or anything just freezes over.

image

image

@fjetter fjetter removed the needs info Needs further information from the user label Jun 16, 2022
@fjetter
Copy link
Member

fjetter commented Jun 28, 2022

Thank you @bkahloon for the cluster dump you provided in #6493 (comment).

We can confirm that this is a deadlock and are not absolutely certain if it is fixed on the most recent version already. We could find out a couple of things, though.
(The following is likely a bit cryptic to most users and is intended for documentation purposes mostly)

There appears to be some sort of network failure between worker and scheduler while the worker is trying to heartbeat. This pops up as an exception Timed out while trying to connect during heartbeat. This is escalated by the worker by closing.
The interesting thing is that at this time the worker and scheduler already have an open connection which is unaffected (the batched stream).
During this close attempt, this worker is stuck for a reason we could not identify yet.

  • Log if closing an executor is not possible in thread #6644 might be causing the event loop to block. This log should help us identify it
  • We strongly suspect that something in our threadpool is off since we can see a (distributed) task being scheduled on a threadpool but it never actually finishes. This is a known problem with the shutdown of the executor that can cause a worker to lock up. However, we believe this should not be possible to block a Worker.close which is what is definitely happening
  • Something in our comms could block closing our ConnectionPool. this might be already fixed by Comm objects do not handle cancellation correctly #6548. We can also see a couple of Timed out during handshake that pop up when this worker is trying to connect to a peer to fetch results which might support this theory.
  • It's not entirely clear why the scheduler does not remove the faulty worker. The config option distributed.scheduler.worker-ttl should remove this broken worker from the rotation and reschedule all stuck tasks, therefore self-healing. For some reason, this is not triggered and we need to investigate this.

Overall a lot happened lately to the code base and we might have fixed the problem already. For instance, #6603 is helping out a lot with proper, graceful shutdown overall and we might've fixed this issue by chance already.

#6644 is introducing an important log message for the threadpool case. once this is merged I would greatly appreciate a new run with a cluster dump if this is possible (Release starting 2022.07.0)

@zklaus
Copy link

zklaus commented Jun 29, 2022

For what it's worth, I encountered something that feels similar when working with masked arrays in dask array (dask/dask#9181). The issue was that dask lost the ability to deserialize masked arrays because something went wrong with the lazy registration of the deserializer. I think it's possible that a similar problem occurs with other objects that rely on lazy registration of deserializers.

@fjetter fjetter added the needs info Needs further information from the user label Aug 4, 2022
@ebo
Copy link

ebo commented Mar 1, 2024

I think I am being bitten by the same or similar. Oddly enough, the code works fine if I am running just a few test files (say 10 files), but gives me an error if I throw the whole enchilada at it (156,526 files). All in all I get the following warnigns and dump on he many files:

extracting CryoSat-2 granule start and end times.
100%|████████████████████████████████████████████████████████████████████| 156526/156526 [00:34<00:00, 4553.44it/s]
100%|█████████████████████████████████████████████████████████████████| 156526/156526 [00:00<00:00, 1045645.01it/s]
/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/client.py:3162: UserWarning: Sending large graph of size 26.28 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
2024-02-29 16:27:01,269 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/comm/tcp.py", line 265, in write
    frames = await to_frames(
             ^^^^^^^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/comm/utils.py", line 48, in to_frames
    return await offload(_to_frames)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/utils.py", line 1540, in run_in_executor_with_context
    return await loop.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/asyncio/base_events.py", line 829, in run_in_executor
    executor.submit(func, *args), loop=self)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2024-02-29 16:27:05,504 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2024-02-29 16:27:05,504 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-02-29 16:27:05,505 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2024-02-29 16:27:05,505 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing

This seams to happen in a number of my scripts, so I may be doing something systematically wrong, but in this case I am automating a check for updated files and then downloading them before processing:

    lock = SerializableLock()

    # set up the clusters
    cluster = LocalCluster(n_workers=workers, threads_per_worker=1)
    client = Client(cluster)

    # generate the futures for Dask to process
    pair_group = []
    for CS_fname in tqdm.tqdm(CS_df['Filename'].values, disable=not verbose):
        pair_group.append((lock,CS_fname, indir, error_file, verbose))

    futures = client.map(download_CryoSat_file, pair_group)

    if verbose:
        progress(futures)
        
    client.gather(futures)

I have tried passing the results to a variable, but all the futures are processed (and files downloaded and validity checked) and the details on their status is written to a log file (the reason for passing the lock), but it would be nice to figure out why this is happening. As a note, I have tried processing this asynchronously and fire_and_forget, but have not gotten it to work consistently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

6 participants