Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Worker) State Machine determinism and replayability #5736

Closed
9 tasks done
fjetter opened this issue Jan 31, 2022 · 12 comments
Closed
9 tasks done

(Worker) State Machine determinism and replayability #5736

fjetter opened this issue Jan 31, 2022 · 12 comments

Comments

@fjetter
Copy link
Member

fjetter commented Jan 31, 2022

Connected tasks


The dask scheduling logic on scheduler and worker side are using the model of a
finite state machine to calculate decisions. A few definitions about a finite
state machine first

  • A deterministic finite state machine is a mathematical model of a
    computation describing an abstract machine that has a finite number of
    distinct states. Given a stimulus S_i and a state W_i, there is a function
    F such that a new state W_i+1 can be calculated as F(W_i, S_i) -> W_i+1
  • The only way to change the state W_i is to apply a transformation F with a
    stimulus S_i
  • Given an initial state W_0 and the sequence of all stimuli S_i, it
    is possible to calculate state W_i by applying the transformation F
    sequentially for all i

How does this model apply to us and where do we violate it?

Most of these arguments can be made for the scheduler as well but we'll restrict
ourselves to the Worker to keep the scope contained.

The worker state is defined primarily by the Worker.tasks dictionary including
TaskState objects with various task specific attributes. On top of this, there
are a few Worker attributes which hold global or remote-worker specific
attributes. A few examples include Worker.data_needed, Worker.has_what,
Worker.pending_data_per_worker, Worker.ready but the list goes on. We
currently do not properly distinguish the state machine attributes from the
server / networking code / other code.

The function F is a bit more difficult to define. Naively one would expect
this to be Worker.transitions but this is not the case since it does not
accept stimuli. Worker.transitions, in short T, accepts a set of already
made decisions we call recommendations. The recommendations are generated by
stimuli handler H, like Worker.handle_task_compute,
Worker.handle_free_keys. Therefore, to define the state transition function we
need a combination of H and T, M ~ T * H, such that W_i+1 = M(W_i, S_i) = T(W_i, H(W_i, S_i)). Our implementation of handlers introduces a certain ambiguity
since it is not entirely clear whether a piece of logic should reside on side of the
handler or the transition function.
However, every decision should be the result of the stimulus and the current state
such that, given all stimuli in order and the initial state, we can reconstruct every iteration.

There are three (actually four) places where this pattern is violated
and the stimulus generation is not only tightly coupled to the handling and transition itself
but also coupled to asynchronous actors.

Specifically dependency gathering (Worker.gather_dep) but also to a softer
extend task execution (Worker.execute) breaks the above pattern since they
simultaneously are generating stimuli and are handling them while interacting
with an async actor (i.e. remote worker or threadpool). There is no way to
inspect, assert or pause the state machine naturally. This prohibits writing
effective tests, increases instability and renders deterministic replayability
impossible.

Worse even are the ensure_communicating and ensure_computing methods which
are triggered in various places of our code to work off the queue of
read-to-compute / ready-to-be-fetched tasks. This pattern effectively delays
state transitions and performs a set of these transitions in bulk which is
benefitial to dependency fetching. However, they are called recursively
(actually rather something like pseudo recursively in the context of
ensure_communicating -> async gather_dep -> ensure_communicating).

Pseudo code below

def stimuls_XY(...):
    # First decision: *What* to fetch

    transition(ts, "released->fetch")
    data_needed.push(ts)

def ensure_communicating():
    # Second decision: Is there *capacity* to fetch?
    while is__gather_channel_available():
        ts = data_needed.pop()
        transition(ts, "fetch->flight")
        loop.add_callback(gather_dep, ts)


async def gather_dep(ts, ...):
    try:
        if not assumptions_still_valid(ts):
            # Another decision might have cancelled this fetch already
            return
        data = await fetch_data()
    finally:
        match response:
            case response.get('busy'):
                await sleep(a_while)
                retry_fetch(ts)
            case response.get('error'):
                flag_remote_dead(worker)
                reschedule_fetch(ts)
            case response.get('data'):
                transition(ts, "memory")
            case not response.get('data'):
                stale_information(ts, worker) # remove who_has / find-missing

        # Recursively loop into ensure_communicating
        ensure_communicating()

Problems

  • The knowledge about capacity to fetch is not encoded in the state machine. This
    requires us to have the infinite callback ensure_* to check periodically if
    something changed that would allow us to fetch a new dependency. Instead,
    this should be a stimulus to the state machine as well since this change in
    something is always connected to a stimulus somehow
  • There is no way to stop the state machine and freeze it in a specific
    iteration i since it is always moving. We frequently need to deal with an
    intermediate state (one of the motivations for the states resumed and
    cancelled)
  • Dealing with intermediate state exposes us to many edge cases to consider
    ultimately resulting in an unstable implementation
  • There is currently no way to know exactly what the stimuli were. While we're
    logging their outcome as part of the transition log we can sometimes only
    guess what the trigger was. For some stimulus handlers we do log the
    incoming stimulus as part of the transition log, e.g. the only stimulus body
    of handle_free_keys are a list of keys which is what we append to our log.
    For more complicated handlers like handle_compute_task we do not do this. If
    we start doing this, we should ensure not to log too much information and only
    restrict the logged info to what is relevant to the state machine, e.g.
    TaskState.runspec is not relevant to the state machine and we should
    therefore not remember it to reduce the memory footprint.
  • By not being able to define the state at a given time i it is impossible to
    write proper deterministic tests for the state machine. Instead, we rely on
    sophisticated mocking to construct a very timing sensitive intermediate state.

Given the state machine is extended sufficiently with the information to make
the capacity decision, the ensure_* loops can be removed such that the entire
state machine can be calculated deterministically and synchronously. Every interaction
with an asynchronous actor will be mapped as a set of out-/inbound stimuli.
This will allow us to

  • Log every stimulus [1] and reconstruct the state at any given time i
  • write specific fakes which would interact with the state machine using
    signals, e.g.
async def gather_dep(worker: str, keys: Collection[str]) -> list[dict]:
    try:
        response = await fetch_data(worker, keys)
    except:
        return [{"stimulus": "gather_dep_failed", "keys": keys}]
    finally:
        stimuli = []
        if busy:
            return [{"stimulus": "gather_dep_remote_busy", "keys": keys}]
        else:
            return [
                {
                    "stimulus": "gather_dep_response",
                    "key": key,
                    "data": data,
                    "nbyte": sizeof(data)
                } for key, data in response.get("data")
            ]

async def gather_dep_fake(worker: str, keys: Collection[str]) -> list[dict]:
    """This fake will emulate a failure with an exception when connecting to worker A and will
    return an empty result when fetching `missing` from B.
    Otherwise it will return data as expected"""

    # No need to do *anything* asynchronous here. This is only a
    # coroutine function to keep interfaces and signatures clean
    # but ideally there is no IO
    if worker == "A":
        return [{"stimulus": "gather_dep_failed", "keys": keys}]
    elif worker == "B":
        res = []
        for k in keys:
            if k == "missing":
                res.append({
                    "stimulus": "gather-dep-missing",
                    "key": "key"
                })
            else:
                res.append({
                    "stimulus": "gather_dep_response",
                    "key": key,
                    "data": DummyData(),
                    "nbyte": 42
                })
        return res
    else:
        return [{
            "stimulus": "gather_dep_response",
            "key": key,
            "data": DummyData(),
            "nbyte": 42
        } for key in keys]

[1] Logging all stimuli is actually cheaper than logging all transitions since one stimulus usually triggers many, many recommendations and transitions

@fjetter
Copy link
Member Author

fjetter commented Feb 15, 2022

A next step on this iteration with a bit of pseudo code.

The expectation should be that the worker state is indeed its own synchronous subclass. Pulling all the state out into its own class allows us to write targeted unit tests without invoking any concurrent or asynchronous code. It also allows us to write more specific fakes or mocks as outlined above.

Interface to WorkerState

This will require us to specify the API in which the server and the state communicate since the state will make decisions (execute/compute a result or fetch data from remote) the server needs to act on. Similarly, either remote servers or the result of an execution/fetch need to be fed into the state to trigger transitions and calculate a new decision/instruction. In the following I will mark these events with classes called Instruction as the decision the WorkerState calculated. These instructions will tell the server to either send a msg, fetch data, compute a result, etc. and will typically trigger a coroutine. Similarly, all simuli created, every input to the state machine is defined as a StateMachineEvent or stimulus.

The following uses dataclasses to denote these events to improve readability on the signatures but this is not a requirement for the implementation.

Selection of Instructions/Events
@dataclass
class Instruction:
    instruction_id: str


@dataclass
class GatherDep(Instruction):
    worker: str
    to_gather: Collection[TaskState]


@dataclass
class FindMissing(Instruction):
    ...


@dataclass
class Execute(Instruction):
    ...


@dataclass
class SendMsg(Instruction):
    payload: dict
    # One of the following remote actions
    # Messages emitted by the executor
    # - task-erred; This is technically not emitted by the state machine but by the executor
    # - reschedule; This one is also emitted by the executor
    # - task-finished; Executor

    # - release-worker-data; State machine / remove ACK/confirm
    # - long-running; Basically a user trigger during execution
    # - add-keys; Manual update_data, fix scheduler state if remove-replica fails (StateMachine),


@dataclass
class StateMachineEvent:
    stimulus_id: str


@dataclass
class HandleComputeTask(StateMachineEvent):
    key: str
    who_has: dict
    priority: tuple


@dataclass
class RemoveReplicas(StateMachineEvent):
    keys: Collection[str]


@dataclass
class GatherDepSuccess(StateMachineEvent):
    data: object


@dataclass
class GatherDepError(StateMachineEvent):
    worker: str
    exc: Exception


@dataclass
class GatherDepMissing(StateMachineEvent):
    key: str


@dataclass
class Pause(StateMachineEvent):
    ...


@dataclass
class UnPause(StateMachineEvent):
    ...


@dataclass
class GatherBusy(StateMachineEvent):
    attempt: int


@dataclass
class RemoteDead(StateMachineEvent):
    worker: str
    exception: Exception


@dataclass
class ExecuteSuccess(StateMachineEvent):
    ts: TaskState
    data: object


@dataclass
class ExecuteFailure(StateMachineEvent):
    ts: TaskState
    exception: Exception


@dataclass
class Reschedule(StateMachineEvent):
    ...

Embedding of WorkerState into Worker server / Asyncio dispatch

Given the set of StateMachineEvent and Instruction, the API for the WorkerState can then be cleanly defined by a single method call WorkerState.handle_stimulus(self, event: StateMachineEvent) -> Collection[Instruction] which handles the event dispatch transparently and logs every input event in order.

WorkerState pseudo code
_TRANSITION_RETURN = tuple[dict, Collection[Instruction]]


class WorkerState:

    data: SpillBuffer
    state_event_log: list[StateMachineEvent]
    tasks: dict[str, TaskState]

    def handle_stimulus(self, event: StateMachineEvent) -> Collection[Instruction]:
        self.state_event_log.append(event)
        if isinstance(event, RemoveReplicas):
            return self._handle_remove_replicas(event)
        else:
            raise RuntimeError(f"Unknown stimulus {event}")

    def _handle_remove_replicas(self, event: RemoveReplicas) -> Collection[Instruction]:
        # This is where the current transition logic would reside and the
        # transition chain would be triggered. No new logic other than
        # unwrapping the event dataclasses if we were to use them
        ...

    def handle_compute_task(self, event: HandleComputeTask) -> Collection[Instruction]:
        ...

    # This is effectively the logic of ``ensure_communicating`` but the
    # transition logic is separated from coroutine scheduling. It is also only
    # called during transition OR when unpausing
    def _ensure_communicating(self, stimulus_id: str) -> _TRANSITION_RETURN:
        recommendations = {}
        instructions = []

        # This is pseudo/simplified code of what is currently in
        # ensure_communicating
        while self.data_needed and (
            len(self.in_flight_workers) < self.total_out_connections
            or self.comm_nbytes < self.comm_threshold_bytes
        ):
            next_ = self.data_needed.pop()
            worker = self.worker_to_fetch_from(next_)
            to_gather = self.select_keys_for_gather(worker, next_)
            recommendations.update({k: ("flight", worker) for k in to_gather})
            instructions.append(GatherDep(stimulus_id, worker, to_gather))

        return recommendations, instructions

    def transition_released_fetch(
        self, ts: TaskState, *, stimulus_id: str
    ) -> _TRANSITION_RETURN:
        for w in ts.who_has:
            self.pending_data_per_worker[w].push(ts)
        ts.state = "fetch"
        ts.done = False
        self.data_needed.push(ts)
        # The current released->fetch does never return any content
        return self._ensure_communicating(stimulus_id)

    async def memory_monitor(self):
        def check_pause(memory):
            if unpaused:  # type: ignore
                self.handle_stimulus(UnPause())

        check_pause(42)

As can be seen by the pseudo code above, the logic currently implemented would only slightly be shifted around but not fundamentally changed. Primarily the ensure_communicating (and similarly ensure_computing) would become private and may be called in selected transitions and/or when unpausing.

Since every event is logged and this is the only way to modify the state, this allows us to deterministically reconstruct any state given the input events.

Worker server code

Embedding this API into a working server class will require us to mix sync and async tasks and requires us to deal with state API calling and dispatching the received Instructions.

The ambition should be to isolate any interaction with an asynchronous actor as well as possible and hide it behind an internal API to allow for easier mocking/faking, e.g. by defining async def _gather_data(self, inst: GatherDep) -> Collection[Instruction] which exclusively handles the fetch of data but will pass off control as soon as the data returns without modifying any state.

Psuedo code for Worker server code (base class)
class WorkerBase(abc.ABC):
    batched_stream: BatchedSend
    state: WorkerState
    instruction_history: list[StateMachineEvent]

    def _handle_stimulus_from_future(self, fut):
        try:
            stim = fut.result()
        except Exception:
            # This must never happen and the handlers must implement exception handling.
            # If we implement error handling here, this should raise some exception that
            # can be immediately filed as a bug report
            raise
        for s in stim:
            self._handle_stimulus(s)

    def _handle_stimulus(self, stim: StateMachineEvent):
        self.instruction_history.append(stim)
        instructions = self.state.handle_stimulus(stim)
        for inst in instructions:
            task = None
            # TODO: collect all futures and await/cancel when closing?
            if isinstance(inst, GatherDep):
                task = asyncio.create_task(self._gather_data(inst))
            elif isinstance(inst, Execute):
                task = asyncio.create_task(self.execute(inst))
            elif isinstance(inst, SendMsg):
                self.batched_stream.send(inst.payload)
            else:
                raise RuntimeError("Unknown instruction")
            if task:
                task.add_done_callback(self._handle_stimulus_from_future)

    @abc.abstractmethod
    async def execute(self, inst: Execute) -> Collection[StateMachineEvent]:
        raise NotImplementedError

    @abc.abstractmethod
    async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:
        raise NotImplementedError

Given the above base class, we can then implement the remote interaction or the fake that emulates the test condition we're looking at

class Worker(WorkerBase):
    async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:

        try:
            result = await get_data_from_worker(inst)
            # We might want to update *server* related state, like bandwidth measurements, etc. here
            # *no* task state specific updates are allowed here, i.e. no interaction to `WorkerBase.state`
            if response["status"] == "busy":
                return [GatherBusy("new-stim", inst.worker)]
        except Exception as exc:
            return [GatherDepError("new-stim", inst.worker, exc)]
        return [GatherDepSuccess("new-stim", inst.worker, response)]


class FakeWorker(WorkerBase):
    """This class can be used for testing to simulate a busy worker A"""

    async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:
        if inst.worker == "A":
            return [GatherBusy("foo", 2)]
        else:
            return [GatherDepSuccess("foo", "bar") for _ in inst.to_gather]

Reconstruct state based on history

Given all of the above, it should also easily be possible to reconstruct a
WorkerState at any given time, given the history of StateMachineEvents and
possibly even initialize a fully working Worker with this information

async def get_worker_from_history(history: Collection[StateMachineEvent]):
    state = WorkerState()
    for event in history:
        _ = state.handle_stimulus(event)  # ignore instructions

    # This _should_ now be a functional worker at the given time
    return await Worker(..., state=state)  # type: ignore

@mrocklin
Copy link
Member

A few thoughts:

  1. In general pulling out the state machine from the Worker seems fine to me
  2. I am nervous about mocks. I would rather that we define a solid interface/protocol around the state machine, and then test the inputs and outputs of that machine. I would be nervous about trying to simulate the full system with mocking. We used to do a lot of testing of internal interfaces early on, and I found that those tests quickly became a liability rather than an asset as the system evolved. Feel free to overrule me on this, but there was a lot of pain here historically. I would ask that any tests that we write have longevity in mind.
  3. If we are building a more formal state machine then I'd also be curious what that looks like. Most of this issue so far is around motivation. It would be interesting to iterate on design a little before jumping in if that is possible.

@mrocklin
Copy link
Member

If we are going to do something in the scheduler, it would be interesting to think about designing both at the same time, or at least making sure that what happens with one can also be applied to the other. It would be nice to have fewer systems if that's possible.

@fjetter
Copy link
Member Author

fjetter commented Feb 22, 2022

I am nervous about mocks. I would rather that we define a solid interface/protocol around the state machine, and then test the inputs and outputs of that machine. I would be nervous about trying to simulate the full system with mocking. We used to do a lot of testing of internal interfaces early on, and I found that those tests quickly became a liability rather than an asset as the system evolved. Feel free to overrule me on this, but there was a lot of pain here historically. I would ask that any tests that we write have longevity in mind.

My intention is to establish such a protocol / interface. Right now we are forced to work with mocks. For instance, below is a quite horrible but necessary example since we do not have a clean interface. Locks, events, mocks, fragile waits. All of this structure is attempting to create a situation where the internals of gather_dep behave in a very specific, timing sensitive way.
In a sense, this proposal is about how to break up Worker.gather_dep (similar problem for Worker.execute; there it's less pronounced since the phase space is smaller) and put up sane, internal APIs

import distributed
with mock.patch.object(distributed.worker.Worker, "gather_dep") as mocked_gather:
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
fut2 = c.submit(inc, fut1, workers=[a.address], key="f2")
await fut2
fut4 = c.submit(sum, fut1, fut2, workers=[b.address], key="f4")
fut3 = c.submit(inc, fut1, workers=[b.address], key="f3")
fut2_key = fut2.key
await _wait_for_state(fut2_key, b, "flight")
while not mocked_gather.call_args:
await asyncio.sleep(0)
fut4.release()
while fut4.key in b.tasks:
await asyncio.sleep(0)
assert b.tasks[fut2.key].state == "cancelled"
args, kwargs = mocked_gather.call_args
assert fut2.key in kwargs["to_gather"]
# The below synchronization and mock structure allows us to intercept the
# state after gather_dep has been scheduled and is waiting for the
# get_data_from_worker to finish. If state transitions happen during this
# time, the response parser needs to handle this properly
lock = asyncio.Lock()
event = asyncio.Event()
async with lock:
async def wait_get_data(*args, **kwargs):
event.set()
async with lock:
return await distributed.worker.get_data_from_worker(*args, **kwargs)
with mock.patch.object(
distributed.worker,
"get_data_from_worker",
side_effect=wait_get_data,
):
gather_dep_fut = asyncio.ensure_future(
Worker.gather_dep(b, *args, **kwargs)
)
await event.wait()
fut4 = c.submit(sum, [fut1, fut2], workers=[b.address], key="f4")
while b.tasks[fut2.key].state != "flight":
await asyncio.sleep(0.1)
await gather_dep_fut
f2_story = b.story(fut2.key)
assert f2_story
await fut3
await fut4

With this proposal we can instead provide a list of input events and assert on output events.

I do not want to advocate for wide spread mocking. Instead I want to establish an internal, narrow API which can be used to implement instrumentation and, if necessary, mocking. Mocking is harmful if done for unstable or ill-defined APIs but as long as the behaviour of this API is well understood, mocking is relatively safe.
For instance, the behaviour of _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent] can be very well described. It accepts a struct including information about fetching data, it does not alter any state, it returns a set of well defined events (Busy, exception, missing, result).

FWIW, the gather dep implementation was connected to most deadlocks I debugged over the past months.

If we are building a more formal state machine then I'd also be curious what that looks like. Most of this issue so far is around motivation. It would be interesting to iterate on design a little before jumping in if that is possible.

Most of this issue so far is around motivation.

I agree that I haven't properly put down requirements, constraints, etc. but I would argue that my pseudo code in the drop downs of #5736 (comment) is close to working assuming the logic was filled in. The primary reason why this is not a PR is because of the many tests we have that currently mock internals and this will require a bit of time.

It would be interesting to iterate on design a little before jumping in if that is possible.

What I'm proposing is that the worker state machine has the public interface

class WorkerState:
    all_possible_events: list[StateMachinEvent] = [
        GatherDep,
        ComputeTask,
        RemoveReplicas,
        ExecuteSuccess,
        Pause,
        ...
    ]
    all_instructions: list[Instructions] = [
        Execute,
        GatherDep,
        SendMsg,
        FindMissing,
        ...
    ]

    data: SpillBuffer
    state_event_log: list[StateMachineEvent]
    tasks: dict[str, TaskState]  # Maybe private?

    def handle_stimulus(self, event: StateMachineEvent) -> Collection[Instruction]:
        ...

All current transitions, simulus handlers, etc. will be encapsulated.

We might want expose a bit more for instrumentation but in terms of functionality everything else should be a black box. In fact, I'm not even sure if the rest of the worker even needs to know about the tasks dict.


An important thing I might not have stressed enough, this design would effectively allow us to replace big parts of the cluster_dump functionality in favour of a dump of the state_event_log which is much more compact and easier to read.


If we are going to do something in the scheduler, it would be interesting to think about designing both at the same time, or at least making sure that what happens with one can also be applied to the other. It would be nice to have fewer systems if that's possible.

The control flow of the scheduler is already much better organized and implemented using the stimulus/event handler pattern. That's basically the requirement of this proposed design. I haven't thought much more about the internals but I assume we will stick to the transitions/recommendations approach and the worker/scheduler may share this piece of logic if we want to. However, everything else will very likely need to be different since the state is fundamentally differently described.

@fjetter
Copy link
Member Author

fjetter commented Feb 22, 2022

I'm happy to iterate further. Do you have suggestions on how, i.e. format? Would you prefer a PR?

@mrocklin
Copy link
Member

Ah, I missed the summary blocks before.

Another question, I was suprised to see data in the WorkerState. Do we still have async data access to get things from disk/device/host memory? Does this mess with the sync preference in WorkerState? Or are we just checking for if a key is in data and so everything is ok as long as those checks are synchronous?

@fjetter
Copy link
Member Author

fjetter commented Feb 22, 2022

Another question, I was suprised to see data in the WorkerState. Do we still have async data access to get things from disk/device/host memory? Does this mess with the sync preference in WorkerState? Or are we just checking for if a key is in data and so everything is ok as long as those checks are synchronous?

data is the one component I'm not entirely sure where it should be. The worker state actually doesn't actually care about it and shouldn't care about it other than knowing what kind of keys are in there.
I'm not too concerned about the async disk but that likely depends on how it is implemented. asyncio does not support any async disk so whatever we do will be done in a thread to unblock the event loop so I guess we can even keep a synchronous setitem/getitem and move data around in a background thread. I'm comfortable kicking this down the road but if you are concerned we can spend a bit more time on the async disk to avoid any major obstacles

Edit: I think keeping a synchronous interface is possible for setitem since a setter should not care if the data is on disk or in memory. Having the same for getitem is not possible for obvious reasons. we'd still need to unblock the event loop 🤦

xref #4424

@mrocklin
Copy link
Member

sync setitem might also not work if you want backpressure

If all you're doing is contains checks though then that could make sense.

No need to block on this. It just seemed off. I agree though that this isn't likely to block 99% of the design and so we should move forward.

@fjetter
Copy link
Member Author

fjetter commented Feb 28, 2022

If we want to implement something like this incrementally, my suggestion on how to split this effort up is as follows

  • Introduce a Instruction event type that replaces all scheduler messages in the transition engine with appropriate SendMessage instructions. This will already require the implementation of WorkerState.handle_stimulus and Worker._handle_stimulus as outlined about. For the first step, the methods might need to be called differently since we're still in the same class
  • Once this is set up, we can replace the loop.add_callback during the "->executing" transition here and here. This will require us to implement a "dispatch futures and handle their result" mechanism (see above _handle_stimulus_from_future)
  • The same should now be doable for gather_dep in here. This is tougher since it doesn't happen during a transition. Therefore this will require a bit of a bigger refactoring to move the ensure_communicating logic to the transition system as outlined above using _ensure_communicating
  • Lastly, the state related logic and the server related logic should be easily separable into dedicated subclasses

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 2, 2022

A bunch of observations in advance of our meeting later today:

Rebuild from log

I don't think it's possible to rebuild a WorkerState from a list of StateMachineEvent, unless such a list never expires (which would be a memory leak).
In order to implement an expiration logic, you'd need in turn logic to determine which events are still relevant for the purpose of the final state and which aren't. Events that are still relevant (and thus are needed to rebuild the WorkerState) must be kept indefinitely; irrelevant ones can go in a fixed-size deque like Worker.log for forensic purposes only.

For Worker-wide events, this is not a trivial operation.
For events that impact more than one key, this would require building a tracking system that tells you when the last of the keys in the event has been forgotten.

Even for single-key events, this is far from trivial. Example:

  • Initial state: Task b depends on task a. b.status=running, a.status=memory
  • WorkerState receives event RemoveReplicas(keys=[a])
  • removal refused because a is a dependency of b
  • b terminates. a was not a transitory copy and stays on the worker
  • b is forgotten for whatever reason; this triggers all events of b to be moved to the deque of irrelevant events.
  • further events cause all events of b to be popfirst'ed out of the deque.
  • event dump
  • rebuild WorkerState from list of events
  • there are no events for b. As a consequence, when RemoveReplicas(keys=[a]) arrives, a is not a dependency of anything and the removal is successful.
  • Your rebuilt WorkerState ends up without a, while the original had it.

The only way I can think to track the above is, in the handler of RemoveReplicas, to note down that the existence of b changed the decision for a, therefore from now on all the events of b are relevant to a for the purpose of rebuilding the WorkerState. This can quickly translate into large branches of the whole dask graph to be kept on the worker for as long as any of the keys of that graph are on the worker.

The alternative to all this is to educate our users that, before their computation hangs, they have to start the cluster with a config flag distributed.workerstate.log_length: inf, which they can't have in production because it's a memory leak; otherwise not only it won't be safe to rebuild the WorkerState on our side, but we'll have no way to know if the rebuild is accurate or not.

memory_monitor and SpillBuffer

I think that memory_monitor and the construction of the SpillBuffer should be moved to a WorkerExtension and that could be a first (simple) PR as the whole thing is fairly self-contained. This would highlight all the contact points between the spill mechanism and the worker state (namely, switching between Status.paused and Status.running).

After we break out the WorkerState, data should just be an opaque MutableMapping created by the Worker and passed to WorkerState.__init__.

Regarding the possibility of asynchronous spilling: as @fjetter was pointing out, __setitem__ and __delitem__ can be synchronous and simply spawn tasks that are handled internally by the SpillBuffer. __getitem__ can remain synchronous as far as WorkerState is concerned; Worker (and Worker alone) however will need to be aware that instead of the actual value it may receive a Future to it. Finally, Worker.close() will need to invoke

cast(SpillBuffer, self.state.data).clear_all_dangling_tasks()

Info from the Worker

In the mock above, WorkerState._ensure_communicating is accessing a wealth of attributes that have no right to be in WorkerState:

            len(self.in_flight_workers) < self.total_out_connections
            or self.comm_nbytes < self.comm_threshold_bytes

Is this info copied over from Worker to WorkerState through a class UpdateWorkerResources(StateMachineEvent)?
Also note that WorkerState._ensure_communicating itself must decrease these measures to know where to stop, so... are they going to be refreshed from the Worker? Do we need to figure out on the Worker which events will require a follow-up with a UpdateWorkerResources event later on? How do we robustly guarantee that Worker and WorkerState don't fall out of sync? Are we going to compulsively send a UpdateWorkerResources ahead of every chain of events?

@fjetter
Copy link
Member Author

fjetter commented Mar 3, 2022

Rebuild from log

I understand the limitation of this reconstruction attempt.

Right now, we're logging 100k transitions on worker side and distributed.scheduler.transition-log-length (100k) on scheduler side. I anticipate that we can abandon the transition log eventually and the new event logging should not only require less memory per event (such that we can increase the limit) but also require fewer logs to build the history and therefore we should effectively be able to persist more information than what we currently do. While a reconstruction is still not possible, having a more thorough history should be helpful.

I also don't mind telling users that in these exceptional situations we'll require a event-log: inf. That's a "small price to pay" to get a reproducer for some of the issues we're facing.

Also, where I consider this reconstruction to be actually used is in unit tests

memory_monitor and SpillBuffer

To address this we chose to go for a small refactoring that moves the memory monitor and spill buffer into an extension to encapsulate the logic there, see #5891 This extension will eventually also emit events to the state machine, like Pause or Unpause

Info from the Worker

These are actually great examples where our current attributes are semantically slightly misleading. Taking as an example the tuple in_flight_workers and total_out_connections. How they are used is as follows

  • total_out_connections: Maximum number of concurrent gather_dep requests
  • in_flight_workers : Currently scheduled numbers of gather_dep requests

They technically do not limit any connections or are tracking in "flight workers" but are controlling how many transitions we are performing. the phrasing total_out_connections suggests this was a commpool limit but it isn't.

Renaming them should already be sufficient to sharpen their meaning and definition and would allow us to define them as internal attributes to the WorkerState. For example (naming is hard!)

  • total_out_connections -> max_gather_data_requests
  • in_flight_workers -> num_scheduled_gather_data_requests

The same exercise can be done with the comm_nbytes since they are semantically rather

  • comm_nbytes: Sum of expected managed memory currently scheduled to be gathered from remote
  • comm_threshold_bytes: Maximum threshold over all scheduled gather_remote_data calls

Again, it's not actually connected to the comm or even connected to the real sizes we're submitting but rather a measure of what we estimate and make a decision on.

This is a great example how this refactoring can help with readability since in the end our concurrency control is not coupled to a ThreadPool or a CommPool limit but rather exclusively based on what kind of task transitions we are allowing.

@fjetter
Copy link
Member Author

fjetter commented Sep 1, 2022

This is now in. Thanks everyone for participating in the design and development. Special thanks to @crusaderky who did most of the heavy lifting!

The primary effort was to address deadlocks. Over the course of the implementation of this, we expanded test coverage significantly. We don't expect any more deadlocks to pop up but we'll be now in a much better position to deal with them. If anybody is still struggling with this, please open a ticket and we will do our best to help you with it.

We didn't implement any utilities for the actual "replayability" but for those who are interested how this looks like, see it in action coiled/benchmarks#295 (comment) ❤️

@fjetter fjetter closed this as completed Sep 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants