-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timed out trying to connect ... : connect() didn't finish in time #4080
Comments
I have the suspicion, that this is related to the handshake, which is failing, when the event loop is blocked |
Interesting. This was added recently. We could test this hypothesis relatively easily by walking back to Dask version 2.22 and seeing if the problem persists. Is this easy for you to do? |
Had a look into our history and we tried 2.20, 2.21 and 2.23. All 'failing' with the timeouts. The runs are somewhat costly and take some hours. We didn't setup cheap reproducers, sofar. This rather happens at scale. |
Digging out more logs
along with
=> ~ 1,364 events |
OK, if this was happening prior to 2.23 then it probably isn't the handshake. I'm surprised to see the event loop being inactive for 5s . The next thing I would probably ask is what is slowing down the workers. One way to get this information is to create a performance_report or watch the dashboard and look at the Separately, if I come up with some time I'll create a small snippet to test how long it takes for every worker to talk to every other worker. My guess is that this will return quickly, and that there is some issue with the combination of comms and whatever work is going on. I'm curious, is there anything suspect in the functions that you're running, or would this happen just as often with normal Pandas functions? |
I would be curious what this returns on your system. On my laptop it's 15ms. import asyncio
import time
from typing import List
from dask.distributed import Client, get_worker
from dask.utils import format_time
async def ping(addresses: List[str]):
await asyncio.gather(*[
get_worker().rpc(addr).identity()
for addr in addresses
if addr != get_worker().address
])
def all_ping(client: Client):
workers = list(client.scheduler_info()["workers"])
start = time.time()
client.run(ping, workers)
stop = time.time()
print(format_time(stop - start))
if __name__ == "__main__":
with Client() as client:
all_ping(client) Also, due to connection pooling, running this before your comptuation might make this smoother short term. |
Thank you so much for looking into this! I'm happy to run In respect to the functions I'm running: The problems arise, when it starts re-partitioning the 100,000 output partitions to 1000. I suspect, that the same would happen, if a |
Thanks again for looking into this. I got hooked up on that topic and happy to share a sort-of-reproducer running on the full-scale-cluster. b = bag.from_sequence([1]*100_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5).persist()
bc = bb.repartition(1000).persist() The reproducer is producing connection errors and timeouts for the comm between workers and scheduler. This is not exactly the failure mode, we have faced. In our case, the workers are loosing connection between themselves. I'm sharing the results anyways. I produced a performance report and uploaded that to https://bl.ocks.org/michaelnarodovitch/19923648bc21002b0a4c39556aea00f9 . I also produced verbose logs and share some highlights here. If somebody is interested, I also may share the logs themselves. On the scheduler On the workers
Will share more information on our failure mode, when we run @mrocklin reporting back on the |
Well, looking at the performance report nothing is coming up unfortunately (we're entertainingly spending most of our time in the workers logging). We're not entirely sensitive here though. If we're spending a lot of time in GIL-holding functions we wouldn't see that. This example is great though in that it's really well isolated, and probably stresses only our networking and serialization stack. I'll take a longer look later today on a cluster. |
That's good to hear. I was curious about how things would scale there. Thanks! |
OK, so I just ran this myself on a larger cluster. Here are the results import coiled
cluster = coiled.Cluster(n_workers=40)
from dask.distributed import Client
client = Client(cluster) # This works fine
b = bag.from_sequence([1]*10_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5)
bc = bb.repartition(1000)
bc = bc.persist() # A larger size fails
b = bag.from_sequence([1]*40_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5)
bc = bb.repartition(1000).persist() # Get logs
logs = cluster.get_logs()
print(list(logs.values())[-1])
cc @jakirkham as the new bytes-handling expert does the error message above mean anything to you? |
That's something new to me :) Thanks for trying it out. I digged a little bit into the comm code and didn't find anything suspicious there. I can't exclude that the connection timeouts on our end are related to our networking setup. I will update here, if I find anything interesting to share. I suppose, that the reproducer captures the problem (on our end) quite well. Maybe some other folks are also facing timeouts and can share their experiences here. |
@quasiben is anyone on your team able to help deal with the comms issue here? It seems like it might be related to recent changes. |
This part of the traceback above sticks out to me. File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 241, in write
stream.write(b) Looking at the code it appears to be sending distributed/distributed/comm/tcp.py Lines 239 to 241 in a803253
The error itself relates to how taking views of underlying buffers may result in them being immutable. Here's an example with a import io
b = io.BytesIO()
b.write(b"abc")
m = b.getbuffer() # buffer cannot be mutated as it is viewed now
b.write(b"def") # error trying to mutate buffer that is viewed The way one fixes this typically is they release the view before trying to mutate the buffer. import io
b = io.BytesIO()
b.write(b"abc")
m = b.getbuffer()
m.release() # release view after doing stuff
b.write(b"def") # now able to mutate buffer without issues What can we draw from these two things? Well the error is invariant to serialization. The error happens in Tornado when trying to pack Doing a quick search I find this SO question. Ben says here that this may be thread related (like one thread is viewing this buffer while another is trying to modify it). That SO question links to this old Tornado PR ( tornadoweb/tornado#2008 ) and then curiously to an old Distributed issue ( #1704 ). So my best guess is maybe we are doing something unsafe with thread usage around Tornado. Barring that maybe there is still a Tornado bug lurking. HTH |
This just came up in a Pangeo issue (see #4128 linked above). @jakirkham @quasiben would you all be able to take ownership over this problem? I think that you all are probably the best positioned to fix it, if you have the time. |
I think I've already provided as much insight as I can above. |
Understood. Someone needs to dig in here, figure out the right way forward, and fix the problem. I'm curious if either of you are willing to take this on. |
Gave it an other try with dask/distributed 2.7.0 and dask/distributed 2.0.0 - same problem. Wondering, what happens with BatchedSend, if the scheduler event loop is blocked, as indicated by the logs
Note: We still can not exclude, that his is due to networking issues on our end. |
I can take a look at this early next week |
@michaelnarodovitch This sounds very similar to the issues I have been having in #3878 - these logs are extremely similar to what I have been seeing:
Unfortunately I do not have anything of value to add other than potentially linking the issues, apologies. |
Thanks for sharing @KrishanBhasin . Also related: |
We've found a similar issue from running Prefect with |
On
and the below on the scheduler: |
What helped quite a bit is, sofar:
Long application timeout effectively makes the workers saturate the scheduler socket. Short timeout + backoff protects the scheduler listener by effectively spreading the connections in time. Failed tasks (still occurs) would be rescheduled instead of failing the whole job. distributed/distributed/comm/core.py Lines 327 to 332 in b2f594e
|
@michaelnarodovitch thanks for sharing that. I have just tried running your suggested config, and am finding that the issue still occurs (but much less). My code does a lot of merging towards the end of the job, where I am still getting This loss of work is very time-expensive for me (~1h each time), so I will try to find time to investigate further Additionally, my scheduler logs are getting hammered with the below error message ~35 times per second which may or may not be related (these appear to relate to the diagnostics dashboard):
Within the scheduler logs there are also a few of these below, which appear to be related to removing the workers that throw up the
It may be worth noting that I am exclusively running a LocalCluster with 50-60 workers with a total RAM of ~1TB. |
This might be fixed by #4167, can you try that branch and see if it works for you? |
Has anyone here found that My workers continue to time out attempting to communicate with each other, on a 50-node LocalCluster |
@KrishanBhasin can you post a traceback to see where the timeout occurs? |
Hey @fjetter Second: |
Don't worry, I didn't even notice anything. I was just a bit upset that the issue was not resolved :)
Glad to hear! |
Hi @fjetter, Regarding to the timeout errors that @KrishanBhasin mentioned above, here is the traceback (obtained while using
|
This traceback looks "ordinary" and this might just be an ordinary timeout. Especially if the cluster is under heavy load this might just be expected behaviour. There is a discussion ongoing to increase the default timeout value to something larger, see #4228 You might want to try out to increase the value and see if this resolves the problem (try 30s or 60s if your cluster is under extreme stress) |
Is there a way to identify a deadlock situations that is related to network timeouts? Is this a typical log for such a situation. Our computation suddenly halts and it does not seem to move forward or backwards. Is there a way to recover from such situations, e.g. by killing a definitive worker or better timing out sth.? When I kill a random worker, the dependend data gets rebuild and I eg. progress fro, 125102/189776 to 125514/189776 split-shuffle tasks done. Is this such a situation as described above? Or sth. different. Does this look norma (seeing similar timeouts):
and many more of this... None of the workers seem to have actually failed on the cluster (here 160 CPUs on 40 workers) |
That sounds like something different. @riedel could you please raise a new issue with a minimal reproducer? |
In case people are still bothered by deadlocks: We've recently merged an important PR addressing a few error handling edge cases which caused unrecoverable deadlocks. Deadlock fix #4784 |
Whereas the following is not a bug we can pinpoint, it really bothers us. After weeks of experimentation and investigation we didn't find a real solution.
We run something like
on 100 workers with 1500 cpus on Kubernetes pods.
When it comes to the last re-partition step, dask starts blocking the event loop and the workers start spamming the following logs
=> 5000 events
=> 26,094 events
We are working around that, by setting
sothat, the job finally finishes, despite the issues in the communication.
Anyways, sometimes workers even start hanging and have to be restarted manually. Also the progress significantly slows down to the end of the job.
I'm sharing this with the community and hoping, that somebody may give pointers what to try, or even ideas for a resolution.
Environment:
The text was updated successfully, but these errors were encountered: