Skip to content

Commit

Permalink
refactor(integration/parquet): Use ParquetMetaDataReader instead (#5170)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Oct 8, 2024
1 parent a0e93b5 commit 13cb7a6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 36 deletions.
4 changes: 2 additions & 2 deletions integrations/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ async-trait = "0.1"
bytes = "1"
futures = "0.3"
opendal = { version = "0.50.0", path = "../../core" }
parquet = { version = "53.0", default-features = false, features = [
parquet = { version = "53.1", default-features = false, features = [
"async",
"arrow",
] }

[dev-dependencies]
arrow = { version = "53.0" }
arrow = { version = "53.1" }
opendal = { version = "0.50.0", path = "../../core", features = [
"services-memory",
"services-s3",
Expand Down
40 changes: 6 additions & 34 deletions integrations/parquet/src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use futures::FutureExt;
use opendal::Reader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::FOOTER_SIZE;

const PREFETCH_FOOTER_SIZE: usize = 512 * 1024;
Expand Down Expand Up @@ -156,40 +156,12 @@ impl AsyncFileReader for AsyncReader {

fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult<std::sync::Arc<ParquetMetaData>>> {
async move {
let prefetched_footer_content = self
.inner
.read(self.content_length - self.prefetch_footer_size as u64..self.content_length)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let prefetched_footer_length = prefetched_footer_content.len();

// Decode the metadata length from the last 8 bytes of the file.
let metadata_length = {
let buf = &prefetched_footer_content
.slice((prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length);
// Safety: checked above.
let buf: [u8; 8] = buf.to_vec().try_into().unwrap();
decode_footer(&buf)?
};

// Try to read the metadata from the `prefetched_footer_content`.
// Otherwise, fetch exact metadata from the remote.
let buf = if prefetched_footer_length >= metadata_length + FOOTER_SIZE {
prefetched_footer_content.slice(
(prefetched_footer_length - metadata_length - FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE),
)
} else {
self.inner
.read(
self.content_length - metadata_length as u64 - FOOTER_SIZE as u64
..self.content_length - FOOTER_SIZE as u64,
)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?
};
let reader =
ParquetMetaDataReader::new().with_prefetch_hint(Some(self.prefetch_footer_size));
let size = self.content_length as usize;
let meta = reader.load_and_finish(self, size).await?;

Ok(Arc::new(decode_metadata(&buf.to_vec())?))
Ok(Arc::new(meta))
}
.boxed()
}
Expand Down

0 comments on commit 13cb7a6

Please sign in to comment.