Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate MetadataLoader #6474

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}

/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
Expand All @@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// * `column_index`: if true will load column index
/// * `offset_index`: if true will load offset index
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
Expand Down Expand Up @@ -226,6 +229,7 @@ where
/// in the first request, instead of 8, and only issue further requests
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
/// significantly reduce the number of `fetch` requests, and consequently latency
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
Expand All @@ -236,10 +240,14 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(fetch, file_size)
.await
}

// these tests are all replicated in parquet::file::metadata::reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,18 @@ impl ArrowReaderMetadata {
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only is this akward, it is also a common source of confusion / bugs (namely that when someone supplies the ParquetMetaData to the arrow reader options to avoid a second object store request, if often turns out the second fetch happens anyways to read the page index (thus obviating the attempt at optimization)

To avoid this they need to ensure when they read the metadata in the first place, they also read the page index.

This is (in a roundabout way) what is happening to @progval in apache/datafusion#12593

I will try and file a ticket explaining the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #6476

// took an argument to fetch the page indexes.
let mut metadata = input.get_metadata().await?;

if options.page_index
&& metadata.column_index().is_none()
&& metadata.offset_index().is_none()
{
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
let mut loader = MetadataLoader::new(input, m);
loader.load_page_index(true, true).await?;
metadata = Arc::new(loader.finish())
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
reader.load_page_index(input).await?;
metadata = Arc::new(reader.finish()?)
}
Self::try_new(metadata, options)
}
Expand Down
17 changes: 8 additions & 9 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader {

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let preload_column_index = self.preload_column_index;
let preload_offset_index = self.preload_offset_index;
let file_size = self.meta.size;
let prefetch = self.metadata_size_hint;
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
loader
.load_page_index(preload_column_index, preload_offset_index)
let metadata = ParquetMetaDataReader::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so-beautiful

.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(loader.finish()))
Ok(Arc::new(metadata))
})
}
}
Expand Down
48 changes: 22 additions & 26 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,18 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index(fetch, remainder).await
self.load_page_index_with_remainder(fetch, remainder).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
pub async fn load_page_index<F: MetadataFetch>(
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be breaking if in 52.2.0. I'll revert this if need be. I found having remainder in the public API confusing.

self.load_page_index_with_remainder(fetch, None).await
}

#[cfg(feature = "async")]
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
Expand Down Expand Up @@ -836,7 +841,7 @@ mod async_tests {

struct MetadataFetchFn<F>(F);

impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now that the API has been changed back, these test changes are also not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows me to wrap the fetch function once, rather than for each invocation of load, but that could be done inline or with a From I suppose. Happy to revert if you'd like.

Copy link
Contributor

@alamb alamb Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #6484

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought reverting them was nice to demonstrate the same API can still be used

I did notice that the metadata loader tests actually have a copy/paste MetadataFetchFn adapter. Maybe if the arrow crate needs it in two places, we should make it easier for actual users to write them 🤔 Not sure

where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
Expand Down Expand Up @@ -865,74 +870,68 @@ mod async_tests {
let expected = expected.file_metadata().schema();
let fetch_count = AtomicUsize::new(0);

let mut fetch = |range| {
let fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};

let input = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(fetch);
let actual = ParquetMetaDataReader::new()
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(7))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(10))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(500))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(428))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 4)
.load_and_finish(&mut f, 4)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 20)
.load_and_finish(&mut f, 20)
.await
.unwrap_err()
.to_string();
Expand All @@ -949,42 +948,39 @@ mod async_tests {
futures::future::ready(read_range(&mut file, range))
};

let f = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650))
.load_and_finish(f, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
Expand Down
12 changes: 5 additions & 7 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ mod tests {
/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -569,13 +569,11 @@ mod tests {
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None)
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut reader, file_size)
.await
.unwrap();
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await.unwrap();
metadata.finish()
.unwrap()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
Expand Down
17 changes: 10 additions & 7 deletions parquet/tests/arrow_reader/bad_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
#[tokio::test]
async fn bad_metadata_err() {
use bytes::Bytes;
use parquet::arrow::async_reader::MetadataLoader;
use parquet::file::metadata::ParquetMetaDataReader;

let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));

let metadata_length = metadata_buffer.len();

let mut reader = std::io::Cursor::new(&metadata_buffer);
let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
.await
.unwrap();
loader.load_page_index(false, false).await.unwrap();
loader.load_page_index(false, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new();
loader.try_load(&mut reader, metadata_length).await.unwrap();
loader = loader.with_page_indexes(false);
loader.load_page_index(&mut reader).await.unwrap();

let err = loader.load_page_index(true, false).await.unwrap_err();
loader = loader.with_offset_indexes(true);
loader.load_page_index(&mut reader).await.unwrap();

loader = loader.with_column_indexes(true);
let err = loader.load_page_index(&mut reader).await.unwrap_err();

assert_eq!(
err.to_string(),
Expand Down
Loading