Skip to content

Commit

Permalink
Move unrelated change to dask#6410
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 25, 2022
1 parent 186ce8a commit 933c9cf
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
11 changes: 7 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7060,20 +7060,23 @@ def adaptive_target(self, target_duration=None):
to_close = self.workers_to_close()
return len(self.workers) - len(to_close)

def request_acquire_replicas(
self, addr: str, keys: Iterable[str], *, stimulus_id: str
) -> None:
def request_acquire_replicas(self, addr: str, keys: list, *, stimulus_id: str):
"""Asynchronously ask a worker to acquire a replica of the listed keys from
other workers. This is a fire-and-forget operation which offers no feedback for
success or failure, and is intended for housekeeping and not for computation.
"""
who_has = {key: {ws.address for ws in self.tasks[key].who_has} for key in keys}
who_has = {}
for key in keys:
ts = self.tasks[key]
who_has[key] = {ws.address for ws in ts.who_has}

if self.validate:
assert all(who_has.values())

self.stream_comms[addr].send(
{
"op": "acquire-replicas",
"keys": keys,
"who_has": who_has,
"stimulus_id": stimulus_id,
},
Expand Down
8 changes: 5 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,15 +1864,17 @@ def handle_cancel_compute(self, key: str, stimulus_id: str) -> None:

def handle_acquire_replicas(
self,
who_has: dict[str, Collection[str]],
*,
keys: Collection[str],
who_has: dict[str, Collection[str]],
stimulus_id: str,
) -> None:
if self.validate:
assert set(keys) == who_has.keys()
assert all(who_has.values())

recommendations: Recs = {}
for key in who_has:
for key in keys:
ts = self.ensure_task_exists(
key=key,
# Transfer this data after all dependency tasks of computations with
Expand All @@ -1893,7 +1895,7 @@ def handle_acquire_replicas(
self._handle_instructions(instructions)

if self.validate:
for key in who_has:
for key in keys:
assert self.tasks[key].state != "released", self.story(key)

def ensure_task_exists(
Expand Down

0 comments on commit 933c9cf

Please sign in to comment.