Skip to content

Commit

Permalink
[PERF] Local filesystem parquet reader (#1461)
Browse files Browse the repository at this point in the history
* Improves performance of local parquet file reads for both into daft
table and into pyarrow
  • Loading branch information
samster25 authored Oct 4, 2023
1 parent a317dd7 commit 1d4248a
Show file tree
Hide file tree
Showing 14 changed files with 523 additions and 146 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ bytes = "1.4.0"
futures = "0.3.28"
html-escape = "0.2.13"
indexmap = "2.0.0"
itertools = "0.11"
num-derive = "0.3.3"
num-traits = "0.2"
prettytable-rs = "0.10"
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ globset = "0.4"
google-cloud-storage = {version = "0.13.0", default-features = false, features = ["default-tls", "auth"]}
hyper = "0.14.27"
hyper-tls = "0.5.0"
itertools = "0.11"
itertools = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
openssl-sys = {version = "0.9.93", features = ["vendored"]}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl std::fmt::Display for SourceType {
}
}

fn parse_url(input: &str) -> Result<(SourceType, Cow<'_, str>)> {
pub fn parse_url(input: &str) -> Result<(SourceType, Cow<'_, str>)> {
let mut fixed_input = Cow::Borrowed(input);

let url = match url::Url::parse(input) {
Expand Down
1 change: 1 addition & 0 deletions src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ daft-core = {path = "../daft-core", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
itertools = {workspace = true}
log = {workspace = true}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
Expand Down
126 changes: 68 additions & 58 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,56 @@ where
}
}

pub(crate) fn build_row_ranges(
num_rows: usize,
row_start_offset: usize,
row_groups: Option<&[i64]>,
metadata: &parquet2::metadata::FileMetaData,
uri: &str,
) -> super::Result<Vec<RowGroupRange>> {
let mut row_ranges = vec![];
let mut curr_row_index = 0;
let mut rows_to_add = num_rows;
if let Some(row_groups) = row_groups {
for i in row_groups {
let i = *i as usize;
if !(0..metadata.row_groups.len()).contains(&i) {
return Err(super::Error::ParquetRowGroupOutOfIndex {
path: uri.to_string(),
row_group: i as i64,
total_row_groups: metadata.row_groups.len() as i64,
});
}
let rg = metadata.row_groups.get(i).unwrap();
let range_to_add = RowGroupRange {
row_group_index: i,
start: 0,
num_rows: rg.num_rows(),
};
row_ranges.push(range_to_add);
}
} else {
for (i, rg) in metadata.row_groups.iter().enumerate() {
if (curr_row_index + rg.num_rows()) < row_start_offset {
curr_row_index += rg.num_rows();
continue;
} else if rows_to_add > 0 {
let range_to_add = RowGroupRange {
row_group_index: i,
start: row_start_offset.saturating_sub(curr_row_index),
num_rows: rg.num_rows().min(rows_to_add),
};
rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add));
row_ranges.push(range_to_add);
} else {
break;
}
curr_row_index += rg.num_rows();
}
}
Ok(row_ranges)
}

impl ParquetReaderBuilder {
pub async fn from_uri(uri: &str, io_client: Arc<daft_io::IOClient>) -> super::Result<Self> {
// TODO(sammy): We actually don't need this since we can do negative offsets when reading the metadata
Expand Down Expand Up @@ -156,65 +206,25 @@ impl ParquetReaderBuilder {
Ok(self)
}

pub fn set_infer_schema_options(mut self, opts: &ParquetSchemaInferenceOptions) -> Self {
self.schema_inference_options = opts.clone();
pub fn set_infer_schema_options(mut self, opts: ParquetSchemaInferenceOptions) -> Self {
self.schema_inference_options = opts;
self
}

pub fn build(self) -> super::Result<ParquetFileReader> {
let mut row_ranges = vec![];

let mut curr_row_index = 0;
let mut rows_to_add = self.num_rows;
if let Some(row_groups) = self.row_groups {
for i in row_groups {
let i = i as usize;
if !(0..self.metadata.row_groups.len()).contains(&i) {
return Err(super::Error::ParquetRowGroupOutOfIndex {
path: self.uri,
row_group: i as i64,
total_row_groups: self.metadata.row_groups.len() as i64,
});
}
let rg = self.metadata.row_groups.get(i).unwrap();
let range_to_add = RowGroupRange {
row_group_index: i,
start: 0,
num_rows: rg.num_rows(),
};
row_ranges.push(range_to_add);
}
} else {
for (i, rg) in self.metadata.row_groups.iter().enumerate() {
if (curr_row_index + rg.num_rows()) < self.row_start_offset {
curr_row_index += rg.num_rows();
continue;
} else if rows_to_add > 0 {
let range_to_add = RowGroupRange {
row_group_index: i,
start: self.row_start_offset.saturating_sub(curr_row_index),
num_rows: rg.num_rows().min(rows_to_add),
};
rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add));
row_ranges.push(range_to_add);
} else {
break;
}
curr_row_index += rg.num_rows();
}
}
let mut arrow_schema = infer_schema_with_options(
let row_ranges = build_row_ranges(
self.num_rows,
self.row_start_offset,
self.row_groups.as_deref(),
&self.metadata,
&Some(arrow2::io::parquet::read::schema::SchemaInferenceOptions {
int96_coerce_to_timeunit: self
.schema_inference_options
.coerce_int96_timestamp_unit
.to_arrow(),
}),
)
.context(UnableToParseSchemaFromMetadataSnafu::<String> {
path: self.uri.clone(),
})?;
&self.uri,
)?;

let mut arrow_schema =
infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into()))
.context(UnableToParseSchemaFromMetadataSnafu::<String> {
path: self.uri.clone(),
})?;

if let Some(names_to_keep) = self.selected_columns {
arrow_schema
Expand All @@ -227,10 +237,10 @@ impl ParquetReaderBuilder {
}

#[derive(Copy, Clone)]
struct RowGroupRange {
row_group_index: usize,
start: usize,
num_rows: usize,
pub(crate) struct RowGroupRange {
pub row_group_index: usize,
pub start: usize,
pub num_rows: usize,
}

pub(crate) struct ParquetFileReader {
Expand Down
28 changes: 24 additions & 4 deletions src/daft-parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,31 @@ pub mod metadata;
pub mod python;
pub mod read;
mod read_planner;
mod stream_reader;
#[cfg(feature = "python")]
pub use python::register_modules;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{source}"))]
IOError { source: daft_io::Error },
DaftIOError { source: daft_io::Error },

#[snafu(display("Internal IO Error when Opening: {path}:\nDetails:\n{source}"))]
InternalIOError {
path: String,
source: std::io::Error,
},

#[snafu(display("Unable to parse parquet metadata for file {}: {}", path, source))]
UnableToParseMetadata {
path: String,
source: parquet2::error::Error,
},
#[snafu(display("Unable to parse parquet metadata for file {}: {}", path, source))]
UnableToParseMetadataFromLocalFile {
path: String,
source: arrow2::error::Error,
},

#[snafu(display(
"Unable to create arrow arrays from parquet pages {}: {}",
Expand All @@ -38,7 +50,15 @@ pub enum Error {
path: String,
source: parquet2::error::Error,
},

#[snafu(display(
"Unable to create arrow chunk from streaming file reader{}: {}",
path,
source
))]
UnableToCreateChunkFromStreamingFileReader {
path: String,
source: arrow2::error::Error,
},
#[snafu(display(
"Unable to parse parquet metadata to arrow schema for file {}: {}",
path,
Expand Down Expand Up @@ -136,15 +156,15 @@ pub enum Error {
impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
match err {
Error::IOError { source } => source.into(),
Error::DaftIOError { source } => source.into(),
_ => DaftError::External(err.into()),
}
}
}

impl From<daft_io::Error> for Error {
fn from(err: daft_io::Error) -> Self {
Error::IOError { source: err }
Error::DaftIOError { source: err }
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ pub mod pylib {
columns.as_deref(),
start_offset,
num_rows,
row_groups.as_deref(),
row_groups,
io_client,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
schema_infer_options,
)?
.into())
})
Expand Down Expand Up @@ -98,10 +98,10 @@ pub mod pylib {
columns.as_deref(),
start_offset,
num_rows,
row_groups.as_deref(),
row_groups,
io_client,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
schema_infer_options,
)
})?;
let (schema, all_arrays) = read_parquet_result;
Expand Down Expand Up @@ -180,7 +180,7 @@ pub mod pylib {
io_client,
num_parallel_tasks.unwrap_or(128) as usize,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
schema_infer_options,
)
})?;
let pyarrow = py.import("pyarrow")?;
Expand Down Expand Up @@ -211,7 +211,7 @@ pub mod pylib {
Ok(Arc::new(crate::read::read_parquet_schema(
uri,
io_client,
&schema_infer_options,
schema_infer_options,
)?)
.into())
})
Expand Down
Loading

0 comments on commit 1d4248a

Please sign in to comment.