Skip to content

Commit

Permalink
Handle missing key case in report_on_key (#4755)
Browse files Browse the repository at this point in the history
* Drop `ts is None` case from `_task_to_report_msg`

`ts` cannot be `None` and be properly handled here. Generally we expect
`ts is not None` and will treat it that way. Otherwise an exception will
be raised.

* Drop `ts is None` case from `_task_to_client_msgs`

This case of `ts is None` simply cannot occur as a messages cannot be
constructed to be reported. Also the handful of cases where
`_task_to_client_msgs` is called we know `ts is not None`. So there is
no point in handling this case. As such just drop the unreachable case.

* Skip gathering client messages if unneeded

If `report_msg is None`, we can skip gathering messages as there is
nothing to send. Also if there are no interested clients, there is no
need to do the work of gathering messages.

* Report `key` cancelled if missing associated task

Previously if an associated `ts` could not be found for a `key`, we
would report that `key` as cancelled. However it seems this code was
missing after a recent rewrite. This effectively readds it by
constructing a message to cancel the `key` when the task is not found.

* Test `report_on_key` with only `key`
  • Loading branch information
jakirkham authored Apr 28, 2021
1 parent 8dfaca6 commit b577ece
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
26 changes: 12 additions & 14 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6047,7 +6047,11 @@ def report_on_key(self, key: str = None, ts: TaskState = None, client: str = Non
assert False, (key, ts)
return

report_msg: dict = _task_to_report_msg(parent, ts)
report_msg: dict
if ts is None:
report_msg = {"op": "cancelled-key", "key": key}
else:
report_msg = _task_to_report_msg(parent, ts)
if report_msg is not None:
self.report(report_msg, ts=ts, client=client)

Expand Down Expand Up @@ -7058,9 +7062,7 @@ def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) ->
@cfunc
@exceptval(check=False)
def _task_to_report_msg(state: SchedulerState, ts: TaskState) -> dict:
if ts is None:
return {"op": "cancelled-key", "key": ts._key}
elif ts._state == "forgotten":
if ts._state == "forgotten":
return {"op": "cancelled-key", "key": ts._key}
elif ts._state == "memory":
return {"op": "key-in-memory", "key": ts._key}
Expand All @@ -7079,16 +7081,12 @@ def _task_to_report_msg(state: SchedulerState, ts: TaskState) -> dict:
@cfunc
@exceptval(check=False)
def _task_to_client_msgs(state: SchedulerState, ts: TaskState) -> dict:
report_msg: dict = _task_to_report_msg(state, ts)
client_msgs: dict
if ts is None:
# Notify all clients
client_msgs = {k: [report_msg] for k in state._clients}
else:
# Notify clients interested in key
cs: ClientState
client_msgs = {cs._client_key: [report_msg] for cs in ts._who_wants}
return client_msgs
if ts._who_wants:
report_msg: dict = _task_to_report_msg(state, ts)
if report_msg is not None:
cs: ClientState
return {cs._client_key: [report_msg] for cs in ts._who_wants}
return {}


@cfunc
Expand Down
5 changes: 5 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ async def test_delete_data(c, s, a, b):
async def test_delete(c, s, a):
x = c.submit(inc, 1)
await x
assert x.key in s.tasks
assert x.key in a.data

await c._cancel(x)
Expand All @@ -422,6 +423,10 @@ async def test_delete(c, s, a):
await asyncio.sleep(0.01)
assert time() < start + 5

assert x.key not in s.tasks

s.report_on_key(key=x.key)


@gen_cluster()
async def test_filtered_communication(s, a, b):
Expand Down

0 comments on commit b577ece

Please sign in to comment.