Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add initial aggregate benchmark #28486

Merged
merged 7 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
132 changes: 132 additions & 0 deletions release/nightly_tests/dataset/aggregate_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import Tuple

import ray
from ray.data.aggregate import _AggregateOnKeyBase, Max, Mean, Min, Sum
from ray.data.block import Block, KeyFn
from ray.data.dataset import Dataset
import pyarrow.compute as pac

from benchmark import Benchmark


def run_h2oai(benchmark: Benchmark):
"""This benchmark is originally from https://github.com/h2oai/db-benchmark

Here we run all group-by queries from the benchmark on Ray Datasets.
The input files are pre-generated and stored in AWS S3 beforehand.
"""

# Test input file schema={
# id1: string, id2: string, id3: string, id4: int64, id5: int64, id6: int64,
# v1: int64, v2: int64, v3: double
# })
test_input = [
jianoaix marked this conversation as resolved.
Show resolved Hide resolved
("s3://air-example-data/h2oai_benchmark/G1_1e7_1e2_0_0.csv", "h2oai-500M")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the input really CSV instead of parquet? That seems like it will spend a lot of time decoding just the CSV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl - yes the input is CSV file - script to generate input file, and Spark script to run the benchmark.

That seems like it will spend a lot of time decoding just the CSV.

That's true, it will be significantly slower than Parquet. But it's just loaded once, and reused across benchmark runs. And the read time is not accounted into benchmark runtime, same to how h2oai db-benchmark measures other system (e.g. Spark script above). Right now read 500MB takes less than 10 seconds, and 5GB takes less than 1 minute.

]
for path, test_name in test_input:
input_ds = ray.data.read_csv(path)
# Number of blocks (parallelism) should be set as number of available CPUs
# to get best performance.
num_blocks = int(ray.cluster_resources().get("CPU", 1))
input_ds = input_ds.repartition(num_blocks).fully_executed()
Comment on lines +27 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that we do an explicit repartition step instead of setting parallelism=num_blocks at read time since we're not guaranteed that parallelism will be respected, e.g. if parallelism > num_files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's only 1 file in this case, so have to do repartition.


q_list = [
(h2oai_q1, "q1"),
(h2oai_q3, "q3"),
(h2oai_q4, "q4"),
(h2oai_q5, "q5"),
(h2oai_q7, "q7"),
(h2oai_q8, "q8"),
]

for q, name in q_list:
benchmark.run(f"{test_name}-{name}", q, ds=input_ds)


def h2oai_q1(ds: Dataset) -> Dataset:
return ds.groupby("id1").sum("v1")


def h2oai_q2(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id1", "id2"]).sum("v1")
raise NotImplementedError


def h2oai_q3(ds: Dataset) -> Dataset:
return ds.groupby("id3").aggregate(Sum("v1"), Mean("v3"))


def h2oai_q4(ds: Dataset) -> Dataset:
return ds.groupby("id4").aggregate(Mean("v1"), Mean("v2"), Mean("v3"))


def h2oai_q5(ds: Dataset) -> Dataset:
return ds.groupby("id6").aggregate(Sum("v1"), Sum("v2"), Sum("v3"))


def h2oai_q6(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id4", "id5"]).aggregate(Median("v3"), Std("v3"))
raise NotImplementedError


def h2oai_q7(ds: Dataset) -> Dataset:
ds = ds.groupby("id3").aggregate(Max("v1"), Min("v2"))
ds = ds.map_batches(lambda df: df.assign(result=df["max(v1)"] - df["min(v2)"]))
return ds


def h2oai_q8(ds: Dataset) -> Dataset:
def accumulate_block(agg: Tuple[float, float], block: Block) -> Tuple[float, float]:
column = block["v3"]
top_k_indices = pac.top_k_unstable(column, k=2)
top_k_result = pac.take(column, top_k_indices).to_pylist()
top_k_result.extend([float("-inf")] * (2 - len(top_k_result)))
top_k_result = (top_k_result[0], top_k_result[1])
return merge(agg, top_k_result)

def merge(
agg1: Tuple[float, float],
agg2: Tuple[float, float],
) -> Tuple[float, float]:
if agg1[0] >= agg2[0]:
value1 = agg1[0]
value2 = max(agg1[1], agg2[0])
else:
value1 = agg2[0]
value2 = max(agg1[0], agg2[1])
return (value1, value2)

class Top2(_AggregateOnKeyBase):
def __init__(self, on: KeyFn):
self._set_key_fn(on)
super().__init__(
init=lambda _: (float("-inf"), float("-inf")),
merge=merge,
accumulate_block=accumulate_block,
name=(f"top2({str(on)})"),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be a good idea to look at porting this to a custom Polars aggregation once that integration is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, also plan to enable Polars later.


return ds.groupby("id6").aggregate(Top2("v3"))


def h2oai_q9(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id2", "id4"]).aggregate(pow(corr("v1", "v2"), 2))
raise NotImplementedError


def h2oai_q10(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id1", "id2", "id3", "id4", "id5", "id6"])
# .aggregate(Count(), Sum("v3"))
raise NotImplementedError


if __name__ == "__main__":
benchmark = Benchmark("aggregate")

run_h2oai(benchmark)

benchmark.write_result()
15 changes: 15 additions & 0 deletions release/nightly_tests/dataset/aggregate_benchmark_compute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 0

head_node_type:
name: head_node
instance_type: m5.4xlarge

worker_node_types:
- name: worker_node
instance_type: m5.4xlarge
max_workers: 0
min_workers: 0
use_spot: false
57 changes: 57 additions & 0 deletions release/nightly_tests/dataset/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import gc
import json
import os
import time
from typing import Callable

from ray.data.dataset import Dataset


class Benchmark:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the scope of the Benchmark? IIUC it's a benchmark for Dataset transformations, if so maybe make it more clear about that. Also good to mention that if it's applicable for both local and distribute benchmarking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the scope of Benchmark should cover all of data-related benchmark (dataset, dataset pipeline, transform, action, etc), there's no restrict to be used only for dataset transformation. It works for both local and distribute benchmarking. let me add more documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianoaix - updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fn is Dataset-to-Dataset mapping, it's basically a transform? Like iter_batches(), min/max etc are not covered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's just for easy to retrieve the statistics by returning another Dataset. You can do arbitrary logic inside benchmark:

def fn(input_ds):
    input_ds.iter_batches(...)
    input_ds.min()
    input_ds.max()
    return the_ds_you_care_for_stats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also just to add - the parameter to fn can be everything, so we are not bound to pass a Dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment about what's expected to return for fn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianoaix - sure, added. Also run(fn: Callable[..., Dataset]) has function's expected return type.

"""Benchmark class used for Ray Datasets.

Call ``run(fn)`` to benchmark a specific piece of code/function.
Call ``write_result()`` to write benchmark result in file.
Result can be rendered in dashboard later through other tool.
We should use this class for any benchmark related to Ray Datasets.
It works for both local and distribute benchmarking.

A typical workflow would be:

benchmark = Benchmark(...)

# set up (such as input read or generation)
...

benchmark.run(..., fn_1)
benchmark.run(..., fn_2)
benchmark.run(..., fn_3)

benchmark.write_result()

See example usage in ``aggregate_benchmark.py``.
"""

def __init__(self, name):
self.name = name
self.result = {}
print(f"Running benchmark: {name}")

def run(self, name: str, fn: Callable[..., Dataset], **fn_run_args):
gc.collect()

print(f"Running case: {name}")
start_time = time.perf_counter()
output_ds = fn(**fn_run_args)
Copy link
Contributor Author

@c21 c21 Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to run the benchmark multiple times to reduce noise. It's easy to add later to have a for-loop on this. Right now the aggregate benchmark does not have any noise worth to rerun.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. This needs fn to be stateless/side-effect free.

output_ds.fully_executed()
duration = time.perf_counter() - start_time

# TODO(chengsu): Record more metrics based on dataset stats.
self.result[name] = {"time": duration}
print(f"Result of case {name}: {self.result[name]}")

def write_result(self):
test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/result.json")
with open(test_output_json, "w") as f:
f.write(json.dumps(self.result))
print(f"Finish benchmark: {self.name}")
17 changes: 17 additions & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4198,6 +4198,23 @@
type: sdk_command
file_manager: sdk

- name: aggregate_benchmark
group: core-dataset-tests
working_dir: nightly_tests/dataset

frequency: multi
team: data
cluster:
cluster_env: app_config.yaml
cluster_compute: aggregate_benchmark_compute.yaml

run:
timeout: 1800
script: python aggregate_benchmark.py

type: sdk_command
file_manager: sdk

- name: pipelined_training_50_gb
group: core-dataset-tests
working_dir: nightly_tests/dataset
Expand Down