Skip to content

Commit

Permalink
Revert "Provide access to inner Write for parquet writers (apache#5471)"
Browse files Browse the repository at this point in the history
This reverts commit e2b1f22.
  • Loading branch information
mwylde committed May 9, 2024
1 parent 8560ac8 commit 2d96957
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 91 deletions.
26 changes: 2 additions & 24 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,38 +270,16 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.append_key_value_metadata(kv_metadata)
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.writer.inner()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer, doing so
/// will likely result in a corrupt parquet file
pub fn inner_mut(&mut self) -> &mut W {
self.writer.inner_mut()
}

/// Flushes any outstanding data and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.flush()?;
self.writer.into_inner()
}

/// Close and finalize the underlying Parquet writer
///
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
self.flush()?;
self.writer.finish()
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.finish()
self.flush()?;
self.writer.close()
}
pub fn get_trailing_bytes(&mut self, target: W) -> Result<W> {
self.writer.write_trailing_bytes(target)
Expand Down
78 changes: 61 additions & 17 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
//! # }
//! ```

use std::{io::Write, sync::Arc};

use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
Expand Down Expand Up @@ -92,11 +94,14 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
/// ```
pub struct AsyncArrowWriter<W> {
/// Underlying sync writer
sync_writer: ArrowWriter<Vec<u8>>,
sync_writer: ArrowWriter<SharedBuffer>,

/// Async writer provided by caller
async_writer: W,

/// The inner buffer shared by the `sync_writer` and the `async_writer`
shared_buffer: SharedBuffer,

/// Trigger forced flushing once buffer size reaches this value
buffer_size: usize,
}
Expand Down Expand Up @@ -130,15 +135,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
buffer_size: usize,
options: ArrowWriterOptions,
) -> Result<Self> {
let sync_writer = ArrowWriter::try_new_with_options(
Vec::with_capacity(buffer_size),
arrow_schema,
options,
)?;
let shared_buffer = SharedBuffer::new(buffer_size);
let sync_writer =
ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, options)?;

Ok(Self {
sync_writer,
async_writer: writer,
shared_buffer,
buffer_size,
})
}
Expand Down Expand Up @@ -169,13 +173,18 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// checked and flush if at least half full
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
self.try_flush(false).await
Self::try_flush(
&mut self.shared_buffer,
&mut self.async_writer,
self.buffer_size,
)
.await
}

/// Flushes all buffered rows into a new row group
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
self.try_flush(false).await?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;

Ok(())
}
Expand All @@ -191,29 +200,34 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
let metadata = self.sync_writer.finish()?;
let metadata = self.sync_writer.close()?;

// Force to flush the remaining data.
self.try_flush(true).await?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;
self.async_writer.shutdown().await?;

Ok(metadata)
}

/// Flush the buffered data into the `async_writer`
async fn try_flush(&mut self, force: bool) -> Result<()> {
let buffer = self.sync_writer.inner_mut();
if !force && (buffer.is_empty() || buffer.len() < self.buffer_size) {
/// Flush the data in the [`SharedBuffer`] into the `async_writer` if its size
/// exceeds the threshold.
async fn try_flush(
shared_buffer: &mut SharedBuffer,
async_writer: &mut W,
buffer_size: usize,
) -> Result<()> {
let mut buffer = shared_buffer.buffer.try_lock().unwrap();
if buffer.is_empty() || buffer.len() < buffer_size {
// no need to flush
return Ok(());
}

self.async_writer
async_writer
.write_all(buffer.as_slice())
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

self.async_writer
async_writer
.flush()
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Expand All @@ -225,12 +239,42 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
}
}

/// A buffer with interior mutability shared by the [`ArrowWriter`] and
/// [`AsyncArrowWriter`].
#[derive(Clone)]
struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use std::sync::Arc;
use tokio::pin;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
Expand Down
53 changes: 3 additions & 50 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@ impl<W: Write> TrackedWrite<W> {
self.bytes_written
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.inner.get_ref()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer, doing so
/// will likely result in data corruption
pub fn inner_mut(&mut self) -> &mut W {
self.inner.get_mut()
}

/// Returns the underlying writer.
pub fn into_inner(self) -> Result<W> {
self.inner.into_inner().map_err(|err| {
Expand Down Expand Up @@ -150,7 +137,6 @@ pub struct SerializedFileWriter<W: Write> {
row_group_index: usize,
// kv_metadatas will be appended to `props` when `write_metadata`
kv_metadatas: Vec<KeyValue>,
finished: bool,
}

impl<W: Write> Debug for SerializedFileWriter<W> {
Expand Down Expand Up @@ -181,7 +167,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
offset_indexes: Vec::new(),
row_group_index: 0,
kv_metadatas: Vec::new(),
finished: false,
})
}

Expand Down Expand Up @@ -225,23 +210,13 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.row_groups
}

/// Close and finalize the underlying Parquet writer
///
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
/// Closes and finalises file writer, returning the file metadata.
pub fn close(mut self) -> Result<parquet::FileMetaData> {
self.assert_previous_writer_closed()?;
let metadata = self.write_metadata()?;
self.buf.flush()?;
Ok(metadata)
}

/// Closes and finalises file writer, returning the file metadata.
pub fn close(mut self) -> Result<parquet::FileMetaData> {
self.finish()
}

/// Writes magic bytes at the beginning of the file.
fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
buf.write_all(&PARQUET_MAGIC)?;
Expand Down Expand Up @@ -328,7 +303,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
self.finished = true;
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();

let mut row_groups = self
Expand Down Expand Up @@ -392,10 +366,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {

#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if self.finished {
return Err(general_err!("SerializedFileWriter already finished"));
}

if self.row_group_index != self.row_groups.len() {
Err(general_err!("Previous row group writer was not closed"))
} else {
Expand All @@ -417,18 +387,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.props
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.buf.inner()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
pub fn inner_mut(&mut self) -> &mut W {
self.buf.inner_mut()
}

/// Writes the file footer and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.assert_previous_writer_closed()?;
Expand Down Expand Up @@ -1809,7 +1767,7 @@ mod tests {
b_writer.close().unwrap();
row_group_writer.close().unwrap();

let metadata = file_writer.finish().unwrap();
let metadata = file_writer.close().unwrap();
assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
assert_eq!(row_group.columns.len(), 2);
Expand All @@ -1820,11 +1778,6 @@ mod tests {
assert!(row_group.columns[1].offset_index_offset.is_some());
assert!(row_group.columns[1].column_index_offset.is_none());

let err = file_writer.next_row_group().err().unwrap().to_string();
assert_eq!(err, "Parquet error: SerializedFileWriter already finished");

drop(file_writer);

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();

Expand Down

0 comments on commit 2d96957

Please sign in to comment.