Skip to content

Commit

Permalink
Merge branch 'main' into zero_copy_numpy_shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 7, 2023
2 parents e322258 + 010f896 commit 3f840d7
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 100 deletions.
7 changes: 5 additions & 2 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,11 @@ def _add_frames_header(
frames_nbytes = [header_nbytes, *frames_nbytes]
frames_nbytes_total += header_nbytes

if frames_nbytes_total < 2**17: # 128kiB
# small enough, send in one go
if frames_nbytes_total < 2**17 or ( # 128 kiB total
frames_nbytes_total < 2**25 # 32 MiB total
and frames_nbytes_total // len(frames) < 2**15 # 32 kiB mean
):
# very small or very fragmented; send in one go
frames = [b"".join(frames)]
frames_nbytes = [frames_nbytes_total]

Expand Down
2 changes: 0 additions & 2 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ def __init__(
self.left_index = left_index
self.right_index = right_index
self.disk = disk
annotations = annotations or {}
annotations.update({"shuffle": lambda key: key[-1]})
super().__init__(annotations=annotations)

def _cull_dependencies(
Expand Down
15 changes: 0 additions & 15 deletions distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,6 @@ def _pin_output_workers(
for partition in output_partitions:
worker = pick(partition, workers)
mapping[partition] = worker

for dt in barrier.dependents:
try:
partition = dt.annotations["shuffle"]
except KeyError:
continue

if dt.worker_restrictions:
worker = pick(partition, list(dt.worker_restrictions))
mapping[partition] = worker
else:
worker = mapping[partition]

self._set_restriction(dt, worker)

return mapping

def _set_restriction(self, ts: TaskState, worker: str) -> None:
Expand Down
2 changes: 0 additions & 2 deletions distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ def __init__(
else:
self.parts_out = set(range(self.npartitions))
self.npartitions_input = npartitions_input
annotations = annotations or {}
annotations.update({"shuffle": lambda key: key[1]})
super().__init__(annotations=annotations)

def __repr__(self) -> str:
Expand Down
12 changes: 2 additions & 10 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from distributed.shuffle._core import id_from_key
from distributed.shuffle._merge import hash_join
from distributed.shuffle.tests.utils import invoke_annotation_chaos
from distributed.utils_test import gen_cluster

dd = pytest.importorskip("dask.dataframe")
Expand All @@ -26,11 +25,6 @@
pytestmark = pytest.mark.ci1


@pytest.fixture(params=[0, 0.3, 1], ids=["none", "some", "all"])
def lose_annotations(request):
return request.param


def list_eq(aa, bb):
if isinstance(aa, dd.DataFrame):
a = aa.compute(scheduler="sync")
Expand Down Expand Up @@ -72,8 +66,7 @@ async def test_minimal_version(c, s, a, b):

@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how, lose_annotations):
await invoke_annotation_chaos(lose_annotations, c)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])

Expand Down Expand Up @@ -166,8 +159,7 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a,
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk, lose_annotations):
await invoke_annotation_chaos(lose_annotations, c)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])

Expand Down
16 changes: 4 additions & 12 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@
split_by_worker,
)
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin, _ShuffleRunManager
from distributed.shuffle.tests.utils import (
AbstractShuffleTestPool,
invoke_annotation_chaos,
)
from distributed.shuffle.tests.utils import AbstractShuffleTestPool
from distributed.utils import Deadline
from distributed.utils_test import (
async_poll_for,
Expand Down Expand Up @@ -186,8 +183,7 @@ def get_active_shuffle_runs(worker: Worker) -> dict[ShuffleId, ShuffleRun]:
@pytest.mark.parametrize("npartitions", [None, 1, 20])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_basic_integration(c, s, a, b, lose_annotations, npartitions, disk):
await invoke_annotation_chaos(lose_annotations, c)
async def test_basic_integration(c, s, a, b, npartitions, disk):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
Expand Down Expand Up @@ -232,8 +228,7 @@ async def test_basic_integration_local_cluster(processes):

@pytest.mark.parametrize("npartitions", [None, 1, 20])
@gen_cluster(client=True)
async def test_shuffle_with_array_conversion(c, s, a, b, lose_annotations, npartitions):
await invoke_annotation_chaos(lose_annotations, c)
async def test_shuffle_with_array_conversion(c, s, a, b, npartitions):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
Expand Down Expand Up @@ -271,8 +266,7 @@ def test_shuffle_before_categorize(loop_in_thread):


@gen_cluster(client=True)
async def test_concurrent(c, s, a, b, lose_annotations):
await invoke_annotation_chaos(lose_annotations, c)
async def test_concurrent(c, s, a, b):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
Expand Down Expand Up @@ -2388,8 +2382,6 @@ async def barrier(self, *args: Any, **kwargs: Any) -> int:
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_unpack_gets_rescheduled_from_non_participating_worker(c, s, a):
await invoke_annotation_chaos(1.0, c)

expected = pd.DataFrame({"a": list(range(10))})
ddf = dd.from_pandas(expected, npartitions=2)
ddf = ddf.shuffle("a")
Expand Down
57 changes: 0 additions & 57 deletions distributed/shuffle/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from __future__ import annotations

import itertools
import random
from typing import Any

from dask.typing import Key

from distributed.client import Client
from distributed.core import PooledRPCCall
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.scheduler import Scheduler, TaskStateState
from distributed.shuffle._core import ShuffleId, ShuffleRun


Expand Down Expand Up @@ -50,54 +44,3 @@ async def shuffle_barrier(
for addr, s in self.shuffles.items():
out[addr] = await s.inputs_done()
return out


class ShuffleAnnotationChaosPlugin(SchedulerPlugin):
#: Rate at which the plugin randomly drops shuffle annotations
rate: float
scheduler: Scheduler | None
seen: set

def __init__(self, rate: float):
self.rate = rate
self.scheduler = None
self.seen = set()

async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler

def transition(
self,
key: Key,
start: TaskStateState,
finish: TaskStateState,
*args: Any,
**kwargs: Any,
) -> None:
assert self.scheduler
if finish != "waiting":
return
if not isinstance(key, str) or not key.startswith("shuffle-barrier-"):
return
if key in self.seen:
return

self.seen.add(key)

barrier = self.scheduler.tasks[key]

if self._flip():
barrier.annotations.pop("shuffle", None)
for dt in barrier.dependents:
if self._flip():
dt.annotations.pop("shuffle", None)

def _flip(self) -> bool:
return random.random() < self.rate


async def invoke_annotation_chaos(rate: float, client: Client) -> None:
if not rate:
return
plugin = ShuffleAnnotationChaosPlugin(rate)
await client.register_plugin(plugin)

0 comments on commit 3f840d7

Please sign in to comment.