Skip to content

Commit

Permalink
refactor: build parquet file stream from ParquetExec (#1852)
Browse files Browse the repository at this point in the history
* refactor: build parquet file stream from ParquetExec

Signed-off-by: Ruihang Xia <[email protected]>

* rename sqlness case

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jun 29, 2023
1 parent 2b3ca13 commit d45e7b7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions src/file-table-engine/src/table/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, P
use common_datasource::file_format::Format;
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
use common_query::prelude::Expr;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::ToDFSchema;
Expand All @@ -28,17 +29,16 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_plan::file_format::{
FileOpener, FileScanConfig, FileStream, ParquetExec, ParquetOpener,
};
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::SessionContext;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::{Schema, SchemaRef};
use object_store::ObjectStore;
use snafu::ResultExt;
use table::table::scan::StreamScanAdapter;

use crate::error::{self, BuildStreamSnafu, Result};
use crate::error::{self, Result};

const DEFAULT_BATCH_SIZE: usize = 8192;

Expand Down Expand Up @@ -264,7 +264,7 @@ fn new_parquet_scan_plan(
)))
}

fn new_parquet_stream(
fn new_parquet_stream_with_exec_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
_format: &ParquetFormat,
Expand All @@ -279,6 +279,7 @@ fn new_parquet_stream(
..
} = config;

// construct config for ParquetExec
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema: file_schema.clone(),
Expand All @@ -294,11 +295,11 @@ fn new_parquet_stream(
infinite_source: false,
};

// build predicate filter
let filters = filters
.iter()
.map(|f| f.df_expr().clone())
.collect::<Vec<_>>();

let filters = if let Some(expr) = conjunction(filters) {
let df_schema = file_schema
.clone()
Expand All @@ -312,34 +313,19 @@ fn new_parquet_stream(
None
};

let parquet_opener = ParquetOpener {
partition_index: 0, // partition: hard-code. This is only for statistics purpose
projection: Arc::from(projection.cloned().unwrap_or_default()),
batch_size: DEFAULT_BATCH_SIZE,
limit: *limit,
predicate: filters,
pruning_predicate: None,
page_pruning_predicate: None,
table_schema: file_schema.clone(),
metadata_size_hint: None,
metrics: ExecutionPlanMetricsSet::new(),
parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(store.clone())),
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
};

let stream = FileStream::new(
&scan_config,
0,
parquet_opener,
&ExecutionPlanMetricsSet::new(),
)
.context(BuildStreamSnafu)?;

let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Box::pin(adapter))
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
let parquet_exec = ParquetExec::new(scan_config, filters, None)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
let stream = parquet_exec
.execute(0, task_ctx)
.context(error::ParquetScanPlanSnafu)?;

Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream).context(error::BuildStreamAdapterSnafu)?,
))
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -373,7 +359,7 @@ pub fn create_stream(
match format {
Format::Csv(format) => new_csv_stream(ctx, config, format),
Format::Json(format) => new_json_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream_with_exec_plan(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}

0 comments on commit d45e7b7

Please sign in to comment.