Skip to content

Commit

Permalink
Dont call multiunzip when no stats (apache#9220)
Browse files Browse the repository at this point in the history
* Dont call multiunzip when no stats

* Update docstring
  • Loading branch information
matthewmturner authored Feb 15, 2024
1 parent 92d9274 commit 8aaea5d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let sql = std::fs::read_to_string(format!("../../benchmarks/queries/{}.sql", q))
.unwrap();
c.bench_function(&format!("physical_plan_tpch_{}", q), |b| {
b.iter(|| logical_plan(&ctx, &sql))
b.iter(|| physical_plan(&ctx, &sql))
});
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,13 @@ impl ListingTable {
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
let (files, statistics) = get_statistics_with_limit(
files,
self.schema(),
limit,
self.options.collect_stat,
)
.await?;

Ok((
split_files(files, self.options.target_partitions),
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ use itertools::izip;
use itertools::multiunzip;

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files.
/// Needed to read up to `limit` number of rows.
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
// These statistics can be calculated as long as at least one file provides
Expand Down Expand Up @@ -78,6 +81,9 @@ pub async fn get_statistics_with_limit(
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
result_files.push(file);
if !collect_stats {
continue;
}

// We accumulate the number of rows, total byte size and null
// counts across all the files in question. If any file does not
Expand Down

0 comments on commit 8aaea5d

Please sign in to comment.