Skip to content

Commit

Permalink
[CHORE] Change TPC-H q4 and q22 answers to use new join types (#2756)
Browse files Browse the repository at this point in the history
Also adds some functionality to the benchmarking code for manual runs

Blocked by #2743 for compatibility with new executor
  • Loading branch information
kevinzwang authored Sep 19, 2024
1 parent 7ee5fda commit dba931f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
26 changes: 18 additions & 8 deletions benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _get_df(table_name: str) -> DataFrame:

def run_all_benchmarks(
parquet_folder: str,
skip_questions: set[int],
questions: list[int],
csv_output_location: str | None,
ray_job_dashboard_url: str | None = None,
requirements: str | None = None,
Expand All @@ -133,11 +133,7 @@ def run_all_benchmarks(
daft_context = get_context()
metrics_builder = MetricsBuilder(daft_context.runner_config.name)

for i in range(1, 23):
if i in skip_questions:
logger.warning("Skipping TPC-H q%s", i)
continue

for i in questions:
# Run as a Ray Job if dashboard URL is provided
if ray_job_dashboard_url is not None:
from benchmarking.tpch import ray_job_runner
Expand Down Expand Up @@ -202,7 +198,10 @@ def get_ray_runtime_env(requirements: str | None) -> dict:
runtime_env = {
"py_modules": [daft],
"eager_install": True,
"env_vars": {"DAFT_PROGRESS_BAR": "0"},
"env_vars": {
"DAFT_PROGRESS_BAR": "0",
"DAFT_RUNNER": "ray",
},
}
if requirements:
runtime_env.update({"pip": requirements})
Expand Down Expand Up @@ -266,6 +265,7 @@ def warm_up_function():
parser.add_argument(
"--num_parts", default=None, help="Number of parts to generate (defaults to 1 part per GB)", type=int
)
parser.add_argument("--questions", type=str, default=None, help="Comma-separated list of questions to run")
parser.add_argument("--skip_questions", type=str, default=None, help="Comma-separated list of questions to skip")
parser.add_argument("--output_csv", default=None, type=str, help="Location to output CSV file")
parser.add_argument(
Expand Down Expand Up @@ -310,9 +310,19 @@ def warm_up_function():
else:
warmup_environment(args.requirements, parquet_folder)

if args.skip_questions is not None:
if args.questions is not None:
raise ValueError("Cannot specify both --questions and --skip_questions")
skip_questions = {int(s) for s in args.skip_questions.split(",")}
questions = [q for q in range(1, MetricsBuilder.NUM_TPCH_QUESTIONS + 1) if q not in skip_questions]
elif args.questions is not None:
questions = sorted(set(int(s) for s in args.questions.split(",")))
else:
questions = list(range(1, MetricsBuilder.NUM_TPCH_QUESTIONS + 1))

run_all_benchmarks(
parquet_folder,
skip_questions={int(s) for s in args.skip_questions.split(",")} if args.skip_questions is not None else set(),
questions=questions,
csv_output_location=args.output_csv,
ray_job_dashboard_url=args.ray_job_dashboard_url,
requirements=args.requirements,
Expand Down
11 changes: 4 additions & 7 deletions benchmarking/tpch/answers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ def q4(get_df: GetDFFunc) -> DataFrame:
(col("O_ORDERDATE") >= datetime.date(1993, 7, 1)) & (col("O_ORDERDATE") < datetime.date(1993, 10, 1))
)

lineitems = lineitems.where(col("L_COMMITDATE") < col("L_RECEIPTDATE")).select(col("L_ORDERKEY")).distinct()
lineitems = lineitems.where(col("L_COMMITDATE") < col("L_RECEIPTDATE"))

daft_df = (
lineitems.join(orders, left_on=col("L_ORDERKEY"), right_on=col("O_ORDERKEY"))
orders.join(lineitems, left_on=col("O_ORDERKEY"), right_on=col("L_ORDERKEY"), how="semi")
.groupby(col("O_ORDERPRIORITY"))
.agg(col("L_ORDERKEY").count().alias("order_count"))
.agg(col("O_ORDERKEY").count().alias("order_count"))
.sort(col("O_ORDERPRIORITY"))
)
return daft_df
Expand Down Expand Up @@ -660,11 +660,8 @@ def q22(get_df: GetDFFunc) -> DataFrame:
res_1.where(col("C_ACCTBAL") > 0).agg(col("C_ACCTBAL").mean().alias("avg_acctbal")).with_column("lit", lit(1))
)

res_3 = orders.select("O_CUSTKEY")

daft_df = (
res_1.join(res_3, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left")
.where(col("O_CUSTKEY").is_null())
res_1.join(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="anti")
.with_column("lit", lit(1))
.join(res_2, on="lit")
.where(col("C_ACCTBAL") > col("avg_acctbal"))
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/tpch/ray_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def ray_job_params(
) -> dict:
return dict(
submission_id=f"tpch-q{tpch_qnum}-{str(uuid.uuid4())[:4]}",
entrypoint=f"python {str(entrypoint.relative_to(working_dir))} --parquet-folder {parquet_folder_path} --question-number {tpch_qnum}",
entrypoint=f"python3 {str(entrypoint.relative_to(working_dir))} --parquet-folder {parquet_folder_path} --question-number {tpch_qnum}",
runtime_env={
"working_dir": str(working_dir),
**runtime_env,
Expand Down

0 comments on commit dba931f

Please sign in to comment.