Skip to content

Commit

Permalink
Combined TPCH runs & uniformed summaries for benchmarks (#4128)
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical authored Nov 9, 2022
1 parent b58ec81 commit a32fb65
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 28 deletions.
11 changes: 11 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `./d
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```

If you omit `--query=<query_id>` argument, then all benchmarks will be run one by one (from query 1 to query 22).
```bash
cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096
```

You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:

```
Expand All @@ -69,6 +74,12 @@ cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parq

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Machine readable benchmark summary

Any `tpch` execution with `-o <dir>` argument will produce a summary file right under the `<dir>`
directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata
(number of cores, DataFusion version, etc.).

## Expected output

The result of query 1 should produce the following output when executed against the SF=1 dataset.
Expand Down
117 changes: 89 additions & 28 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[derive(Debug, StructOpt, Clone)]
struct DataFusionBenchmarkOpt {
/// Query number
/// Query number. If not specified, runs all queries
#[structopt(short, long)]
query: usize,
query: Option<usize>,

/// Activate debug mode to see query results
#[structopt(short, long)]
Expand Down Expand Up @@ -182,29 +182,57 @@ async fn main() -> Result<()> {
}
}

async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
const TPCH_QUERY_START_ID: usize = 1;
const TPCH_QUERY_END_ID: usize = 22;

async fn benchmark_datafusion(
opt: DataFusionBenchmarkOpt,
) -> Result<Vec<Vec<RecordBatch>>> {
println!("Running benchmarks with the following options: {:?}", opt);
let mut benchmark_run = BenchmarkRun::new(opt.query);
let query_range = match opt.query {
Some(query_id) => query_id..=query_id,
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
};

let mut benchmark_run = BenchmarkRun::new();
let mut results = vec![];
for query_id in query_range {
let (query_run, result) = benchmark_query(&opt, query_id).await?;
results.push(result);
benchmark_run.add_query(query_run);
}

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}
Ok(results)
}

async fn benchmark_query(
opt: &DataFusionBenchmarkOpt,
query_id: usize,
) -> Result<(QueryRun, Vec<RecordBatch>)> {
let mut benchmark_run = QueryRun::new(query_id);
let config = SessionConfig::new()
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size)
.with_collect_statistics(!opt.disable_statistics);
let ctx = SessionContext::with_config(config);

// register tables
register_tables(&opt, &ctx).await?;
register_tables(opt, &ctx).await?;

let mut millis = vec![];
// run benchmark
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
for i in 0..opt.iterations {
let start = Instant::now();

let sql = &get_query_sql(opt.query)?;
let sql = &get_query_sql(query_id)?;

// query 15 is special, with 3 statements. the second statement is the one from which we
// want to capture the results
if opt.query == 15 {
if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
result = execute_query(&ctx, query, opt.debug).await?;
Expand All @@ -223,19 +251,15 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
opt.query, i, elapsed, row_count
query_id, i, elapsed, row_count
);
benchmark_run.add_result(elapsed, row_count);
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);
println!("Query {} avg time: {:.2} ms", query_id, avg);

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}

Ok(result)
Ok((benchmark_run, result))
}

#[allow(clippy::await_holding_lock)]
Expand Down Expand Up @@ -278,10 +302,7 @@ async fn register_tables(
fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
let json =
serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
let filename = format!(
"tpch-q{}-{}.json",
benchmark_run.query, benchmark_run.start_time
);
let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time);
let path = path.join(filename);
println!(
"Writing summary file to {}",
Expand Down Expand Up @@ -391,7 +412,7 @@ async fn get_table(
}

#[derive(Debug, Serialize)]
struct BenchmarkRun {
struct RunContext {
/// Benchmark crate version
benchmark_version: String,
/// DataFusion crate version
Expand All @@ -402,14 +423,10 @@ struct BenchmarkRun {
start_time: u64,
/// CLI arguments
arguments: Vec<String>,
/// query number
query: usize,
/// list of individual run times and row counts
iterations: Vec<QueryResult>,
}

impl BenchmarkRun {
fn new(query: usize) -> Self {
impl RunContext {
fn new() -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
Expand All @@ -422,8 +439,50 @@ impl BenchmarkRun {
.skip(1)
.into_iter()
.collect::<Vec<String>>(),
}
}
}

#[derive(Debug, Serialize)]
struct BenchmarkRun {
/// Information regarding the environment in which the benchmark was run
context: RunContext,
/// Per-query summaries
queries: Vec<QueryRun>,
}

impl BenchmarkRun {
fn new() -> Self {
Self {
context: RunContext::new(),
queries: vec![],
}
}

fn add_query(&mut self, query: QueryRun) {
self.queries.push(query)
}
}

#[derive(Debug, Serialize)]
struct QueryRun {
/// query number
query: usize,
/// list of individual run times and row counts
iterations: Vec<QueryResult>,
/// Start time
start_time: u64,
}

impl QueryRun {
fn new(query: usize) -> Self {
Self {
query,
iterations: vec![],
start_time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is later than UNIX_EPOCH")
.as_secs(),
}
}

Expand Down Expand Up @@ -775,7 +834,7 @@ mod ci {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
query,
query: Some(query),
debug: false,
iterations: 1,
partitions: 2,
Expand Down Expand Up @@ -1087,7 +1146,7 @@ mod ci {

// run the query to compute actual results of the query
let opt = DataFusionBenchmarkOpt {
query: n,
query: Some(n),
debug: false,
iterations: 1,
partitions: 2,
Expand All @@ -1098,8 +1157,10 @@ mod ci {
output_path: None,
disable_statistics: false,
};
let actual = benchmark_datafusion(opt).await?;
let mut results = benchmark_datafusion(opt).await?;
assert_eq!(results.len(), 1);

let actual = results.remove(0);
let transformed = transform_actual_result(actual, n).await?;

// assert schema data types match
Expand Down

0 comments on commit a32fb65

Please sign in to comment.