Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add finer metrics on operations covered by time_elapsed_opening #12585

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ pub struct ParquetFileMetrics {
pub pushdown_rows_pruned: Count,
/// Total rows passed predicates pushed into parquet scan
pub pushdown_rows_matched: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
/// Total time spent evaluating row-level pushdown filters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside while I am reviewing this code, the description of what the metrics mean would be more useful if we could attach them to the Metric itself somehow 🤔

Something like:

        let row_pushdown_eval_time = MetricBuilder::new(metrics)
            .with_description("Total time spent evaluating row-level pushdown filters")
            .with_new_label("filename", filename.to_string()) 
            .subset_time("row_pushdown_eval_time", partition);

Though then we would have to figure out how to expose that information in an explain plan 🤔

pub row_pushdown_eval_time: Time,
/// Total time spent evaluating row group-level statistics filters
pub statistics_eval_time: Time,
/// Total time spent evaluating row group Bloom Filters
pub bloom_filter_eval_time: Time,
/// Total rows filtered out by parquet page index
pub page_index_rows_pruned: Count,
/// Total rows passed through the parquet page index
pub page_index_rows_matched: Count,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
pub metadata_load_time: Time,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -91,9 +97,16 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_matched", partition);

let pushdown_eval_time = MetricBuilder::new(metrics)
let row_pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
.subset_time("row_pushdown_eval_time", partition);
let statistics_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("statistics_eval_time", partition);
let bloom_filter_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("bloom_filter_eval_time", partition);

let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_pruned", partition);
Expand All @@ -105,6 +118,10 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);

let metadata_load_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("metadata_load_time", partition);

Self {
predicate_evaluation_errors,
row_groups_matched_bloom_filter,
Expand All @@ -114,10 +131,13 @@ impl ParquetFileMetrics {
bytes_scanned,
pushdown_rows_pruned,
pushdown_rows_matched,
pushdown_eval_time,
row_pushdown_eval_time,
page_index_rows_pruned,
page_index_rows_matched,
statistics_eval_time,
bloom_filter_eval_time,
page_index_eval_time,
metadata_load_time,
}
}
}
12 changes: 10 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1862,8 +1862,16 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
assert!(
get_value(&metrics, "pushdown_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
get_value(&metrics, "row_pushdown_eval_time") > 0,
"no pushdown eval time in metrics: {metrics:#?}"
);
assert!(
get_value(&metrics, "statistics_eval_time") > 0,
"no statistics eval time in metrics: {metrics:#?}"
);
assert!(
get_value(&metrics, "bloom_filter_eval_time") > 0,
"no Bloom Filter eval time in metrics: {metrics:#?}"
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let mut metadata_timer = file_metrics.metadata_load_time.timer();
let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();
Expand All @@ -133,6 +134,8 @@ impl FileOpener for ParquetOpener {
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

metadata_timer.stop();

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

Expand Down Expand Up @@ -187,15 +190,18 @@ impl FileOpener for ParquetOpener {
}
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {
let mut timer = file_metrics.statistics_eval_time.timer();
row_groups.prune_by_statistics(
&file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
&file_metrics,
);
timer.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a problem, but technically speaking the stop() is unecessary (it happens automatically when timer goes out of scope and is Droped

https://github.com/alamb/datafusion/blob/544e49bb0acac7130a873a92b44e1c902e41ac8f/datafusion/physical-plan/src/metrics/value.rs#L335-L339

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is needed, because it only goes out of scope after the Bloom Filter evaluation. The next timer.stop() is indeed redundant, but I kept it for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't matter anymore, as I moved both calls to the callees.


if enable_bloom_filter && !row_groups.is_empty() {
let mut timer = file_metrics.bloom_filter_eval_time.timer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I note that the time calculation for page_index_eval_time below seems to be in the call to

                    access_plan = p.prune_plan_with_page_index(
                        access_plan,
                        &file_schema,
                        builder.parquet_schema(),
                        file_metadata.as_ref(),
                        &file_metrics,
                    );

I wonder if we should move the timer into this function for consistency (or alternatively push the timer calculation into prune_by_bloom_filters and prune_by_statistics)

Copy link
Contributor Author

@progval progval Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I did the latter, because it's probably closer to what users expect.

row_groups
.prune_by_bloom_filters(
&file_schema,
Expand All @@ -204,6 +210,7 @@ impl FileOpener for ParquetOpener {
&file_metrics,
)
.await;
timer.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ pub fn build_row_filter(
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.pushdown_eval_time;
let time = &file_metrics.row_pushdown_eval_time;

// Split into conjuncts:
// `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
Expand Down
4 changes: 3 additions & 1 deletion docs/source/user-guide/explain-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ When predicate pushdown is enabled, `ParquetExec` gains the following metrics:
- `pushdown_rows_pruned`: rows that were tested by any of the above filtered, and did not pass one of them (this should be sum of `page_index_rows_matched`, `row_groups_pruned_bloom_filter`, and `row_groups_pruned_statistics`)
- `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation)
- `num_predicate_creation_errors`: number of errors creating predicates (expected to be zero in normal operation)
- `pushdown_eval_time`: time spent evaluating these filters
- `bloom_filter_eval_time`: time spent parsing and evaluating Bloom Filters
- `statistics_eval_time`: time spent parsing and evaluating row group-level statistics
- `row_pushdown_eval_time`: time spent evaluating row-level filters
- `page_index_eval_time`: time required to evaluate the page index filters

## Partitions and Execution
Expand Down