Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Bulk executor initial implementation #30903

Merged
merged 152 commits into from
Jan 25, 2023
Merged

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Dec 5, 2022

Why are these changes needed?

Initial implementation of ray-project/enhancements#18

Original prototype: https://github.com/ray-project/ray/pull/30222/files

TODO:

  • Implements block splitting support
  • Implement legacy plan compatibility
  • Implement stats
  • Implement all:all stages
  • Implement block bundling (@jianoaix )
  • Implement actor pool map op (@clarkzinzow )
  • Implement passing of callable constructor args (test_map_batches_extra_args)
  • Pass test_object_gc.py (@ericl )
  • Rename get_tasks() -> get_work_refs()
  • Fix stage_uuid TODO
  • Pass test_stats.py
  • Handle Ctrl-C
  • Fix randomize_block_order / owned_by_consumer tests in test_dataset_pipeline.py
  • Support lazy union (future work)?

Merge plan:

  • interfaces.py
  • memory debug utils
  • operator implementations with unit tests
  • bulk executor implementation with unit tests
  • legacy integration and flag for enabling
  • debug nightly test / regressions
  • enable by default

@ericl ericl assigned ericl and unassigned ericl Dec 7, 2022
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
@ericl
Copy link
Contributor Author

ericl commented Dec 14, 2022

Some progress: now passing a good portion of dataset tests:

FAILED test_dataset.py::test_bulk_lazy_eval_split_mode[False] - AssertionError: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000), BlockMetadata(num_rows=None, size_bytes=10, schema...
FAILED test_dataset.py::test_bulk_lazy_eval_split_mode[True] - AssertionError: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000009000000), BlockMetadata(num_rows=None, size_bytes=10, schema=...
FAILED test_dataset.py::test_basic_actors[False] - NotImplementedError
FAILED test_dataset.py::test_basic_actors[True] - NotImplementedError
FAILED test_dataset.py::test_callable_classes - NotImplementedError
FAILED test_dataset.py::test_convert_to_pyarrow - ModuleNotFoundError: No module named 'dask.dataframe'
FAILED test_dataset.py::test_iter_batches_local_shuffle[arrow-True] - ray.exceptions.RayTaskError: ray::_map_task() (pid=177449, ip=10.103.244.198)
FAILED test_dataset.py::test_iter_batches_local_shuffle[pandas-True] - ray.exceptions.ObjectFreedError: Failed to retrieve object 7f90397ff370237bffffffffffffffffffffffff0100000002000000. To see infor...
FAILED test_dataset.py::test_iter_batches_local_shuffle[simple-True] - ray.exceptions.RayTaskError: ray::_map_task() (pid=179727, ip=10.103.244.198)
FAILED test_dataset.py::test_map_batches_extra_args - ray.exceptions.RayTaskError(AssertionError): ray::_map_task() (pid=180287, ip=10.103.244.198)
FAILED test_dataset.py::test_map_batches_actors_preserves_order - NotImplementedError
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-2] - assert 10 == 5
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-3] - assert 10 == 4
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-4] - assert 10 == 3
FAILED test_dataset.py::test_map_batches_block_bundling_auto[2-4] - assert 10 == 5
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-5] - assert 10 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_auto[2-5] - assert 10 == 5
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-6] - assert 12 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_auto[2-6] - assert 10 == 4
FAILED test_dataset.py::test_map_batches_block_bundling_auto[3-6] - assert 10 == 5
FAILED test_dataset.py::test_map_batches_block_bundling_auto[1-7] - assert 14 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_auto[2-7] - assert 10 == 4
FAILED test_dataset.py::test_map_batches_block_bundling_auto[3-7] - assert 10 == 5
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes0-3-1] - assert 2 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes1-3-2] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes2-4-3] - assert 4 == 3
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes3-4-2] - assert 4 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes5-4-1] - assert 4 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_manual[block_sizes6-4-2] - assert 4 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes6-2] - assert 2 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes28-3] - assert 2 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes29-3] - assert 2 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes34-3] - assert 2 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes64-3] - assert 3 == 1
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes65-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes66-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes67-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes68-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes69-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes70-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes71-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes72-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes73-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes74-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes75-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes100-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes101-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes102-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes103-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes104-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes105-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes106-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes136-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes137-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes142-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes172-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes173-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes178-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes208-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes209-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes214-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes244-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes245-3] - assert 3 == 2
FAILED test_dataset.py::test_map_batches_block_bundling_skewed_auto[block_sizes250-3] - assert 3 == 2
FAILED test_dataset.py::test_from_dask - ModuleNotFoundError: No module named 'dask.dataframe'
FAILED test_dataset.py::test_to_dask[pandas] - ModuleNotFoundError: No module named 'dask.core'
FAILED test_dataset.py::test_to_dask[arrow] - ModuleNotFoundError: No module named 'dask.core'
FAILED test_dataset.py::test_to_dask_tensor_column_cast_pandas - ModuleNotFoundError: No module named 'dask.dataframe'
FAILED test_dataset.py::test_to_dask_tensor_column_cast_arrow - ModuleNotFoundError: No module named 'dask.dataframe'
FAILED test_dataset.py::test_from_modin - ImportError: cannot import name 'FilePathOrBuffer' from 'pandas._typing' (/home/eric/.local/lib/python3.8/site-packages/pandas/_typing.py)
FAILED test_dataset.py::test_to_modin - ImportError: cannot import name 'FilePathOrBuffer' from 'pandas._typing' (/home/eric/.local/lib/python3.8/site-packages/pandas/_typing.py)
FAILED test_dataset.py::test_map_batches_combine_empty_blocks - assert 30 == 3
FAILED test_dataset.py::test_random_shuffle[True-True] - ray.exceptions.RayTaskError: ray::_map_task() (pid=184295, ip=10.103.244.198)
FAILED test_dataset.py::test_random_shuffle[False-True] - ray.exceptions.RayTaskError: ray::_map_task() (pid=185254, ip=10.103.244.198)
FAILED test_dataset.py::test_dataset_retry_exceptions - AssertionError: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff01000000ad6a0000), BlockMetadata(num_rows=None, size_bytes=20, schema=None, i...
FAILED test_dataset.py::test_split_is_not_disruptive - ray.exceptions.ObjectFreedError: Failed to retrieve object b6aef734ef5f822bffffffffffffffffffffffff0100000002000000. To see information about whe...
FAILED test_dataset.py::test_actor_pool_strategy_apply_interrupt - AssertionError: Legacy backend off
FAILED test_dataset.py::test_actor_pool_strategy_default_num_actors - NotImplementedError
================================================================== 78 failed, 539 passed, 3 skipped, 5010 warnings in 431.51s (0:07:11) ===================================================================

@@ -28,8 +28,6 @@ def __init__(
ray_remote_args: Remote arguments for the Ray actors to be created.
pool_size: The size of the actor pool.
"""
if "num_cpus" not in ray_remote_args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl I reverted this, since it looks not compatible with the requirement that "num_gpus and num_cpus cannot be both specified" (user may want to run actors on gpus and then they won't be able tp specify cpus).

@jianoaix jianoaix marked this pull request as ready for review January 24, 2023 19:43
@jianoaix
Copy link
Contributor

All CI tests are passing now. However, we seem to have a release test failure that may be relevant: https://buildkite.com/ray-project/release-tests-pr/builds/26265#0185e158-7979-4c94-be61-22da62d208ca

@jianoaix
Copy link
Contributor

jianoaix commented Jan 24, 2023

All CI tests are passing now. However, we seem to have a release test failure that may be relevant: https://buildkite.com/ray-project/release-tests-pr/builds/26265#0185e158-7979-4c94-be61-22da62d208ca

It seems likely due to the lack of autoscaling for actor pool, as it had just one actor through the entire run, @clarkzinzow

And since it's about the same issue as TODO in unit test, shall we note this as a debt to fix soon and merge this PR? @ericl

2023-01-23 20:47:03,018	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> MapOperator[map_batches]
--
  |  
  | 0%\|          \| 0/363 [00:00<?, ?it/s]
  | map_batches:   0%\|          \| 0/363 [00:00<?, ?it/s]
  | map_batches:   0%\|          \| 1/363 [00:18<1:52:33, 18.66s/it]
  | map_batches, 1 actors:   0%\|          \| 1/363 [00:18<1:52:33, 18.66s/it](raylet, ip=172.31.203.20) Spilled 34054 MiB, 110 objects, write throughput 637 MiB/s.
  |  
  | map_batches, 1 actors:   1%\|          \| 2/363 [00:37<1:54:07, 18.97s/it]
  | map_batches, 1 actors:   1%\|          \| 3/363 [01:28<3:21:07, 33.52s/it]
  | map_batches, 1 actors:   1%\|          \| 4/363 [01:42<2:35:06, 25.92s/it]
  | map_batches, 1 actors:   1%\|▏         \| 5/363 [01:44<1:42:34, 17.19s/it]
  | map_batches, 1 actors:   2%\|▏         \| 6/363 [01:45<1:10:09, 11.79s/it]
  | map_batches, 1 actors:   2%\|▏         \| 7/363 [01:48<51:34,  8.69s/it]
  | map_batches, 1 actors:   2%\|▏         \| 8/363 [01:49<37:42,  6.37s/it]
  | map_batches, 1 actors:   2%\|▏         \| 9/363 [01:51<28:59,  4.91s/it]
  | map_batches, 1 actors:   3%\|▎         \| 10/363 [01:52<22:21,  3.80s/it]
  | map_batches, 1 actors:   3%\|▎         \| 11/363 [01:54<19:05,  3.25s/it]
  | map_batches, 1 actors:   3%\|▎         \| 12/363 [01:56<15:44,  2.69s/it]
  | map_batches, 1 actors:   4%\|▎         \| 13/363 [01:58<14:29,  2.49s/it]
  | map_batches, 1 actors:   4%\|▍         \| 14/363 [01:59<12:23,  2.13s/it]
  | map_batches, 1 actors:   4%\|▍         \| 15/363 [02:01<11:58,  2.06s/it]
  | map_batches, 1 actors:   4%\|▍         \| 16/363 [02:02<10:36,  1.84s/it]
  | map_batches, 1 actors:   5%\|▍         \| 17/363 [02:04<10:42,  1.86s/it]
  | map_batches, 1 actors:   5%\|▍         \| 18/363 [02:05<09:43,  1.69s/it]
  | map_batches, 1 actors:   5%\|▌         \| 19/363 [02:07<10:04,  1.76s/it]
  | map_batches, 1 actors:   6%\|▌         \| 20/363 [02:09<09:26,  1.65s/it]
  | map_batches, 1 actors:   6%\|▌         \| 21/363 [02:11<09:51,  1.73s/it]
  | map_batches, 1 actors:   6%\|▌         \| 22/363 [02:12<09:17,  1.64s/it]
  | map_batches, 1 actors:   6%\|▋         \| 23/363 [02:14<09:54,  1.75s/it]
  | map_batches, 1 actors:   7%\|▋         \| 24/363 [02:15<09:07,  1.62s/it]
  | map_batches, 1 actors:   7%\|▋         \| 25/363 [02:17<08:55,  1.58s/it]
  | map_batches, 1 actors:   7%\|▋         \| 26/363 [02:18<08:56,  1.59s/it]
  | map_batches, 1 actors:   7%\|▋         \| 27/363 [02:20<08:05,  1.45s/it]
  | map_batches, 1 actors:   8%\|▊         \| 28/363 [02:21<08:40,  1.55s/it]
  | map_batches, 1 actors:   8%\|▊         \| 29/363 [02:23<08:14,  1.48s/it]
  | map_batches, 1 actors:   8%\|▊         \| 30/363 [02:25<08:55,  1.61s/it]
  | map_batches, 1 actors:   9%\|▊         \| 31/363 [02:26<08:24,  1.52s/it]
  | map_batches, 1 actors:   9%\|▉         \| 32/363 [02:28<09:01,  1.64s/it]
  | map_batches, 1 actors:   9%\|▉         \| 33/363 [02:29<08:17,  1.51s/it]
  | map_batches, 1 actors:   9%\|▉         \| 34/363 [02:30<08:05,  1.48s/it]
  ......
  | map_batches, 1 actors:  90%\|████████▉ \| 325/363 [09:53<00:43,  1.15s/it]
  | map_batches, 1 actors:  90%\|████████▉ \| 326/363 [09:54<00:38,  1.05s/it]
  | map_batches, 1 actors:  90%\|█████████ \| 327/363 [09:55<00:36,  1.01s/it]
  | map_batches, 1 actors:  90%\|█████████ \| 328/363 [09:56<00:37,  1.06s/it]
  | map_batches, 1 actors:  91%\|█████████ \| 329/363 [09:57<00:37,  1.11s/it]
  | map_batches, 1 actors:  91%\|█████████ \| 330/363 [09:58<00:33,  1.02s/it]
  | map_batches, 1 actors:  91%\|█████████ \| 331/363 [09:59<00:30,  1.05it/s]
  | map_batches, 1 actors:  91%\|█████████▏\| 332/363 [10:00<00:35,  1.15s/it]
  | map_batches, 1 actors:  92%\|█████████▏\| 333/363 [10:01<00:31,  1.04s/it]
  | map_batches, 1 actors:  92%\|█████████▏\| 334/363 [10:02<00:29,  1.00s/it]
  | map_batches, 1 actors:  92%\|█████████▏\| 335/363 [10:04<00:33,  1.18s/it]
  | map_batches, 1 actors:  93%\|█████████▎\| 336/363 [10:05<00:28,  1.07s/it]
  | map_batches, 1 actors:  93%\|█████████▎\| 337/363 [10:05<00:25,  1.01it/s]
  | map_batches, 1 actors:  93%\|█████████▎\| 338/363 [10:06<00:24,  1.04it/s]
  | map_batches, 1 actors:  93%\|█████████▎\| 339/363 [10:08<00:27,  1.13s/it]
  | map_batches, 1 actors:  94%\|█████████▎\| 340/363 [10:09<00:23,  1.03s/it]
  | map_batches, 1 actors:  94%\|█████████▍\| 341/363 [10:09<00:21,  1.01it/s]
  | map_batches, 1 actors:  94%\|█████████▍\| 342/363 [10:11<00:25,  1.20s/it]
  | map_batches, 1 actors:  94%\|█████████▍\| 343/363 [10:12<00:21,  1.08s/it]
  | map_batches, 1 actors:  95%\|█████████▍\| 344/363 [10:13<00:19,  1.03s/it]
  | map_batches, 1 actors:  95%\|█████████▌\| 345/363 [10:15<00:21,  1.20s/it]
  | map_batches, 1 actors:  95%\|█████████▌\| 346/363 [10:15<00:18,  1.08s/it]
  | map_batches, 1 actors:  96%\|█████████▌\| 347/363 [10:16<00:15,  1.00it/s]
  | map_batches, 1 actors:  96%\|█████████▌\| 348/363 [10:17<00:14,  1.03it/s]
  | map_batches, 1 actors:  96%\|█████████▌\| 349/363 [10:19<00:16,  1.16s/it]
  | map_batches, 1 actors:  96%\|█████████▋\| 350/363 [10:19<00:13,  1.05s/it]
  | map_batches, 1 actors:  97%\|█████████▋\| 351/363 [10:20<00:11,  1.02it/s]
  | map_batches, 1 actors:  97%\|█████████▋\| 352/363 [10:22<00:13,  1.20s/it]
  | map_batches, 1 actors:  97%\|█████████▋\| 353/363 [10:23<00:10,  1.08s/it]
  | map_batches, 1 actors:  98%\|█████████▊\| 354/363 [10:24<00:08,  1.01it/s]
  | map_batches, 1 actors:  98%\|█████████▊\| 355/363 [10:25<00:08,  1.03s/it]
  | map_batches, 1 actors:  98%\|█████████▊\| 356/363 [10:26<00:07,  1.11s/it]
  | map_batches, 1 actors:  98%\|█████████▊\| 357/363 [10:27<00:06,  1.02s/it]
  | map_batches, 1 actors:  99%\|█████████▊\| 358/363 [10:28<00:04,  1.02it/s]
  | map_batches, 1 actors:  99%\|█████████▉\| 359/363 [10:29<00:04,  1.17s/it]
  | map_batches, 1 actors:  99%\|█████████▉\| 360/363 [10:30<00:03,  1.06s/it]
  | map_batches, 1 actors:  99%\|█████████▉\| 361/363 [10:31<00:02,  1.01s/it]
  | map_batches, 1 actors: 100%\|█████████▉\| 362/363 [10:32<00:01,  1.16s/it]
  | map_batches, 1 actors: 100%\|██████████\| 363/363 [10:33<00:00,  1.04s/it]
  | map_batches, 0 actors: 100%\|██████████\| 363/363 [10:33<00:00,  1.04s/it]
  | map_batches, 0 actors: 100%\|██████████\| 363/363 [10:33<00:00,  1.75s/it]
  | run_xgboost_prediction takes 671.7315916329999 seconds.
  | Results: {'training_time': 781.2928511609999, 'prediction_time': 671.7315916329999}
  | Traceback (most recent call last):
  | File "workloads/xgboost_benchmark.py", line 175, in <module>
  | main(args)
  | File "workloads/xgboost_benchmark.py", line 155, in main
  | f"Batch prediction on XGBoost is taking {prediction_time} seconds, "
  | RuntimeError: Batch prediction on XGBoost is taking 671.7315916329999 seconds, which is longer than expected (450 seconds).

@ericl
Copy link
Contributor Author

ericl commented Jan 25, 2023

Alright. Let's leave the TODO to fix this test (or we can fix this test by increasing the min pool size).

@ericl ericl merged commit 877770e into ray-project:master Jan 25, 2023
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…-project#31283)

Add a utility class for tracing object allocation / freeing. This makes it a lot easier to debug memory allocation / freeing issues.

This is split out from ray-project#30903

Signed-off-by: tmynn <[email protected]>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…oject#31305)

Add the initial operator implementations.

This is split out from ray-project#30903

Signed-off-by: tmynn <[email protected]>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants