-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
comm: close comm on low-level errors #4239
comm: close comm on low-level errors #4239
Conversation
@@ -297,11 +297,5 @@ def raise_buffererror(*args, **kwargs): | |||
b.send("hello") | |||
b.send("world") | |||
await asyncio.sleep(0.020) | |||
result = await comm.read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger You wrote this test I touched here, so you may want to look at it: On BufferError, the first comm.read
already fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test now catches another exception.
The original test was testing for
distributed/distributed/batched.py
Lines 140 to 146 in eda9bcc
def send(self, msg): | |
"""Schedule a message for sending to the other side | |
This completes quickly and synchronously | |
""" | |
if self.comm is not None and self.comm.closed(): | |
raise CommClosedError |
but with your implementation we're now hitting
distributed/distributed/comm/tcp.py
Lines 181 to 184 in eda9bcc
async def read(self, deserializers=None): | |
stream = self.stream | |
if stream is None: | |
raise CommClosedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into it a bit, I think this is a clash of philosophy and this test actually tests the opposite to what you are proposing here.
You suggest that e.g. a BufferError must not be retried but the implementation introduced in #4135 explicitly implements a retry mechansim for these failures.
What happens is: the first instance of the mock raises a BufferError
which was supposed to be retried. Your implementation now, however, immediately closes the comm before anything can be retried.
If we allow for retries in other exception cases, the "solution" would be to change the mock to use a different exception class than BufferError. However, I think the intention was to explicitly deal with a buffer error :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into it a bit, I think this is a clash of philosophy and this test actually tests the opposite to what you are proposing here.
I think the code that re-tries here is deeply flawed, for one simply reason: one cannot safely re-try write
if one does not know how much was written in the previous attempt. So I'd rather go ahead and delete all the retry code for the batched send. It is just plain wrong to retry here.
You suggest that e.g. a BufferError must not be retried but the implementation introduced in #4135 explicitly implements a retry mechansim for these failures.
Yes, re-trying is just wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a second commit which removes the re-tries in distributed.batched
.
I re-based on the latest master, so tests have a chance to run.
The only build failure is a for As this is a known issue (see #4173) and only affected one of the 6 builds, so I think we can consider all tests passed for the purpose of this PR. |
08e5553
to
e70a0b1
Compare
e70a0b1
to
06a3f2a
Compare
I believe the reasoning in here is fine. We should, however, double check that this doesn't cause further trouble. The retry implementation apparently fixed things for the pangeo people, see pangeo-data/pangeo#788 For me there are two questions connected to this change
I would appreciate another opinion, cc @TomAugspurger @mrocklin @jrbourbeau |
I don’t really have anything to add here, other than BatchedSend being fire and forget seemed a bit strange. It wasn’t clear to me how the system handled messages that were lost in the buffer.
… On Nov 19, 2020, at 6:56 AM, Florian Jetter ***@***.***> wrote:
I believe the reasoning in here is fine. We should, however, double check that this doesn't cause further trouble. The retry implementation apparently fixed things for the pangeo people, see pangeo-data/pangeo#788 <pangeo-data/pangeo#788>
For me there are two questions connected to this change
Is the failure / closure of the BatchedSend being dealt with gracefully? (E.g. worker<->scheduler should just trigger a reconnect)
What happens to the buffered payload in the failure case? If it is lost, what would this mean? I have the impression that most of what we're transmitting over batched send relies on a guaranteed delivery (although this is not true, not even now)
I would appreciate another opinion, cc @TomAugspurger <https://github.com/TomAugspurger> @mrocklin <https://github.com/mrocklin> @jrbourbeau <https://github.com/jrbourbeau>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub <#4239 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AAKAOIXYRUWGYV7RHFFJYF3SQUIYBANCNFSM4TS3LEOA>.
|
Dask would not be robust to dropped messages along the BatchedSend channel. I am generally curious why buffers are being dropped. If they are, resending seems like a good idea (maybe?) or otherwise we could disconnect and reconnect the worker and hope that the general worker-loss resilience mechanism handle things. |
@TomAugspurger who had the connection issues in Pangeo? Would they be open to testing out this alternative fix? |
The failing workload is at pangeo-data/pangeo#788 <pangeo-data/pangeo#788> (though it failed maybe 1/5 or 1/10 times).
If anyone has time & interest they can run the example on pangeo’s binder, at https://binder.pangeo.io <https://binder.pangeo.io/>, pointing to this branch, similar to https://github.com/TomAugspurger/pangeo-binder-test/blob/master/binder/Dockerfile
… On Nov 19, 2020, at 11:01 AM, Matthew Rocklin ***@***.***> wrote:
@TomAugspurger <https://github.com/TomAugspurger> who had the connection issues in Pangeo? Would they be open to testing out this alternative fix?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub <#4239 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AAKAOIWOASWHK7ZSFQYIAZLSQVFNZANCNFSM4TS3LEOA>.
|
Is there anything left to do here? |
FWIW, I think we can merge this. |
@fjetter did you have more thoughts? |
I think that it would be useful for someone to try the pangeo workload that Tom mentions here: #4239 (comment) Maybe @rabernat has someone he can point to this? |
The example in pangeo-data/pangeo#788 originated with @paigem. Maybe should could try to give it a spin? |
@rabernat I'm happy to test out this PR with the example from pangeo-data/pangeo#788. I just tried to recreate the error from that example with no luck, but was using a different dataset, so I'll have another try tomorrow. @TomAugspurger you mention testing the example on Pangeo binder - is that preferable to running it on Pangeo Cloud (where the original error occurred)? |
With the binder at binder.pangeo.io you can test this branch by making a dockerfile like https://github.com/TomAugspurger/pangeo-binder-test/blob/master/binder/Dockerfile. We can't easily test unreleased packages on pangeo cloud. But in the end they're the same, from this PR's point of view. Just a Dask cluster created with Dask Gateway. |
Thanks for the tip @TomAugspurger. I started up a Pangeo Binder instance pointing to this branch, but I don't seem to be able to start up a Dask Gateway cluster. I get a "GatewayClusterError: Cluster failed to start". Not sure why it's not working - the notebook I'm running and the Dockerfile can be found here: paigem/test_dask_comms. Sorry I'm not of more help at the moment! |
I can now start up a Dask Gateway cluster (thanks @TomAugspurger) with dask-gateway version 0.9.0 as discussed in the Pangeo binder docs. But I get a import dask.array as dsa
a = dsa.ones((1000,1000,100))
print(a.mean().compute()) However, when I start up a binder instance with the default Dask-distributed (i.e. I don't point it to this PR branch), then I am able to run the above (as well as more complicated calculations) just fine. So it seems like the |
Ah, most likely dask needs to be on master as well.
… On Nov 26, 2020, at 23:59, paigem ***@***.***> wrote:
I can now start up a Dask Gateway cluster (thanks @TomAugspurger) with dask-gateway version 0.9.0 as discussed in the Pangeo binder docs. But I get a CancelledError when trying to run even a basic example calculation:
import dask.array as dsa
a = dsa.ones((1000,1000,100))
print(a.mean().compute())
However, when I start up a binder instance with the default Dask-distributed (i.e. I don't point it to this PR branch), then I am able to run the above (as well as more complicated calculations) just fine. So it seems like the CancelledError I am getting is either due to how I'm setting up my binder instance to point to this branch or an issue with this PR. It's likely the former, since I'm a newbie and still struggling to understand how to debug here, but happy to provide more details if someone is able to help me figure out why I'm getting a CancelledError. The Dockerfile I'm using to set up Binder can be found here.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
@TomAugspurger you're right, it looks like dask is not on master in my binder instance. How do I ensure that my binder uses the dask master branch, with this unreleased version of distributed? Do I need to add a line to my Dockerfile? |
@TomAugspurger Sorry I haven't been of any help here. I'm happy to keep trying at this, as I'm learning a lot, but I also don't want to stall this PR any further. I'm still having trouble getting the correct version of both dask and distributed into my Binder instance, so if you or anyone else has any specific pointers for how to make this work, I'm happy to keep at it. Dockerfile contents:
Result of client.get_versions():
|
Probably not worth holding the PR up for that test, if it's otherwise ready to go. |
Ok, I finally figured this out and am now able to test this PR on Pangeo Binder. Apologies for being so slow to get this up and running @TomAugspurger. I am still getting similar behavior as in pangeo-data/pangeo#788, where one worker seems to have all the tasks and the computation stalls, though it doesn't look like the worker is out of memory in this case (see screenshot below). However, I only got this behavior twice in about 20 tries. Here is the code I am running: from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()
options.worker_memory = 6
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=30)
import gcsfs
import dask.array as dsa
import os
scratch_path = 'gs://pangeo-scratch/paigem/'
gcs = gcsfs.GCSFileSystem(requester_pays=True)
varname = 'SHF_2'
url = f'{scratch_path}/CESM_POP_hires_control/{varname}_target.zarr'
target_mapper = gcs.get_mapper(url)
target_array = dsa.from_zarr(target_mapper)
target_array.mean(axis=0).compute() Dockerfile and dask/distributed versions: Dockerfile contents:
Result of client.get_versions():
|
Ok, so how do we proceed here? I think it's pretty obvious that the retry code in batched_send is flawed in the general case, because it can lead to garbage on the comm if re-trying on any exception after a partial write. For me, this is enough reason to remove it. This is true even if a problem re-appears where this re-try helped: instead, we should fix the problem properly. But I sense this is not the consensus here? So what would be required to get this merged? |
@paigem to be clear, are you saying that before things worked well, and with this PR things work less well? |
My interpretations of @paigem's comment is that she was finally able to test this PR via a properly-configured binder. She found that this PR did not resolve the intermittent failure issue (with tasks all stuck on one worker) we experienced with earlier versions of distributed. edit: whether it works better or worse is hard to say, because it is so intermittent. |
@jochen-ott-by it sounds like you are confident that this is an improvement. Correct?
I suspect that the problem here is that no one feels comfortable making a final decision. Historically I was the one who handled tricky comms issues and I think people tended to wait until I weighed in. That probably makes less sense now that I've been sidelined a bit recently organizing other things. My apologies, and thank you @jochen-ott-by for continuing to follow up here. If this PR doesn't negatively impact the Pangeo workload (see question above to @paigem) , then I think that we're good to go and should merge. If this PR does negatively impact the Pangeo workload then it might be useful to figure out what that failure looks like (@paigem do you happen to have a traceback?) and see what we can do there. |
Got it. Just to be clear, the current released version of distributed does resolve that issue, correct? Merging this in would degrade performance for Pangeo folks? |
I reported the original issue in pangeo-data/pangeo#788. There is no traceback to report, because the computation never finishes or errors.
I thought it was resolved by #4135. I don't have enough knowledge to understand how this PR interacts with that one. This problem is not particularly severe, since it only happens on 1 out of 20 trials. If merging this PR helps move development forward, go for it! But it would be good to keep track of this lingering issue somewhere. |
I was reasonably confident that #4135 fixed the issue with computations completely stalling, though it's hard to say for sure. So if that's correct then a stalled computation on this branch would be a regression. That said, I really know how confident I should be that this was actually fixed by #4135. If we're able to reproduce the stalled computation with distributed 2.30.1, then we can conclude that #4135 didn't fix the original issue, and can proceed with this. Or we determine that this PR is a net improvement and move forward with it anyway, and try to re-debug the stalled computation issue afterwards. |
Maybe, when @paigem comes online (she's down under 🙃 ), she could share the binder repo / url she is using to debug this. It may be useful to have a shared reality where the problem is reproducible. |
Is the update from #4135 currently running on Pangeo Cloud? If so, then my thinking was also that this comms issue was resolved in that update, as I wasn't able to reproduce the error. However, I probably only reran the calculation ~ 10 tries, which isn't conclusive. If it would be useful, I could test the #4135 version again either on Pangeo Cloud or in a Binder instance, but I don't want to delay this PR more than I already have... 🙂 As @rabernat suggested, here is the url I used to start my Pangeo binder |
Also, quick question to verify that I had the correct Dask/distributed versions on my binder: when running |
Yes, I am confident it is an improvement: It removes an execution path that potentially blocks forever. One execution path it removes are the "unusual exception from socket.send", which would look like this:
So overall, there is the situation that Note that this fix is only about handling low-level comm errors by more aggressively closing the connection; it does not fix the actual source of the first error assumed above. I hoped these exception would become more visible, though. Even if what I described is not happening right now "in the wild", I still think this change would be a net improvement, because I think the distributed code should be robust against such cases. |
This is a difficult decision. We're choosing between two incomplete solutions to a problem that affects two different groups differently. We're also making that decision without diving into and fully understanding the underlying issue. Diagnosing the underlying issue requires significant setup work to reproduce. Given that, what do we do?
This seems like the lead bad option to me. I'm going to go ahead with merging this. It looks like this will degrade performance a bit on a Pangeo workload, but it sets us up to better understand and resolve the problem in the future I think. @jochen-ott-by if Pangeo folks come back with a traceback I might try to engage you in thinking about a solution in the future if you're ok with that. @paigem thank you for the small example. That's very helpful. If you're able to verify that the problem also exists without private data that would make it easier for other maintainers to reproduce this and see what is going on.
Also, while you may not get a traceback in Jupyter, it might still be helpful to get logs from the workers (which may include tracebacks). I think that Dask-Gateway will get these for you if you run something like |
I was going to ask elsewhere, but it seems to me a good idea to be able to set scheuler/worker logging to DEBUG from the client. This doesn't yet exist, right (without a hand-crafter |
I suspect that a failed network connection will show up at the ERROR level. My read from @rabernat 's comment above is that they're not seeing a traceback in Jupyter, but aren't yet checking the worker logs. |
Not that I know of, but yes, I would prefer to continue that conversation
elsewhere if that's ok
…On Fri, Dec 4, 2020 at 6:58 AM Martin Durant ***@***.***> wrote:
Also, while you may not get a traceback in Jupyter, it might still be
helpful to get logs from the workers (which may include tracebacks). I
think that Dask-Gateway will get these for you if you run something like
cluster.get_logs()
I was going to ask elsewhere, but it seems to me a good idea to be able to
set scheuler/worker logging to DEBUG from the client. This doesn't yet
exist, right (without a hand-crafter run()) ?
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#4239 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTCSJ45QDKVM6JO27JDSTD2JVANCNFSM4TS3LEOA>
.
|
The data are accessible from Pangeo Binder, so they are not fully private. |
Does this require GCS? Does it require Zarr? Can this be done with random data, or is it something about using those libraries at the same time that causes the issue? |
Thanks for merging this @mrocklin ! If the pangeo issue bubbles up again, feel free to ping me as well, in case @jochen-ott-by is not available |
Close
comm.stream
on low-level errors, such as BufferErrors. In such cases, we do not really know what was written to / read from the underlying socket, so the only safe way forward is to immediately close the connection.See also #4133