Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor all event handlers #6410

Merged
merged 12 commits into from
Jun 7, 2022
Merged

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented May 22, 2022

Part of #5736

@crusaderky crusaderky self-assigned this May 22, 2022
@github-actions
Copy link
Contributor

github-actions bot commented May 22, 2022

Unit Test Results

       15 files  +       15         15 suites  +15   6h 35m 10s ⏱️ + 6h 35m 10s
  2 852 tests +  2 852    2 736 ✔️ +  2 736    82 💤 +  82  31 +31  3 🔥 +3 
21 129 runs  +21 129  20 146 ✔️ +20 146  946 💤 +946  34 +34  3 🔥 +3 

For more details on these failures and errors, see this check.

Results for commit 42d399f. ± Comparison against base commit 7d280fd.

♻️ This comment has been updated with latest results.

@functools.singledispatchmethod
def handle_event(self, ev: StateMachineEvent) -> RecsInstrs:
raise TypeError(ev) # pragma: nocover

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved so that I didn't have to move the various refactored functions, thus minimising the diff.
This is temporary - this and all the registered methods will be moved to worker_state_machine.py.

@crusaderky crusaderky marked this pull request as ready for review May 22, 2022 22:16
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
crusaderky added a commit to crusaderky/distributed that referenced this pull request May 25, 2022
@pentschev
Copy link
Member

rerun tests

Note: rerunning gpuCI tests since those errors should be fixed by #6434

@crusaderky crusaderky linked an issue May 30, 2022 that may be closed by this pull request
@hayesgb hayesgb requested a review from gjoseph92 June 6, 2022 14:55
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. All comments are nits and could be ignored, besides the list->set question and using descriptive function names.

ts = self.tasks[key]
who_has[key] = {ws.address for ws in ts.who_has}

who_has = {key: [ws.address for ws in self.tasks[key].who_has] for key in keys}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
who_has = {key: [ws.address for ws in self.tasks[key].who_has] for key in keys}
who_has = {key: {ws.address for ws in self.tasks[key].who_has} for key in keys}

This was changed from a list to a set, was that intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from a set to a list. yes, as the set doesn't offer any useful feature here and a list is fractionally faster. This also makes it coherent with Scheduler.get_who_has.

"who_has": who_has,
"stimulus_id": stimulus_id,
},
)

def request_remove_replicas(self, addr: str, keys: list, *, stimulus_id: str):
def request_remove_replicas(
self, addr: str, keys: list[str], *, stimulus_id: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, addr: str, keys: list[str], *, stimulus_id: str
self, addr: str, keys: Iterable[str], *, stimulus_id: str

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the variable is sent as-is through the msgpack RPC, so only a list, set, or tuple are accepted.

return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"}

@handle_event.register
def _(self, ev: UpdateDataEvent) -> RecsInstrs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _(self, ev: UpdateDataEvent) -> RecsInstrs:
def handle_update_data(self, ev: UpdateDataEvent) -> RecsInstrs:

Could we please name these functions recognizably, even though they're singledispatch handlers? It makes it much easier to navigate the code. I usually fuzzy-find search for terms like update data to find the handler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +503 to +506
if isinstance(self.run_spec, dict):
self.run_spec = SerializedTask(**self.run_spec)
elif not isinstance(self.run_spec, SerializedTask):
self.run_spec = SerializedTask(task=self.run_spec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit nitpicky, feel free to ignore. But it might be nice to use init-only variables for function, args, kwargs, and task, make run_spec a non-init field, and go back to this when constructing the message:

    if isinstance(ts.run_spec, dict):
        msg.update(ts.run_spec)
    else:
        msg["task"] = ts.run_spec

Just a little more explicitness and type checking around what has to be passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I packed it this way was to avoid replicating all the (fairly convoluted) annotations in the SerializedTask class. The SerializedTask class is messy to begin with - given more time, I'd redesign it thoroughly.

ev2 = ev.to_loggable(handled=11.22)
assert ev2.handled == 11.22
assert ev2.run_spec == SerializedTask(task=None)
assert ev.run_spec == SerializedTask(function=b"blob", args=b"blob")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert ev.run_spec == SerializedTask(function=b"blob", args=b"blob")
assert ev.run_spec == SerializedTask(function=b"blob", args=b"blob")
assert not ev.handled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a feature we want to test, just an implementation detail. As a matter of fact, the base method StateMachineEvent.to_loggable overwrites the original object for the sake of performance.

)
ev2 = ev.to_loggable(handled=11.22)
assert ev2.handled == 11.22
assert ev2.data == {"x": None, "y": None}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert ev2.data == {"x": None, "y": None}
assert ev2.data == {"x": None, "y": None}
assert not ev.handled
assert ev.data == {"x": "foo", "y": "bar"}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@crusaderky crusaderky merged commit bd98e66 into dask:main Jun 7, 2022
@crusaderky crusaderky deleted the WSMR/refactor_handlers branch June 7, 2022 00:15
jsignell pushed a commit that referenced this pull request Jun 24, 2022
Hotfix for #6624 by reverting the compute-task message format almost to the original state before #6410
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Yank state machine out of Worker class
3 participants