From 70d633a71d0096b03bd8f173b000fdca82671d3f Mon Sep 17 00:00:00 2001 From: elijah <30852919+e1ijah1@users.noreply.github.com> Date: Tue, 6 Jun 2023 20:06:33 +0800 Subject: [PATCH] feat: support type coercion in Parquet Reader (#6458) * feat: support type coercion in Parquet Reader * fix: schema adapter map schema with projection * fix: map schema with projection * fix: fix test evolved_schema_incompatible_types * make ci happy * improve the code * add e2e test * make ci happy * empty commit * improve the code * add docs for the fields * fix test evolved_schema_projection * fix inconsistent_order mapping * make ci happy * make ci happy --- .../core/src/physical_plan/file_format/mod.rs | 324 +++++++----------- .../src/physical_plan/file_format/parquet.rs | 28 +- datafusion/core/tests/parquet/mod.rs | 1 + .../core/tests/parquet/schema_coercion.rs | 196 +++++++++++ 4 files changed, 345 insertions(+), 204 deletions(-) create mode 100644 datafusion/core/tests/parquet/schema_coercion.rs diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 3b737f03d82c..f67636e6af1d 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -57,7 +57,7 @@ use arrow::compute::can_cast_types; use arrow::record_batch::RecordBatchOptions; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_physical_expr::expressions::Column; -use log::{debug, info, warn}; +use log::{debug, warn}; use object_store::path::Path; use object_store::ObjectMeta; use std::fmt::Debug; @@ -392,32 +392,10 @@ impl SchemaAdapter { file_schema.index_of(field.name()).ok() } - /// Map projected column indexes to the file schema. This will fail if the table schema - /// and the file schema contain a field with the same name and different types. - pub fn map_projections( - &self, - file_schema: &Schema, - projections: &[usize], - ) -> Result> { - let mut mapped: Vec = vec![]; - for idx in projections { - let field = self.table_schema.field(*idx); - if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { - if file_schema.field(mapped_idx).data_type() == field.data_type() { - mapped.push(mapped_idx) - } else { - let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); - info!("{}", msg); - return Err(DataFusionError::Execution(msg)); - } - } - } - Ok(mapped) - } - /// Re-order projected columns by index in record batch to match table schema column ordering. If the record /// batch does not contain a column for an expected field, insert a null-valued column at the /// required column index. + #[allow(dead_code)] pub fn adapt_batch( &self, batch: RecordBatch, @@ -458,36 +436,38 @@ impl SchemaAdapter { /// If the provided `file_schema` contains columns of a different type to the expected /// `table_schema`, the method will attempt to cast the array data from the file schema /// to the table schema where possible. - #[allow(dead_code)] - pub fn map_schema(&self, file_schema: &Schema) -> Result { - let mut field_mappings = Vec::new(); + pub fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(SchemaMapping, Vec)> { + let mut field_mappings: Vec = vec![false; self.table_schema.fields().len()]; + let mut mapped: Vec = vec![]; for (idx, field) in self.table_schema.fields().iter().enumerate() { - match file_schema.field_with_name(field.name()) { - Ok(file_field) => { - if can_cast_types(file_field.data_type(), field.data_type()) { - field_mappings.push((idx, field.data_type().clone())) - } else { - return Err(DataFusionError::Plan(format!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - field.name(), - file_field.data_type(), - field.data_type() - ))); - } - } - Err(_) => { + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if can_cast_types( + file_schema.field(mapped_idx).data_type(), + field.data_type(), + ) { + field_mappings[idx] = true; + mapped.push(mapped_idx); + } else { return Err(DataFusionError::Plan(format!( - "File schema does not contain expected field {}", - field.name() + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + field.name(), + file_schema.field(mapped_idx).data_type(), + field.data_type() ))); } } } - Ok(SchemaMapping { - table_schema: self.table_schema.clone(), - field_mappings, - }) + Ok(( + SchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings, + }, + mapped, + )) } } @@ -495,32 +475,46 @@ impl SchemaAdapter { /// and any necessary type conversions that need to be applied. #[derive(Debug)] pub struct SchemaMapping { - #[allow(dead_code)] + /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. table_schema: SchemaRef, - #[allow(dead_code)] - field_mappings: Vec<(usize, DataType)>, + /// In `field_mappings`, a `true` value indicates that the corresponding field in `table_schema` exists in `file_schema`, + /// while a `false` value indicates that the corresponding field does not exist. + field_mappings: Vec, } impl SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. - #[allow(dead_code)] fn map_batch(&self, batch: RecordBatch) -> Result { - let mut mapped_cols = Vec::with_capacity(self.field_mappings.len()); + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + let batch_schema = batch.schema(); - for (idx, data_type) in &self.field_mappings { - let array = batch.column(*idx); - let casted_array = arrow::compute::cast(array, data_type)?; - mapped_cols.push(casted_array); - } + let cols = self + .table_schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| { + if self.field_mappings[idx] { + match batch_schema.index_of(field.name()) { + Ok(batch_idx) => arrow::compute::cast( + &batch_cols[batch_idx], + field.data_type(), + ) + .map_err(DataFusionError::ArrowError), + Err(_) => Ok(new_null_array(field.data_type(), batch_rows)), + } + } else { + Ok(new_null_array(field.data_type(), batch_rows)) + } + }) + .collect::>>()?; // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let record_batch = RecordBatch::try_new_with_options( - self.table_schema.clone(), - mapped_cols, - &options, - )?; + let record_batch = + RecordBatch::try_new_with_options(self.table_schema.clone(), cols, &options)?; Ok(record_batch) } } @@ -881,8 +875,11 @@ fn get_projected_output_ordering( #[cfg(test)] mod tests { use arrow_array::cast::AsArray; - use arrow_array::types::{Float64Type, UInt32Type}; - use arrow_array::{Float32Array, StringArray, UInt64Array}; + use arrow_array::types::{Float32Type, Float64Type, UInt32Type}; + use arrow_array::{ + BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray, + UInt64Array, + }; use chrono::Utc; use crate::{ @@ -1147,132 +1144,6 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } - #[test] - fn schema_adapter_adapt_projections() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int64, true), - Field::new("c3", DataType::Int8, true), - ])); - - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int64, true), - ]); - - let file_schema_2 = Arc::new(Schema::new(vec![ - Field::new("c3", DataType::Int8, true), - Field::new("c2", DataType::Int64, true), - ])); - - let file_schema_3 = - Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)])); - - let adapter = SchemaAdapter::new(table_schema); - - let projections1: Vec = vec![0, 1, 2]; - let projections2: Vec = vec![2]; - - let mapped = adapter - .map_projections(&file_schema, projections1.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![0, 1]); - - let mapped = adapter - .map_projections(&file_schema, projections2.as_slice()) - .expect("mapping projections"); - - assert!(mapped.is_empty()); - - let mapped = adapter - .map_projections(&file_schema_2, projections1.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![1, 0]); - - let mapped = adapter - .map_projections(&file_schema_2, projections2.as_slice()) - .expect("mapping projections"); - - assert_eq!(mapped, vec![0]); - - let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice()); - - assert!(mapped.is_err()); - } - - #[test] - fn schema_adapter_map_schema() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt64, true), - Field::new("c3", DataType::Float64, true), - ])); - - let adapter = SchemaAdapter::new(table_schema.clone()); - - // file schema matches table schema - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt64, true), - Field::new("c3", DataType::Float64, true), - ]); - - let mapping = adapter.map_schema(&file_schema).unwrap(); - - assert_eq!( - mapping.field_mappings, - vec![ - (0, DataType::Utf8), - (1, DataType::UInt64), - (2, DataType::Float64), - ] - ); - assert_eq!(mapping.table_schema, table_schema); - - // file schema has columns of a different but castable type - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), // can be casted to UInt64 - Field::new("c3", DataType::Float32, true), // can be casted to Float64 - ]); - - let mapping = adapter.map_schema(&file_schema).unwrap(); - - assert_eq!( - mapping.field_mappings, - vec![ - (0, DataType::Utf8), - (1, DataType::UInt64), - (2, DataType::Float64), - ] - ); - assert_eq!(mapping.table_schema, table_schema); - - // file schema lacks necessary columns - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), - ]); - - let err = adapter.map_schema(&file_schema).unwrap_err(); - - assert!(err - .to_string() - .contains("File schema does not contain expected field")); - - // file schema has columns of a different and non-castable type - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Int32, true), - Field::new("c3", DataType::Date64, true), // cannot be casted to Float64 - ]); - let err = adapter.map_schema(&file_schema).unwrap_err(); - - assert!(err.to_string().contains("Cannot cast file schema field")); - } - #[test] fn schema_mapping_map_batch() { let table_schema = Arc::new(Schema::new(vec![ @@ -1289,7 +1160,7 @@ mod tests { Field::new("c3", DataType::Float32, true), ]); - let mapping = adapter.map_schema(&file_schema).expect("map schema failed"); + let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed"); let c1 = StringArray::from(vec!["hello", "world"]); let c2 = UInt64Array::from(vec![9_u64, 5_u64]); @@ -1318,6 +1189,77 @@ mod tests { assert_eq!(c3.value(1), 7.0_f64); } + #[test] + fn schema_adapter_map_schema_with_projection() { + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Utf8, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Float64, true), + Field::new("c3", DataType::Int32, true), + Field::new("c4", DataType::Float32, true), + ])); + + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c2", DataType::Float32, true), + Field::new("c3", DataType::Binary, true), + Field::new("c4", DataType::Int64, true), + ]); + + let indices = vec![1, 2, 4]; + let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); + let adapter = SchemaAdapter::new(schema); + let (mapping, _) = adapter.map_schema(&file_schema).unwrap(); + + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); + let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]); + let c3 = BinaryArray::from_opt_vec(vec![ + Some(b"hallo"), + Some(b"danke"), + Some(b"super"), + ]); + let c4 = Int64Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new( + Arc::new(file_schema), + vec![ + Arc::new(id), + Arc::new(c1), + Arc::new(c2), + Arc::new(c3), + Arc::new(c4), + ], + ) + .unwrap(); + + let rows_num = batch.num_rows(); + let mapped_batch = mapping.map_batch(batch).unwrap(); + + assert_eq!( + mapped_batch.schema(), + Arc::new(table_schema.project(&indices).unwrap()) + ); + assert_eq!(mapped_batch.num_columns(), indices.len()); + assert_eq!(mapped_batch.num_rows(), rows_num); + + let c1 = mapped_batch.column(0).as_string::(); + let c2 = mapped_batch.column(1).as_primitive::(); + let c4 = mapped_batch.column(2).as_primitive::(); + + assert_eq!(c1.value(0), "1"); + assert_eq!(c1.value(1), "0"); + assert_eq!(c1.value(2), "1"); + + assert_eq!(c2.value(0), 2.0_f64); + assert_eq!(c2.value(1), 7.0_f64); + assert_eq!(c2.value(2), 3.0_f64); + + assert_eq!(c4.value(0), 1.0_f32); + assert_eq!(c4.value(1), 2.0_f32); + assert_eq!(c4.value(2), 3.0_f32); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 800da3a177c8..a281f60a940a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -485,9 +485,10 @@ impl FileOpener for ParquetOpener { &self.metrics, )?; - let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let schema_adapter = SchemaAdapter::new(projected_schema); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); @@ -505,8 +506,9 @@ impl FileOpener for ParquetOpener { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; - let adapted_projections = - schema_adapter.map_projections(builder.schema(), &projection)?; + + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(builder.schema())?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( @@ -575,11 +577,8 @@ impl FileOpener for ParquetOpener { let adapted = stream .map_err(|e| ArrowError::ExternalError(Box::new(e))) .map(move |maybe_batch| { - maybe_batch.and_then(|b| { - schema_adapter - .adapt_batch(b, &projection) - .map_err(Into::into) - }) + maybe_batch + .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) }); Ok(adapted.boxed()) @@ -795,13 +794,14 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::{ArrayRef, Float32Array, Int32Array}; + use arrow::array::{ArrayRef, Int32Array}; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::{ array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; + use arrow_array::Date64Array; use chrono::{TimeZone, Utc}; use datafusion_common::ScalarValue; use datafusion_common::{assert_contains, ToDFSchema}; @@ -892,7 +892,6 @@ mod tests { .unwrap(), ), }; - // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = page_index_predicate; @@ -1465,8 +1464,11 @@ mod tests { let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let c4: ArrayRef = - Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); + let c4: ArrayRef = Arc::new(Date64Array::from(vec![ + Some(86400000), + None, + Some(259200000), + ])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![ @@ -1490,7 +1492,7 @@ mod tests { .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(), - "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); + "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8"); } #[tokio::test] diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f4da1efb80ed..3fbf41303263 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -46,6 +46,7 @@ mod custom_reader; mod filter_pushdown; mod page_pruning; mod row_group_pruning; +mod schema_coercion; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs new file mode 100644 index 000000000000..b5ceabad1d3e --- /dev/null +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::types::Int32Type; +use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, StringArray}; +use arrow_schema::DataType; +use datafusion::assert_batches_sorted_eq; +use datafusion::physical_plan::collect; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::prelude::SessionContext; +use datafusion_common::Result; +use datafusion_common::Statistics; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::path::Path; +use object_store::ObjectMeta; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// Test for reading data from multiple parquet files with different schemas and coercing them into a single schema. +#[tokio::test] +async fn multi_parquet_coercion() { + let d1: DictionaryArray = + vec![Some("one"), None, Some("three")].into_iter().collect(); + let c1: ArrayRef = Arc::new(d1); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Float32Array::from(vec![Some(10.0), Some(20.0), None])); + + // batch1: c1(dict), c2(int64) + let batch1 = + RecordBatch::try_from_iter(vec![("c1", c1), ("c2", c2.clone())]).unwrap(); + // batch2: c2(int64), c3(float32) + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3)]).unwrap(); + + let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); + let file_groups = meta.into_iter().map(Into::into).collect(); + + // cast c1 to utf8, c2 to int32, c3 to float64 + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Float64, true), + ])); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-------+----+------+", + "| c1 | c2 | c3 |", + "+-------+----+------+", + "| | | |", + "| | 1 | 10.0 |", + "| | 2 | |", + "| | 2 | 20.0 |", + "| one | 1 | |", + "| three | | |", + "+-------+----+------+", + ]; + assert_batches_sorted_eq!(expected, &read); +} + +#[tokio::test] +async fn multi_parquet_coercion_projection() { + let d1: DictionaryArray = + vec![Some("one"), None, Some("three")].into_iter().collect(); + let c1: ArrayRef = Arc::new(d1); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Float32Array::from(vec![Some(10.0), Some(20.0), None])); + let c1s = Arc::new(StringArray::from(vec![ + Some("baz"), + Some("Boo"), + Some("foo"), + ])); + + // batch1: c2(int64), c1(dict) + let batch1 = + RecordBatch::try_from_iter(vec![("c2", c2.clone()), ("c1", c1)]).unwrap(); + // batch2: c2(int64), c1(str), c3(float32) + let batch2 = + RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3", c3)]).unwrap(); + + let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); + let file_groups = meta.into_iter().map(Into::into).collect(); + + // cast c1 to utf8, c2 to int32, c3 to float64 + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Float64, true), + ])); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: Some(vec![1, 0, 2]), + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+----+-------+------+", + "| c2 | c1 | c3 |", + "+----+-------+------+", + "| | foo | |", + "| | three | |", + "| 1 | baz | 10.0 |", + "| 1 | one | |", + "| 2 | | |", + "| 2 | Boo | 20.0 |", + "+----+-------+------+", + ]; + assert_batches_sorted_eq!(expected, &read); +} + +/// Writes `batches` to a temporary parquet file +pub async fn store_parquet( + batches: Vec, +) -> Result<(Vec, Vec)> { + // Each batch writes to their own file + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let mut output = NamedTempFile::new().expect("creating temp file"); + + let builder = WriterProperties::builder(); + let props = builder.build(); + + let mut writer = + ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) +} + +/// Helper method to fetch the file size and date at given path and create a `ObjectMeta` +pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta { + let location = Path::from_filesystem_path(path.as_ref()).unwrap(); + let metadata = std::fs::metadata(path).expect("Local file metadata"); + ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + } +}