Skip to content

Commit

Permalink
comm: close comm on low-level errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jochen-ott-by committed Nov 12, 2020
1 parent aa8e2dd commit 08e5553
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
22 changes: 17 additions & 5 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ async def read(self, deserializers=None):
self.stream = None
if not shutting_down():
convert_stream_closed_error(self, e)
except Exception:
# Some OSError or a another "low-level" exception. We do not really know what
# was already read from the underlying socket, so it is not even safe to retry
# here using the same stream. The only safe thing to do is to abort.
# (See also GitHub #4133).
self.abort()
raise
else:
try:
msg = await from_frames(
Expand Down Expand Up @@ -253,13 +260,18 @@ async def write(self, msg, serializers=None, on_error="message"):
await future
bytes_since_last_yield = 0
except StreamClosedError as e:
stream = None
convert_stream_closed_error(self, e)
except TypeError as e:
self.stream = None
if not shutting_down():
convert_stream_closed_error(self, e)
except Exception:
# Some OSError or a another "low-level" exception. We do not really know what
# was already written to the underlying socket, so it is not even safe to retry
# here using the same stream. The only safe thing to do is to abort.
# (See also GitHub #4133).
if stream._write_buffer is None:
logger.info("tried to write message %s on closed stream", msg)
else:
raise
self.abort()
raise

return sum(lengths)

Expand Down
19 changes: 19 additions & 0 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,25 @@ async def handle_comm(comm):
await comm.close()


@pytest.mark.asyncio
async def test_comm_closed_on_buffer_error():
# Internal errors from comm.stream.write, such as
# BufferError should lead to the stream being closed
# and not re-used. See GitHub #4133
reader, writer = await get_tcp_comm_pair()

def _write(data):
raise BufferError

writer.stream.write = _write
with pytest.raises(BufferError):
await writer.write("x")
assert writer.stream is None

await reader.close()
await writer.close()


#
# Various stress tests
#
Expand Down
16 changes: 11 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,12 @@ async def handle_comm(self, comm, shutting_down=shutting_down):
)
break
except Exception as e:
logger.exception(e)
await comm.write(error_message(e, status="uncaught-error"))
continue
logger.exception("Exception while reading from %s", address)
if comm.closed():
raise
else:
await comm.write(error_message(e, status="uncaught-error"))
continue
if not isinstance(msg, dict):
raise TypeError(
"Bad message type. Expected dict, got\n " + str(msg)
Expand Down Expand Up @@ -531,8 +534,11 @@ async def handle_comm(self, comm, shutting_down=shutting_down):
logger.info("Lost connection to %r: %s", address, e)
break
except Exception as e:
logger.exception(e)
result = error_message(e, status="uncaught-error")
logger.exception("Exception while handling op %s", op)
if comm.closed():
raise
else:
result = error_message(e, status="uncaught-error")

# result is not type stable:
# when LHS is not Status then RHS must not be Status or it raises.
Expand Down
8 changes: 1 addition & 7 deletions distributed/tests/test_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,5 @@ def raise_buffererror(*args, **kwargs):
b.send("hello")
b.send("world")
await asyncio.sleep(0.020)
result = await comm.read()
assert result == ("hello", "hello", "world")

b.send("raises when flushed")
await asyncio.sleep(0.020) # CommClosedError hit in callback

with pytest.raises(CommClosedError):
b.send("raises when sent")
await comm.read()

0 comments on commit 08e5553

Please sign in to comment.