From 11cb2b11897a9f0e91d7f5d07da5a61040515a73 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Aug 2023 09:19:31 -0400 Subject: [PATCH] Minor: remove duplication in `create_writer` --- .../core/src/datasource/file_format/csv.rs | 83 ++++--------------- .../src/datasource/file_format/file_type.rs | 2 +- .../core/src/datasource/file_format/json.rs | 80 ++++-------------- .../core/src/datasource/file_format/mod.rs | 59 ++++++++++++- 4 files changed, 91 insertions(+), 133 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 0d8641a4645e..44525ce2b6a4 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -36,17 +36,13 @@ use bytes::{Buf, Bytes}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use tokio::io::AsyncWrite; -use super::{stateless_serialize_and_write_files, FileFormat}; +use super::{create_writer, stateless_serialize_and_write_files, FileFormat}; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::FileWriterMode; -use crate::datasource::file_format::{ - AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, - DEFAULT_SCHEMA_INFER_MAX_RECORD, -}; +use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::physical_plan::{ - CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig, + CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; use crate::error::Result; use crate::execution::context::SessionState; @@ -494,56 +490,6 @@ impl CsvSink { file_compression_type, } } - - // Create a write for Csv files - async fn create_writer( - &self, - file_meta: FileMeta, - object_store: Arc, - ) -> Result>> { - let object = &file_meta.object_meta; - match self.config.writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } - } } #[async_trait] @@ -577,12 +523,13 @@ impl DataSink for CsvSink { serializers.push(Box::new(serializer)); let file = file_group.clone(); - let writer = self - .create_writer( - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; writers.push(writer); } } @@ -612,9 +559,13 @@ impl DataSink for CsvSink { size: 0, e_tag: None, }; - let writer = self - .create_writer(object_meta.into(), object_store.clone()) - .await?; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; writers.push(writer); } } diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index 567fffb32367..68967221ee6c 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -62,7 +62,7 @@ pub trait GetExt { } /// Readable file compression type -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct FileCompressionType { variant: CompressionTypeVariant, } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index dae3a18f96ff..14bffc4721f2 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -28,7 +28,6 @@ use std::fmt; use std::fmt::Debug; use std::io::BufReader; use std::sync::Arc; -use tokio::io::AsyncWrite; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; @@ -43,21 +42,17 @@ use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; use crate::datasource::physical_plan::FileGroupDisplay; -use crate::datasource::physical_plan::FileMeta; use crate::physical_plan::insert::DataSink; use crate::physical_plan::insert::InsertExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; +use super::create_writer; use super::stateless_serialize_and_write_files; -use super::AbortMode; -use super::AbortableWrite; -use super::AsyncPutWriter; use super::BatchSerializer; use super::FileFormat; use super::FileScanConfig; use super::FileWriterMode; -use super::MultiPart; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::FileSinkConfig; @@ -266,56 +261,6 @@ impl JsonSink { file_compression_type, } } - - // Create a write for Json files - async fn create_writer( - &self, - file_meta: FileMeta, - object_store: Arc, - ) -> Result>> { - let object = &file_meta.object_meta; - match self.config.writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - self.file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } - } } #[async_trait] @@ -341,12 +286,13 @@ impl DataSink for JsonSink { serializers.push(Box::new(serializer)); let file = file_group.clone(); - let writer = self - .create_writer( - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; writers.push(writer); } } @@ -372,9 +318,13 @@ impl DataSink for JsonSink { size: 0, e_tag: None, }; - let writer = self - .create_writer(object_meta.into(), object_store.clone()) - .await?; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; writers.push(writer); } } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 97492276a27d..6d57621126a4 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -53,6 +53,10 @@ use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use self::file_type::FileCompressionType; + +use super::physical_plan::FileMeta; /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the the same file formats. @@ -235,7 +239,7 @@ pub(crate) enum AbortMode { } /// A wrapper struct with abort method and writer -struct AbortableWrite { +pub(crate) struct AbortableWrite { writer: W, mode: AbortMode, } @@ -306,6 +310,59 @@ pub enum FileWriterMode { /// Data is written to a new file in multiple parts. PutMultipart, } + +/// return an [`AbortableWrite`] that writes to the specified object +/// store location and compression +pub(crate) async fn create_writer( + writer_mode: FileWriterMode, + file_compression_type: FileCompressionType, + file_meta: FileMeta, + object_store: Arc, +) -> Result>> { + let object = &file_meta.object_meta; + match writer_mode { + // If the mode is append, call the store's append method and return wrapped in + // a boxed trait object. + FileWriterMode::Append => { + let writer = object_store + .append(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Append, + ); + Ok(writer) + } + // If the mode is put, create a new AsyncPut writer and return it wrapped in + // a boxed trait object + FileWriterMode::Put => { + let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Put, + ); + Ok(writer) + } + // If the mode is put multipart, call the store's put_multipart method and + // return the writer wrapped in a boxed trait object. + FileWriterMode::PutMultipart => { + let (multipart_id, writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::MultiPart(MultiPart::new( + object_store, + multipart_id, + object.location.clone(), + )), + )) + } + } +} + /// A trait that defines the methods required for a RecordBatch serializer. #[async_trait] pub trait BatchSerializer: Unpin + Send {