From a32fb657d1caec634cb53979b2f7ef2fad224905 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 9 Nov 2022 03:24:56 +0300 Subject: [PATCH] Combined TPCH runs & uniformed summaries for benchmarks (#4128) --- benchmarks/README.md | 11 ++++ benchmarks/src/bin/tpch.rs | 117 ++++++++++++++++++++++++++++--------- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 65296da8de2e..f2f581fd2ae8 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -49,6 +49,11 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `./d cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` +If you omit `--query=` argument, then all benchmarks will be run one by one (from query 1 to query 22). +```bash +cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096 +``` + You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`: ``` @@ -69,6 +74,12 @@ cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parq Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`. +### Machine readable benchmark summary + +Any `tpch` execution with `-o ` argument will produce a summary file right under the `` +directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata +(number of cores, DataFusion version, etc.). + ## Expected output The result of query 1 should produce the following output when executed against the SF=1 dataset. diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 33a0443eedbf..f0977fe3267f 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -62,9 +62,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; #[derive(Debug, StructOpt, Clone)] struct DataFusionBenchmarkOpt { - /// Query number + /// Query number. If not specified, runs all queries #[structopt(short, long)] - query: usize, + query: Option, /// Activate debug mode to see query results #[structopt(short, long)] @@ -182,9 +182,37 @@ async fn main() -> Result<()> { } } -async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { +const TPCH_QUERY_START_ID: usize = 1; +const TPCH_QUERY_END_ID: usize = 22; + +async fn benchmark_datafusion( + opt: DataFusionBenchmarkOpt, +) -> Result>> { println!("Running benchmarks with the following options: {:?}", opt); - let mut benchmark_run = BenchmarkRun::new(opt.query); + let query_range = match opt.query { + Some(query_id) => query_id..=query_id, + None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID, + }; + + let mut benchmark_run = BenchmarkRun::new(); + let mut results = vec![]; + for query_id in query_range { + let (query_run, result) = benchmark_query(&opt, query_id).await?; + results.push(result); + benchmark_run.add_query(query_run); + } + + if let Some(path) = &opt.output_path { + write_summary_json(&mut benchmark_run, path)?; + } + Ok(results) +} + +async fn benchmark_query( + opt: &DataFusionBenchmarkOpt, + query_id: usize, +) -> Result<(QueryRun, Vec)> { + let mut benchmark_run = QueryRun::new(query_id); let config = SessionConfig::new() .with_target_partitions(opt.partitions) .with_batch_size(opt.batch_size) @@ -192,7 +220,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result Result() / millis.len() as f64; - println!("Query {} avg time: {:.2} ms", opt.query, avg); + println!("Query {} avg time: {:.2} ms", query_id, avg); - if let Some(path) = &opt.output_path { - write_summary_json(&mut benchmark_run, path)?; - } - - Ok(result) + Ok((benchmark_run, result)) } #[allow(clippy::await_holding_lock)] @@ -278,10 +302,7 @@ async fn register_tables( fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> { let json = serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable"); - let filename = format!( - "tpch-q{}-{}.json", - benchmark_run.query, benchmark_run.start_time - ); + let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time); let path = path.join(filename); println!( "Writing summary file to {}", @@ -391,7 +412,7 @@ async fn get_table( } #[derive(Debug, Serialize)] -struct BenchmarkRun { +struct RunContext { /// Benchmark crate version benchmark_version: String, /// DataFusion crate version @@ -402,14 +423,10 @@ struct BenchmarkRun { start_time: u64, /// CLI arguments arguments: Vec, - /// query number - query: usize, - /// list of individual run times and row counts - iterations: Vec, } -impl BenchmarkRun { - fn new(query: usize) -> Self { +impl RunContext { + fn new() -> Self { Self { benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), datafusion_version: DATAFUSION_VERSION.to_owned(), @@ -422,8 +439,50 @@ impl BenchmarkRun { .skip(1) .into_iter() .collect::>(), + } + } +} + +#[derive(Debug, Serialize)] +struct BenchmarkRun { + /// Information regarding the environment in which the benchmark was run + context: RunContext, + /// Per-query summaries + queries: Vec, +} + +impl BenchmarkRun { + fn new() -> Self { + Self { + context: RunContext::new(), + queries: vec![], + } + } + + fn add_query(&mut self, query: QueryRun) { + self.queries.push(query) + } +} + +#[derive(Debug, Serialize)] +struct QueryRun { + /// query number + query: usize, + /// list of individual run times and row counts + iterations: Vec, + /// Start time + start_time: u64, +} + +impl QueryRun { + fn new(query: usize) -> Self { + Self { query, iterations: vec![], + start_time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time is later than UNIX_EPOCH") + .as_secs(), } } @@ -775,7 +834,7 @@ mod ci { let ctx = SessionContext::default(); let path = get_tpch_data_path()?; let opt = DataFusionBenchmarkOpt { - query, + query: Some(query), debug: false, iterations: 1, partitions: 2, @@ -1087,7 +1146,7 @@ mod ci { // run the query to compute actual results of the query let opt = DataFusionBenchmarkOpt { - query: n, + query: Some(n), debug: false, iterations: 1, partitions: 2, @@ -1098,8 +1157,10 @@ mod ci { output_path: None, disable_statistics: false, }; - let actual = benchmark_datafusion(opt).await?; + let mut results = benchmark_datafusion(opt).await?; + assert_eq!(results.len(), 1); + let actual = results.remove(0); let transformed = transform_actual_result(actual, n).await?; // assert schema data types match