diff --git a/distributed/worker.py b/distributed/worker.py index c935be8ef1..81cda8d614 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3082,8 +3082,12 @@ async def gather_dep( pdb.set_trace() msg = error_message(e) for k in self.in_flight_workers[worker]: - ts = self.tasks[k] - recommendations[ts] = tuple(msg.values()) + try: + ts = self.tasks[k] + except KeyError: + continue + else: + recommendations[ts] = tuple(msg.values()) raise finally: self.comm_nbytes -= total_nbytes @@ -3102,7 +3106,11 @@ async def gather_dep( refresh_who_has = set() for d in self.in_flight_workers.pop(worker): - ts = self.tasks[d] + try: + ts = self.tasks[d] + except KeyError: + continue + ts.done = True if d in cancelled_keys: if ts.state == "cancelled":