diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 31b8b6d3bcbc..24faddd9f34d 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -159,7 +159,7 @@ impl TryInto for &protobuf::LogicalPlanNode { LogicalPlanBuilder::scan_parquet_with_name( &scan.path, projection, - 24, + create_datafusion_context_concurrency(24), &scan.table_name, )? //TODO concurrency .build() @@ -1100,6 +1100,8 @@ impl TryInto for &protobuf::Field { } } +use crate::utils::create_datafusion_context_concurrency; +use datafusion::physical_plan::datetime_expressions::to_timestamp; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 678bcde8fa73..1441f87bc0aa 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -29,11 +29,13 @@ use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::protobuf::ShuffleReaderPartition; use crate::serde::scheduler::PartitionLocation; use crate::serde::{from_proto_binary_op, proto_error, protobuf}; +use crate::utils::create_datafusion_context_concurrency; use crate::{convert_box_required, convert_required, into_required}; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; +use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -129,14 +131,13 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::ParquetScan(scan) => { let projection = scan.projection.iter().map(|i| *i as usize).collect(); - let filenames: Vec<&str> = - scan.filename.iter().map(|s| s.as_str()).collect(); - Ok(Arc::new(ParquetExec::try_from_files( - &filenames, + let path: &str = scan.filename[0].as_str(); + Ok(Arc::new(ParquetExec::try_from_path( + path, Some(projection), None, scan.batch_size as usize, - scan.num_partitions as usize, + create_datafusion_context_concurrency(scan.num_partitions as usize), None, )?)) } @@ -614,6 +615,9 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; + + let object_store_registry = Arc::new(ObjectStoreRegistry::new()); + let ctx_state = ExecutionContextState { catalog_list, scalar_functions: Default::default(), @@ -621,6 +625,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry, }; let fun_expr = functions::create_physical_fun( diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 48b21345525b..7b310cd076fa 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -259,7 +259,7 @@ impl TryInto for Arc { let filenames = exec .partitions() .iter() - .flat_map(|part| part.filenames().to_owned()) + .flat_map(|part| part.filenames()) .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 4187faa6645a..e960b77575a9 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -252,6 +252,11 @@ pub fn create_datafusion_context( ExecutionContext::with_config(config) } +/// Create a DataFusion context that is compatible with Ballista in concurrency +pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext { + ExecutionContext::with_concurrency(concurrency) +} + pub struct BallistaQueryPlanner { scheduler_url: String, config: BallistaConfig, diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 676975fcaec9..2037a3530aba 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -85,7 +85,8 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::physical_plan::parquet::ParquetExec; +use ballista_core::utils::create_datafusion_context_concurrency; +use datafusion::datasource::parquet::ParquetRootDesc; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer { match file_type { FileType::Parquet => { - let parquet_exec = - ParquetExec::try_from_path(&path, None, None, 1024, 1, None) - .map_err(|e| { - let msg = format!("Error opening parquet files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; + let ctx = create_datafusion_context_concurrency(1); + let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| { + let msg = format!("Error opening parquet files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; //TODO include statistics and any other info needed to reconstruct ParquetExec Ok(Response::new(GetFileMetadataResult { - schema: Some(parquet_exec.schema().as_ref().into()), - partitions: parquet_exec - .partitions() - .iter() - .map(|part| FilePartitionMetadata { - filename: part.filenames().to_vec(), - }) - .collect(), + schema: Some(parquet_desc.schema().as_ref().into()), + partitions: vec![FilePartitionMetadata { + filename: vec![path], + }], })) } //TODO implement for CSV diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 10b5c2db795f..c45341bad2de 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -475,7 +475,10 @@ fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?)) + Ok(Arc::new(ParquetTable::try_new( + &path, + ExecutionContext::with_concurrency(max_concurrency), + )?)) } other => { unimplemented!("Invalid file format '{}'", other); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 138434ea2482..aab647b86676 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -65,7 +65,11 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); + let table = ParquetTable::try_new( + &request.path[0], + ExecutionContext::with_concurrency(num_cpus::get()), + ) + .unwrap(); let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into(); diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 987c4fdb079d..d4ca073af2dd 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -40,12 +40,14 @@ use std::string::String; use std::sync::{Arc, Mutex}; use crate::datasource::datasource::Statistics; +use crate::datasource::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::datasource::{Source, TableProvider}; use crate::error::{DataFusionError, Result}; use crate::logical_plan::Expr; use crate::physical_plan::csv::CsvExec; pub use crate::physical_plan::csv::CsvReadOptions; -use crate::physical_plan::{common, ExecutionPlan}; +use crate::physical_plan::ExecutionPlan; /// Represents a CSV file with a provided schema pub struct CsvFile { @@ -64,7 +66,8 @@ impl CsvFile { let schema = Arc::new(match options.schema { Some(s) => s.clone(), None => { - let filenames = common::build_file_list(&path, options.file_extension)?; + let filenames = LocalFileSystem + .list_all_files(path.as_str(), options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs index 90fedfd6f528..5bd8a5f7121f 100644 --- a/datafusion/src/datasource/json.rs +++ b/datafusion/src/datasource/json.rs @@ -30,7 +30,6 @@ use crate::{ datasource::{Source, TableProvider}, error::{DataFusionError, Result}, physical_plan::{ - common, json::{NdJsonExec, NdJsonReadOptions}, ExecutionPlan, }, @@ -38,6 +37,8 @@ use crate::{ use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable}; use super::datasource::Statistics; +use crate::datasource::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; trait SeekRead: Read + Seek {} @@ -57,7 +58,8 @@ impl NdJsonFile { let schema = if let Some(schema) = options.schema { schema } else { - let filenames = common::build_file_list(path, options.file_extension)?; + let filenames = + LocalFileSystem.list_all_files(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/datasource/local.rs b/datafusion/src/datasource/local.rs new file mode 100644 index 000000000000..4890e9d229f9 --- /dev/null +++ b/datafusion/src/datasource/local.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object store that represents the Local File System. + +use crate::datasource::object_store::{ObjectReader, ObjectStore}; +use crate::error::DataFusionError; +use crate::error::Result; +use crate::parquet::file::reader::{ChunkReader, Length}; +use std::any::Any; +use std::fs; +use std::fs::{metadata, File}; +use std::io::Read; +use std::sync::Arc; + +#[derive(Debug)] +/// Local File System as Object Store. +pub struct LocalFileSystem; + +impl ObjectStore for LocalFileSystem { + fn as_any(&self) -> &dyn Any { + self + } + + fn list_all_files(&self, path: &str, ext: &str) -> Result> { + list_all(path, ext) + } + + fn get_reader(&self, file_path: &str) -> Result> { + let file = File::open(file_path)?; + let reader = LocalFSObjectReader::new(file)?; + Ok(Arc::new(reader)) + } +} + +struct LocalFSObjectReader { + file: File, +} + +impl LocalFSObjectReader { + fn new(file: File) -> Result { + Ok(Self { file }) + } +} + +impl ObjectReader for LocalFSObjectReader { + fn get_reader(&self, start: u64, length: usize) -> Box { + Box::new(FileSegmentReader::new( + self.file.try_clone().unwrap(), + start, + length, + )) + } + + fn length(&self) -> u64 { + self.file.len() + } +} + +struct FileSegmentReader { + file: File, + start: u64, + length: usize, +} + +impl FileSegmentReader { + fn new(file: File, start: u64, length: usize) -> Self { + Self { + file, + start, + length, + } + } +} + +impl Read for FileSegmentReader { + fn read(&mut self, buf: &mut [u8]) -> std::result::Result { + let mut file_source = self.file.get_read(self.start, self.length)?; + file_source.read(buf) + } +} + +fn list_all(root_path: &str, ext: &str) -> Result> { + let mut filenames: Vec = Vec::new(); + list_all_files(root_path, &mut filenames, ext)?; + Ok(filenames) +} + +/// Recursively build a list of files in a directory with a given extension with an accumulator list +fn list_all_files(dir: &str, filenames: &mut Vec, ext: &str) -> Result<()> { + let metadata = metadata(dir)?; + if metadata.is_file() { + if dir.ends_with(ext) { + filenames.push(dir.to_string()); + } + } else { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + list_all_files(path_name, filenames, ext)?; + } else if path_name.ends_with(ext) { + filenames.push(path_name.to_string()); + } + } else { + return Err(DataFusionError::Plan("Invalid path".to_string())); + } + } + } + Ok(()) +} diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9699a997caa1..64e84c8e5611 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -21,13 +21,24 @@ pub mod csv; pub mod datasource; pub mod empty; pub mod json; +pub mod local; pub mod memory; +pub mod object_store; pub mod parquet; pub use self::csv::{CsvFile, CsvReadOptions}; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; +use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::object_store::ObjectStore; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::Accumulator; +use crate::scalar::ScalarValue; +use std::sync::Arc; + /// Source for table input data pub(crate) enum Source> { /// Path to a single file or a directory containing one of more files @@ -36,3 +47,245 @@ pub(crate) enum Source> { /// Read data from a reader Reader(std::sync::Mutex>), } + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub file_path: String, + /// Schema of the file + pub schema: Schema, + /// Statistics of the file + pub statistics: Statistics, + /// Values of partition columns to be appended to each row + pub partition_value: Option>, + /// Schema of partition columns + pub partition_schema: Option, + // We may include row group range here for a more fine-grained parallel execution +} + +impl From for PartitionedFile { + fn from(file_path: String) -> Self { + Self { + file_path, + schema: Schema::empty(), + statistics: Default::default(), + partition_value: None, + partition_schema: None, + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "PartitionedFile(file_path: {}, schema: {}, statistics: {:?},\ + partition_value: {:?}, partition_schema: {:?})", + self.file_path, + self.schema, + self.statistics, + self.partition_value, + self.partition_schema + ) + } +} + +#[derive(Debug, Clone)] +/// A collection of files that should be read in a single task +pub struct FilePartition { + /// The index of the partition among all partitions + pub index: usize, + /// The contained files of the partition + pub files: Vec, +} + +impl std::fmt::Display for FilePartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let files: Vec = self.files.iter().map(|f| format!("{}", f)).collect(); + write!( + f, + "FilePartition[{}], files: {}", + self.index, + files.join(", ") + ) + } +} + +#[derive(Debug, Clone)] +/// All source files with same schema exists in a path +pub struct SourceRootDescriptor { + /// All source files in the path + pub partition_files: Vec, + /// The schema of the files + pub schema: SchemaRef, +} + +/// Builder for ['SourceRootDescriptor'] inside given path +pub trait SourceRootDescBuilder { + /// Construct a ['SourceRootDescriptor'] from the provided path + fn get_source_desc( + path: &str, + object_store: Arc, + ext: &str, + ) -> Result { + let filenames = object_store.list_all_files(path, ext)?; + if filenames.is_empty() { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + // build a list of Parquet partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec = vec![]; + + let partitioned_files = filenames + .iter() + .map(|file_path| { + let pf = Self::get_file_meta(file_path, object_store.clone())?; + let schema = pf.schema.clone(); + if schemas.is_empty() { + schemas.push(schema); + } else if schema != schemas[0] { + // we currently get the schema information from the first file rather than do + // schema merging and this is a limitation. + // See https://issues.apache.org/jira/browse/ARROW-11017 + return Err(DataFusionError::Plan(format!( + "The file {} have different schema from the first file and DataFusion does \ + not yet support schema merging", + file_path + ))); + } + Ok(pf) + }).collect::>>(); + + Ok(SourceRootDescriptor { + partition_files: partitioned_files?, + schema: Arc::new(schemas.pop().unwrap()), + }) + } + + /// Get all metadata for a source file, including schema, statistics, partitions, etc. + fn get_file_meta( + file_path: &str, + object_store: Arc, + ) -> Result; +} + +/// Get all files as well as the summary statistics when a limit is provided +pub fn get_statistics_with_limit( + source_desc: &SourceRootDescriptor, + limit: Option, +) -> (Vec, Statistics) { + let mut all_files = source_desc.partition_files.clone(); + let schema = source_desc.schema.clone(); + + let mut total_byte_size = 0; + let mut null_counts = vec![0; schema.fields().len()]; + let mut has_statistics = false; + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + let mut num_rows = 0; + let mut num_files = 0; + for file in &all_files { + num_files += 1; + let file_stats = &file.statistics; + num_rows += file_stats.num_rows.unwrap_or(0); + total_byte_size += file_stats.total_byte_size.unwrap_or(0); + if let Some(vec) = &file_stats.column_statistics { + has_statistics = true; + for (i, cs) in vec.iter().enumerate() { + null_counts[i] += cs.null_count.unwrap_or(0); + + if let Some(max_value) = &mut max_values[i] { + if let Some(file_max) = cs.max_value.clone() { + match max_value.update(&[file_max]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + } + + if let Some(min_value) = &mut min_values[i] { + if let Some(file_min) = cs.min_value.clone() { + match min_value.update(&[file_min]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + if num_rows > limit.unwrap_or(usize::MAX) { + break; + } + } + all_files.truncate(num_files); + + let column_stats = if has_statistics { + Some(get_col_stats( + &*schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + (all_files, statistics) +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec, + max_values: &mut Vec>, + min_values: &mut Vec>, +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match &max_values[i] { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match &min_values[i] { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: Some(null_counts[i] as usize), + max_value, + min_value, + distinct_count: None, + } + }) + .collect() +} diff --git a/datafusion/src/datasource/object_store.rs b/datafusion/src/datasource/object_store.rs new file mode 100644 index 000000000000..5c7a53215534 --- /dev/null +++ b/datafusion/src/datasource/object_store.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object Store abstracts access to an underlying file/object storage. + +use crate::datasource::local::LocalFileSystem; +use crate::error::Result; +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; +use std::io::Read; +use std::sync::{Arc, RwLock}; + +/// Objct Reader for one file in a object store +pub trait ObjectReader { + /// Get reader for a part [start, start + length] in the file + fn get_reader(&self, start: u64, length: usize) -> Box; + + /// Get lenght for the file + fn length(&self) -> u64; +} + +/// A ObjectStore abstracts access to an underlying file/object storage. +/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes +pub trait ObjectStore: Sync + Send + Debug { + /// Returns the object store as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Returns all the files with `ext` in path `prefix` + fn list_all_files(&self, prefix: &str, ext: &str) -> Result>; + + /// Get object reader for one file + fn get_reader(&self, file_path: &str) -> Result>; +} + +static LOCAL_SCHEME: &str = "file"; + +/// A Registry holds all the object stores at runtime with a scheme for each store. +/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +/// and query data inside these systems. +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read from localfs natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write().unwrap(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read().unwrap(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the path based on it's scheme. For example: + /// path with prefix file:/// or no prefix will return the default LocalFS store, + /// path with prefix s3:/// will return the S3 store if it's registered, + /// and will always return LocalFS store when a prefix is not registered in the path. + pub fn store_for_path(&self, path: &str) -> Arc { + if let Some((scheme, _)) = path.split_once(':') { + let stores = self.object_stores.read().unwrap(); + if let Some(store) = stores.get(&*scheme.to_lowercase()) { + return store.clone(); + } + } + self.object_stores + .read() + .unwrap() + .get(LOCAL_SCHEME) + .unwrap() + .clone() + } +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 28f79a6ae8dd..aaec9e83f78c 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -18,39 +18,51 @@ //! Parquet data source use std::any::Any; -use std::string::String; +use std::io::Read; use std::sync::Arc; -use arrow::datatypes::*; +use arrow::datatypes::SchemaRef; +use parquet::arrow::ArrowReader; +use parquet::arrow::ParquetFileArrowReader; +use parquet::file::reader::ChunkReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics as ParquetStatistics; + +use super::datasource::TableProviderFilterPushDown; +use crate::arrow::datatypes::{DataType, Field}; use crate::datasource::datasource::Statistics; -use crate::datasource::TableProvider; +use crate::datasource::object_store::{ObjectReader, ObjectStore}; +use crate::datasource::{ + create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, + SourceRootDescBuilder, SourceRootDescriptor, TableProvider, +}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; +use crate::parquet::file::reader::Length; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; -use crate::physical_plan::ExecutionPlan; - -use super::datasource::TableProviderFilterPushDown; +use crate::physical_plan::{Accumulator, ExecutionPlan}; +use crate::prelude::ExecutionContext; +use crate::scalar::ScalarValue; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, - schema: SchemaRef, - statistics: Statistics, + desc: Arc, max_concurrency: usize, enable_pruning: bool, } impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. - pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { + pub fn try_new(path: impl Into, context: ExecutionContext) -> Result { let path = path.into(); - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - let schema = parquet_exec.schema(); + let max_concurrency = context.state.lock().unwrap().config.concurrency; + let root_desc = ParquetRootDesc::new(path.as_str(), context); Ok(Self { path, - schema, - statistics: parquet_exec.statistics().to_owned(), + desc: Arc::new(root_desc?), max_concurrency, enable_pruning: true, }) @@ -80,7 +92,7 @@ impl TableProvider for ParquetTable { /// Get the schema for this parquet file. fn schema(&self) -> SchemaRef { - self.schema.clone() + self.desc.schema() } fn supports_filter_pushdown( @@ -107,8 +119,8 @@ impl TableProvider for ParquetTable { } else { None }; - Ok(Arc::new(ParquetExec::try_from_path( - &self.path, + Ok(Arc::new(ParquetExec::try_new( + self.desc.clone(), projection.clone(), predicate, limit @@ -120,7 +132,7 @@ impl TableProvider for ParquetTable { } fn statistics(&self) -> Statistics { - self.statistics.clone() + self.desc.statistics() } fn has_exact_statistics(&self) -> bool { @@ -128,6 +140,295 @@ impl TableProvider for ParquetTable { } } +#[derive(Debug)] +/// Descriptor for a parquet root path +pub struct ParquetRootDesc { + /// object store for reading files inside the root path + pub object_store: Arc, + /// metadata for files inside the root path + pub descriptor: SourceRootDescriptor, +} + +impl ParquetRootDesc { + /// Construct a new parquet descriptor for a root path + pub fn new(root_path: &str, context: ExecutionContext) -> Result { + let object_store = context + .state + .lock() + .unwrap() + .object_store_registry + .store_for_path(root_path); + let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet"); + Ok(Self { + object_store, + descriptor: root_desc?, + }) + } + + /// Get file schema for all parquet files + pub fn schema(&self) -> SchemaRef { + self.descriptor.schema.clone() + } + + /// Get the summary statistics for all parquet files + pub fn statistics(&self) -> Statistics { + get_statistics_with_limit(&self.descriptor, None).1 + } + + fn summarize_min_max( + max_values: &mut Vec>, + min_values: &mut Vec>, + fields: &Vec, + i: usize, + stat: &ParquetStatistics, + ) { + match stat { + ParquetStatistics::Boolean(s) => { + if let DataType::Boolean = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Boolean(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Boolean(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int32(s) => { + if let DataType::Int32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int64(s) => { + if let DataType::Int64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Float(s) => { + if let DataType::Float32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Double(s) => { + if let DataType::Float64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + _ => {} + } + } +} + +impl SourceRootDescBuilder for ParquetRootDesc { + fn get_file_meta( + file_path: &str, + object_store: Arc, + ) -> Result { + let reader = object_store.get_reader(file_path)?; + let file_reader = + Arc::new(SerializedFileReader::new(ObjectReaderWrapper::new(reader))?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let file_path = file_path.to_string(); + let schema = arrow_reader.get_schema()?; + let num_fields = schema.fields().len(); + let fields = schema.fields().to_vec(); + let meta_data = arrow_reader.get_metadata(); + + let mut num_rows = 0; + let mut total_byte_size = 0; + let mut null_counts = vec![0; num_fields]; + let mut has_statistics = false; + + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + for row_group_meta in meta_data.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt as usize + } + + for (i, column) in row_group_meta.columns().iter().enumerate() { + if let Some(stat) = column.statistics() { + has_statistics = true; + ParquetRootDesc::summarize_min_max( + &mut max_values, + &mut min_values, + &fields, + i, + stat, + ) + } + } + } + + let column_stats = if has_statistics { + Some(get_col_stats( + &schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + + Ok(PartitionedFile { + file_path, + schema, + statistics, + partition_value: None, + partition_schema: None, + }) + } +} + +/// Thin wrapper over object wrapper to work with parquet file read +pub struct ObjectReaderWrapper { + reader: Arc, +} + +impl ObjectReaderWrapper { + /// Construct a wrapper over the provided object reader + pub fn new(reader: Arc) -> Self { + Self { reader } + } +} + +impl ChunkReader for ObjectReaderWrapper { + type T = InnerReaderWrapper; + + fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result { + Ok(InnerReaderWrapper { + inner_reader: self.reader.get_reader(start, length), + }) + } +} + +impl Length for ObjectReaderWrapper { + fn len(&self) -> u64 { + self.reader.length() + } +} + +/// Thin wrapper over reader for a parquet file +pub struct InnerReaderWrapper { + inner_reader: Box, +} + +impl Read for InnerReaderWrapper { + fn read(&mut self, buf: &mut [u8]) -> std::result::Result { + self.inner_reader.read(buf) + } +} + #[cfg(test)] mod tests { use super::*; @@ -355,7 +656,8 @@ mod tests { fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let table = ParquetTable::try_new(&filename, 2)?; + let table = + ParquetTable::try_new(&filename, ExecutionContext::with_concurrency(2))?; Ok(Arc::new(table)) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 0cf8b3b6c276..4c7b85c1eb26 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,6 +49,8 @@ use crate::catalog::{ ResolvedTableReference, TableReference, }; use crate::datasource::csv::CsvFile; +use crate::datasource::object_store::ObjectStore; +use crate::datasource::object_store::ObjectStoreRegistry; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -164,10 +166,17 @@ impl ExecutionContext { aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), })), } } + /// Creates a new execution context using the provided concurrency. + pub fn with_concurrency(concurrency: usize) -> ExecutionContext { + let config = ExecutionConfig::new().with_concurrency(concurrency); + ExecutionContext::with_config(config) + } + /// Creates a dataframe that will execute a SQL query. pub fn sql(&mut self, sql: &str) -> Result> { let plan = self.create_logical_plan(sql)?; @@ -288,12 +297,7 @@ impl ExecutionContext { ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), - &LogicalPlanBuilder::scan_parquet( - filename, - None, - self.state.lock().unwrap().config.concurrency, - )? - .build()?, + &LogicalPlanBuilder::scan_parquet(filename, None, self.clone())?.build()?, ))) } @@ -325,7 +329,7 @@ impl ExecutionContext { pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { let table = { let m = self.state.lock().unwrap(); - ParquetTable::try_new(filename, m.config.concurrency)? + ParquetTable::try_new(filename, self.clone())? .with_enable_pruning(m.config.parquet_pruning) }; self.register_table(name, Arc::new(table))?; @@ -358,6 +362,25 @@ impl ExecutionContext { state.catalog_list.register_catalog(name, catalog) } + /// Registers a object store with scheme using a custom `ObjectStore` so that + /// an external file system or object storage system could be used against this context. + /// + /// Returns the `ObjectStore` previously registered for this + /// scheme, if any + pub fn register_object_store( + &self, + scheme: impl Into, + object_store: Arc, + ) -> Option> { + let scheme = scheme.into(); + + self.state + .lock() + .unwrap() + .object_store_registry + .register_store(scheme, object_store) + } + /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { self.state.lock().unwrap().catalog_list.catalog(name) @@ -840,6 +863,8 @@ pub struct ExecutionContextState { pub config: ExecutionConfig, /// Execution properties pub execution_props: ExecutionProps, + /// Object Store that are registered with the context + pub object_store_registry: Arc, } impl ExecutionProps { @@ -867,6 +892,7 @@ impl ExecutionContextState { aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), } } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0dfc1e7aa048..77dd65c97ef3 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -40,6 +40,7 @@ use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema, DFSchemaRef, Partitioning, }; +use crate::prelude::ExecutionContext; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -137,20 +138,20 @@ impl LogicalPlanBuilder { pub fn scan_parquet( path: impl Into, projection: Option>, - max_concurrency: usize, + context: ExecutionContext, ) -> Result { let path = path.into(); - Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path) + Self::scan_parquet_with_name(path.clone(), projection, context, path) } /// Scan a Parquet data source and register it with a given table name pub fn scan_parquet_with_name( path: impl Into, projection: Option>, - max_concurrency: usize, + context: ExecutionContext, table_name: impl Into, ) -> Result { - let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); + let provider = Arc::new(ParquetTable::try_new(path, context)?); Self::scan(table_name, provider, projection) } diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 4504c81daa06..30ec896b4e2f 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,6 +110,8 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; + use crate::datasource::local::LocalFileSystem; + use crate::datasource::PartitionedFile; use crate::physical_plan::parquet::{ ParquetExec, ParquetExecMetrics, ParquetPartition, }; @@ -122,11 +124,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], + Arc::new(LocalFileSystem), schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, @@ -160,11 +164,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], + Arc::new(LocalFileSystem), schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index 2482bfc0872c..628095c6640c 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -27,8 +27,6 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use futures::channel::mpsc; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; -use std::fs; -use std::fs::metadata; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::task::JoinHandle; @@ -107,42 +105,6 @@ pub(crate) fn combine_batches( } } -/// Recursively builds a list of files in a directory with a given extension -pub fn build_file_list(dir: &str, ext: &str) -> Result> { - let mut filenames: Vec = Vec::new(); - build_file_list_recurse(dir, &mut filenames, ext)?; - Ok(filenames) -} - -/// Recursively build a list of files in a directory with a given extension with an accumulator list -fn build_file_list_recurse( - dir: &str, - filenames: &mut Vec, - ext: &str, -) -> Result<()> { - let metadata = metadata(dir)?; - if metadata.is_file() { - if dir.ends_with(ext) { - filenames.push(dir.to_string()); - } - } else { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - build_file_list_recurse(path_name, filenames, ext)?; - } else if path_name.ends_with(ext) { - filenames.push(path_name.to_string()); - } - } else { - return Err(DataFusionError::Plan("Invalid path".to_string())); - } - } - } - Ok(()) -} - /// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender pub(crate) fn spawn_execution( input: Arc, diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 544f98cba0c6..293f46d7a736 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -17,9 +17,11 @@ //! Execution plan for reading CSV files +use crate::datasource::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::{common, source::Source, Partitioning}; +use crate::physical_plan::{source::Source, Partitioning}; use arrow::csv; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -141,7 +143,7 @@ impl CsvExec { ) -> Result { let file_extension = String::from(options.file_extension); - let filenames = common::build_file_list(path, file_extension.as_str())?; + let filenames = LocalFileSystem.list_all_files(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs index ed9b0b03a38e..df7e9e5e5014 100644 --- a/datafusion/src/physical_plan/json.rs +++ b/datafusion/src/physical_plan/json.rs @@ -19,7 +19,9 @@ use async_trait::async_trait; use futures::Stream; -use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; +use super::{source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; +use crate::datasource::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; use arrow::{ @@ -87,7 +89,7 @@ impl NdJsonExec { ) -> Result { let file_extension = options.file_extension.to_string(); - let filenames = common::build_file_list(path, &file_extension)?; + let filenames = LocalFileSystem.list_all_files(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ec5611f96292..bc3e0d597213 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -18,7 +18,6 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::fs::File; use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; @@ -28,7 +27,7 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }, scalar::ScalarValue, @@ -36,7 +35,7 @@ use crate::{ use arrow::{ array::ArrayRef, - datatypes::{DataType, Schema, SchemaRef}, + datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; @@ -57,19 +56,23 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; -use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use super::SQLMetric; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::Accumulator; +use crate::datasource::object_store::ObjectStore; +use crate::datasource::parquet::{ObjectReaderWrapper, ParquetRootDesc}; +use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; +use crate::prelude::ExecutionContext; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { /// Parquet partitions to read partitions: Vec, + /// Source used for get reader for partitions + object_store: Arc, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load @@ -98,9 +101,7 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - pub filenames: Vec, - /// Statistics for this partition - pub statistics: Statistics, + pub file_partition: FilePartition, /// Execution metrics metrics: ParquetPartitionMetrics, } @@ -129,290 +130,44 @@ impl ParquetExec { projection: Option>, predicate: Option, batch_size: usize, - max_concurrency: usize, + context: ExecutionContext, limit: Option, ) -> Result { + let max_concurrency = context.state.lock().unwrap().config.concurrency; // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files - let filenames = common::build_file_list(path, ".parquet")?; - if filenames.is_empty() { - Err(DataFusionError::Plan(format!( - "No Parquet files (with .parquet extension) found at path {}", - path - ))) - } else { - let filenames = filenames - .iter() - .map(|filename| filename.as_str()) - .collect::>(); - Self::try_from_files( - &filenames, - projection, - predicate, - batch_size, - max_concurrency, - limit, - ) - } + let root_desc = ParquetRootDesc::new(path, context)?; + Self::try_new( + Arc::new(root_desc), + projection, + predicate, + batch_size, + max_concurrency, + limit, + ) } - /// Create a new Parquet reader execution plan based on the specified list of Parquet - /// files - pub fn try_from_files( - filenames: &[&str], + /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema + pub fn try_new( + desc: Arc, projection: Option>, predicate: Option, batch_size: usize, max_concurrency: usize, limit: Option, ) -> Result { - debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - filenames, projection, predicate, limit); - // build a list of Parquet partitions with statistics and gather all unique schemas - // used in this data set - let mut schemas: Vec = vec![]; - let mut partitions = Vec::with_capacity(max_concurrency); - let filenames: Vec = filenames.iter().map(|s| s.to_string()).collect(); - let chunks = split_files(&filenames, max_concurrency); - let mut num_rows = 0; - let mut num_fields = 0; - let mut fields = Vec::new(); - let mut total_byte_size = 0; - let mut null_counts = Vec::new(); - let mut max_values: Vec> = Vec::new(); - let mut min_values: Vec> = Vec::new(); - let mut limit_exhausted = false; - for chunk in chunks { - let mut filenames: Vec = - chunk.iter().map(|x| x.to_string()).collect(); - let mut total_files = 0; - for filename in &filenames { - total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let meta_data = arrow_reader.get_metadata(); - // collect all the unique schemas in this data set - let schema = arrow_reader.get_schema()?; - if schemas.is_empty() || schema != schemas[0] { - fields = schema.fields().to_vec(); - num_fields = schema.fields().len(); - null_counts = vec![0; num_fields]; - max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - schemas.push(schema); - } + debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + desc, projection, predicate, limit); - for row_group_meta in meta_data.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); + let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit); + let schema = desc.schema(); - // Currently assumes every Parquet file has same schema - // https://issues.apache.org/jira/browse/ARROW-11017 - let columns_null_counts = row_group_meta - .columns() - .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); - - for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt - } - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Boolean(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Boolean(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - _ => {} - } - } - } - - if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { - limit_exhausted = true; - break; - } - } - } - let column_stats = (0..num_fields) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(); - - let statistics = Statistics { - num_rows: Some(num_rows as usize), - total_byte_size: Some(total_byte_size as usize), - column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); - partitions.push(ParquetPartition::new(filenames, statistics)); - if limit_exhausted { - break; - } + let mut partitions = Vec::with_capacity(max_concurrency); + let chunked_files = split_files(&all_files, max_concurrency); + for (index, group) in chunked_files.iter().enumerate() { + partitions.push(ParquetPartition::new(Vec::from(*group), index)); } - // we currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 - if schemas.len() > 1 { - return Err(DataFusionError::Plan(format!( - "The Parquet files have {} different schemas and DataFusion does \ - not yet support schema merging", - schemas.len() - ))); - } - let schema = Arc::new(schemas.pop().unwrap()); let metrics = ParquetExecMetrics::new(); let predicate_builder = predicate.and_then(|predicate_expr| { @@ -431,8 +186,10 @@ impl ParquetExec { Ok(Self::new( partitions, + desc.object_store.clone(), schema, projection, + statistics, metrics, predicate_builder, batch_size, @@ -443,8 +200,10 @@ impl ParquetExec { /// Create a new Parquet reader execution plan with provided partitions and schema pub fn new( partitions: Vec, + object_store: Arc, schema: SchemaRef, projection: Option>, + statistics: Statistics, metrics: ParquetExecMetrics, predicate_builder: Option, batch_size: usize, @@ -462,96 +221,23 @@ impl ParquetExec { .collect(), ); - // sum the statistics - let mut num_rows: Option = None; - let mut total_byte_size: Option = None; - let mut null_counts: Vec = vec![0; schema.fields().len()]; - let mut has_statistics = false; - let mut max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - let mut min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - for part in &partitions { - if let Some(n) = part.statistics.num_rows { - num_rows = Some(num_rows.unwrap_or(0) + n) - } - if let Some(n) = part.statistics.total_byte_size { - total_byte_size = Some(total_byte_size.unwrap_or(0) + n) + let new_column_statistics = statistics.column_statistics.map(|stats| { + let mut projected_stats = Vec::with_capacity(projection.len()); + for proj in &projection { + projected_stats.push(stats[*proj].clone()); } - if let Some(x) = &part.statistics.column_statistics { - let part_nulls: Vec> = - x.iter().map(|c| c.null_count).collect(); - has_statistics = true; - - let part_max_values: Vec> = - x.iter().map(|c| c.max_value.clone()).collect(); - let part_min_values: Vec> = - x.iter().map(|c| c.min_value.clone()).collect(); - - for &i in projection.iter() { - null_counts[i] = part_nulls[i].unwrap_or(0); - if let Some(part_max_value) = part_max_values[i].clone() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[part_max_value]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - } - if let Some(part_min_value) = part_min_values[i].clone() { - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[part_min_value]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - } - - let column_stats = if has_statistics { - Some( - (0..schema.fields().len()) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(), - ) - } else { - None - }; + projected_stats + }); let statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: column_stats, + num_rows: statistics.num_rows, + total_byte_size: statistics.total_byte_size, + column_statistics: new_column_statistics, }; + Self { partitions, + object_store, schema: Arc::new(projected_schema), projection, metrics, @@ -585,22 +271,20 @@ impl ParquetExec { impl ParquetPartition { /// Create a new parquet partition - pub fn new(filenames: Vec, statistics: Statistics) -> Self { + pub fn new(files: Vec, index: usize) -> Self { Self { - filenames, - statistics, + file_partition: FilePartition { index, files }, metrics: ParquetPartitionMetrics::new(), } } /// The Parquet filename for this partition - pub fn filenames(&self) -> &[String] { - &self.filenames - } - - /// Statistics for this partition - pub fn statistics(&self) -> &Statistics { - &self.statistics + pub fn filenames(&self) -> Vec { + self.file_partition + .files + .iter() + .map(|f| f.file_path.clone()) + .collect() } } @@ -666,8 +350,8 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let partition = &self.partitions[partition]; - let filenames = partition.filenames.clone(); + let object_store = self.object_store.clone(); + let partition = self.partitions[partition].clone(); let metrics = partition.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); @@ -676,7 +360,8 @@ impl ExecutionPlan for ParquetExec { task::spawn_blocking(move || { if let Err(e) = read_files( - &filenames, + object_store, + partition, metrics, &projection, &predicate_builder, @@ -704,9 +389,7 @@ impl ExecutionPlan for ParquetExec { let files: Vec<_> = self .partitions .iter() - .map(|pp| pp.filenames.iter()) - .flatten() - .map(|s| s.as_str()) + .map(|pp| format!("{}", pp.file_partition)) .collect(); write!( @@ -726,14 +409,11 @@ impl ExecutionPlan for ParquetExec { .flat_map(|p| { vec![ ( - format!( - "numPredicateEvaluationErrors for {}", - p.filenames.join(",") - ), + format!("numPredicateEvaluationErrors for {}", p.file_partition), p.metrics.predicate_evaluation_errors.as_ref().clone(), ), ( - format!("numRowGroupsPruned for {}", p.filenames.join(",")), + format!("numRowGroupsPruned for {}", p.file_partition), p.metrics.row_groups_pruned.as_ref().clone(), ), ] @@ -857,7 +537,7 @@ fn build_row_group_predicate( match predicate_values { Ok(values) => { // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !v).count(); + let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); Box::new(move |_, i| values[i]) } @@ -872,7 +552,8 @@ fn build_row_group_predicate( } fn read_files( - filenames: &[String], + object_store: Arc, + partition: ParquetPartition, metrics: ParquetPartitionMetrics, projection: &[usize], predicate_builder: &Option, @@ -881,9 +562,11 @@ fn read_files( limit: Option, ) -> Result<()> { let mut total_rows = 0; - 'outer: for filename in filenames { - let file = File::open(&filename)?; - let mut file_reader = SerializedFileReader::new(file)?; + let all_files = partition.file_partition.files; + 'outer: for partitioned_file in all_files { + let reader = object_store.get_reader(partitioned_file.file_path.as_str())?; + let mut file_reader = + SerializedFileReader::new(ObjectReaderWrapper::new(reader))?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( predicate_builder, @@ -910,7 +593,7 @@ fn read_files( Some(Err(e)) => { let err_msg = format!( "Error reading batch from {}: {}", - filename, + partitioned_file, e.to_string() ); // send error to operator @@ -930,12 +613,15 @@ fn read_files( Ok(()) } -fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { - let mut chunk_size = filenames.len() / n; - if filenames.len() % n > 0 { +fn split_files( + partitioned_files: &[PartitionedFile], + n: usize, +) -> Vec<&[PartitionedFile]> { + let mut chunk_size = partitioned_files.len() / n; + if partitioned_files.len() % n > 0 { chunk_size += 1; } - filenames.chunks(chunk_size).collect() + partitioned_files.chunks(chunk_size).collect() } struct ParquetStream { @@ -973,24 +659,24 @@ mod tests { #[test] fn test_split_files() { - let filenames = vec![ - "a".to_string(), - "b".to_string(), - "c".to_string(), - "d".to_string(), - "e".to_string(), + let files = vec![ + PartitionedFile::from("a".to_string()), + PartitionedFile::from("b".to_string()), + PartitionedFile::from("c".to_string()), + PartitionedFile::from("d".to_string()), + PartitionedFile::from("e".to_string()), ]; - let chunks = split_files(&filenames, 1); + let chunks = split_files(&files, 1); assert_eq!(1, chunks.len()); assert_eq!(5, chunks[0].len()); - let chunks = split_files(&filenames, 2); + let chunks = split_files(&files, 2); assert_eq!(2, chunks.len()); assert_eq!(3, chunks[0].len()); assert_eq!(2, chunks[1].len()); - let chunks = split_files(&filenames, 5); + let chunks = split_files(&files, 5); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -998,7 +684,7 @@ mod tests { assert_eq!(1, chunks[3].len()); assert_eq!(1, chunks[4].len()); - let chunks = split_files(&filenames, 123); + let chunks = split_files(&files, 123); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -1016,7 +702,7 @@ mod tests { Some(vec![0, 1, 2]), None, 1024, - 4, + ExecutionContext::with_concurrency(4), None, )?; assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);