diff --git a/release/nightly_tests/dataset/__init__.py b/release/nightly_tests/dataset/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/release/nightly_tests/dataset/aggregate_benchmark.py b/release/nightly_tests/dataset/aggregate_benchmark.py new file mode 100644 index 000000000000..a8d6110143a7 --- /dev/null +++ b/release/nightly_tests/dataset/aggregate_benchmark.py @@ -0,0 +1,132 @@ +from typing import Tuple + +import ray +from ray.data.aggregate import _AggregateOnKeyBase, Max, Mean, Min, Sum +from ray.data.block import Block, KeyFn +from ray.data.dataset import Dataset +import pyarrow.compute as pac + +from benchmark import Benchmark + + +def run_h2oai(benchmark: Benchmark): + """This benchmark is originally from https://github.com/h2oai/db-benchmark + + Here we run all group-by queries from the benchmark on Ray Datasets. + The input files are pre-generated and stored in AWS S3 beforehand. + """ + + # Test input file schema={ + # id1: string, id2: string, id3: string, id4: int64, id5: int64, id6: int64, + # v1: int64, v2: int64, v3: double + # }) + test_input = [ + ("s3://air-example-data/h2oai_benchmark/G1_1e7_1e2_0_0.csv", "h2oai-500M") + ] + for path, test_name in test_input: + input_ds = ray.data.read_csv(path) + # Number of blocks (parallelism) should be set as number of available CPUs + # to get best performance. + num_blocks = int(ray.cluster_resources().get("CPU", 1)) + input_ds = input_ds.repartition(num_blocks).fully_executed() + + q_list = [ + (h2oai_q1, "q1"), + (h2oai_q3, "q3"), + (h2oai_q4, "q4"), + (h2oai_q5, "q5"), + (h2oai_q7, "q7"), + (h2oai_q8, "q8"), + ] + + for q, name in q_list: + benchmark.run(f"{test_name}-{name}", q, ds=input_ds) + + +def h2oai_q1(ds: Dataset) -> Dataset: + return ds.groupby("id1").sum("v1") + + +def h2oai_q2(ds: Dataset) -> Dataset: + # TODO(chengsu): Run this after dataset supports multiple group-by keys. + # return ds.groupby(["id1", "id2"]).sum("v1") + raise NotImplementedError + + +def h2oai_q3(ds: Dataset) -> Dataset: + return ds.groupby("id3").aggregate(Sum("v1"), Mean("v3")) + + +def h2oai_q4(ds: Dataset) -> Dataset: + return ds.groupby("id4").aggregate(Mean("v1"), Mean("v2"), Mean("v3")) + + +def h2oai_q5(ds: Dataset) -> Dataset: + return ds.groupby("id6").aggregate(Sum("v1"), Sum("v2"), Sum("v3")) + + +def h2oai_q6(ds: Dataset) -> Dataset: + # TODO(chengsu): Run this after dataset supports multiple group-by keys. + # return ds.groupby(["id4", "id5"]).aggregate(Median("v3"), Std("v3")) + raise NotImplementedError + + +def h2oai_q7(ds: Dataset) -> Dataset: + ds = ds.groupby("id3").aggregate(Max("v1"), Min("v2")) + ds = ds.map_batches(lambda df: df.assign(result=df["max(v1)"] - df["min(v2)"])) + return ds + + +def h2oai_q8(ds: Dataset) -> Dataset: + def accumulate_block(agg: Tuple[float, float], block: Block) -> Tuple[float, float]: + column = block["v3"] + top_k_indices = pac.top_k_unstable(column, k=2) + top_k_result = pac.take(column, top_k_indices).to_pylist() + top_k_result.extend([float("-inf")] * (2 - len(top_k_result))) + top_k_result = (top_k_result[0], top_k_result[1]) + return merge(agg, top_k_result) + + def merge( + agg1: Tuple[float, float], + agg2: Tuple[float, float], + ) -> Tuple[float, float]: + if agg1[0] >= agg2[0]: + value1 = agg1[0] + value2 = max(agg1[1], agg2[0]) + else: + value1 = agg2[0] + value2 = max(agg1[0], agg2[1]) + return (value1, value2) + + class Top2(_AggregateOnKeyBase): + def __init__(self, on: KeyFn): + self._set_key_fn(on) + super().__init__( + init=lambda _: (float("-inf"), float("-inf")), + merge=merge, + accumulate_block=accumulate_block, + name=(f"top2({str(on)})"), + ) + + return ds.groupby("id6").aggregate(Top2("v3")) + + +def h2oai_q9(ds: Dataset) -> Dataset: + # TODO(chengsu): Run this after dataset supports multiple group-by keys. + # return ds.groupby(["id2", "id4"]).aggregate(pow(corr("v1", "v2"), 2)) + raise NotImplementedError + + +def h2oai_q10(ds: Dataset) -> Dataset: + # TODO(chengsu): Run this after dataset supports multiple group-by keys. + # return ds.groupby(["id1", "id2", "id3", "id4", "id5", "id6"]) + # .aggregate(Count(), Sum("v3")) + raise NotImplementedError + + +if __name__ == "__main__": + benchmark = Benchmark("aggregate") + + run_h2oai(benchmark) + + benchmark.write_result() diff --git a/release/nightly_tests/dataset/aggregate_benchmark_compute.yaml b/release/nightly_tests/dataset/aggregate_benchmark_compute.yaml new file mode 100644 index 000000000000..d01218e7e374 --- /dev/null +++ b/release/nightly_tests/dataset/aggregate_benchmark_compute.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: m5.4xlarge + +worker_node_types: + - name: worker_node + instance_type: m5.4xlarge + max_workers: 0 + min_workers: 0 + use_spot: false diff --git a/release/nightly_tests/dataset/benchmark.py b/release/nightly_tests/dataset/benchmark.py new file mode 100644 index 000000000000..391c07147cba --- /dev/null +++ b/release/nightly_tests/dataset/benchmark.py @@ -0,0 +1,61 @@ +import gc +import json +import os +import time +from typing import Callable + +from ray.data.dataset import Dataset + + +class Benchmark: + """Benchmark class used for Ray Datasets. + + Call ``run(fn)`` to benchmark a specific piece of code/function. + ``fn`` is expected to return the final Dataset. Benchmark ensures + final Dataset is fully executed. Plan to add Dataset statistics + logging in the future. + + Call ``write_result()`` to write benchmark result in file. + Result can be rendered in dashboard later through other tool. + We should use this class for any benchmark related to Ray Datasets. + It works for both local and distribute benchmarking. + + A typical workflow would be: + + benchmark = Benchmark(...) + + # set up (such as input read or generation) + ... + + benchmark.run(..., fn_1) + benchmark.run(..., fn_2) + benchmark.run(..., fn_3) + + benchmark.write_result() + + See example usage in ``aggregate_benchmark.py``. + """ + + def __init__(self, name): + self.name = name + self.result = {} + print(f"Running benchmark: {name}") + + def run(self, name: str, fn: Callable[..., Dataset], **fn_run_args): + gc.collect() + + print(f"Running case: {name}") + start_time = time.perf_counter() + output_ds = fn(**fn_run_args) + output_ds.fully_executed() + duration = time.perf_counter() - start_time + + # TODO(chengsu): Record more metrics based on dataset stats. + self.result[name] = {"time": duration} + print(f"Result of case {name}: {self.result[name]}") + + def write_result(self): + test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/result.json") + with open(test_output_json, "w") as f: + f.write(json.dumps(self.result)) + print(f"Finish benchmark: {self.name}") diff --git a/release/release_tests.yaml b/release/release_tests.yaml index db2fa04e1668..601e0061dc32 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4198,6 +4198,23 @@ type: sdk_command file_manager: sdk +- name: aggregate_benchmark + group: core-dataset-tests + working_dir: nightly_tests/dataset + + frequency: multi + team: data + cluster: + cluster_env: app_config.yaml + cluster_compute: aggregate_benchmark_compute.yaml + + run: + timeout: 1800 + script: python aggregate_benchmark.py + + type: sdk_command + file_manager: sdk + - name: pipelined_training_50_gb group: core-dataset-tests working_dir: nightly_tests/dataset