diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 363f57bf5bc2..92b5b5cfad16 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -270,38 +270,16 @@ impl ArrowWriter { self.writer.append_key_value_metadata(kv_metadata) } - /// Returns a reference to the underlying writer. - pub fn inner(&self) -> &W { - self.writer.inner() - } - - /// Returns a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer, doing so - /// will likely result in a corrupt parquet file - pub fn inner_mut(&mut self) -> &mut W { - self.writer.inner_mut() - } - /// Flushes any outstanding data and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.flush()?; self.writer.into_inner() } - /// Close and finalize the underlying Parquet writer - /// - /// Unlike [`Self::close`] this does not consume self - /// - /// Attempting to write after calling finish will result in an error - pub fn finish(&mut self) -> Result { - self.flush()?; - self.writer.finish() - } - /// Close and finalize the underlying Parquet writer pub fn close(mut self) -> Result { - self.finish() + self.flush()?; + self.writer.close() } pub fn get_trailing_bytes(&mut self, target: W) -> Result { self.writer.write_trailing_bytes(target) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 784959fbd9fb..264b05dc5cef 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -51,6 +51,8 @@ //! # } //! ``` +use std::{io::Write, sync::Arc}; + use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, @@ -92,11 +94,14 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; /// ``` pub struct AsyncArrowWriter { /// Underlying sync writer - sync_writer: ArrowWriter>, + sync_writer: ArrowWriter, /// Async writer provided by caller async_writer: W, + /// The inner buffer shared by the `sync_writer` and the `async_writer` + shared_buffer: SharedBuffer, + /// Trigger forced flushing once buffer size reaches this value buffer_size: usize, } @@ -130,15 +135,14 @@ impl AsyncArrowWriter { buffer_size: usize, options: ArrowWriterOptions, ) -> Result { - let sync_writer = ArrowWriter::try_new_with_options( - Vec::with_capacity(buffer_size), - arrow_schema, - options, - )?; + let shared_buffer = SharedBuffer::new(buffer_size); + let sync_writer = + ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, options)?; Ok(Self { sync_writer, async_writer: writer, + shared_buffer, buffer_size, }) } @@ -169,13 +173,18 @@ impl AsyncArrowWriter { /// checked and flush if at least half full pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.sync_writer.write(batch)?; - self.try_flush(false).await + Self::try_flush( + &mut self.shared_buffer, + &mut self.async_writer, + self.buffer_size, + ) + .await } /// Flushes all buffered rows into a new row group pub async fn flush(&mut self) -> Result<()> { self.sync_writer.flush()?; - self.try_flush(false).await?; + Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?; Ok(()) } @@ -191,29 +200,34 @@ impl AsyncArrowWriter { /// /// All the data in the inner buffer will be force flushed. pub async fn close(mut self) -> Result { - let metadata = self.sync_writer.finish()?; + let metadata = self.sync_writer.close()?; // Force to flush the remaining data. - self.try_flush(true).await?; + Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?; self.async_writer.shutdown().await?; Ok(metadata) } - /// Flush the buffered data into the `async_writer` - async fn try_flush(&mut self, force: bool) -> Result<()> { - let buffer = self.sync_writer.inner_mut(); - if !force && (buffer.is_empty() || buffer.len() < self.buffer_size) { + /// Flush the data in the [`SharedBuffer`] into the `async_writer` if its size + /// exceeds the threshold. + async fn try_flush( + shared_buffer: &mut SharedBuffer, + async_writer: &mut W, + buffer_size: usize, + ) -> Result<()> { + let mut buffer = shared_buffer.buffer.try_lock().unwrap(); + if buffer.is_empty() || buffer.len() < buffer_size { // no need to flush return Ok(()); } - self.async_writer + async_writer .write_all(buffer.as_slice()) .await .map_err(|e| ParquetError::External(Box::new(e)))?; - self.async_writer + async_writer .flush() .await .map_err(|e| ParquetError::External(Box::new(e)))?; @@ -225,12 +239,42 @@ impl AsyncArrowWriter { } } +/// A buffer with interior mutability shared by the [`ArrowWriter`] and +/// [`AsyncArrowWriter`]. +#[derive(Clone)] +struct SharedBuffer { + /// The inner buffer for reading and writing + /// + /// The lock is used to obtain internal mutability, so no worry about the + /// lock contention. + buffer: Arc>>, +} + +impl SharedBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), + } + } +} + +impl Write for SharedBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::write(&mut *buffer, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::flush(&mut *buffer) + } +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader}; use bytes::Bytes; - use std::sync::Arc; use tokio::pin; use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index fa23623d6148..1afbfa7a1090 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -61,19 +61,6 @@ impl TrackedWrite { self.bytes_written } - /// Returns a reference to the underlying writer. - pub fn inner(&self) -> &W { - self.inner.get_ref() - } - - /// Returns a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer, doing so - /// will likely result in data corruption - pub fn inner_mut(&mut self) -> &mut W { - self.inner.get_mut() - } - /// Returns the underlying writer. pub fn into_inner(self) -> Result { self.inner.into_inner().map_err(|err| { @@ -150,7 +137,6 @@ pub struct SerializedFileWriter { row_group_index: usize, // kv_metadatas will be appended to `props` when `write_metadata` kv_metadatas: Vec, - finished: bool, } impl Debug for SerializedFileWriter { @@ -181,7 +167,6 @@ impl SerializedFileWriter { offset_indexes: Vec::new(), row_group_index: 0, kv_metadatas: Vec::new(), - finished: false, }) } @@ -225,23 +210,13 @@ impl SerializedFileWriter { &self.row_groups } - /// Close and finalize the underlying Parquet writer - /// - /// Unlike [`Self::close`] this does not consume self - /// - /// Attempting to write after calling finish will result in an error - pub fn finish(&mut self) -> Result { + /// Closes and finalises file writer, returning the file metadata. + pub fn close(mut self) -> Result { self.assert_previous_writer_closed()?; let metadata = self.write_metadata()?; - self.buf.flush()?; Ok(metadata) } - /// Closes and finalises file writer, returning the file metadata. - pub fn close(mut self) -> Result { - self.finish() - } - /// Writes magic bytes at the beginning of the file. fn start_file(buf: &mut TrackedWrite) -> Result<()> { buf.write_all(&PARQUET_MAGIC)?; @@ -328,7 +303,6 @@ impl SerializedFileWriter { /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { - self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); let mut row_groups = self @@ -392,10 +366,6 @@ impl SerializedFileWriter { #[inline] fn assert_previous_writer_closed(&self) -> Result<()> { - if self.finished { - return Err(general_err!("SerializedFileWriter already finished")); - } - if self.row_group_index != self.row_groups.len() { Err(general_err!("Previous row group writer was not closed")) } else { @@ -417,18 +387,6 @@ impl SerializedFileWriter { &self.props } - /// Returns a reference to the underlying writer. - pub fn inner(&self) -> &W { - self.buf.inner() - } - - /// Returns a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn inner_mut(&mut self) -> &mut W { - self.buf.inner_mut() - } - /// Writes the file footer and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.assert_previous_writer_closed()?; @@ -1809,7 +1767,7 @@ mod tests { b_writer.close().unwrap(); row_group_writer.close().unwrap(); - let metadata = file_writer.finish().unwrap(); + let metadata = file_writer.close().unwrap(); assert_eq!(metadata.row_groups.len(), 1); let row_group = &metadata.row_groups[0]; assert_eq!(row_group.columns.len(), 2); @@ -1820,11 +1778,6 @@ mod tests { assert!(row_group.columns[1].offset_index_offset.is_some()); assert!(row_group.columns[1].column_index_offset.is_none()); - let err = file_writer.next_row_group().err().unwrap().to_string(); - assert_eq!(err, "Parquet error: SerializedFileWriter already finished"); - - drop(file_writer); - let options = ReadOptionsBuilder::new().with_page_index().build(); let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();