Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StateMachine event dispatch mechanism #5894

Closed
Tracked by #5736 ...
fjetter opened this issue Mar 3, 2022 · 2 comments
Closed
Tracked by #5736 ...

StateMachine event dispatch mechanism #5894

fjetter opened this issue Mar 3, 2022 · 2 comments
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Mar 3, 2022

This is a task part of a bigger epic #5736

The WorkerState interface as described in #5736 (comment) will accept and emit events the Worker (server) needs to listen to. These events may require us to send a message, schedule a coroutine, schedule a task on the threadpool, etc.
These event handlers themselves emit again a set of events that needs to be put into the WorkerState. This callback system works similar to how our RPC implementation works, pseudo code below

class WorkerBase(abc.ABC):
    batched_stream: BatchedSend
    state: WorkerState
    instruction_history: list[StateMachineEvent]

    def _handle_stimulus_from_future(self, fut):
        try:
            stim = fut.result()
        except Exception:
            # This must never happen and the handlers must implement exception handling.
            # If we implement error handling here, this should raise some exception that
            # can be immediately filed as a bug report
            raise
        for s in stim:
            self._handle_stimulus(s)

    def _handle_stimulus(self, stim: StateMachineEvent):
        self.instruction_history.append(stim)
        instructions = self.state.handle_stimulus(stim)
        for inst in instructions:
            task = None
            # TODO: collect all futures and await/cancel when closing?
            # TODO: This dispatch should be easily modifiable to allow for easier testing
            if isinstance(inst, GatherDep):
                task = asyncio.create_task(self._gather_data(inst))
            elif isinstance(inst, Execute):
                task = asyncio.create_task(self.execute(inst))
            elif isinstance(inst, SendMsg):
                self.batched_stream.send(inst.payload)
            else:
                raise RuntimeError("Unknown instruction")
            if task:
                task.add_done_callback(self._handle_stimulus_from_future)

    @abc.abstractmethod
    async def execute(self, inst: Execute) -> Collection[StateMachineEvent]:
        raise NotImplementedError

    @abc.abstractmethod
    async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:
        raise NotImplementedError
  • A definition of input/output events exit (in the design document called StateMachineEvents and Instructions)
  • Worker._handle_stimuls implements an dispatch system dispatching events to handlers
  • The handlers follow a strict API of method(self, input_event) -> Collection[StateMachineEvent]
  • The handler output is fed again into the state machine
  • This callback mechanism has dedicated unit tests for sync and async handlers
  • Event handlers are not allowed to raise exceptions but instead need to return appropriate events

To incorporate this callback method into our current code, the signatures of the transition functions can be changed to return a tuple tuple[Recommendations, Collection[Instructions]] (i.e. instructions instead of messages) where the scheduler messages are replaced by appropriate Instructions

@crusaderky
Copy link
Collaborator

@fjetter to clarify, does this issue cover the first 3 bullet points from #5736 (comment) ?

@fjetter
Copy link
Member Author

fjetter commented Mar 7, 2022

Yes, this is the first bullet point

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants