From 2455174496c9c2907e3f53b65145d0613ad56a56 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 25 Sep 2020 10:12:13 -0500 Subject: [PATCH] Fixed stuck BatchedSend comm As reported in https://github.com/pangeo-data/pangeo/issues/788, users were seeing tasks just not completing. After some debugging, I discovered that the scheduler had assigned the "stuck" tasks to a worker, but the worker never received the message. A bit more digging showed that 1. The message was stuck in the worker BatchedSend comm's buffer 2. The BatchedSend.waker event was clear (awaiting it would wait) 3. The BatchedSend.next_deadline was set I couldn't determine *why*, but this state is consistent with us "missing" a deadline, i.e. the `BatchedSend.next_deadline` is set, but the `_background_send` is already `awaiting` the event. So I'm very shaky on the cause, but I'm hoping that this fixes the issue. Doing some more manual testing. --- distributed/batched.py | 2 +- distributed/tests/test_batched.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/distributed/batched.py b/distributed/batched.py index eab57c420e..e3a0d4b785 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -119,7 +119,7 @@ def send(self, msg): self.message_count += 1 self.buffer.append(msg) # Avoid spurious wakeups if possible - if self.next_deadline is None: + if self.next_deadline is None or self.next_deadline < self.loop.time(): self.waker.set() @gen.coroutine diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index a288a25bbb..a2c12ec958 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -253,3 +253,20 @@ async def test_serializers(): with pytest.raises(TimeoutError): msg = await asyncio.wait_for(comm.read(), 0.1) + + +@pytest.mark.asyncio +async def test_missed_deadline(): + async with EchoServer() as e: + comm = await connect(e.address) + b = BatchedSend(interval=10) + b.start(comm) + + await asyncio.sleep(0.020) + # We've missed the deadline. + b.next_deadline = b.loop.time() - b.interval + b.send("hello") + b.send("world") + await asyncio.sleep(0.020) + assert len(b.buffer) == 0 + await b.close()