Skip to content

Commit

Permalink
Extend insert into support to include Json backed tables (apache#7212)
Browse files Browse the repository at this point in the history
* jsonsink and test simplemented

* fix tests and clean up

* clippy

* minor refactor

* comments + append existing file test check no new files added

* format comments

Co-authored-by: Metehan Yıldırım <[email protected]>

---------

Co-authored-by: Metehan Yıldırım <[email protected]>
  • Loading branch information
devinjdangelo and metesynnada authored Aug 8, 2023
1 parent 99e2cd4 commit 3d917a0
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 138 deletions.
78 changes: 15 additions & 63 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::AsyncWrite;

use super::FileFormat;
use super::{stateless_serialize_and_write_files, FileFormat};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{
Expand Down Expand Up @@ -274,6 +274,12 @@ impl FileFormat for CsvFormat {
"Overwrites are not implemented yet for CSV".into(),
));
}

if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
return Err(DataFusionError::NotImplemented(
"Inserting compressed CSV is not implemented yet.".into(),
));
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
Expand Down Expand Up @@ -439,28 +445,6 @@ impl BatchSerializer for CsvSerializer {
}
}

async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
result: Result<T>,
writers: &mut [AbortableWrite<W>],
) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(e) => {
// Abort all writers before returning the error:
for writer in writers {
let mut abort_future = writer.abort_writer();
if let Ok(abort_future) = &mut abort_future {
let _ = abort_future.await;
}
// Ignore errors that occur during abortion,
// We do try to abort all writers before returning error.
}
// After aborting writers return original error.
Err(e)
}
}
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
/// Config options for writing data
Expand Down Expand Up @@ -566,7 +550,7 @@ impl CsvSink {
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
data: Vec<SendableRecordBatchStream>,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
Expand All @@ -576,7 +560,7 @@ impl DataSink for CsvSink {
.object_store(&self.config.object_store_url)?;

// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
Expand All @@ -590,7 +574,7 @@ impl DataSink for CsvSink {
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);
serializers.push(Box::new(serializer));

let file = file_group.clone();
let writer = self
Expand All @@ -608,17 +592,17 @@ impl DataSink for CsvSink {
))
}
FileWriterMode::PutMultipart => {
//currently assuming only 1 partition path (i.e. not hive style partitioning on a column)
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
//uniquely identify this batch of files with a random string, to prevent collisions overwriting files
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
Expand All @@ -636,39 +620,7 @@ impl DataSink for CsvSink {
}
}

let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
// TODO parallelize serialization accross partitions and batches within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
for idx in 0..num_partitions {
while let Some(maybe_batch) = data[idx].next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers)
.await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
}
}
// Perform cleanup:
let n_writers = writers.len();
for idx in 0..n_writers {
check_for_errors(
writers[idx].shutdown().await.map_err(err_converter),
&mut writers,
)
.await?;
}
Ok(row_count as u64)
stateless_serialize_and_write_files(data, serializers, writers).await
}
}

Expand Down
Loading

0 comments on commit 3d917a0

Please sign in to comment.