Skip to content

Commit

Permalink
perf: Parquet do not copy uncompressed pages (#18441)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 28, 2024
1 parent f6ef516 commit 54cfc31
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 77 deletions.
14 changes: 0 additions & 14 deletions crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,6 @@ pub enum CompressedPage {
}

impl CompressedPage {
pub(crate) fn buffer(&self) -> &[u8] {
match self {
CompressedPage::Data(page) => &page.buffer,
CompressedPage::Dict(page) => &page.buffer,
}
}

pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
match self {
CompressedPage::Data(page) => page.buffer.to_mut(),
Expand Down Expand Up @@ -303,13 +296,6 @@ impl CompressedPage {
CompressedPage::Dict(_) => Some(0),
}
}

pub(crate) fn uncompressed_size(&self) -> usize {
match self {
CompressedPage::Data(page) => page.uncompressed_page_size,
CompressedPage::Dict(page) => page.uncompressed_page_size,
}
}
}

/// An uncompressed, encoded dictionary page.
Expand Down
123 changes: 60 additions & 63 deletions crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,75 +54,72 @@ fn decompress_v2(
Ok(())
}

/// decompresses a [`CompressedDataPage`] into `buffer`.
/// If the page is un-compressed, `buffer` is swapped instead.
/// Returns whether the page was decompressed.
pub fn decompress_buffer(
compressed_page: &mut CompressedPage,
buffer: &mut Vec<u8>,
) -> ParquetResult<bool> {
if compressed_page.compression() != Compression::Uncompressed {
// prepare the compression buffer
let read_size = compressed_page.uncompressed_size();

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
match compressed_page {
CompressedPage::Data(compressed_page) => match compressed_page.header() {
DataPageHeader::V1(_) => {
decompress_v1(&compressed_page.buffer, compressed_page.compression, buffer)?
},
DataPageHeader::V2(header) => decompress_v2(
&compressed_page.buffer,
header,
compressed_page.compression,
buffer,
)?,
},
CompressedPage::Dict(page) => decompress_v1(&page.buffer, page.compression(), buffer)?,
}
Ok(true)
} else {
// page.buffer is already decompressed => swap it with `buffer`, making `page.buffer` the
// decompression buffer and `buffer` the decompressed buffer
std::mem::swap(&mut compressed_page.buffer().to_vec(), buffer);
Ok(false)
}
}

fn create_page(compressed_page: CompressedPage, buffer: Vec<u8>) -> Page {
match compressed_page {
CompressedPage::Data(page) => Page::Data(DataPage::new_read(
/// Decompresses the page, using `buffer` for decompression.
/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.
/// Else, decompression took place.
pub fn decompress(compressed_page: CompressedPage, buffer: &mut Vec<u8>) -> ParquetResult<Page> {
Ok(match (compressed_page.compression(), compressed_page) {
(Compression::Uncompressed, CompressedPage::Data(page)) => Page::Data(DataPage::new_read(
page.header,
CowBuffer::Owned(buffer),
page.buffer,
page.descriptor,
)),
CompressedPage::Dict(page) => Page::Dict(DictPage {
buffer: CowBuffer::Owned(buffer),
(_, CompressedPage::Data(page)) => {
// prepare the compression buffer
let read_size = page.uncompressed_size();

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}

match page.header() {
DataPageHeader::V1(_) => decompress_v1(&page.buffer, page.compression, buffer)?,
DataPageHeader::V2(header) => {
decompress_v2(&page.buffer, header, page.compression, buffer)?
},
}
let buffer = CowBuffer::Owned(std::mem::take(buffer));

Page::Data(DataPage::new_read(page.header, buffer, page.descriptor))
},
(Compression::Uncompressed, CompressedPage::Dict(page)) => Page::Dict(DictPage {
buffer: page.buffer,
num_values: page.num_values,
is_sorted: page.is_sorted,
}),
}
}

/// Decompresses the page, using `buffer` for decompression.
/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.
/// Else, decompression took place.
pub fn decompress(
mut compressed_page: CompressedPage,
buffer: &mut Vec<u8>,
) -> ParquetResult<Page> {
decompress_buffer(&mut compressed_page, buffer)?;
Ok(create_page(compressed_page, std::mem::take(buffer)))
(_, CompressedPage::Dict(page)) => {
// prepare the compression buffer
let read_size = page.uncompressed_page_size;

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
decompress_v1(&page.buffer, page.compression(), buffer)?;
let buffer = CowBuffer::Owned(std::mem::take(buffer));

Page::Dict(DictPage {
buffer,
num_values: page.num_values,
is_sorted: page.is_sorted,
})
},
})
}

type _Decompressor<I> = streaming_decompression::Decompressor<
Expand Down

0 comments on commit 54cfc31

Please sign in to comment.