Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Computation deadlocks after inter worker communication error #4133

Closed
fjetter opened this issue Sep 28, 2020 · 15 comments · Fixed by #4784
Closed

Computation deadlocks after inter worker communication error #4133

fjetter opened this issue Sep 28, 2020 · 15 comments · Fixed by #4784

Comments

@fjetter
Copy link
Member

fjetter commented Sep 28, 2020

Before I start, sincere apologies for not having a reproducible example but the issue is a bit weird and hard to reproduce. I mostly querying here to see if anybody has an idea what might be going on.

What happened:

We have seen multiple times now that some individual computations are stuck in the processing state and are not being processed. A closer investigation revealed in all occurrences that the worker the processing task is on, let's call it WorkerA, is in the process of fetching the tasks dependencies from WorkerB. While fetching these, an exception seems to be triggered on WorkerB and for some reason this exception is lost. In particular, WorkerA never gets any signal that something was wrong and it waits indefinitely for the dependency to arrive while WorkerB already forgot about the error. This scenario ultimately leads to a deadlock where the single dependency fetch is blocking the cluster. It looks like the cluster never actually self heals. The last time I could observe this issue, the computation was blocked for about 18hrs. After manually closing all connections, the workers retried and the graph finished successfully.

I've seen this deadlock happen multiple times already but only during the last instance, I could glimps an error log connecting to issue #4130 where the tornado stream seems to cause this error. Irregardless of what caused the error, I believe there is an issue on distributed side since the error was not properly propagated, the connections where not closed, etc. suggesting to WorkerA that everything was fine.

Traceback
Traceback (most recent call last):

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/core.py", line 513, in handle_comm
    result = await result

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 1284, in get_data
    compressed = await comm.write(msg, serializers=serializers)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/comm/tcp.py", line 243, in write
    future = stream.write(frame)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 553, in write
    self._write_buffer.append(data)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 177, in append
    b += data  # type: ignore

BufferError: Existing exports of data: object cannot be re-sized

The above traceback shows that the error appeared while WorkerB was handling the request, executing the get_data handler, ultimately raising the exception

try:
result = handler(comm, **msg)
if inspect.isawaitable(result):
result = asyncio.ensure_future(result)
self._ongoing_coroutines.add(result)
result = await result
except (CommClosedError, CancelledError) as e:
if self.status == Status.running:
logger.info("Lost connection to %r: %s", address, e)
break
except Exception as e:
logger.exception(e)
result = error_message(e, status="uncaught-error")

In this code section we can see that the exception is caught and logged. Following the code, an exception error result is generated which should be sent back to WorkerA causing the dependency fetch to fail, causing a retry

if reply and not is_dont_reply:
try:
await comm.write(result, serializers=serializers)
except (EnvironmentError, TypeError) as e:
logger.debug(
"Lost connection to %r while sending result for op %r: %s",
address,
op,
e,
)
break

Even if this fails (debug logs not enabled), the connection should be closed/aborted eventually and removed from the servers tracking

finally:
del self._comms[comm]
if not shutting_down() and not comm.closed():
try:
comm.abort()
except Exception as e:
logger.error(
"Failed while closing connection to %r: %s", address, e
)

which never happens. I checked on the still running workers and the self._comm attributed was still populated with the seemingly broken connection.

Why was this finally block never executed? Why was the reply never submitted received by WorkerA?

Just looking at the code, I would assume this to be already implemented robustly but it seems I'm missing something. Anybody has a clue about what might be going on?

Environment

  • Distributed version: 2.20.0
  • Python version: Py 3.6
  • Operating System: Debian
  • Tornado 6.0.4
  • Install method (conda, pip, source): pip
@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 28, 2020

Good, timing, @quasiben and I are looking into this right now! xref #4128 and linked pangeo issues.

One thing I'm trying to disambiguate: connection errors like those seen in #4130 and the BufferError: Existing exports of data: object cannot be re-sized like pangeo is running into (which is also mentioned in #4128). It sounds like you're maybe hitting the pangeo issue. For the pangeo issue, the issue occurs in

@gen.coroutine
def _background_send(self):
while not self.please_stop:
try:
yield self.waker.wait(self.next_deadline)
self.waker.clear()
except gen.TimeoutError:
pass
if not self.buffer:
# Nothing to send
self.next_deadline = None
continue
if self.next_deadline is not None and self.loop.time() < self.next_deadline:
# Send interval not expired yet
continue
payload, self.buffer = self.buffer, []
self.batch_count += 1
self.next_deadline = self.loop.time() + self.interval
try:
nbytes = yield self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
if nbytes < 1e6:
self.recent_message_log.append(payload)
else:
self.recent_message_log.append("large-message")
self.byte_count += nbytes
except CommClosedError as e:
logger.info("Batched Comm Closed: %s", e)
break
except Exception:
logger.exception("Error in batched write")
break
finally:
payload = None # lose ref
self.stopped.set()
. The BatchedSend._background_send is not robust to failures (and I think swallows exceptions). Right now I'm going through tornado issues / IOStream to construct a reproducer, and then I'll work on making BatchedSend more robust.

@quasiben
Copy link
Member

Does this occur with a large number of workers ?

One thing which would be super helpful (though fairly challenging) is getting a reproducer for this (these) issues.

@fjetter
Copy link
Member Author

fjetter commented Sep 28, 2020

I suspected the BatchedSend myself but I was working under the impression that everything in handle_comm was not batched but a plain comm (ICP/TCP/TLS/etc.; depending on the setup, of course)

Does this occur with a large number of workers ?

No, last time I observed this we had 10 workers. However, the scheduler and one of the workers had an uptime of about a month (we always have one worker ready to compute and are scaling up as necessary. Scheduler and worker0 are always-on). Not sure if this is connected but often worth mentioning

One thing which would be super helpful (though fairly challenging) is getting a reproducer for this (these) issues.

Yes, I know but so far I don't have anything to show. As I said, we've been facing this a few times already and I didn't even know where to start looking because most of the times we didn't even have an error log. I also don't believe #4130 is the only way to trigger this scenario, just the latest instance I could observe.

@TomAugspurger
Copy link
Member

handle_comm was not batched but a plain comm (ICP/TCP/TLS/etc.; depending on the setup, of course)

That sounds right to me, but I"ll confirm. Right now my guess is that we have two distinct issues (the handle_comm things and BatchedSend not gracefully handling exceptions like BufferError)

@fjetter
Copy link
Member Author

fjetter commented Nov 9, 2020

I've observed a link between this issue and #4080 / #4176 where the handshake triggered this but I cannot say if this is the only trigger and I'm not closer to the root cause itself.

@jochen-ott-by
Copy link
Contributor

Why was this finally block never executed? Why was the reply never submitted received by WorkerA?

Just looking at the code, I would assume this to be already implemented robustly but it seems I'm missing something. Anybody has a clue about what might be going on?

Note that I'm just reading code and tried to follow your analysis, and I think I may have found (a part of) the answers to this question, in particular why the "finally" block you quoted is never invoked: I think l. 546 in core.py (as you cited it) runs just fine without an exception, so no exception handling mechanism is triggered and this "finally" is not invoked. (This is possible, e.g. if the serialized exception exceeds 2048 bytes which would not re-trigger this same BufferError; it is also possible that this BufferError is not hit again here for other reasons, such that a write from another thread finished in the meantime).

So I'm pretty confident this is the problem: This BufferError exception leaves the connection in a broken internal state, but it is still re-used. I think the fix is relatively straight-forward: be much more aggressive to close / cleanup connections with failures: in case of an internal error, the worker should NOT attempt to reply this error in-band via the (likely already broken) connection. Instead, it should treat the connection as broken beyond repair and just log the error and close the connection, relying on re-tries from a level above this code.

jochen-ott-by added a commit to jochen-ott-by/distributed that referenced this issue Nov 11, 2020
This improves the robustness in case of internal
errors in handlers that leave `comm` in an invalid state,
see also dask#4133.
@fjetter
Copy link
Member Author

fjetter commented Nov 11, 2020

l. 546 in core.py (as you cited it) runs just fine without an exception

If it was running just fine, it would trigger an exception on the listener side, wouldn't it?

if isinstance(response, dict) and response.get("status") == "uncaught-error":
if comm.deserialize:
typ, exc, tb = clean_exception(**response)
raise exc.with_traceback(tb)
else:
raise Exception(response["text"])

If the write would somehow block / not terminate that would explain the issue.

Do you have a suggestion to why we do not see any exception logs (see line 534 core.py)?

if the serialized exception exceeds 2048 bytes which would not re-trigger this same BufferError

Not really important here but why would the size of the exception impact this? I was still under the assumption that this buffer error was not, yet, well understood

@jochen-ott-by
Copy link
Contributor

jochen-ott-by commented Nov 11, 2020

l. 546 in core.py (as you cited it) runs just fine without an exception

If it was running just fine, it would trigger an exception on the listener side, wouldn't it?

I would not expect this: The BufferError means this is a short write. So I guess the other end just waits for the rest (which never comes, or only garbage, intermangled with the exception sent here).

If the write would somehow block / not terminate that would explain the issue.

My guess is this read blocks on the other end somewhere around here:

for length in lengths:
frame = bytearray(length)
if length:
n = await stream.read_into(frame)
assert n == length, (n, length)
frames.append(frame)

Do you have a suggestion to why we do not see any exception logs (see line 534 core.py)?

I thought this was the traceback you posted?

if the serialized exception exceeds 2048 bytes which would not re-trigger this same BufferError

Not really important here but why would the size of the exception impact this? I was still under the assumption that this buffer error was not, yet, well understood

tornado behaves differently depending on the size passed to write. The code path that potentially triggers the BufferError is only hit for writes smaller than 2048 bytes (while exceptions sent by distributed can, by default, be up to 10kb):

https://github.com/tornadoweb/tornado/blob/b4e39e52cd27d6b3b324a399dff046f71545c4a5/tornado/iostream.py#L164-L177

@fjetter
Copy link
Member Author

fjetter commented Nov 11, 2020

I thought this was the traceback you posted?

Yes, of course. I've seen this issue a few times not and there was not always an exception log. For this instance that would fit, though

@jochen-ott-by
Copy link
Contributor

I think we have at least a partial understanding of what is going on here, enough for a reasonable fix:

  • WorkerA is fetching data (dependencies) from WorkerB
  • After sending some of the requested data (!), there is a low-level exception on WorkerB, e.g. a BufferError, when sending the data to WorkerA (The origin is not fully understood yet, but does not matter that much for what happens next).
  • This exception is caught by handle_comm and treated by writing an exception message to the connection to WorkerA.
  • The connection stream is now complete garbage: it contains parts of the requested data, followed by the exception message. Note, though, that the connection is not closed.
  • Depending on some details (expected message size, size of the error message), WorkerA may wait forever for more data on this connection, which WorkerB never sends.

The fix is to simply close the connection on such low-level connection errors, rather than attempting to send an error message over this connection. This is what #4239 does.

Note that there is still the possibility for similar error conditions not addressed by #4239: if the handler starts to send some data and then raises an exception (from the handler code, not from comm.write), then the same situation would arise in that the connection stream contains unreadable garbage that might mean the other end of the connection waits indefinitely. Fixing that, however, likely requires much more refactoring, so I did not attempt to fix it so far.

One relatively straight-forward option is to always close the connection after sending an "uncaught-error" message. This would prevent the other end from waiting indefinitely. Depending on whether some data has already been written to the stream or not, the error message might be garbage or not, but at least the client would not wait indefinitely.

@mrocklin
Copy link
Member

One relatively straight-forward option is to always close the connection after sending an "uncaught-error" message. This would prevent the other end from waiting indefinitely.

I'm curious, what happens in this situation. Do operations like sending data between worker retry today on a CommClosedError? If not, should they?

@jochen-ott-by
Copy link
Contributor

jochen-ott-by commented Nov 13, 2020

One relatively straight-forward option is to always close the connection after sending an "uncaught-error" message. This would prevent the other end from waiting indefinitely.

I'm curious, what happens in this situation. Do operations like sending data between worker retry today on a CommClosedError? If not, should they?

It depends: some operations are re-tried via retry_operation, if retries are enabled in the configuration. See e.g.

return await retry_operation(_get_data, operation="get_data_from_worker")
to see that e-tries are done for worker->worker data transfers.

These calls to retry_operation are actually from a change I contributed. Back then, there also was the question of whether / which operations should retry. I decided to only add retry_operation calls to operations that "seemed safe" to blindly repeat, i.e. "reading" operations or idempotent operations, but not more complex, mutating operations.

@gforsyth
Copy link
Contributor

Pinging @KrishanBhasin as this sounds very similar to some issues he has been seeing.

@KrishanBhasin
Copy link
Contributor

Thanks @gforsyth.

I was indeed seeing something that sounds similar to this in #3878, but have not encountered it for some time.
At the time I was having the issues, I didn't delve deep enough into the logs to understand exactly what was happening, nor did I make a note of when the problem went away so I'm afraid I can't contribute much to this discussion.

@fjetter
Copy link
Member Author

fjetter commented Nov 18, 2020

Following the above reference also leads to #3761 which mentions deadlocks. Might be the same thing but can't confirm.

mrocklin pushed a commit that referenced this issue Dec 4, 2020
Close comm.stream on low-level errors, such as BufferErrors. In such cases, we do not really know what was written to / read from the underlying socket, so the only safe way forward is to immediately close the connection.

See also #4133
@fjetter fjetter linked a pull request Jun 11, 2021 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants