From d34711278685ae46cd2c83a1473c0a47411d8120 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 23 Sep 2024 14:43:27 +0200 Subject: [PATCH] parquet: Add support for user-provided metadata loaders This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file. --- .../datasource/physical_plan/parquet/mod.rs | 4 +- .../physical_plan/parquet/opener.rs | 10 ++-- .../physical_plan/parquet/reader.rs | 51 +++++++++++++++++-- .../physical_plan/parquet/row_group_filter.rs | 4 +- 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ce679bfa76c5..ac561307260f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader::{ + DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory, +}; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 9880c30ddb6b..811ef98bbdcf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ - row_filter, should_enable_page_index, ParquetAccessPlan, + row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader, }; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, @@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use std::sync::Arc; @@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener { let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let mut reader: Box = + let mut reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener { Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); - let metadata = - ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + let metadata = reader.load_metadata(options.clone()).await?; let mut schema = metadata.schema().clone(); // read with view types if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema) @@ -134,7 +132,7 @@ impl FileOpener for ParquetOpener { ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?; let mut builder = - ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); + ParquetRecordBatchStreamBuilder::new_with_metadata(reader.upcast(), metadata); let file_schema = builder.schema().clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs index 8a4ba136fc96..ac0d5e057bc3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs @@ -23,6 +23,7 @@ use bytes::Bytes; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; use object_store::ObjectStore; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::fmt::Debug; @@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> datafusion_common::Result>; + ) -> datafusion_common::Result>; } +/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded. +pub trait ParquetFileReader: AsyncFileReader + Send + 'static { + /// Returns a [`AsyncFileReader`] trait object + /// + /// This can usually be implemented as `Box::new(*self)` + fn upcast(self: Box) -> Box; + + /// Parses the file's metadata + /// + /// The default implementation is: + /// + /// ``` + /// Box::pin(ArrowReaderMetadata::load_async(self, options)) + /// ``` + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, parquet::errors::Result>; +} + +macro_rules! impl_ParquetFileReader { + ($type:ty) => { + impl ParquetFileReader for $type { + fn upcast(self: Box) -> Box { + Box::new(*self) + } + + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin(ArrowReaderMetadata::load_async(self, options)) + } + } + } +} + +impl_ParquetFileReader!(ParquetObjectReader); +impl_ParquetFileReader!(DefaultParquetFileReader); + /// Default implementation of [`ParquetFileReaderFactory`] /// /// This implementation: @@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory { /// This implementation does not coalesce I/O operations or cache bytes. Such /// optimizations can be done either at the object store level or by providing a /// custom implementation of [`ParquetFileReaderFactory`]. -pub(crate) struct ParquetFileReader { +pub(crate) struct DefaultParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, } -impl AsyncFileReader for ParquetFileReader { +impl AsyncFileReader for DefaultParquetFileReader { fn get_bytes( &mut self, range: Range, @@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, file_meta.location().as_ref(), @@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { inner = inner.with_footer_size_hint(hint) }; - Ok(Box::new(ParquetFileReader { + Ok(Box::new(DefaultParquetFileReader { inner, file_metrics, })) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 4cdcb005018e..eb5cb08e35a6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -416,7 +416,7 @@ mod tests { use std::sync::Arc; use super::*; - use crate::datasource::physical_plan::parquet::reader::ParquetFileReader; + use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use arrow::datatypes::DataType::Decimal128; @@ -1516,7 +1516,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics); - let reader = ParquetFileReader { + let reader = DefaultParquetFileReader { inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), file_metrics: file_metrics.clone(), };