From 3d917a0efd2688233d435d0235d9e6360cf79a63 Mon Sep 17 00:00:00 2001 From: devinjdangelo Date: Tue, 8 Aug 2023 09:18:28 -0400 Subject: [PATCH] Extend insert into support to include Json backed tables (#7212) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * jsonsink and test simplemented * fix tests and clean up * clippy * minor refactor * comments + append existing file test check no new files added * format comments Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com> --------- Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com> --- .../core/src/datasource/file_format/csv.rs | 78 ++---- .../core/src/datasource/file_format/json.rs | 236 ++++++++++++++++- .../core/src/datasource/file_format/mod.rs | 75 +++++- .../src/datasource/file_format/options.rs | 20 ++ .../core/src/datasource/listing/table.rs | 247 +++++++++++++----- 5 files changed, 518 insertions(+), 138 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index cbcdc2f112b0..0d8641a4645e 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -36,9 +36,9 @@ use bytes::{Buf, Bytes}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::io::AsyncWrite; -use super::FileFormat; +use super::{stateless_serialize_and_write_files, FileFormat}; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::FileWriterMode; use crate::datasource::file_format::{ @@ -274,6 +274,12 @@ impl FileFormat for CsvFormat { "Overwrites are not implemented yet for CSV".into(), )); } + + if self.file_compression_type != FileCompressionType::UNCOMPRESSED { + return Err(DataFusionError::NotImplemented( + "Inserting compressed CSV is not implemented yet.".into(), + )); + } let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new( conf, @@ -439,28 +445,6 @@ impl BatchSerializer for CsvSerializer { } } -async fn check_for_errors( - result: Result, - writers: &mut [AbortableWrite], -) -> Result { - match result { - Ok(value) => Ok(value), - Err(e) => { - // Abort all writers before returning the error: - for writer in writers { - let mut abort_future = writer.abort_writer(); - if let Ok(abort_future) = &mut abort_future { - let _ = abort_future.await; - } - // Ignore errors that occur during abortion, - // We do try to abort all writers before returning error. - } - // After aborting writers return original error. - Err(e) - } - } -} - /// Implements [`DataSink`] for writing to a CSV file. struct CsvSink { /// Config options for writing data @@ -566,7 +550,7 @@ impl CsvSink { impl DataSink for CsvSink { async fn write_all( &self, - mut data: Vec, + data: Vec, context: &Arc, ) -> Result { let num_partitions = data.len(); @@ -576,7 +560,7 @@ impl DataSink for CsvSink { .object_store(&self.config.object_store_url)?; // Construct serializer and writer for each file group - let mut serializers = vec![]; + let mut serializers: Vec> = vec![]; let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { @@ -590,7 +574,7 @@ impl DataSink for CsvSink { let serializer = CsvSerializer::new() .with_builder(builder) .with_header(header); - serializers.push(serializer); + serializers.push(Box::new(serializer)); let file = file_group.clone(); let writer = self @@ -608,9 +592,9 @@ impl DataSink for CsvSink { )) } FileWriterMode::PutMultipart => { - //currently assuming only 1 partition path (i.e. not hive style partitioning on a column) + // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - //uniquely identify this batch of files with a random string, to prevent collisions overwriting files + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); for part_idx in 0..num_partitions { let header = true; @@ -618,7 +602,7 @@ impl DataSink for CsvSink { let serializer = CsvSerializer::new() .with_builder(builder) .with_header(header); - serializers.push(serializer); + serializers.push(Box::new(serializer)); let file_path = base_path .prefix() .child(format!("/{}_{}.csv", write_id, part_idx)); @@ -636,39 +620,7 @@ impl DataSink for CsvSink { } } - let mut row_count = 0; - // Map errors to DatafusionError. - let err_converter = - |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(maybe_batch) = data[idx].next().await { - // Write data to files in a round robin fashion: - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers) - .await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; - } - } - // Perform cleanup: - let n_writers = writers.len(); - for idx in 0..n_writers { - check_for_errors( - writers[idx].shutdown().await.map_err(err_converter), - &mut writers, - ) - .await?; - } - Ok(row_count as u64) + stateless_serialize_and_write_files(data, serializers, writers).await } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 6247e85ba879..dae3a18f96ff 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -19,28 +19,52 @@ use std::any::Any; +use bytes::Bytes; +use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; +use rand::distributions::Alphanumeric; +use rand::distributions::DistString; +use std::fmt; +use std::fmt::Debug; use std::io::BufReader; use std::sync::Arc; +use tokio::io::AsyncWrite; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; +use arrow::json; use arrow::json::reader::infer_json_schema_from_iterator; use arrow::json::reader::ValueIter; +use arrow_array::RecordBatch; use async_trait::async_trait; use bytes::Buf; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; +use crate::datasource::physical_plan::FileGroupDisplay; +use crate::datasource::physical_plan::FileMeta; +use crate::physical_plan::insert::DataSink; +use crate::physical_plan::insert::InsertExec; +use crate::physical_plan::SendableRecordBatchStream; +use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; + +use super::stateless_serialize_and_write_files; +use super::AbortMode; +use super::AbortableWrite; +use super::AsyncPutWriter; +use super::BatchSerializer; use super::FileFormat; use super::FileScanConfig; +use super::FileWriterMode; +use super::MultiPart; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; +use crate::datasource::physical_plan::FileSinkConfig; use crate::datasource::physical_plan::NdJsonExec; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::Statistics; /// The default file extension of json files pub const DEFAULT_JSON_EXTENSION: &str = ".json"; @@ -148,6 +172,216 @@ impl FileFormat for JsonFormat { let exec = NdJsonExec::new(conf, self.file_compression_type.to_owned()); Ok(Arc::new(exec)) } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &SessionState, + conf: FileSinkConfig, + ) -> Result> { + if conf.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented yet for Json".into(), + )); + } + + if self.file_compression_type != FileCompressionType::UNCOMPRESSED { + return Err(DataFusionError::NotImplemented( + "Inserting compressed JSON is not implemented yet.".into(), + )); + } + let sink_schema = conf.output_schema().clone(); + let sink = Arc::new(JsonSink::new(conf, self.file_compression_type.clone())); + + Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + } +} + +impl Default for JsonSerializer { + fn default() -> Self { + Self::new() + } +} + +/// Define a struct for serializing Json records to a stream +pub struct JsonSerializer { + // Inner buffer for avoiding reallocation + buffer: Vec, +} + +impl JsonSerializer { + /// Constructor for the JsonSerializer object + pub fn new() -> Self { + Self { + buffer: Vec::with_capacity(4096), + } + } +} + +#[async_trait] +impl BatchSerializer for JsonSerializer { + async fn serialize(&mut self, batch: RecordBatch) -> Result { + let mut writer = json::LineDelimitedWriter::new(&mut self.buffer); + writer.write(&batch)?; + //drop(writer); + Ok(Bytes::from(self.buffer.drain(..).collect::>())) + } +} + +/// Implements [`DataSink`] for writing to a Json file. +struct JsonSink { + /// Config options for writing data + config: FileSinkConfig, + file_compression_type: FileCompressionType, +} + +impl Debug for JsonSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JsonSink") + .field("file_compression_type", &self.file_compression_type) + .finish() + } +} + +impl DisplayAs for JsonSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "JsonSink(writer_mode={:?}, file_groups=", + self.config.writer_mode + )?; + FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; + write!(f, ")") + } + } + } +} + +impl JsonSink { + fn new(config: FileSinkConfig, file_compression_type: FileCompressionType) -> Self { + Self { + config, + file_compression_type, + } + } + + // Create a write for Json files + async fn create_writer( + &self, + file_meta: FileMeta, + object_store: Arc, + ) -> Result>> { + let object = &file_meta.object_meta; + match self.config.writer_mode { + // If the mode is append, call the store's append method and return wrapped in + // a boxed trait object. + FileWriterMode::Append => { + let writer = object_store + .append(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::Append, + ); + Ok(writer) + } + // If the mode is put, create a new AsyncPut writer and return it wrapped in + // a boxed trait object + FileWriterMode::Put => { + let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); + let writer = AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::Put, + ); + Ok(writer) + } + // If the mode is put multipart, call the store's put_multipart method and + // return the writer wrapped in a boxed trait object. + FileWriterMode::PutMultipart => { + let (multipart_id, writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::MultiPart(MultiPart::new( + object_store, + multipart_id, + object.location.clone(), + )), + )) + } + } + } +} + +#[async_trait] +impl DataSink for JsonSink { + async fn write_all( + &self, + data: Vec, + context: &Arc, + ) -> Result { + let num_partitions = data.len(); + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + + // Construct serializer and writer for each file group + let mut serializers: Vec> = vec![]; + let mut writers = vec![]; + match self.config.writer_mode { + FileWriterMode::Append => { + for file_group in &self.config.file_groups { + let serializer = JsonSerializer::new(); + serializers.push(Box::new(serializer)); + + let file = file_group.clone(); + let writer = self + .create_writer( + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } + } + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented( + "Put Mode is not implemented for Json Sink yet".into(), + )) + } + FileWriterMode::PutMultipart => { + // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) + let base_path = &self.config.table_paths[0]; + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let serializer = JsonSerializer::new(); + serializers.push(Box::new(serializer)); + let file_path = base_path + .prefix() + .child(format!("/{}_{}.json", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer(object_meta.into(), object_store.clone()) + .await?; + writers.push(writer); + } + } + } + + stateless_serialize_and_write_files(data, serializers, writers).await + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 4cc6e8706ab9..97492276a27d 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -39,7 +39,7 @@ use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::{ExecutionPlan, Statistics}; +use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream, Statistics}; use arrow_array::RecordBatch; use datafusion_common::DataFusionError; @@ -48,11 +48,11 @@ use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::ready; use futures::FutureExt; +use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncWrite, AsyncWriteExt}; /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the the same file formats. @@ -313,6 +313,75 @@ pub trait BatchSerializer: Unpin + Send { async fn serialize(&mut self, batch: RecordBatch) -> Result; } +/// Checks if any of the passed writers have encountered an error +/// and if so, all writers are aborted. +async fn check_for_errors( + result: Result, + writers: &mut [AbortableWrite], +) -> Result { + match result { + Ok(value) => Ok(value), + Err(e) => { + // Abort all writers before returning the error: + for writer in writers { + let mut abort_future = writer.abort_writer(); + if let Ok(abort_future) = &mut abort_future { + let _ = abort_future.await; + } + // Ignore errors that occur during abortion, + // We do try to abort all writers before returning error. + } + // After aborting writers return original error. + Err(e) + } + } +} + +/// Contains the common logic for serializing RecordBatches and +/// writing the resulting bytes to an ObjectStore. +/// Serialization is assumed to be stateless, i.e. +/// each RecordBatch can be serialized without any +/// dependency on the RecordBatches before or after. +async fn stateless_serialize_and_write_files( + mut data: Vec, + mut serializers: Vec>, + mut writers: Vec>>, +) -> Result { + let num_partitions = data.len(); + let mut row_count = 0; + // Map errors to DatafusionError. + let err_converter = + |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(maybe_batch) = data[idx].next().await { + // Write data to files in a round robin fashion: + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } + } + // Perform cleanup: + let n_writers = writers.len(); + for idx in 0..n_writers { + check_for_errors( + writers[idx].shutdown().await.map_err(err_converter), + &mut writers, + ) + .await?; + } + Ok(row_count as u64) +} + #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index b8499065bd69..73c20d3b0c3a 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -373,6 +373,10 @@ pub struct NdJsonReadOptions<'a> { pub file_compression_type: FileCompressionType, /// Flag indicating whether this file may be unbounded (as in a FIFO file). pub infinite: bool, + /// Indicates how the file is sorted + pub file_sort_order: Vec>, + /// Setting controls how inserts to this file should be handled + pub insert_mode: ListingTableInsertMode, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -384,6 +388,8 @@ impl<'a> Default for NdJsonReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, + file_sort_order: vec![], + insert_mode: ListingTableInsertMode::AppendToFile, } } } @@ -424,6 +430,18 @@ impl<'a> NdJsonReadOptions<'a> { self.schema = Some(schema); self } + + /// Configure if file has known sort order + pub fn file_sort_order(mut self, file_sort_order: Vec>) -> Self { + self.file_sort_order = file_sort_order; + self + } + + /// Configure how insertions to this table should be handled + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { + self.insert_mode = insert_mode; + self + } } #[async_trait] @@ -535,6 +553,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_infinite_source(self.infinite) + .with_file_sort_order(self.file_sort_order.clone()) + .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4085dac48404..b47d25d1f951 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -805,6 +805,19 @@ impl TableProvider for ListingTable { ); } + // TODO support inserts to sorted tables which preserve sort_order + // Inserts currently make no effort to preserve sort_order. This could lead to + // incorrect query results on the table after inserting incorrectly sorted data. + let unsorted: Vec> = vec![]; + if self.options.file_sort_order != unsorted { + return Err( + DataFusionError::NotImplemented( + "Writing to a sorted listing table via insert into is not supported yet. \ + To write to this table in the meantime, register an equivalent table with \ + file_sort_order = vec![]".into()) + ); + } + let table_path = &self.table_paths()[0]; // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; @@ -838,10 +851,9 @@ impl TableProvider for ListingTable { writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart } ListingTableInsertMode::Error => { - return Err(DataFusionError::Plan( + return plan_err!( "Invalid plan attempting write to table with TableWriteMode::Error!" - .into(), - )) + ) } } @@ -935,6 +947,7 @@ mod tests { use super::*; use crate::datasource::file_format::file_type::GetExt; use crate::datasource::{provider_as_source, MemTable}; + use crate::execution::options::ArrowReadOptions; use crate::physical_plan::collect; use crate::prelude::*; use crate::{ @@ -944,9 +957,7 @@ mod tests { logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; - use arrow::csv; use arrow::datatypes::{DataType, Schema}; - use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use chrono::DateTime; use datafusion_common::assert_contains; @@ -1434,26 +1445,6 @@ mod tests { Ok(Arc::new(table)) } - fn load_empty_schema_csv_table( - schema: SchemaRef, - temp_path: &str, - insert_mode: ListingTableInsertMode, - ) -> Result> { - File::create(temp_path)?; - let table_path = ListingTableUrl::parse(temp_path).unwrap(); - - let file_format = CsvFormat::default(); - let listing_options = - ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode); - - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(schema); - - let table = ListingTable::try_new(config)?; - Ok(Arc::new(table)) - } - /// Check that the files listed by the table match the specified `output_partitioning` /// when the object store contains `files`. async fn assert_list_files_for_scan_grouping( @@ -1559,10 +1550,72 @@ mod tests { } #[tokio::test] - async fn test_append_plan_to_external_table_stored_as_csv() -> Result<()> { - let file_type = FileType::CSV; - let file_compression_type = FileCompressionType::UNCOMPRESSED; + async fn test_insert_into_append_to_json_file() -> Result<()> { + helper_test_insert_into_append_to_existing_files( + FileType::JSON, + FileCompressionType::UNCOMPRESSED, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_new_json_files() -> Result<()> { + helper_test_append_new_files_to_table( + FileType::JSON, + FileCompressionType::UNCOMPRESSED, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_to_csv_file() -> Result<()> { + helper_test_insert_into_append_to_existing_files( + FileType::CSV, + FileCompressionType::UNCOMPRESSED, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_new_csv_files() -> Result<()> { + helper_test_append_new_files_to_table( + FileType::CSV, + FileCompressionType::UNCOMPRESSED, + ) + .await?; + Ok(()) + } + + fn load_empty_schema_table( + schema: SchemaRef, + temp_path: &str, + insert_mode: ListingTableInsertMode, + file_format: Arc, + ) -> Result> { + File::create(temp_path)?; + let table_path = ListingTableUrl::parse(temp_path).unwrap(); + + let listing_options = + ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } + /// Logic of testing inserting into listing table by Appending to existing files + /// is the same for all formats/options which support this. This helper allows + /// passing different options to execute the same test with different settings. + async fn helper_test_insert_into_append_to_existing_files( + file_type: FileType, + file_compression_type: FileCompressionType, + ) -> Result<()> { // Create the initial context, schema, and batch. let session_ctx = SessionContext::new(); // Create a new schema with one field called "a" of type Int32 @@ -1587,17 +1640,27 @@ mod tests { .unwrap() ); - // Define batch size for file reader - let batch_size = batch.num_rows(); - // Create a temporary directory and a CSV file within it. let tmp_dir = TempDir::new()?; let path = tmp_dir.path().join(filename); - let initial_table = load_empty_schema_csv_table( + let file_format: Arc = match file_type { + FileType::CSV => Arc::new( + CsvFormat::default().with_file_compression_type(file_compression_type), + ), + FileType::JSON => Arc::new( + JsonFormat::default().with_file_compression_type(file_compression_type), + ), + FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::AVRO => Arc::new(AvroFormat {}), + FileType::ARROW => Arc::new(ArrowFormat {}), + }; + + let initial_table = load_empty_schema_table( schema.clone(), path.to_str().unwrap(), ListingTableInsertMode::AppendToFile, + file_format, )?; session_ctx.register_table("t", initial_table)?; // Create and register the source table with the provided schema and inserted data @@ -1632,19 +1695,9 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &res); - // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. - let file = File::open(path.clone())?; - let reader = csv::ReaderBuilder::new(schema.clone()) - .has_header(true) - .with_batch_size(batch_size) - .build(file) - .map_err(|e| DataFusionError::Internal(e.to_string()))?; - - let batches = reader - .collect::>>() - .into_iter() - .collect::>>() - .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + // Read the records in the table + let batches = session_ctx.sql("select * from t").await?.collect().await?; // Define the expected result as a vector of strings. let expected = vec![ @@ -1663,6 +1716,10 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); + // Assert that only 1 file was added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 1); + // Create a physical plan from the insert plan let plan = session_ctx .state() @@ -1684,18 +1741,7 @@ mod tests { assert_batches_eq!(expected, &res); // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. - let file = File::open(path.clone())?; - let reader = csv::ReaderBuilder::new(schema.clone()) - .has_header(true) - .with_batch_size(batch_size) - .build(file) - .map_err(|e| DataFusionError::Internal(e.to_string()))?; - - let batches = reader - .collect::>>() - .into_iter() - .collect::>>() - .map_err(|e| DataFusionError::Internal(e.to_string())); + let batches = session_ctx.sql("select * from t").await?.collect().await?; // Define the expected result after the second append. let expected = vec![ @@ -1718,14 +1764,20 @@ mod tests { ]; // Assert that the batches read from the file after the second append match the expected result. - assert_batches_eq!(expected, &batches?); + assert_batches_eq!(expected, &batches); + + // Assert that no additional files were added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 1); // Return Ok if the function Ok(()) } - #[tokio::test] - async fn test_append_new_files_to_csv_table() -> Result<()> { + async fn helper_test_append_new_files_to_table( + file_type: FileType, + file_compression_type: FileCompressionType, + ) -> Result<()> { // Create the initial context, schema, and batch. let session_ctx = SessionContext::new(); // Create a new schema with one field called "a" of type Int32 @@ -1741,17 +1793,70 @@ mod tests { vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], )?; - // Create a temporary directory and a CSV file within it. + // Register appropriate table depending on file_type we want to test let tmp_dir = TempDir::new()?; - session_ctx - .register_csv( - "t", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), - ) - .await?; + match file_type { + FileType::CSV => { + session_ctx + .register_csv( + "t", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()) + .file_compression_type(file_compression_type), + ) + .await?; + } + FileType::JSON => { + session_ctx + .register_json( + "t", + tmp_dir.path().to_str().unwrap(), + NdJsonReadOptions::default() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()) + .file_compression_type(file_compression_type), + ) + .await?; + } + FileType::PARQUET => { + session_ctx + .register_parquet( + "t", + tmp_dir.path().to_str().unwrap(), + ParquetReadOptions::default(), // TODO implement insert_mode for parquet + //.insert_mode(ListingTableInsertMode::AppendNewFiles) + //.schema(schema.as_ref()), + ) + .await?; + } + FileType::AVRO => { + session_ctx + .register_avro( + "t", + tmp_dir.path().to_str().unwrap(), + AvroReadOptions::default() + // TODO implement insert_mode for avro + //.insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), + ) + .await?; + } + FileType::ARROW => { + session_ctx + .register_arrow( + "t", + tmp_dir.path().to_str().unwrap(), + ArrowReadOptions::default() + // TODO implement insert_mode for arrow + //.insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), + ) + .await?; + } + } + // Create and register the source table with the provided schema and inserted data let source_table = Arc::new(MemTable::try_new( schema.clone(), @@ -1804,7 +1909,7 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); - //asert that 6 files were added to the table + // Assert that 6 files were added to the table let num_files = tmp_dir.path().read_dir()?.count(); assert_eq!(num_files, 6);