From ee163a5f6ba2533a3fbbf0e2b012e769e9df1464 Mon Sep 17 00:00:00 2001 From: elijah Date: Fri, 26 May 2023 18:11:48 +0800 Subject: [PATCH 01/15] feat: support type coercion in Parquet Reader --- .../core/src/physical_plan/file_format/mod.rs | 43 +++++++++++++++++-- .../src/physical_plan/file_format/parquet.rs | 14 +++--- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 3b737f03d82c..a2e5039f287e 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -394,6 +394,7 @@ impl SchemaAdapter { /// Map projected column indexes to the file schema. This will fail if the table schema /// and the file schema contain a field with the same name and different types. + #[allow(dead_code)] pub fn map_projections( &self, file_schema: &Schema, @@ -418,6 +419,7 @@ impl SchemaAdapter { /// Re-order projected columns by index in record batch to match table schema column ordering. If the record /// batch does not contain a column for an expected field, insert a null-valued column at the /// required column index. + #[allow(dead_code)] pub fn adapt_batch( &self, batch: RecordBatch, @@ -489,21 +491,56 @@ impl SchemaAdapter { field_mappings, }) } + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema + /// to the table schema, taking into account the provided projections. + pub fn map_schema_with_projection( + &self, + file_schema: &Schema, + projections: &[usize], + ) -> Result { + let mut field_mappings = Vec::new(); + + for idx in projections { + let field = self.table_schema.field(*idx); + match file_schema.field_with_name(field.name()) { + Ok(file_field) => { + if can_cast_types(file_field.data_type(), field.data_type()) { + field_mappings.push((*idx, field.data_type().clone())) + } else { + return Err(DataFusionError::Plan(format!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + field.name(), + file_field.data_type(), + field.data_type() + ))); + } + } + Err(_) => { + return Err(DataFusionError::Plan(format!( + "File schema does not contain expected field {}", + field.name() + ))); + } + } + } + Ok(SchemaMapping { + table_schema: Arc::new(self.table_schema.project(projections)?), + field_mappings, + }) + } } /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. #[derive(Debug)] pub struct SchemaMapping { - #[allow(dead_code)] table_schema: SchemaRef, - #[allow(dead_code)] field_mappings: Vec<(usize, DataType)>, } impl SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. - #[allow(dead_code)] fn map_batch(&self, batch: RecordBatch) -> Result { let mut mapped_cols = Vec::with_capacity(self.field_mappings.len()); diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 800da3a177c8..3386830d0001 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -505,13 +505,14 @@ impl FileOpener for ParquetOpener { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; - let adapted_projections = - schema_adapter.map_projections(builder.schema(), &projection)?; + + let schema_mapping = + schema_adapter.map_schema_with_projection(builder.schema(), &projection)?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( builder.parquet_schema(), - adapted_projections.iter().cloned(), + projection.iter().cloned(), ); // Filter pushdown: evaluate predicates during scan @@ -575,11 +576,8 @@ impl FileOpener for ParquetOpener { let adapted = stream .map_err(|e| ArrowError::ExternalError(Box::new(e))) .map(move |maybe_batch| { - maybe_batch.and_then(|b| { - schema_adapter - .adapt_batch(b, &projection) - .map_err(Into::into) - }) + maybe_batch + .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) }); Ok(adapted.boxed()) From a5181ac3aef4445d084fab86b7cffaf443068112 Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 27 May 2023 16:09:06 +0800 Subject: [PATCH 02/15] fix: schema adapter map schema with projection --- .../core/src/physical_plan/file_format/mod.rs | 112 ++++++++++++++++-- .../src/physical_plan/file_format/parquet.rs | 4 +- 2 files changed, 102 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index a2e5039f287e..2753c3532859 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -525,7 +525,7 @@ impl SchemaAdapter { } } Ok(SchemaMapping { - table_schema: Arc::new(self.table_schema.project(projections)?), + table_schema: self.table_schema.clone(), field_mappings, }) } @@ -542,22 +542,35 @@ pub struct SchemaMapping { impl SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> Result { - let mut mapped_cols = Vec::with_capacity(self.field_mappings.len()); + let mut cols = Vec::with_capacity(self.field_mappings.len()); + let batch_schema = batch.schema(); + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); for (idx, data_type) in &self.field_mappings { - let array = batch.column(*idx); - let casted_array = arrow::compute::cast(array, data_type)?; - mapped_cols.push(casted_array); + let table_field = &self.table_schema.fields()[*idx]; + if let Some((batch_idx, _name)) = + batch_schema.column_with_name(table_field.name().as_str()) + { + cols.push(arrow::compute::cast(&batch_cols[batch_idx], data_type)?); + } else { + cols.push(new_null_array(table_field.data_type(), batch_rows)) + } } // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let record_batch = RecordBatch::try_new_with_options( - self.table_schema.clone(), - mapped_cols, - &options, - )?; + let batch_schema: SchemaRef; + if cols.len() != self.table_schema.fields.len() { + let projection: Vec = + self.field_mappings.iter().map(|(idx, _)| *idx).collect(); + batch_schema = Arc::new(self.table_schema.project(&projection)?); + } else { + batch_schema = self.table_schema.clone(); + } + let record_batch = + RecordBatch::try_new_with_options(batch_schema, cols, &options)?; Ok(record_batch) } } @@ -918,8 +931,11 @@ fn get_projected_output_ordering( #[cfg(test)] mod tests { use arrow_array::cast::AsArray; - use arrow_array::types::{Float64Type, UInt32Type}; - use arrow_array::{Float32Array, StringArray, UInt64Array}; + use arrow_array::types::{Float32Type, Float64Type, UInt32Type}; + use arrow_array::{ + BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray, + UInt64Array, + }; use chrono::Utc; use crate::{ @@ -1355,6 +1371,78 @@ mod tests { assert_eq!(c3.value(1), 7.0_f64); } + #[test] + fn schema_adapter_map_schema_with_projection() { + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Utf8, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Float64, true), + Field::new("c3", DataType::Int32, true), + Field::new("c4", DataType::Float32, true), + ])); + + let adapter = SchemaAdapter::new(table_schema.clone()); + + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c2", DataType::Float32, true), + Field::new("c3", DataType::Binary, true), + Field::new("c4", DataType::Int64, true), + ]); + let indices = vec![1, 2, 4]; + let mapping = adapter + .map_schema_with_projection(&file_schema, &indices) + .unwrap(); + + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); + let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]); + let c3 = BinaryArray::from_opt_vec(vec![ + Some(b"hallo"), + Some(b"danke"), + Some(b"super"), + ]); + let c4 = Int64Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new( + Arc::new(file_schema), + vec![ + Arc::new(id), + Arc::new(c1), + Arc::new(c2), + Arc::new(c3), + Arc::new(c4), + ], + ) + .unwrap(); + + let rows_num = batch.num_rows(); + let mapped_batch = mapping.map_batch(batch).unwrap(); + + assert_eq!( + mapped_batch.schema(), + Arc::new(table_schema.project(&indices).unwrap()) + ); + assert_eq!(mapped_batch.num_columns(), indices.len()); + assert_eq!(mapped_batch.num_rows(), rows_num); + + let c1 = mapped_batch.column(0).as_string::(); + let c2 = mapped_batch.column(1).as_primitive::(); + let c4 = mapped_batch.column(2).as_primitive::(); + + assert_eq!(c1.value(0), "1"); + assert_eq!(c1.value(1), "0"); + assert_eq!(c1.value(2), "1"); + + assert_eq!(c2.value(0), 2.0_f64); + assert_eq!(c2.value(1), 7.0_f64); + assert_eq!(c2.value(2), 3.0_f64); + + assert_eq!(c4.value(0), 1.0_f32); + assert_eq!(c4.value(1), 2.0_f32); + assert_eq!(c4.value(2), 3.0_f32); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 3386830d0001..5d6229438ce5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -506,8 +506,8 @@ impl FileOpener for ParquetOpener { ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; - let schema_mapping = - schema_adapter.map_schema_with_projection(builder.schema(), &projection)?; + let schema_mapping = schema_adapter + .map_schema_with_projection(builder.schema(), &projection)?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( From 121f0176718743f6f3ba0cfc4b9ce18a8276c4b5 Mon Sep 17 00:00:00 2001 From: elijah Date: Sun, 28 May 2023 23:48:05 +0800 Subject: [PATCH 03/15] fix: map schema with projection --- .../core/src/physical_plan/file_format/mod.rs | 80 ++++++++++--------- .../src/physical_plan/file_format/parquet.rs | 5 +- 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 2753c3532859..e11be0a7a807 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -468,7 +468,7 @@ impl SchemaAdapter { match file_schema.field_with_name(field.name()) { Ok(file_field) => { if can_cast_types(file_field.data_type(), field.data_type()) { - field_mappings.push((idx, field.data_type().clone())) + field_mappings.push((idx, Some(field.data_type().clone()))) } else { return Err(DataFusionError::Plan(format!( "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", @@ -498,36 +498,42 @@ impl SchemaAdapter { &self, file_schema: &Schema, projections: &[usize], - ) -> Result { - let mut field_mappings = Vec::new(); + ) -> Result<(SchemaMapping, Vec)> { + let mut field_mappings: Vec<(usize, Option)> = Vec::new(); + let mut mapped: Vec = vec![]; for idx in projections { let field = self.table_schema.field(*idx); - match file_schema.field_with_name(field.name()) { - Ok(file_field) => { - if can_cast_types(file_field.data_type(), field.data_type()) { - field_mappings.push((*idx, field.data_type().clone())) - } else { - return Err(DataFusionError::Plan(format!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - field.name(), - file_field.data_type(), - field.data_type() - ))); - } + match file_schema.index_of(field.name().as_str()) { + Ok(mapped_idx) + if can_cast_types( + file_schema.field(mapped_idx).data_type(), + field.data_type(), + ) => + { + field_mappings.push((*idx, Some(field.data_type().clone()))); + mapped.push(mapped_idx); } - Err(_) => { + Ok(mapped_idx) => { return Err(DataFusionError::Plan(format!( - "File schema does not contain expected field {}", - field.name() + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + field.name(), + file_schema.field(mapped_idx).data_type(), + field.data_type() ))); } + Err(_) => { + field_mappings.push((*idx, None)); + } } } - Ok(SchemaMapping { - table_schema: self.table_schema.clone(), - field_mappings, - }) + Ok(( + SchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings, + }, + mapped, + )) } } @@ -536,7 +542,7 @@ impl SchemaAdapter { #[derive(Debug)] pub struct SchemaMapping { table_schema: SchemaRef, - field_mappings: Vec<(usize, DataType)>, + field_mappings: Vec<(usize, Option)>, } impl SchemaMapping { @@ -549,12 +555,14 @@ impl SchemaMapping { let batch_cols = batch.columns().to_vec(); for (idx, data_type) in &self.field_mappings { let table_field = &self.table_schema.fields()[*idx]; - if let Some((batch_idx, _name)) = - batch_schema.column_with_name(table_field.name().as_str()) - { - cols.push(arrow::compute::cast(&batch_cols[batch_idx], data_type)?); - } else { - cols.push(new_null_array(table_field.data_type(), batch_rows)) + match batch_schema.column_with_name(table_field.name().as_str()) { + Some((batch_idx, _name)) if data_type.is_some() => { + cols.push(arrow::compute::cast( + &batch_cols[batch_idx], + data_type.as_ref().unwrap(), + )?); + } + _ => cols.push(new_null_array(table_field.data_type(), batch_rows)), } } @@ -1277,9 +1285,9 @@ mod tests { assert_eq!( mapping.field_mappings, vec![ - (0, DataType::Utf8), - (1, DataType::UInt64), - (2, DataType::Float64), + (0, Some(DataType::Utf8)), + (1, Some(DataType::UInt64)), + (2, Some(DataType::Float64)), ] ); assert_eq!(mapping.table_schema, table_schema); @@ -1296,9 +1304,9 @@ mod tests { assert_eq!( mapping.field_mappings, vec![ - (0, DataType::Utf8), - (1, DataType::UInt64), - (2, DataType::Float64), + (0, Some(DataType::Utf8)), + (1, Some(DataType::UInt64)), + (2, Some(DataType::Float64)), ] ); assert_eq!(mapping.table_schema, table_schema); @@ -1391,7 +1399,7 @@ mod tests { Field::new("c4", DataType::Int64, true), ]); let indices = vec![1, 2, 4]; - let mapping = adapter + let (mapping, _) = adapter .map_schema_with_projection(&file_schema, &indices) .unwrap(); diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 5d6229438ce5..7418f3a501fa 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -506,13 +506,13 @@ impl FileOpener for ParquetOpener { ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; - let schema_mapping = schema_adapter + let (schema_mapping, adapted_projections) = schema_adapter .map_schema_with_projection(builder.schema(), &projection)?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( builder.parquet_schema(), - projection.iter().cloned(), + adapted_projections.iter().cloned(), ); // Filter pushdown: evaluate predicates during scan @@ -890,7 +890,6 @@ mod tests { .unwrap(), ), }; - // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = page_index_predicate; From bfb5e0eadd0bda1f951497ad7c659f38f0f4de2a Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 29 May 2023 00:24:59 +0800 Subject: [PATCH 04/15] fix: fix test evolved_schema_incompatible_types --- .../core/src/physical_plan/file_format/mod.rs | 9 ++++----- .../core/src/physical_plan/file_format/parquet.rs | 13 ++++++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index e11be0a7a807..a06a2e5f5dc3 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -569,14 +569,13 @@ impl SchemaMapping { // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let batch_schema: SchemaRef; - if cols.len() != self.table_schema.fields.len() { + let batch_schema: SchemaRef = if cols.len() != self.table_schema.fields.len() { let projection: Vec = self.field_mappings.iter().map(|(idx, _)| *idx).collect(); - batch_schema = Arc::new(self.table_schema.project(&projection)?); + Arc::new(self.table_schema.project(&projection)?) } else { - batch_schema = self.table_schema.clone(); - } + self.table_schema.clone() + }; let record_batch = RecordBatch::try_new_with_options(batch_schema, cols, &options)?; Ok(record_batch) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 7418f3a501fa..7d8e725019ab 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -793,7 +793,7 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::{ArrayRef, Float32Array, Int32Array}; + use arrow::array::{ArrayRef, Int32Array}; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::{ @@ -812,6 +812,7 @@ mod tests { use object_store::ObjectMeta; use std::fs::File; use std::io::Write; + use arrow_array::Date64Array; use tempfile::TempDir; struct RoundTripResult { @@ -1463,7 +1464,12 @@ mod tests { let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); let c4: ArrayRef = - Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); + Arc::new(Date64Array::from(vec![ + Some(86400000), + None, + Some(259200000), + ])); + // Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![ @@ -1487,7 +1493,8 @@ mod tests { .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(), - "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); + "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8"); + // "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); } #[tokio::test] From 0a832b7c17651390cae8451f546bda98bd13453f Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 29 May 2023 10:06:59 +0800 Subject: [PATCH 05/15] make ci happy --- .../src/physical_plan/file_format/parquet.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 7d8e725019ab..6290d715e428 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -800,6 +800,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; + use arrow_array::Date64Array; use chrono::{TimeZone, Utc}; use datafusion_common::ScalarValue; use datafusion_common::{assert_contains, ToDFSchema}; @@ -812,7 +813,6 @@ mod tests { use object_store::ObjectMeta; use std::fs::File; use std::io::Write; - use arrow_array::Date64Array; use tempfile::TempDir; struct RoundTripResult { @@ -1463,13 +1463,12 @@ mod tests { let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let c4: ArrayRef = - Arc::new(Date64Array::from(vec![ - Some(86400000), - None, - Some(259200000), - ])); - // Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); + let c4: ArrayRef = Arc::new(Date64Array::from(vec![ + Some(86400000), + None, + Some(259200000), + ])); + // Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![ @@ -1494,7 +1493,7 @@ mod tests { .await; assert_contains!(read.unwrap_err().to_string(), "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8"); - // "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); + // "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); } #[tokio::test] From bb9b9d9f935cda160d25666269648b7e041ca015 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 29 May 2023 11:17:27 +0800 Subject: [PATCH 06/15] improve the code --- datafusion/core/src/physical_plan/file_format/parquet.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 6290d715e428..6696075efede 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1468,7 +1468,6 @@ mod tests { None, Some(259200000), ])); - // Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![ @@ -1493,7 +1492,6 @@ mod tests { .await; assert_contains!(read.unwrap_err().to_string(), "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8"); - // "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); } #[tokio::test] From c236841ba0aa74ccd81caafd8696628a13caab05 Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 31 May 2023 06:29:06 +0800 Subject: [PATCH 07/15] add e2e test --- datafusion/core/tests/parquet/mod.rs | 1 + .../core/tests/parquet/schema_coercion.rs | 140 ++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 datafusion/core/tests/parquet/schema_coercion.rs diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f4da1efb80ed..3fbf41303263 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -46,6 +46,7 @@ mod custom_reader; mod filter_pushdown; mod page_pruning; mod row_group_pruning; +mod schema_coercion; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs new file mode 100644 index 000000000000..969ea9bb10c7 --- /dev/null +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::types::Int32Type; +use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, ListArray}; +use arrow_schema::DataType; +use datafusion::assert_batches_sorted_eq; +use datafusion::physical_plan::collect; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::prelude::SessionContext; +use datafusion_common::Result; +use datafusion_common::Statistics; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::path::Path; +use object_store::ObjectMeta; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// Test for reading data from multiple parquet files with different schemas and coercing them into a single schema. +#[tokio::test] +async fn multi_parquet_coercion() { + let d1: DictionaryArray = + vec![Some("one"), None, Some("three")].into_iter().collect(); + let c1: ArrayRef = Arc::new(d1); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Float32Array::from(vec![Some(10.0), Some(20.0), None])); + let c4 = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + Some(vec![Some(6)]), + ])); + + let batch1 = RecordBatch::try_from_iter(vec![ + ("c1", c1), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]) + .unwrap(); + let batch2 = + RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3), ("c4", c4)]).unwrap(); + + let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); + let file_groups = meta.into_iter().map(Into::into).collect(); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Float64, true), + Field::new("c4", DataType::Utf8, true), + ])); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + None, + None, + ); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-------+----+------+-----------+", + "| c1 | c2 | c3 | c4 |", + "+-------+----+------+-----------+", + "| | | | [6] |", + "| | 1 | 10.0 | [1, 2, 3] |", + "| | 2 | 20.0 | |", + "| | 2 | 20.0 | [4, 5] |", + "| one | 1 | 10.0 | |", + "| three | | | |", + "+-------+----+------+-----------+", + ]; + assert_batches_sorted_eq!(expected, &read); +} + +/// Writes `batches` to a temporary parquet file +pub async fn store_parquet( + batches: Vec, +) -> Result<(Vec, Vec)> { + // Each batch writes to their own file + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let mut output = NamedTempFile::new().expect("creating temp file"); + + let builder = WriterProperties::builder(); + let props = builder.build(); + + let mut writer = + ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) +} + +/// Helper method to fetch the file size and date at given path and create a `ObjectMeta` +pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta { + let location = Path::from_filesystem_path(path.as_ref()).unwrap(); + let metadata = std::fs::metadata(path).expect("Local file metadata"); + ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + } +} From 946137611a2870566378ca63236404590693099d Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 31 May 2023 10:49:13 +0800 Subject: [PATCH 08/15] make ci happy --- datafusion/core/tests/parquet/schema_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 969ea9bb10c7..455a92885626 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -75,7 +75,7 @@ async fn multi_parquet_coercion() { projection: None, limit: None, table_partition_cols: vec![], - output_ordering: None, + output_ordering: vec![], infinite_source: false, }, None, From 45d579ad6ad7c661f0b0ea02f7ccd1b6a6a7e13e Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 1 Jun 2023 10:30:33 +0800 Subject: [PATCH 09/15] empty commit From dfdd8deb5c9c8ce4198129012f1a6623b1f662e0 Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 3 Jun 2023 01:03:52 +0800 Subject: [PATCH 10/15] improve the code --- .../core/src/physical_plan/file_format/mod.rs | 248 ++---------------- .../src/physical_plan/file_format/parquet.rs | 7 +- 2 files changed, 30 insertions(+), 225 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index a06a2e5f5dc3..f20acc559648 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -57,7 +57,7 @@ use arrow::compute::can_cast_types; use arrow::record_batch::RecordBatchOptions; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_physical_expr::expressions::Column; -use log::{debug, info, warn}; +use log::{debug, warn}; use object_store::path::Path; use object_store::ObjectMeta; use std::fmt::Debug; @@ -392,30 +392,6 @@ impl SchemaAdapter { file_schema.index_of(field.name()).ok() } - /// Map projected column indexes to the file schema. This will fail if the table schema - /// and the file schema contain a field with the same name and different types. - #[allow(dead_code)] - pub fn map_projections( - &self, - file_schema: &Schema, - projections: &[usize], - ) -> Result> { - let mut mapped: Vec = vec![]; - for idx in projections { - let field = self.table_schema.field(*idx); - if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { - if file_schema.field(mapped_idx).data_type() == field.data_type() { - mapped.push(mapped_idx) - } else { - let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); - info!("{}", msg); - return Err(DataFusionError::Execution(msg)); - } - } - } - Ok(mapped) - } - /// Re-order projected columns by index in record batch to match table schema column ordering. If the record /// batch does not contain a column for an expected field, insert a null-valued column at the /// required column index. @@ -460,50 +436,15 @@ impl SchemaAdapter { /// If the provided `file_schema` contains columns of a different type to the expected /// `table_schema`, the method will attempt to cast the array data from the file schema /// to the table schema where possible. - #[allow(dead_code)] - pub fn map_schema(&self, file_schema: &Schema) -> Result { - let mut field_mappings = Vec::new(); - - for (idx, field) in self.table_schema.fields().iter().enumerate() { - match file_schema.field_with_name(field.name()) { - Ok(file_field) => { - if can_cast_types(file_field.data_type(), field.data_type()) { - field_mappings.push((idx, Some(field.data_type().clone()))) - } else { - return Err(DataFusionError::Plan(format!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - field.name(), - file_field.data_type(), - field.data_type() - ))); - } - } - Err(_) => { - return Err(DataFusionError::Plan(format!( - "File schema does not contain expected field {}", - field.name() - ))); - } - } - } - Ok(SchemaMapping { - table_schema: self.table_schema.clone(), - field_mappings, - }) - } - - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema - /// to the table schema, taking into account the provided projections. - pub fn map_schema_with_projection( + pub fn map_schema( &self, file_schema: &Schema, - projections: &[usize], ) -> Result<(SchemaMapping, Vec)> { - let mut field_mappings: Vec<(usize, Option)> = Vec::new(); + let mut field_mappings: Vec> = + Vec::with_capacity(self.table_schema.fields().len()); let mut mapped: Vec = vec![]; - for idx in projections { - let field = self.table_schema.field(*idx); + for field in self.table_schema.fields() { match file_schema.index_of(field.name().as_str()) { Ok(mapped_idx) if can_cast_types( @@ -511,7 +452,7 @@ impl SchemaAdapter { field.data_type(), ) => { - field_mappings.push((*idx, Some(field.data_type().clone()))); + field_mappings.push(Some(mapped_idx)); mapped.push(mapped_idx); } Ok(mapped_idx) => { @@ -523,7 +464,7 @@ impl SchemaAdapter { ))); } Err(_) => { - field_mappings.push((*idx, None)); + field_mappings.push(None); } } } @@ -542,42 +483,32 @@ impl SchemaAdapter { #[derive(Debug)] pub struct SchemaMapping { table_schema: SchemaRef, - field_mappings: Vec<(usize, Option)>, + field_mappings: Vec>, } impl SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> Result { - let mut cols = Vec::with_capacity(self.field_mappings.len()); - - let batch_schema = batch.schema(); let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); - for (idx, data_type) in &self.field_mappings { - let table_field = &self.table_schema.fields()[*idx]; - match batch_schema.column_with_name(table_field.name().as_str()) { - Some((batch_idx, _name)) if data_type.is_some() => { - cols.push(arrow::compute::cast( - &batch_cols[batch_idx], - data_type.as_ref().unwrap(), - )?); - } - _ => cols.push(new_null_array(table_field.data_type(), batch_rows)), - } - } + + let cols = self + .table_schema + .fields() + .iter() + .zip(&self.field_mappings) + .map(|(field, mapping)| match mapping { + Some(idx) => arrow::compute::cast(&batch_cols[*idx], field.data_type()) + .map_err(DataFusionError::ArrowError), + None => Ok(new_null_array(field.data_type(), batch_rows)), + }) + .collect::>>()?; // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let batch_schema: SchemaRef = if cols.len() != self.table_schema.fields.len() { - let projection: Vec = - self.field_mappings.iter().map(|(idx, _)| *idx).collect(); - Arc::new(self.table_schema.project(&projection)?) - } else { - self.table_schema.clone() - }; let record_batch = - RecordBatch::try_new_with_options(batch_schema, cols, &options)?; + RecordBatch::try_new_with_options(self.table_schema.clone(), cols, &options)?; Ok(record_batch) } } @@ -1207,132 +1138,6 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } - #[test] - fn schema_adapter_adapt_projections() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int64, true), - Field::new("c3", DataType::Int8, true), - ])); - - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int64, true), - ]); - - let file_schema_2 = Arc::new(Schema::new(vec![ - Field::new("c3", DataType::Int8, true), - Field::new("c2", DataType::Int64, true), - ])); - - let file_schema_3 = - Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)])); - - let adapter = SchemaAdapter::new(table_schema); - - let projections1: Vec = vec![0, 1, 2]; - let projections2: Vec = vec![2]; - - let mapped = adapter - .map_projections(&file_schema, projections1.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![0, 1]); - - let mapped = adapter - .map_projections(&file_schema, projections2.as_slice()) - .expect("mapping projections"); - - assert!(mapped.is_empty()); - - let mapped = adapter - .map_projections(&file_schema_2, projections1.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![1, 0]); - - let mapped = adapter - .map_projections(&file_schema_2, projections2.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![0]); - - let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice()); - - assert!(mapped.is_err()); - } - - #[test] - fn schema_adapter_map_schema() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt64, true), - Field::new("c3", DataType::Float64, true), - ])); - - let adapter = SchemaAdapter::new(table_schema.clone()); - - // file schema matches table schema - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt64, true), - Field::new("c3", DataType::Float64, true), - ]); - - let mapping = adapter.map_schema(&file_schema).unwrap(); - - assert_eq!( - mapping.field_mappings, - vec![ - (0, Some(DataType::Utf8)), - (1, Some(DataType::UInt64)), - (2, Some(DataType::Float64)), - ] - ); - assert_eq!(mapping.table_schema, table_schema); - - // file schema has columns of a different but castable type - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), // can be casted to UInt64 - Field::new("c3", DataType::Float32, true), // can be casted to Float64 - ]); - - let mapping = adapter.map_schema(&file_schema).unwrap(); - - assert_eq!( - mapping.field_mappings, - vec![ - (0, Some(DataType::Utf8)), - (1, Some(DataType::UInt64)), - (2, Some(DataType::Float64)), - ] - ); - assert_eq!(mapping.table_schema, table_schema); - - // file schema lacks necessary columns - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), - ]); - - let err = adapter.map_schema(&file_schema).unwrap_err(); - - assert!(err - .to_string() - .contains("File schema does not contain expected field")); - - // file schema has columns of a different and non-castable type - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), - Field::new("c3", DataType::Date64, true), // cannot be casted to Float64 - ]); - let err = adapter.map_schema(&file_schema).unwrap_err(); - - assert!(err.to_string().contains("Cannot cast file schema field")); - } - #[test] fn schema_mapping_map_batch() { let table_schema = Arc::new(Schema::new(vec![ @@ -1349,7 +1154,7 @@ mod tests { Field::new("c3", DataType::Float32, true), ]); - let mapping = adapter.map_schema(&file_schema).expect("map schema failed"); + let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed"); let c1 = StringArray::from(vec!["hello", "world"]); let c2 = UInt64Array::from(vec![9_u64, 5_u64]); @@ -1388,8 +1193,6 @@ mod tests { Field::new("c4", DataType::Float32, true), ])); - let adapter = SchemaAdapter::new(table_schema.clone()); - let file_schema = Schema::new(vec![ Field::new("id", DataType::Int32, true), Field::new("c1", DataType::Boolean, true), @@ -1397,10 +1200,11 @@ mod tests { Field::new("c3", DataType::Binary, true), Field::new("c4", DataType::Int64, true), ]); + let indices = vec![1, 2, 4]; - let (mapping, _) = adapter - .map_schema_with_projection(&file_schema, &indices) - .unwrap(); + let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); + let adapter = SchemaAdapter::new(schema); + let (mapping, _) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 6696075efede..5a1e53a45410 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -485,9 +485,10 @@ impl FileOpener for ParquetOpener { &self.metrics, )?; - let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let schema_adapter = SchemaAdapter::new(projected_schema.clone()); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); @@ -506,8 +507,8 @@ impl FileOpener for ParquetOpener { ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; - let (schema_mapping, adapted_projections) = schema_adapter - .map_schema_with_projection(builder.schema(), &projection)?; + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(builder.schema())?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( From 030a631936258d3b393520916fb863979c1da55e Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 3 Jun 2023 01:14:39 +0800 Subject: [PATCH 11/15] add docs for the fields --- datafusion/core/src/physical_plan/file_format/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index f20acc559648..79609a6ef547 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -482,7 +482,10 @@ impl SchemaAdapter { /// and any necessary type conversions that need to be applied. #[derive(Debug)] pub struct SchemaMapping { + /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. table_schema: SchemaRef, + /// The index in the batch schema matching the corresponding field in table_schema + /// i.e. table_schema[i] = file_schema[field_mappings[i].0] field_mappings: Vec>, } From 9e1d3f768c177fa05375cdd3cd033b89b8fb9525 Mon Sep 17 00:00:00 2001 From: elijah Date: Sun, 4 Jun 2023 21:02:30 +0800 Subject: [PATCH 12/15] fix test evolved_schema_projection --- .../core/src/physical_plan/file_format/mod.rs | 8 +- .../src/physical_plan/file_format/parquet.rs | 2 +- .../core/tests/parquet/schema_coercion.rs | 104 ++++++++++++++---- 3 files changed, 86 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 79609a6ef547..feac0f105a6e 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -443,6 +443,7 @@ impl SchemaAdapter { let mut field_mappings: Vec> = Vec::with_capacity(self.table_schema.fields().len()); let mut mapped: Vec = vec![]; + let mut batch_idx = 0; for field in self.table_schema.fields() { match file_schema.index_of(field.name().as_str()) { @@ -452,7 +453,8 @@ impl SchemaAdapter { field.data_type(), ) => { - field_mappings.push(Some(mapped_idx)); + field_mappings.push(Some(batch_idx)); + batch_idx += 1; mapped.push(mapped_idx); } Ok(mapped_idx) => { @@ -484,8 +486,8 @@ impl SchemaAdapter { pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. table_schema: SchemaRef, - /// The index in the batch schema matching the corresponding field in table_schema - /// i.e. table_schema[i] = file_schema[field_mappings[i].0] + /// The field in table_schema at index i is mapped to the field in batch_schema at the index field_mappings\[i\].0 + /// i.e. table_schema\[i\] = batch_schema\[field_mappings\[i\].0\] field_mappings: Vec>, } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 5a1e53a45410..a281f60a940a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -488,7 +488,7 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = SchemaAdapter::new(projected_schema.clone()); + let schema_adapter = SchemaAdapter::new(projected_schema); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 455a92885626..b5ceabad1d3e 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -18,7 +18,7 @@ use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::types::Int32Type; -use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, ListArray}; +use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, StringArray}; use arrow_schema::DataType; use datafusion::assert_batches_sorted_eq; use datafusion::physical_plan::collect; @@ -42,29 +42,85 @@ async fn multi_parquet_coercion() { let c1: ArrayRef = Arc::new(d1); let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); let c3: ArrayRef = Arc::new(Float32Array::from(vec![Some(10.0), Some(20.0), None])); - let c4 = Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1), Some(2), Some(3)]), - Some(vec![Some(4), Some(5)]), - Some(vec![Some(6)]), + + // batch1: c1(dict), c2(int64) + let batch1 = + RecordBatch::try_from_iter(vec![("c1", c1), ("c2", c2.clone())]).unwrap(); + // batch2: c2(int64), c3(float32) + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3)]).unwrap(); + + let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); + let file_groups = meta.into_iter().map(Into::into).collect(); + + // cast c1 to utf8, c2 to int32, c3 to float64 + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Float64, true), + ])); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-------+----+------+", + "| c1 | c2 | c3 |", + "+-------+----+------+", + "| | | |", + "| | 1 | 10.0 |", + "| | 2 | |", + "| | 2 | 20.0 |", + "| one | 1 | |", + "| three | | |", + "+-------+----+------+", + ]; + assert_batches_sorted_eq!(expected, &read); +} + +#[tokio::test] +async fn multi_parquet_coercion_projection() { + let d1: DictionaryArray = + vec![Some("one"), None, Some("three")].into_iter().collect(); + let c1: ArrayRef = Arc::new(d1); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Float32Array::from(vec![Some(10.0), Some(20.0), None])); + let c1s = Arc::new(StringArray::from(vec![ + Some("baz"), + Some("Boo"), + Some("foo"), ])); - let batch1 = RecordBatch::try_from_iter(vec![ - ("c1", c1), - ("c2", c2.clone()), - ("c3", c3.clone()), - ]) - .unwrap(); + // batch1: c2(int64), c1(dict) + let batch1 = + RecordBatch::try_from_iter(vec![("c2", c2.clone()), ("c1", c1)]).unwrap(); + // batch2: c2(int64), c1(str), c3(float32) let batch2 = - RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3), ("c4", c4)]).unwrap(); + RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3", c3)]).unwrap(); let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); let file_groups = meta.into_iter().map(Into::into).collect(); + // cast c1 to utf8, c2 to int32, c3 to float64 let file_schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Utf8, true), Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), - Field::new("c4", DataType::Utf8, true), ])); let parquet_exec = ParquetExec::new( FileScanConfig { @@ -72,7 +128,7 @@ async fn multi_parquet_coercion() { file_groups: vec![file_groups], file_schema, statistics: Statistics::default(), - projection: None, + projection: Some(vec![1, 0, 2]), limit: None, table_partition_cols: vec![], output_ordering: vec![], @@ -87,16 +143,16 @@ async fn multi_parquet_coercion() { let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); let expected = vec![ - "+-------+----+------+-----------+", - "| c1 | c2 | c3 | c4 |", - "+-------+----+------+-----------+", - "| | | | [6] |", - "| | 1 | 10.0 | [1, 2, 3] |", - "| | 2 | 20.0 | |", - "| | 2 | 20.0 | [4, 5] |", - "| one | 1 | 10.0 | |", - "| three | | | |", - "+-------+----+------+-----------+", + "+----+-------+------+", + "| c2 | c1 | c3 |", + "+----+-------+------+", + "| | foo | |", + "| | three | |", + "| 1 | baz | 10.0 |", + "| 1 | one | |", + "| 2 | | |", + "| 2 | Boo | 20.0 |", + "+----+-------+------+", ]; assert_batches_sorted_eq!(expected, &read); } From f71238bf756437d149d5600a0f6879b5c4cd9f67 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 5 Jun 2023 03:15:19 +0800 Subject: [PATCH 13/15] fix inconsistent_order mapping --- .../core/src/physical_plan/file_format/mod.rs | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index feac0f105a6e..7a0a4d448544 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -440,24 +440,18 @@ impl SchemaAdapter { &self, file_schema: &Schema, ) -> Result<(SchemaMapping, Vec)> { - let mut field_mappings: Vec> = - Vec::with_capacity(self.table_schema.fields().len()); + let mut field_mappings: Vec = vec![false; self.table_schema.fields().len()]; let mut mapped: Vec = vec![]; - let mut batch_idx = 0; - for field in self.table_schema.fields() { - match file_schema.index_of(field.name().as_str()) { - Ok(mapped_idx) - if can_cast_types( - file_schema.field(mapped_idx).data_type(), - field.data_type(), - ) => - { - field_mappings.push(Some(batch_idx)); - batch_idx += 1; + for (idx, field) in self.table_schema.fields().iter().enumerate() { + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if can_cast_types( + file_schema.field(mapped_idx).data_type(), + field.data_type(), + ) { + field_mappings[idx] = true; mapped.push(mapped_idx); - } - Ok(mapped_idx) => { + } else { return Err(DataFusionError::Plan(format!( "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", field.name(), @@ -465,9 +459,6 @@ impl SchemaAdapter { field.data_type() ))); } - Err(_) => { - field_mappings.push(None); - } } } Ok(( @@ -486,9 +477,9 @@ impl SchemaAdapter { pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. table_schema: SchemaRef, - /// The field in table_schema at index i is mapped to the field in batch_schema at the index field_mappings\[i\].0 - /// i.e. table_schema\[i\] = batch_schema\[field_mappings\[i\].0\] - field_mappings: Vec>, + /// In `field_mappings`, a `true` value indicates that the corresponding field in `table_schema` exists in `file_schema`, + /// while a `false` value indicates that the corresponding field does not exist. + field_mappings: Vec, } impl SchemaMapping { @@ -496,16 +487,44 @@ impl SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); - + let batch_schema = batch.schema().clone(); + + // let cols = self + // .table_schema + // .fields() + // .iter() + // .zip(&self.field_mappings) + // .map(|(field, mapping)| { + // if mapping { + // match batch_schema.index_of(fiel) { + // Ok(batch_idx) => { + // arrow::compute::cast(&batch_cols[batch_idx], field.data_type()) + // .map_err(DataFusionError::ArrowError) + // } + // Err(_) => Ok(new_null_array(field.data_type(), batch_rows)), + // } + // } + // None => Ok(new_null_array(field.data_type(), batch_rows)), + // }) + // .collect::>>()?; let cols = self .table_schema .fields() .iter() - .zip(&self.field_mappings) - .map(|(field, mapping)| match mapping { - Some(idx) => arrow::compute::cast(&batch_cols[*idx], field.data_type()) - .map_err(DataFusionError::ArrowError), - None => Ok(new_null_array(field.data_type(), batch_rows)), + .enumerate() + .map(|(idx, field)| { + if self.field_mappings[idx] { + match batch_schema.index_of(field.name()) { + Ok(batch_idx) => arrow::compute::cast( + &batch_cols[batch_idx], + field.data_type(), + ) + .map_err(DataFusionError::ArrowError), + Err(_) => Ok(new_null_array(field.data_type(), batch_rows)), + } + } else { + Ok(new_null_array(field.data_type(), batch_rows)) + } }) .collect::>>()?; From b08b8795668c3c1253030cd55aa30b0ee4266b23 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 5 Jun 2023 03:17:39 +0800 Subject: [PATCH 14/15] make ci happy --- .../core/src/physical_plan/file_format/mod.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 7a0a4d448544..e694204d6128 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -489,24 +489,6 @@ impl SchemaMapping { let batch_cols = batch.columns().to_vec(); let batch_schema = batch.schema().clone(); - // let cols = self - // .table_schema - // .fields() - // .iter() - // .zip(&self.field_mappings) - // .map(|(field, mapping)| { - // if mapping { - // match batch_schema.index_of(fiel) { - // Ok(batch_idx) => { - // arrow::compute::cast(&batch_cols[batch_idx], field.data_type()) - // .map_err(DataFusionError::ArrowError) - // } - // Err(_) => Ok(new_null_array(field.data_type(), batch_rows)), - // } - // } - // None => Ok(new_null_array(field.data_type(), batch_rows)), - // }) - // .collect::>>()?; let cols = self .table_schema .fields() From 684d38da268ad82258254288cc2beb07f3547a30 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 5 Jun 2023 03:43:56 +0800 Subject: [PATCH 15/15] make ci happy --- datafusion/core/src/physical_plan/file_format/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index e694204d6128..f67636e6af1d 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -487,7 +487,7 @@ impl SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); - let batch_schema = batch.schema().clone(); + let batch_schema = batch.schema(); let cols = self .table_schema