From dba931fa7fc276fbecfdaff44cd9bc32d2dca98a Mon Sep 17 00:00:00 2001 From: Kev Wang Date: Thu, 19 Sep 2024 11:11:03 -0700 Subject: [PATCH] [CHORE] Change TPC-H q4 and q22 answers to use new join types (#2756) Also adds some functionality to the benchmarking code for manual runs Blocked by #2743 for compatibility with new executor --- benchmarking/tpch/__main__.py | 26 ++++++++++++++++++-------- benchmarking/tpch/answers.py | 11 ++++------- benchmarking/tpch/ray_job_runner.py | 2 +- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/benchmarking/tpch/__main__.py b/benchmarking/tpch/__main__.py index 6a7d24b290..33cc847c9c 100644 --- a/benchmarking/tpch/__main__.py +++ b/benchmarking/tpch/__main__.py @@ -123,7 +123,7 @@ def _get_df(table_name: str) -> DataFrame: def run_all_benchmarks( parquet_folder: str, - skip_questions: set[int], + questions: list[int], csv_output_location: str | None, ray_job_dashboard_url: str | None = None, requirements: str | None = None, @@ -133,11 +133,7 @@ def run_all_benchmarks( daft_context = get_context() metrics_builder = MetricsBuilder(daft_context.runner_config.name) - for i in range(1, 23): - if i in skip_questions: - logger.warning("Skipping TPC-H q%s", i) - continue - + for i in questions: # Run as a Ray Job if dashboard URL is provided if ray_job_dashboard_url is not None: from benchmarking.tpch import ray_job_runner @@ -202,7 +198,10 @@ def get_ray_runtime_env(requirements: str | None) -> dict: runtime_env = { "py_modules": [daft], "eager_install": True, - "env_vars": {"DAFT_PROGRESS_BAR": "0"}, + "env_vars": { + "DAFT_PROGRESS_BAR": "0", + "DAFT_RUNNER": "ray", + }, } if requirements: runtime_env.update({"pip": requirements}) @@ -266,6 +265,7 @@ def warm_up_function(): parser.add_argument( "--num_parts", default=None, help="Number of parts to generate (defaults to 1 part per GB)", type=int ) + parser.add_argument("--questions", type=str, default=None, help="Comma-separated list of questions to run") parser.add_argument("--skip_questions", type=str, default=None, help="Comma-separated list of questions to skip") parser.add_argument("--output_csv", default=None, type=str, help="Location to output CSV file") parser.add_argument( @@ -310,9 +310,19 @@ def warm_up_function(): else: warmup_environment(args.requirements, parquet_folder) + if args.skip_questions is not None: + if args.questions is not None: + raise ValueError("Cannot specify both --questions and --skip_questions") + skip_questions = {int(s) for s in args.skip_questions.split(",")} + questions = [q for q in range(1, MetricsBuilder.NUM_TPCH_QUESTIONS + 1) if q not in skip_questions] + elif args.questions is not None: + questions = sorted(set(int(s) for s in args.questions.split(","))) + else: + questions = list(range(1, MetricsBuilder.NUM_TPCH_QUESTIONS + 1)) + run_all_benchmarks( parquet_folder, - skip_questions={int(s) for s in args.skip_questions.split(",")} if args.skip_questions is not None else set(), + questions=questions, csv_output_location=args.output_csv, ray_job_dashboard_url=args.ray_job_dashboard_url, requirements=args.requirements, diff --git a/benchmarking/tpch/answers.py b/benchmarking/tpch/answers.py index 90e8b212c7..3c7577f665 100644 --- a/benchmarking/tpch/answers.py +++ b/benchmarking/tpch/answers.py @@ -106,12 +106,12 @@ def q4(get_df: GetDFFunc) -> DataFrame: (col("O_ORDERDATE") >= datetime.date(1993, 7, 1)) & (col("O_ORDERDATE") < datetime.date(1993, 10, 1)) ) - lineitems = lineitems.where(col("L_COMMITDATE") < col("L_RECEIPTDATE")).select(col("L_ORDERKEY")).distinct() + lineitems = lineitems.where(col("L_COMMITDATE") < col("L_RECEIPTDATE")) daft_df = ( - lineitems.join(orders, left_on=col("L_ORDERKEY"), right_on=col("O_ORDERKEY")) + orders.join(lineitems, left_on=col("O_ORDERKEY"), right_on=col("L_ORDERKEY"), how="semi") .groupby(col("O_ORDERPRIORITY")) - .agg(col("L_ORDERKEY").count().alias("order_count")) + .agg(col("O_ORDERKEY").count().alias("order_count")) .sort(col("O_ORDERPRIORITY")) ) return daft_df @@ -660,11 +660,8 @@ def q22(get_df: GetDFFunc) -> DataFrame: res_1.where(col("C_ACCTBAL") > 0).agg(col("C_ACCTBAL").mean().alias("avg_acctbal")).with_column("lit", lit(1)) ) - res_3 = orders.select("O_CUSTKEY") - daft_df = ( - res_1.join(res_3, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left") - .where(col("O_CUSTKEY").is_null()) + res_1.join(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="anti") .with_column("lit", lit(1)) .join(res_2, on="lit") .where(col("C_ACCTBAL") > col("avg_acctbal")) diff --git a/benchmarking/tpch/ray_job_runner.py b/benchmarking/tpch/ray_job_runner.py index e6163e8fe1..42fcfc96cf 100644 --- a/benchmarking/tpch/ray_job_runner.py +++ b/benchmarking/tpch/ray_job_runner.py @@ -57,7 +57,7 @@ def ray_job_params( ) -> dict: return dict( submission_id=f"tpch-q{tpch_qnum}-{str(uuid.uuid4())[:4]}", - entrypoint=f"python {str(entrypoint.relative_to(working_dir))} --parquet-folder {parquet_folder_path} --question-number {tpch_qnum}", + entrypoint=f"python3 {str(entrypoint.relative_to(working_dir))} --parquet-folder {parquet_folder_path} --question-number {tpch_qnum}", runtime_env={ "working_dir": str(working_dir), **runtime_env,