Skip to content

Commit

Permalink
[Datasets] Add initial aggregate benchmark (#28486)
Browse files Browse the repository at this point in the history
This PR is to add initial aggregate benchmark (for h2oai benchmark - https://github.com/h2oai/db-benchmark). To follow the convention in h2oai benchmark, the benchmark is run on single node (https://h2oai.github.io/db-benchmark/#environment-configuration). No fandamental blocker for us to run the benchmark in multiple nodes (just a matter to change our `yaml` file). The benchmark has 3 input files setting - 0.5GB, 5GB and 50GB. Here we start with 0.5GB input file. Followup PR will add benchmark for 5GB and 50GB (just a matter to generate input file, no benchmark code change needed).

NOTE: Didn't optimize the benchmark queries yet, and just write the most straight-forward version of code here. We can use this as a baseline to fix the performance gap and optimize it.

A typical benchmark workflow would be:
1. Create `xxx_benchmark.py` file for the specific APIs to benchmark (e.g. `split_benchmark.py` for split-related APIs).
2. Use `Benchmark` class to run benchmark.
3. Check in benchmark code after testing locally and workspace.
4. Monitor nightly tests result.
5. Create Preset/Databricks dashboard and alert on benchmark result.
  • Loading branch information
c21 authored Sep 22, 2022
1 parent 9c2abf9 commit db2ce69
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 0 deletions.
Empty file.
132 changes: 132 additions & 0 deletions release/nightly_tests/dataset/aggregate_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import Tuple

import ray
from ray.data.aggregate import _AggregateOnKeyBase, Max, Mean, Min, Sum
from ray.data.block import Block, KeyFn
from ray.data.dataset import Dataset
import pyarrow.compute as pac

from benchmark import Benchmark


def run_h2oai(benchmark: Benchmark):
"""This benchmark is originally from https://github.com/h2oai/db-benchmark
Here we run all group-by queries from the benchmark on Ray Datasets.
The input files are pre-generated and stored in AWS S3 beforehand.
"""

# Test input file schema={
# id1: string, id2: string, id3: string, id4: int64, id5: int64, id6: int64,
# v1: int64, v2: int64, v3: double
# })
test_input = [
("s3://air-example-data/h2oai_benchmark/G1_1e7_1e2_0_0.csv", "h2oai-500M")
]
for path, test_name in test_input:
input_ds = ray.data.read_csv(path)
# Number of blocks (parallelism) should be set as number of available CPUs
# to get best performance.
num_blocks = int(ray.cluster_resources().get("CPU", 1))
input_ds = input_ds.repartition(num_blocks).fully_executed()

q_list = [
(h2oai_q1, "q1"),
(h2oai_q3, "q3"),
(h2oai_q4, "q4"),
(h2oai_q5, "q5"),
(h2oai_q7, "q7"),
(h2oai_q8, "q8"),
]

for q, name in q_list:
benchmark.run(f"{test_name}-{name}", q, ds=input_ds)


def h2oai_q1(ds: Dataset) -> Dataset:
return ds.groupby("id1").sum("v1")


def h2oai_q2(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id1", "id2"]).sum("v1")
raise NotImplementedError


def h2oai_q3(ds: Dataset) -> Dataset:
return ds.groupby("id3").aggregate(Sum("v1"), Mean("v3"))


def h2oai_q4(ds: Dataset) -> Dataset:
return ds.groupby("id4").aggregate(Mean("v1"), Mean("v2"), Mean("v3"))


def h2oai_q5(ds: Dataset) -> Dataset:
return ds.groupby("id6").aggregate(Sum("v1"), Sum("v2"), Sum("v3"))


def h2oai_q6(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id4", "id5"]).aggregate(Median("v3"), Std("v3"))
raise NotImplementedError


def h2oai_q7(ds: Dataset) -> Dataset:
ds = ds.groupby("id3").aggregate(Max("v1"), Min("v2"))
ds = ds.map_batches(lambda df: df.assign(result=df["max(v1)"] - df["min(v2)"]))
return ds


def h2oai_q8(ds: Dataset) -> Dataset:
def accumulate_block(agg: Tuple[float, float], block: Block) -> Tuple[float, float]:
column = block["v3"]
top_k_indices = pac.top_k_unstable(column, k=2)
top_k_result = pac.take(column, top_k_indices).to_pylist()
top_k_result.extend([float("-inf")] * (2 - len(top_k_result)))
top_k_result = (top_k_result[0], top_k_result[1])
return merge(agg, top_k_result)

def merge(
agg1: Tuple[float, float],
agg2: Tuple[float, float],
) -> Tuple[float, float]:
if agg1[0] >= agg2[0]:
value1 = agg1[0]
value2 = max(agg1[1], agg2[0])
else:
value1 = agg2[0]
value2 = max(agg1[0], agg2[1])
return (value1, value2)

class Top2(_AggregateOnKeyBase):
def __init__(self, on: KeyFn):
self._set_key_fn(on)
super().__init__(
init=lambda _: (float("-inf"), float("-inf")),
merge=merge,
accumulate_block=accumulate_block,
name=(f"top2({str(on)})"),
)

return ds.groupby("id6").aggregate(Top2("v3"))


def h2oai_q9(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id2", "id4"]).aggregate(pow(corr("v1", "v2"), 2))
raise NotImplementedError


def h2oai_q10(ds: Dataset) -> Dataset:
# TODO(chengsu): Run this after dataset supports multiple group-by keys.
# return ds.groupby(["id1", "id2", "id3", "id4", "id5", "id6"])
# .aggregate(Count(), Sum("v3"))
raise NotImplementedError


if __name__ == "__main__":
benchmark = Benchmark("aggregate")

run_h2oai(benchmark)

benchmark.write_result()
15 changes: 15 additions & 0 deletions release/nightly_tests/dataset/aggregate_benchmark_compute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 0

head_node_type:
name: head_node
instance_type: m5.4xlarge

worker_node_types:
- name: worker_node
instance_type: m5.4xlarge
max_workers: 0
min_workers: 0
use_spot: false
61 changes: 61 additions & 0 deletions release/nightly_tests/dataset/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import gc
import json
import os
import time
from typing import Callable

from ray.data.dataset import Dataset


class Benchmark:
"""Benchmark class used for Ray Datasets.
Call ``run(fn)`` to benchmark a specific piece of code/function.
``fn`` is expected to return the final Dataset. Benchmark ensures
final Dataset is fully executed. Plan to add Dataset statistics
logging in the future.
Call ``write_result()`` to write benchmark result in file.
Result can be rendered in dashboard later through other tool.
We should use this class for any benchmark related to Ray Datasets.
It works for both local and distribute benchmarking.
A typical workflow would be:
benchmark = Benchmark(...)
# set up (such as input read or generation)
...
benchmark.run(..., fn_1)
benchmark.run(..., fn_2)
benchmark.run(..., fn_3)
benchmark.write_result()
See example usage in ``aggregate_benchmark.py``.
"""

def __init__(self, name):
self.name = name
self.result = {}
print(f"Running benchmark: {name}")

def run(self, name: str, fn: Callable[..., Dataset], **fn_run_args):
gc.collect()

print(f"Running case: {name}")
start_time = time.perf_counter()
output_ds = fn(**fn_run_args)
output_ds.fully_executed()
duration = time.perf_counter() - start_time

# TODO(chengsu): Record more metrics based on dataset stats.
self.result[name] = {"time": duration}
print(f"Result of case {name}: {self.result[name]}")

def write_result(self):
test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/result.json")
with open(test_output_json, "w") as f:
f.write(json.dumps(self.result))
print(f"Finish benchmark: {self.name}")
17 changes: 17 additions & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4200,6 +4200,23 @@
type: sdk_command
file_manager: sdk

- name: aggregate_benchmark
group: core-dataset-tests
working_dir: nightly_tests/dataset

frequency: multi
team: data
cluster:
cluster_env: app_config.yaml
cluster_compute: aggregate_benchmark_compute.yaml

run:
timeout: 1800
script: python aggregate_benchmark.py

type: sdk_command
file_manager: sdk

- name: pipelined_training_50_gb
group: core-dataset-tests
working_dir: nightly_tests/dataset
Expand Down

0 comments on commit db2ce69

Please sign in to comment.