Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined TPCH runs & uniformed summaries for benchmarks #4128

Merged
merged 1 commit into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `./d
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```

If you omit `--query=<query_id>` argument, then all benchmarks will be run one by one (from query 1 to query 22).
```bash
cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096
```

You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:

```
Expand All @@ -69,6 +74,12 @@ cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parq

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Machine readable benchmark summary

Any `tpch` execution with `-o <dir>` argument will produce a summary file right under the `<dir>`
directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata
(number of cores, DataFusion version, etc.).

## Expected output

The result of query 1 should produce the following output when executed against the SF=1 dataset.
Expand Down
117 changes: 89 additions & 28 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[derive(Debug, StructOpt, Clone)]
struct DataFusionBenchmarkOpt {
/// Query number
/// Query number. If not specified, runs all queries
#[structopt(short, long)]
query: usize,
query: Option<usize>,

/// Activate debug mode to see query results
#[structopt(short, long)]
Expand Down Expand Up @@ -182,29 +182,57 @@ async fn main() -> Result<()> {
}
}

async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
const TPCH_QUERY_START_ID: usize = 1;
const TPCH_QUERY_END_ID: usize = 22;

async fn benchmark_datafusion(
opt: DataFusionBenchmarkOpt,
) -> Result<Vec<Vec<RecordBatch>>> {
println!("Running benchmarks with the following options: {:?}", opt);
let mut benchmark_run = BenchmarkRun::new(opt.query);
let query_range = match opt.query {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Some(query_id) => query_id..=query_id,
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
};

let mut benchmark_run = BenchmarkRun::new();
let mut results = vec![];
for query_id in query_range {
let (query_run, result) = benchmark_query(&opt, query_id).await?;
results.push(result);
benchmark_run.add_query(query_run);
}

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}
Ok(results)
}

async fn benchmark_query(
opt: &DataFusionBenchmarkOpt,
query_id: usize,
) -> Result<(QueryRun, Vec<RecordBatch>)> {
let mut benchmark_run = QueryRun::new(query_id);
let config = SessionConfig::new()
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size)
.with_collect_statistics(!opt.disable_statistics);
let ctx = SessionContext::with_config(config);

// register tables
register_tables(&opt, &ctx).await?;
register_tables(opt, &ctx).await?;

let mut millis = vec![];
// run benchmark
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
for i in 0..opt.iterations {
let start = Instant::now();

let sql = &get_query_sql(opt.query)?;
let sql = &get_query_sql(query_id)?;

// query 15 is special, with 3 statements. the second statement is the one from which we
// want to capture the results
if opt.query == 15 {
if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
result = execute_query(&ctx, query, opt.debug).await?;
Expand All @@ -223,19 +251,15 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
opt.query, i, elapsed, row_count
query_id, i, elapsed, row_count
);
benchmark_run.add_result(elapsed, row_count);
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);
println!("Query {} avg time: {:.2} ms", query_id, avg);

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}

Ok(result)
Ok((benchmark_run, result))
}

#[allow(clippy::await_holding_lock)]
Expand Down Expand Up @@ -278,10 +302,7 @@ async fn register_tables(
fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
let json =
serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
let filename = format!(
"tpch-q{}-{}.json",
benchmark_run.query, benchmark_run.start_time
);
let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time);
let path = path.join(filename);
println!(
"Writing summary file to {}",
Expand Down Expand Up @@ -391,7 +412,7 @@ async fn get_table(
}

#[derive(Debug, Serialize)]
struct BenchmarkRun {
struct RunContext {
/// Benchmark crate version
benchmark_version: String,
/// DataFusion crate version
Expand All @@ -402,14 +423,10 @@ struct BenchmarkRun {
start_time: u64,
/// CLI arguments
arguments: Vec<String>,
/// query number
query: usize,
/// list of individual run times and row counts
iterations: Vec<QueryResult>,
}

impl BenchmarkRun {
fn new(query: usize) -> Self {
impl RunContext {
fn new() -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
Expand All @@ -422,8 +439,50 @@ impl BenchmarkRun {
.skip(1)
.into_iter()
.collect::<Vec<String>>(),
}
}
}

#[derive(Debug, Serialize)]
struct BenchmarkRun {
/// Information regarding the environment in which the benchmark was run
context: RunContext,
/// Per-query summaries
queries: Vec<QueryRun>,
}

impl BenchmarkRun {
fn new() -> Self {
Self {
context: RunContext::new(),
queries: vec![],
}
}

fn add_query(&mut self, query: QueryRun) {
self.queries.push(query)
}
}

#[derive(Debug, Serialize)]
struct QueryRun {
/// query number
query: usize,
/// list of individual run times and row counts
iterations: Vec<QueryResult>,
/// Start time
start_time: u64,
}

impl QueryRun {
fn new(query: usize) -> Self {
Self {
query,
iterations: vec![],
start_time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is later than UNIX_EPOCH")
.as_secs(),
}
}

Expand Down Expand Up @@ -775,7 +834,7 @@ mod ci {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
query,
query: Some(query),
debug: false,
iterations: 1,
partitions: 2,
Expand Down Expand Up @@ -1087,7 +1146,7 @@ mod ci {

// run the query to compute actual results of the query
let opt = DataFusionBenchmarkOpt {
query: n,
query: Some(n),
debug: false,
iterations: 1,
partitions: 2,
Expand All @@ -1098,8 +1157,10 @@ mod ci {
output_path: None,
disable_statistics: false,
};
let actual = benchmark_datafusion(opt).await?;
let mut results = benchmark_datafusion(opt).await?;
assert_eq!(results.len(), 1);

let actual = results.remove(0);
let transformed = transform_actual_result(actual, n).await?;

// assert schema data types match
Expand Down