From 2bcb03e9552dae1729fa797884b54b00b1b34a0c Mon Sep 17 00:00:00 2001 From: pdmurray Date: Fri, 6 Jan 2023 23:14:25 +0100 Subject: [PATCH] Add informative progress bar names to map_batches Signed-off-by: pdmurray --- .bazeliskrc | 1 + .gitignore | 2 + python/ray/air/tests/test_dataset_config.py | 6 +- python/ray/data/_internal/plan.py | 26 +++++++- python/ray/data/dataset.py | 10 ++- python/ray/data/tests/test_dataset.py | 10 +-- python/ray/data/tests/test_optimize.py | 64 +++++++++++-------- python/ray/data/tests/test_stats.py | 45 +++++++------ .../ray/train/tests/test_batch_predictor.py | 16 ++--- 9 files changed, 115 insertions(+), 65 deletions(-) create mode 100644 .bazeliskrc diff --git a/.bazeliskrc b/.bazeliskrc new file mode 100644 index 000000000000..9adf2699b0d0 --- /dev/null +++ b/.bazeliskrc @@ -0,0 +1 @@ +USE_BAZEL_VERSION=5.x diff --git a/.gitignore b/.gitignore index 297a7b9d0608..9268fa948c4e 100644 --- a/.gitignore +++ b/.gitignore @@ -224,3 +224,5 @@ workflow_data/ # Auto-generated tag mapping tag-mapping.json + +.bazeliskrc diff --git a/python/ray/air/tests/test_dataset_config.py b/python/ray/air/tests/test_dataset_config.py index 6616395f93bd..902835e3f4b9 100644 --- a/python/ray/air/tests/test_dataset_config.py +++ b/python/ray/air/tests/test_dataset_config.py @@ -251,7 +251,7 @@ def checker(shard, results): # applying the preprocessor on each epoch. assert results[0] == results[1], results stats = shard.stats() - assert "Stage 1 read->map_batches: 1/1 blocks executed " in stats, stats + assert "Stage 1 read->BatchMapper: 1/1 blocks executed " in stats, stats def rand(x): x["value"] = [random.random() for _ in range(len(x))] @@ -284,8 +284,8 @@ def checker(shard, results): assert results[0] != results[1], results stats = shard.stats() assert ( - "Stage 1 read->randomize_block_order->map_batches: 1/1 blocks executed " - in stats + "Stage 1 read->randomize_block_order->" + "BatchMapper: 1/1 blocks executed " in stats ), stats test = TestStream( diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 3bc492e944e9..47097bf2821b 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -45,6 +45,30 @@ logger = DatasetLogger(__name__) +def capfirst(s: str): + """Capitalize the first letter of a string + + Args: + s: String to capitalize + + Returns: + Capitalized string + """ + return s[0].upper() + s[1:] + + +def capitalize(s: str): + """Capitalize a string, removing '_' and keeping camelcase. + + Args: + s: String to capitalize + + Returns: + Capitalized string with no underscores. + """ + return "".join(capfirst(x) for x in s.split("_")) + + class Stage: """Represents a Dataset transform stage (e.g., map or shuffle).""" @@ -157,7 +181,7 @@ def get_plan_as_string(self) -> str: # Get string representation of each stage in reverse order. for stage in self._stages_after_snapshot[::-1]: # Get name of each stage in camel case. - stage_name = stage.name.title().replace("_", "") + stage_name = capitalize(stage.name) if num_stages == 0: plan_str += f"{stage_name}\n" else: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 7ab51669992b..f2f9db25753d 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -671,8 +671,16 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: if output_buffer.has_next(): yield output_buffer.next() + # breakpoint() + if hasattr(fn, "__self__") and isinstance( + fn.__self__, ray.data.preprocessor.Preprocessor + ): + stage_name = fn.__self__.__class__.__name__ + else: + stage_name = f'MapBatches({getattr(fn, "__name__", type(fn))})' + stage = OneToOneStage( - "map_batches", + stage_name, transform, compute, ray_remote_args, diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index e593c5ca60ce..4cd1d39aa5a8 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -1517,26 +1517,28 @@ def test_dataset_repr(ray_start_regular_shared): assert repr(ds) == "Dataset(num_blocks=10, num_rows=10, schema=)" ds = ds.map_batches(lambda x: x) assert repr(ds) == ( - "MapBatches\n" "+- Dataset(num_blocks=10, num_rows=10, schema=)" + "MapBatches()\n" + "+- Dataset(num_blocks=10, num_rows=10, schema=)" ) ds = ds.filter(lambda x: x > 0) assert repr(ds) == ( "Filter\n" - "+- MapBatches\n" + "+- MapBatches()\n" " +- Dataset(num_blocks=10, num_rows=10, schema=)" ) ds = ds.random_shuffle() assert repr(ds) == ( "RandomShuffle\n" "+- Filter\n" - " +- MapBatches\n" + " +- MapBatches()\n" " +- Dataset(num_blocks=10, num_rows=10, schema=)" ) ds.fully_executed() assert repr(ds) == "Dataset(num_blocks=10, num_rows=9, schema=)" ds = ds.map_batches(lambda x: x) assert repr(ds) == ( - "MapBatches\n" "+- Dataset(num_blocks=10, num_rows=9, schema=)" + "MapBatches()\n" + "+- Dataset(num_blocks=10, num_rows=9, schema=)" ) ds1, ds2 = ds.split(2) assert ( diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index e8d0893aa3a4..97b62885e1b2 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -59,6 +59,11 @@ def expect_stages(pipe, num_stages_expected, stage_names): ), pipe._optimized_stages +def dummy_map(x): + """Dummy function used in calls to map_batches in these tests.""" + return x + + def test_memory_sanity(shutdown_only): info = ray.init(num_cpus=1, object_store_memory=500e6) ds = ray.data.range(10) @@ -312,23 +317,23 @@ def test_optimize_reorder(ray_start_regular_shared): context.optimize_fuse_read_stages = True context.optimize_reorder_stages = True - ds = ray.data.range(10).randomize_block_order().map_batches(lambda x: x) + ds = ray.data.range(10).randomize_block_order().map_batches(dummy_map) expect_stages( ds, 2, - ["read->map_batches", "randomize_block_order"], + ["read->MapBatches(dummy_map)", "randomize_block_order"], ) ds2 = ( ray.data.range(10) .randomize_block_order() .repartition(10) - .map_batches(lambda x: x) + .map_batches(dummy_map) ) expect_stages( ds2, 3, - ["read->randomize_block_order", "repartition", "map_batches"], + ["read->randomize_block_order", "repartition", "MapBatches(dummy_map)"], ) @@ -338,10 +343,10 @@ def test_window_randomize_fusion(ray_start_regular_shared): context.optimize_fuse_read_stages = True context.optimize_reorder_stages = True - pipe = ray.data.range(100).randomize_block_order().window().map_batches(lambda x: x) + pipe = ray.data.range(100).randomize_block_order().window().map_batches(dummy_map) pipe.take() stats = pipe.stats() - assert "read->randomize_block_order->map_batches" in stats, stats + assert "read->randomize_block_order->MapBatches(dummy_map)" in stats, stats def test_optimize_fuse(ray_start_regular_shared): @@ -349,8 +354,8 @@ def test_optimize_fuse(ray_start_regular_shared): def build_pipe(): pipe = ray.data.range(3).window(blocks_per_window=1).repeat(2) - pipe = pipe.map_batches(lambda x: x) - pipe = pipe.map_batches(lambda x: x) + pipe = pipe.map_batches(dummy_map) + pipe = pipe.map_batches(dummy_map) pipe = pipe.random_shuffle_each_window() results = [sorted(p.take()) for p in pipe.iter_epochs()] assert results == [[0, 1, 2], [0, 1, 2]], results @@ -362,7 +367,10 @@ def build_pipe(): expect_stages( build_pipe(), 1, - ["read->map_batches->map_batches->random_shuffle_map", "random_shuffle_reduce"], + [ + "read->MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map", + "random_shuffle_reduce", + ], ) context.optimize_fuse_stages = True @@ -373,7 +381,7 @@ def build_pipe(): 1, [ "read", - "map_batches->map_batches->random_shuffle_map", + "MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map", "random_shuffle_reduce", ], ) @@ -386,7 +394,7 @@ def build_pipe(): 2, [ "read", - "map_batches->map_batches", + "MapBatches(dummy_map)->MapBatches(dummy_map)", "random_shuffle_map", "random_shuffle_reduce", ], @@ -400,8 +408,8 @@ def build_pipe(): 3, [ "read", - "map_batches", - "map_batches", + "MapBatches(dummy_map)", + "MapBatches(dummy_map)", "random_shuffle_map", "random_shuffle_reduce", ], @@ -428,14 +436,14 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared): for kwb in equivalent_kwargs: print("CHECKING", kwa, kwb) pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwa) - pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwb) + pipe = pipe.map_batches(dummy_map, compute="tasks", **kwa) + pipe = pipe.map_batches(dummy_map, compute="tasks", **kwb) pipe.take() expect_stages( pipe, 1, [ - "read->map_batches->map_batches", + "read->MapBatches(dummy_map)->MapBatches(dummy_map)", ], ) @@ -443,14 +451,14 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared): for kwb in equivalent_kwargs: print("CHECKING", kwa, kwb) pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwa) + pipe = pipe.map_batches(dummy_map, compute="tasks", **kwa) pipe = pipe.random_shuffle_each_window(**kwb) pipe.take() expect_stages( pipe, 1, [ - "read->map_batches->random_shuffle_map", + "read->MapBatches(dummy_map)->random_shuffle_map", "random_shuffle_reduce", ], ) @@ -464,32 +472,32 @@ def test_optimize_incompatible_stages(ray_start_regular_shared): pipe = ray.data.range(3).repeat(2) # Should get fused as long as their resource types are compatible. - pipe = pipe.map_batches(lambda x: x, compute="actors") + pipe = pipe.map_batches(dummy_map, compute="actors") # Cannot fuse actors->tasks. - pipe = pipe.map_batches(lambda x: x, compute="tasks") + pipe = pipe.map_batches(dummy_map, compute="tasks") pipe = pipe.random_shuffle_each_window() pipe.take() expect_stages( pipe, 2, [ - "read->map_batches", - "map_batches->random_shuffle_map", + "read->MapBatches(dummy_map)", + "MapBatches(dummy_map)->random_shuffle_map", "random_shuffle_reduce", ], ) pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(lambda x: x, compute="tasks") - pipe = pipe.map_batches(lambda x: x, num_cpus=0.75) + pipe = pipe.map_batches(dummy_map, compute="tasks") + pipe = pipe.map_batches(dummy_map, num_cpus=0.75) pipe = pipe.random_shuffle_each_window() pipe.take() expect_stages( pipe, 3, [ - "read->map_batches", - "map_batches", + "read->MapBatches(dummy_map)", + "MapBatches(dummy_map)", "random_shuffle_map", "random_shuffle_reduce", ], @@ -556,7 +564,7 @@ def __call__(self, x): pipe, 1, [ - "read->map_batches->map_batches", + "read->MapBatches(CallableFn)->MapBatches(CallableFn)", ], ) @@ -592,7 +600,7 @@ def __call__(self, x): pipe, 1, [ - "read->map_batches->map_batches", + "read->MapBatches()->MapBatches(CallableFn)", ], ) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index c5c85c27ddda..454750723cfd 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -25,6 +25,11 @@ def canonicalize(stats: str) -> str: return s4 +def dummy_map_batches(x): + """Dummy function used in calls to map_batches below.""" + return x + + def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): context = DatasetContext.get_current() context.optimize_fuse_stages = True @@ -39,7 +44,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): ) with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(lambda x: x).fully_executed() + ds = ds.map_batches(dummy_map_batches).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -47,7 +52,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -61,7 +66,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -71,7 +76,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): """ ) - ds = ds.map(lambda x: x).fully_executed() + ds = ds.map(dummy_map_batches).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -108,7 +113,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( stats - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -140,7 +145,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( stats - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -369,7 +374,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(lambda x: x).fully_executed() + ds = ds.map_batches(dummy_map_batches).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -377,7 +382,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -391,7 +396,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -402,14 +407,14 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ ) pipe = ds.repeat(5) - pipe = pipe.map(lambda x: x) + pipe = pipe.map(dummy_map_batches) if enable_auto_log_stats: # Stats only include first stage, and not for pipelined map logger_args, logger_kwargs = mock_logger.call_args if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -423,7 +428,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N read->map_batches: N/N blocks executed in T + == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -473,7 +478,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ assert ( stats == """== Pipeline Window N == -Stage N read->map_batches: N/N blocks executed in T +Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -494,7 +499,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage N read->map_batches: [execution cached] +Stage N read->MapBatches(dummy_map_batches): [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -509,7 +514,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage N read->map_batches: [execution cached] +Stage N read->MapBatches(dummy_map_batches): [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -540,7 +545,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ assert ( stats == """== Pipeline Window N == -Stage N read->map_batches: N/N blocks executed in T +Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -557,7 +562,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ * Tasks per node: N min, N max, N mean; N nodes used == Pipeline Window N == -Stage N read->map_batches: [execution cached] +Stage N read->MapBatches(dummy_map_batches): [execution cached] Stage N map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total @@ -568,7 +573,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ * Tasks per node: N min, N max, N mean; N nodes used == Pipeline Window N == -Stage N read->map_batches: [execution cached] +Stage N read->MapBatches(dummy_map_batches): [execution cached] Stage N map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total @@ -607,11 +612,11 @@ def test_dataset_pipeline_cache_cases(ray_start_regular_shared): assert "[execution cached]" in stats # CACHED (eager map stage). - ds = ray.data.range(10).map_batches(lambda x: x).repeat(2) + ds = ray.data.range(10).map_batches(dummy_map_batches).repeat(2) ds.take(999) stats = ds.stats() assert "[execution cached]" in stats - assert "read->map_batches" in stats + assert "read->MapBatches(dummy_map_batches)" in stats def test_dataset_pipeline_split_stats_basic(ray_start_regular_shared): diff --git a/python/ray/train/tests/test_batch_predictor.py b/python/ray/train/tests/test_batch_predictor.py index 5b04009318ed..5c51a502408e 100644 --- a/python/ray/train/tests/test_batch_predictor.py +++ b/python/ray/train/tests/test_batch_predictor.py @@ -116,8 +116,8 @@ def test_separate_gpu_stage(shutdown_only): allow_gpu=True, ) stats = ds.stats() - assert "Stage 1 read->map_batches:" in stats, stats - assert "Stage 2 map_batches:" in stats, stats + assert "Stage 1 read->DummyPreprocessor:" in stats, stats + assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats assert ds.max("value") == 36.0, ds ds = batch_predictor.predict( @@ -128,7 +128,7 @@ def test_separate_gpu_stage(shutdown_only): ) stats = ds.stats() assert "Stage 1 read:" in stats, stats - assert "Stage 2 map_batches:" in stats, stats + assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats assert ds.max("value") == 36.0, ds @@ -160,7 +160,7 @@ def test_batch_prediction(): test_dataset = ray.data.range_table(4) ds = batch_predictor.predict(test_dataset) # Check fusion occurred. - assert "read->map_batches" in ds.stats(), ds.stats() + assert "read->DummyPreprocessor" in ds.stats(), ds.stats() assert ds.to_pandas().to_numpy().squeeze().tolist() == [ 0.0, 4.0, @@ -278,7 +278,7 @@ def test_batch_prediction_various_combination(): ds = batch_predictor.predict(input_dataset) print(ds.stats()) # Check no fusion needed since we're not doing a dataset read. - assert "Stage 1 map_batches" in ds.stats(), ds.stats() + assert f"Stage 1 {preprocessor.__class__.__name__}" in ds.stats(), ds.stats() assert ds.to_pandas().to_numpy().squeeze().tolist() == [ 4.0, 8.0, @@ -544,8 +544,8 @@ def test_separate_gpu_stage_pipelined(shutdown_only): ) out = [x["value"] for x in ds.iter_rows()] stats = ds.stats() - assert "Stage 1 read->map_batches:" in stats, stats - assert "Stage 2 map_batches:" in stats, stats + assert "Stage 1 read->DummyPreprocessor:" in stats, stats + assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats assert max(out) == 16.0, out ds = batch_predictor.predict_pipelined( @@ -558,7 +558,7 @@ def test_separate_gpu_stage_pipelined(shutdown_only): out = [x["value"] for x in ds.iter_rows()] stats = ds.stats() assert "Stage 1 read:" in stats, stats - assert "Stage 2 map_batches:" in stats, stats + assert "Stage 2 MapBatches(ScoringWrapper):" in stats, stats assert max(out) == 16.0, out