-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Add initial aggregate benchmark #28486
Conversation
|
||
print(f"Running case: {name}") | ||
start_time = time.perf_counter() | ||
output_ds = fn(**fn_run_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to run the benchmark multiple times to reduce noise. It's easy to add later to have a for-loop on this. Right now the aggregate benchmark does not have any noise worth to rerun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. This needs fn
to be stateless/side-effect free.
The input files are pre-generated and stored in AWS S3 beforehand. | ||
""" | ||
test_input = [ | ||
("s3://air-example-data/h2oai_benchmark/G1_1e7_1e2_0_0.csv", "h2oai-500M") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the input really CSV instead of parquet? That seems like it will spend a lot of time decoding just the CSV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - yes the input is CSV file - script to generate input file, and Spark script to run the benchmark.
That seems like it will spend a lot of time decoding just the CSV.
That's true, it will be significantly slower than Parquet. But it's just loaded once, and reused across benchmark runs. And the read time is not accounted into benchmark runtime, same to how h2oai db-benchmark measures other system (e.g. Spark script above). Right now read 500MB takes less than 10 seconds, and 5GB takes less than 1 minute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a stab at benchmarking!
from ray.data.dataset import Dataset | ||
|
||
|
||
class Benchmark: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the scope of the Benchmark? IIUC it's a benchmark for Dataset transformations, if so maybe make it more clear about that. Also good to mention that if it's applicable for both local and distribute benchmarking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally the scope of Benchmark
should cover all of data-related benchmark (dataset, dataset pipeline, transform, action, etc), there's no restrict to be used only for dataset transformation. It works for both local and distribute benchmarking. let me add more documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If fn
is Dataset-to-Dataset mapping, it's basically a transform? Like iter_batches(), min/max etc are not covered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, it's just for easy to retrieve the statistics by returning another Dataset. You can do arbitrary logic inside benchmark:
def fn(input_ds):
input_ds.iter_batches(...)
input_ds.min()
input_ds.max()
return the_ds_you_care_for_stats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also just to add - the parameter to fn
can be everything, so we are not bound to pass a Dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a comment about what's expected to return for fn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - sure, added. Also run(fn: Callable[..., Dataset])
has function's expected return type.
("s3://air-example-data/h2oai_benchmark/G1_1e7_1e2_0_0.csv", "h2oai-500M") | ||
] | ||
for path, test_name in test_input: | ||
input_ds = ray.data.read_csv(path).repartition(10).fully_executed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a magic number around, can we document how it's chosen? If we doing local node benchmark, can it be just set to the number of CPUs on the node or needs manual tuning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it should be set to number of CPUs on the node to get best performance. Let me add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this benchmark runnable in different cluster setup (currently it's on one node, per yaml config), it'd be better to read num cpus from ray, like ray.cluster_resources().get("CPU", 1) rather than hard coded it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - thanks, didn't know the API before, updated.
release/release_tests.yaml
Outdated
working_dir: nightly_tests/dataset | ||
|
||
frequency: multi | ||
team: core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now "data" team is owning the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - good catch, updated.
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Addressed all comments, the PR is for ready for review again. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
input_ds = ray.data.read_csv(path) | ||
# Number of blocks (parallelism) should be set as number of available CPUs | ||
# to get best performance. | ||
num_blocks = int(ray.cluster_resources().get("CPU", 1)) | ||
input_ds = input_ds.repartition(num_blocks).fully_executed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that we do an explicit repartition step instead of setting parallelism=num_blocks
at read time since we're not guaranteed that parallelism
will be respected, e.g. if parallelism > num_files
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there's only 1 file in this case, so have to do repartition
.
merge=merge, | ||
accumulate_block=accumulate_block, | ||
name=(f"top2({str(on)})"), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a good idea to look at porting this to a custom Polars aggregation once that integration is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, also plan to enable Polars later.
This PR is to prune (remove) unused columns before doing aggregate (in _GroupbyOp.map()). Only keeps the group-by column and columns used in aggregate functions. All other columns can be pruned, so it reduces the cost during sort and aggregate. Also introduce BlockAccessor.select(keys) to get a new Block with only selected keys/columns. Refactored existing code path in map_groups to also use the API. Later on, we can use this API to implement Dataset.select_columns. Tested with query in h2oai benchmark - #28486 . Reduced query runtime by 50% with this PR.
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su [email protected]
Why are these changes needed?
This PR is to add initial aggregate benchmark (for h2oai benchmark - https://github.com/h2oai/db-benchmark). To follow the convention in h2oai benchmark, the benchmark is run on single node (https://h2oai.github.io/db-benchmark/#environment-configuration). No fandamental blocker for us to run the benchmark in multiple nodes (just a matter to change our
yaml
file). The benchmark has 3 input files setting - 0.5GB, 5GB and 50GB. Here we start with 0.5GB input file. Followup PR will add benchmark for 5GB and 50GB (just a matter to generate input file, no benchmark code change needed).NOTE: Didn't optimize the benchmark queries yet, and just write the most straight-forward version of code here. We can use this as a baseline to fix the performance gap and optimize it.
A typical benchmark workflow would be:
xxx_benchmark.py
file for the specific APIs to benchmark (e.g.split_benchmark.py
for split-related APIs).Benchmark
class to run benchmark.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.Tested on workspace with same cluster environment as
aggregate_benchmark_compute.yaml
.Verified benchmark succeed - https://console.anyscale-staging.com/o/anyscale-internal/workspaces/expwrk_SkskBJ2Um8GMzaDAL4Zn8nvb/ses_LCGGwWMb1a14Zp5hqq6mTXvQ .