Skip to content

Commit

Permalink
Add splice column API (apache#4155) (apache#4269)
Browse files Browse the repository at this point in the history
* Add splice column API (apache#4155)

* Review feedback

* Re-encode offset index
  • Loading branch information
tustvold authored and alamb committed May 30, 2023
1 parent aef9ab0 commit 62c6cbb
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 36 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ impl<'a> ByteArrayWriter<'a> {
/// Returns a new [`ByteArrayWriter`]
pub fn new(
descr: ColumnDescPtr,
props: &'a WriterPropertiesPtr,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Result<Self> {
Ok(Self {
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
writer: GenericColumnWriter::new(descr, props, page_writer),
on_close: Some(on_close),
})
}
Expand Down
233 changes: 199 additions & 34 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use std::io::{BufWriter, IoSlice};
use std::io::{BufWriter, IoSlice, Read};
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TSerializable};

Expand All @@ -35,6 +35,7 @@ use crate::column::{
};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::file::reader::ChunkReader;
use crate::file::{
metadata::*, properties::WriterPropertiesPtr,
statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC,
Expand Down Expand Up @@ -423,27 +424,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
}
}

/// Returns the next column writer, if available, using the factory function;
/// otherwise returns `None`.
pub(crate) fn next_column_with_factory<'b, F, C>(
&'b mut self,
factory: F,
) -> Result<Option<C>>
where
F: FnOnce(
ColumnDescPtr,
&'b WriterPropertiesPtr,
Box<dyn PageWriter + 'b>,
OnCloseColumnChunk<'b>,
) -> Result<C>,
{
self.assert_previous_writer_closed()?;

if self.column_index >= self.descr.num_columns() {
return Ok(None);
}
let page_writer = Box::new(SerializedPageWriter::new(self.buf));
/// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
let ret = self.descr.columns().get(self.column_index)?.clone();
self.column_index += 1;
Some(ret)
}

/// Returns [`OnCloseColumnChunk`] for the next writer
fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
let total_bytes_written = &mut self.total_bytes_written;
let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
let total_rows_written = &mut self.total_rows_written;
Expand Down Expand Up @@ -475,28 +464,115 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {

Ok(())
};
(self.buf, Box::new(on_close))
}

let column = self.descr.column(self.column_index);
self.column_index += 1;

Ok(Some(factory(
column,
&self.props,
page_writer,
Box::new(on_close),
)?))
/// Returns the next column writer, if available, using the factory function;
/// otherwise returns `None`.
pub(crate) fn next_column_with_factory<'b, F, C>(
&'b mut self,
factory: F,
) -> Result<Option<C>>
where
F: FnOnce(
ColumnDescPtr,
WriterPropertiesPtr,
Box<dyn PageWriter + 'b>,
OnCloseColumnChunk<'b>,
) -> Result<C>,
{
self.assert_previous_writer_closed()?;
Ok(match self.next_column_desc() {
Some(column) => {
let props = self.props.clone();
let (buf, on_close) = self.get_on_close();
let page_writer = Box::new(SerializedPageWriter::new(buf));
Some(factory(column, props, page_writer, Box::new(on_close))?)
}
None => None,
})
}

/// Returns the next column writer, if available; otherwise returns `None`.
/// In case of any IO error or Thrift error, or if row group writer has already been
/// closed returns `Err`.
pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
self.next_column_with_factory(|descr, props, page_writer, on_close| {
let column_writer = get_column_writer(descr, props.clone(), page_writer);
let column_writer = get_column_writer(descr, props, page_writer);
Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
})
}

/// Append an encoded column chunk from another source without decoding it
///
/// This can be used for efficiently concatenating or projecting parquet data,
/// or encoding parquet data to temporary in-memory buffers
///
/// See [`Self::next_column`] for writing data that isn't already encoded
pub fn append_column<R: ChunkReader>(
&mut self,
reader: &R,
mut close: ColumnCloseResult,
) -> Result<()> {
self.assert_previous_writer_closed()?;
let desc = self.next_column_desc().ok_or_else(|| {
general_err!("exhausted columns in SerializedRowGroupWriter")
})?;

let metadata = close.metadata;

if metadata.column_descr() != desc.as_ref() {
return Err(general_err!(
"column descriptor mismatch, expected {:?} got {:?}",
desc,
metadata.column_descr()
));
}

let src_dictionary_offset = metadata.dictionary_page_offset();
let src_data_offset = metadata.data_page_offset();
let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
let src_length = metadata.compressed_size();

let write_offset = self.buf.bytes_written();
let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
let write_length = std::io::copy(&mut read, &mut self.buf)?;

if src_length as u64 != write_length {
return Err(general_err!(
"Failed to splice column data, expected {read_length} got {write_length}"
));
}

let file_offset = self.buf.bytes_written() as i64;

let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
.set_compression(metadata.compression())
.set_encodings(metadata.encodings().clone())
.set_file_offset(file_offset)
.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
.set_data_page_offset(map_offset(src_data_offset))
.set_dictionary_page_offset(src_dictionary_offset.map(map_offset));

if let Some(statistics) = metadata.statistics() {
builder = builder.set_statistics(statistics.clone())
}
close.metadata = builder.build()?;

if let Some(offsets) = close.offset_index.as_mut() {
for location in &mut offsets.page_locations {
location.offset = map_offset(location.offset)
}
}

SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;
let (_, on_close) = self.get_on_close();
on_close(close)
}

/// Closes this row group writer and returns row group metadata.
pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
if self.row_group_metadata.is_none() {
Expand All @@ -516,9 +592,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
if let Some(on_close) = self.on_close.take() {
on_close(
metadata,
self.bloom_filters.clone(),
self.column_indexes.clone(),
self.offset_indexes.clone(),
self.bloom_filters,
self.column_indexes,
self.offset_indexes,
)?
}
}
Expand Down Expand Up @@ -720,9 +796,11 @@ mod tests {

use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
use crate::column::reader::get_typed_column_reader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
use crate::data_type::{BoolType, Int32Type};
use crate::file::reader::ChunkReader;
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
properties::{ReaderProperties, WriterProperties, WriterVersion},
reader::{FileReader, SerializedFileReader, SerializedPageReader},
Expand Down Expand Up @@ -1540,4 +1618,91 @@ mod tests {
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
}

#[test]
fn test_spliced_write() {
let message_type = "
message test_schema {
REQUIRED INT32 i32 (INTEGER(32,true));
REQUIRED INT32 u32 (INTEGER(32,false));
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(WriterProperties::builder().build());

let mut file = Vec::with_capacity(1024);
let mut file_writer =
SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();

let columns = file_writer.descr.columns();
let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
.iter()
.map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
.collect();

let mut column_state_slice = column_state.as_mut_slice();
let mut column_writers = Vec::with_capacity(columns.len());
for c in columns {
let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
column_state_slice = tail;

let page_writer = Box::new(SerializedPageWriter::new(buf));
let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
column_writers.push(SerializedColumnWriter::new(
col_writer,
Some(Box::new(|on_close| {
*out = Some(on_close);
Ok(())
})),
));
}

let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];

// Interleaved writing to the column writers
for (writer, batch) in column_writers.iter_mut().zip(column_data) {
let writer = writer.typed::<Int32Type>();
writer.write_batch(&batch, None, None).unwrap();
}

// Close the column writers
for writer in column_writers {
writer.close().unwrap()
}

// Splice column data into a row group
let mut row_group_writer = file_writer.next_row_group().unwrap();
for (write, close) in column_state {
let buf = Bytes::from(write.into_inner().unwrap());
row_group_writer
.append_column(&buf, close.unwrap())
.unwrap();
}
row_group_writer.close().unwrap();
file_writer.close().unwrap();

// Check data was written correctly
let file = Bytes::from(file);
let test_read = |reader: SerializedFileReader<Bytes>| {
let row_group = reader.get_row_group(0).unwrap();

let mut out = [0; 4];
let c1 = row_group.get_column_reader(0).unwrap();
let mut c1 = get_typed_column_reader::<Int32Type>(c1);
c1.read_batch(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[0]);

let c2 = row_group.get_column_reader(1).unwrap();
let mut c2 = get_typed_column_reader::<Int32Type>(c2);
c2.read_batch(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[1]);
};

let reader = SerializedFileReader::new(file.clone()).unwrap();
test_read(reader);

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file, options).unwrap();
test_read(reader);
}
}

0 comments on commit 62c6cbb

Please sign in to comment.