Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed stuck BatchedSend comm #4128

Closed
wants to merge 1 commit into from

Conversation

TomAugspurger
Copy link
Member

@TomAugspurger TomAugspurger commented Sep 25, 2020

As reported in pangeo-data/pangeo#788, users
were seeing tasks just not completing. After some debugging, I
discovered that the scheduler had assigned the "stuck" tasks to a worker,
but the worker never received the message. A bit more digging showed
that

  1. The message was stuck in the worker BatchedSend comm's buffer
  2. The BatchedSend.waker event was clear (awaiting it would wait)
  3. The BatchedSend.next_deadline was set

I couldn't determine why, but this state is consistent with us
"missing" a deadline, i.e. the BatchedSend.next_deadline is set, but
the _background_send is already awaiting the event (maybe with no timeout?).
So I'm very shaky on the cause, but I'm hoping that this fixes the issue. Doing
some more manual testing.

@mrocklin I'm extra confused here, since this code hasn't been touched in a while. I'd have thought we would have seen more reports of this, but I don't recall any.

As reported in pangeo-data/pangeo#788, users
were seeing tasks just not completing. After some debugging, I
discovered that the scheduler had assigned the "stuck" tasks to a worker,
but the worker never received the message. A bit more digging showed
that

1. The message was stuck in the worker BatchedSend comm's buffer
2. The BatchedSend.waker event was clear (awaiting it would wait)
3. The BatchedSend.next_deadline was set

I couldn't determine *why*, but this state is consistent with us
"missing" a deadline, i.e. the `BatchedSend.next_deadline` is set, but
the `_background_send` is already `awaiting` the event. So I'm very
shaky on the cause, but I'm hoping that this fixes the issue. Doing
some more manual testing.
@TomAugspurger
Copy link
Member Author

TomAugspurger commented Sep 25, 2020

Oh, I should have looked at the logs sooner. From the scheduler pod

dask_gateway.dask_cli - INFO - Requesting scale to 30 workers from 1
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 93, in _background_send
    payload, serializers=self.serializers, on_error="raise"
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 241, in write
    stream.write(b)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py", line 553, in write
    self._write_buffer.append(data)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py", line 177, in append
    b += data  # type: ignore
BufferError: Existing exports of data: object cannot be re-sized

which does sound familiar (#2519, #1704, #2506). This is with tornado 6.0.4

So this fix is unlikely to address the original issue (I think). But it may still be worthwhile? I'm not sure.

We instead need a way to recover from an exception. I'm able to client.run_on_scheduler a function to restart it, so the BatchedComm itself seems to be OK. Maybe we retry a certain number of times? And when we retry and fail multiple times, we try opening a new BatchedComm to the worker and copy the current buffer over to it?

@TomAugspurger
Copy link
Member Author

John's been looked into this a bit a #4080 (comment).

@mrocklin
Copy link
Member

OK, so things failed in the batched_send, so the next send didn't trigger? We should be robust to this, which I take it this PR helps with, but I honestly wouldn't expect things to finish if the error in #4080 persists . We're not robust to dropped messages.

One thing to check here is if an older version of distributed has this problem, ideally something just before all of the serialization / bytes / memoryviews changes that went in a few months ago. Git bisect might help highlight an issue here.

@mrocklin
Copy link
Member

(if it's easy to reproduce that is)

@TomAugspurger
Copy link
Member Author

OK, so things failed in the batched_send, so the next send didn't trigger?

I think (but I'm pretty hazy here) the rough sequence was

        while not self.please_stop:
            try:
                nbytes = yield self.comm.write(
                    payload, serializers=self.serializers, on_error="raise"  # this raised a BufferError
                )
             ....
            except CommClosedError as e:
                logger.info("Batched Comm Closed: %s", e)
                break
            except Exception:
                logger.exception("Error in batched write")
                break   # <------------ so we break out of the for loop
        # and set self.stopped
        self.stopped.set()

We should be robust to this, which I take it this PR helps with,

I'm less sure that it actually helps with things. I'll need to dig more. But what would help is retrying.

but I honestly wouldn't expect things to finish if the error in #4080 persists

The error is transient I think, since when I manually wake up the Comm things finish fine. I think that tornado's internal buffer is drained and we're OK to go again?

I'd consider this a WIP for now. I haven't been able to make a small reproducer yet but will give it some more time on Monday.

@mnarodovitch
Copy link

I gave it a try with the reproducer reported in #4080 :

b = bag.from_sequence([1]*100_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5).persist()
bc = bb.repartition(1000).persist()

Unfortunately, this does not fix the issue. Workers still use to loose connection, as reported in #4080 .

@mrocklin
Copy link
Member

Thanks @michaelnarodovitch . Would you be willing to use git bisect to see if there was a time when this used to work, and maybe try to isolate which commit caused the issue?

@mnarodovitch
Copy link

For me, there was no time, where this used to work. Our production code is still working around with aggressive retries.

distributed.comm.timeout=80s
distributed.scheduler.allowed-failures=999

Ran the reproducer with dask from pip install git+https://github.com/TomAugspurger/distributed.git@pangeo-788. Tried with some other versions back then, as reported in #4080 .

@mrocklin
Copy link
Member

mrocklin commented Sep 26, 2020 via email

@mnarodovitch
Copy link

mnarodovitch commented Sep 27, 2020

Folks were digging around in the serialization code a bit this summer. If you're willing, I would encourage you to go back about 6-12 months and see if things worked then.

Same problem with dask/distributed 2.7.0 and dask/distributed 2.0.0.

@mrocklin
Copy link
Member

mrocklin commented Sep 28, 2020 via email

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Sep 28, 2020

@mrocklin, at a high-level, what do you expect to happen when a Scheduler.stream_comms[wroker].send(msg) fails? We handle the CommClosedError (aka "this worker is probably gone forever) well by removing the worker.

For other exceptions like this BufferError, I'd expect us to retry some number of times. But eventually, we should likely eventually give up and handle this like the CommClosedError by closing the worker?

Edit: Hmm, this is maybe complicated by the fact that we don't actually await things from BatchedSend._background_send, it just happens in the background. So even if we were to raise from BatchedSend._background_send no one would be there to catch it.

@mrocklin
Copy link
Member

For other exceptions like this BufferError, I'd expect us to retry some number of times. But eventually, we should likely eventually give up and handle this like the CommClosedError by closing the worker?

Yes, that would make sense to me

Edit: Hmm, this is maybe complicated by the fact that we don't actually await things from BatchedSend._background_send, it just happens in the background. So even if we were to raise from BatchedSend._background_send no one would be there to catch it.

Ah indeed. Well maybe we can still retry there, and it fails we close the underlying Comm, so that the next time the BatchedSend tries to write it fails and restarts the worker?

@TomAugspurger
Copy link
Member Author

I'm going to close this for now in favor of #4135.

This PR really only fixed things for the case where we missed a deadline (failed to await self.waker.wait(self.next_deadline) by the time next_deadline passed. We don't have any examples of this actually occurring when there isn't a deeper issue like in #4135, and so this is just a theoretical concern. Let's revisit it if we get an actual example where we miss a deadline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants