diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 24026877ae9e..340d445e6885 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -867,8 +867,8 @@ def split( equal: Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits. - locality_hints: A list of Ray actor handles of size ``n``. The - system will try to co-locate the blocks of the i-th dataset + locality_hints: [Experimental] A list of Ray actor handles of size ``n``. + The system will try to co-locate the blocks of the i-th dataset with the i-th actor to maximize data locality. Returns: @@ -877,6 +877,19 @@ def split( if n <= 0: raise ValueError(f"The number of splits {n} is not positive.") + # fallback to split_at_indices for equal split without locality hints. + # simple benchmarks shows spilit_at_indices yields more stable performance. + # https://github.com/ray-project/ray/pull/26641 for more context. + if equal and locality_hints is None: + count = self.count() + split_index = count // n + # we are creating n split_indices which will generate + # n + 1 splits; the last split will at most contains (n - 1) + # rows, which could be safely dropped. + split_indices = [split_index * i for i in range(1, n + 1)] + shards = self.split_at_indices(split_indices) + return shards[:n] + if locality_hints and len(locality_hints) != n: raise ValueError( f"The length of locality_hints {len(locality_hints)} " diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 83d797e47baa..fbfe8ff10a8f 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -262,8 +262,8 @@ def split( equal: Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits. - locality_hints: A list of Ray actor handles of size ``n``. The - system will try to co-locate the blocks of the ith pipeline + locality_hints: [Experimental] A list of Ray actor handles of size ``n``. + The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality. Returns: