Skip to content

Commit

Permalink
[BUG] Propagate errors when hitting them in parquet byte stream (#2214)
Browse files Browse the repository at this point in the history
* Wait on errors for the first chunk to finish in the parquet reader
* No longer panic on error when encountering them in the byte reader
* Unwrap cached error when exporting
  • Loading branch information
samster25 authored May 1, 2024
1 parent ce7cec5 commit 339e4a1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
12 changes: 12 additions & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ impl From<Error> for DaftError {
ReadTimeout { .. } => DaftError::ReadTimeout(err.into()),
UnableToReadBytes { .. } => DaftError::ByteStreamError(err.into()),
SocketError { .. } => DaftError::SocketError(err.into()),
// We have to repeat everything above for the case we have an Arc since we can't move the error.
CachedError { ref source } => match source.as_ref() {
NotFound { path, source: _ } => DaftError::FileNotFound {
path: path.clone(),
source: err.into(),
},
ConnectTimeout { .. } => DaftError::ConnectTimeout(err.into()),
ReadTimeout { .. } => DaftError::ReadTimeout(err.into()),
UnableToReadBytes { .. } => DaftError::ByteStreamError(err.into()),
SocketError { .. } => DaftError::SocketError(err.into()),
_ => DaftError::External(err.into()),
},
_ => DaftError::External(err.into()),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ where
None => Ok(first),
Some(second) => {
let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len());

let mut buf = Vec::with_capacity(size_hint);
buf.extend_from_slice(&first);
buf.extend_from_slice(&second);

while let Some(maybe_bytes) = stream.next().await {
buf.extend_from_slice(&maybe_bytes?);
}
Expand Down
37 changes: 24 additions & 13 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,21 +423,28 @@ impl ParquetFileReader {
.map(|(i, _)| i)
.collect::<Vec<_>>();

let range_readers = filtered_cols_idx
let metadata = metadata.clone();

let needed_byte_ranges = filtered_cols_idx
.iter()
.map(|i| {
let c = columns.get(*i).unwrap();
let (start, len) = c.byte_range();
let end: u64 = start + len;
let range_reader = ranges
.get_range_reader(start as usize..end as usize)
.unwrap();

Box::pin(range_reader)
start as usize..end as usize
})
.collect::<Vec<_>>();
let metadata = metadata.clone();

let ranges = ranges.clone();

let handle = tokio::task::spawn(async move {
let mut range_readers = Vec::with_capacity(filtered_cols_idx.len());

for range in needed_byte_ranges.into_iter() {
let range_reader = ranges.get_range_reader(range).await?;
range_readers.push(Box::pin(range_reader))
}

let mut decompressed_iters =
Vec::with_capacity(filtered_cols_idx.len());
let mut ptypes = Vec::with_capacity(filtered_cols_idx.len());
Expand Down Expand Up @@ -599,21 +606,25 @@ impl ParquetFileReader {
.map(|(i, _)| i)
.collect::<Vec<_>>();

let range_readers = filtered_cols_idx
let needed_byte_ranges = filtered_cols_idx
.iter()
.map(|i| {
let c = columns.get(*i).unwrap();
let (start, len) = c.byte_range();
let end: u64 = start + len;
let range_reader = ranges
.get_range_reader(start as usize..end as usize)
.unwrap();

Box::pin(range_reader)
start as usize..end as usize
})
.collect::<Vec<_>>();
let metadata = metadata.clone();
let ranges = ranges.clone();
let handle = tokio::task::spawn(async move {
let mut range_readers = Vec::with_capacity(filtered_cols_idx.len());

for range in needed_byte_ranges.into_iter() {
let range_reader = ranges.get_range_reader(range).await?;
range_readers.push(Box::pin(range_reader))
}

let mut decompressed_iters =
Vec::with_capacity(filtered_cols_idx.len());
let mut ptypes = Vec::with_capacity(filtered_cols_idx.len());
Expand Down
14 changes: 12 additions & 2 deletions src/daft-parquet/src/read_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ pub(crate) struct RangesContainer {
}

impl RangesContainer {
pub fn get_range_reader(&self, range: Range<usize>) -> DaftResult<impl futures::AsyncRead> {
pub async fn get_range_reader(
&self,
range: Range<usize>,
) -> DaftResult<impl futures::AsyncRead> {
let mut current_pos = range.start;
let mut curr_index;
let start_point = self.ranges.binary_search_by_key(&current_pos, |e| e.start);
Expand Down Expand Up @@ -256,9 +259,16 @@ impl RangesContainer {

assert_eq!(current_pos, range.end);

// We block on the first entry so we can surface up the error. This shouldn't cause any performance issues since we have to wait for this to complete anyways
if let Some(entry) = needed_entries.first()
&& let Some(range) = ranges_to_slice.first()
{
entry.get_or_wait(range.clone()).await?;
}

let bytes_iter = tokio_stream::iter(needed_entries.into_iter().zip(ranges_to_slice))
.then(|(e, r)| async move { e.get_or_wait(r).await })
.inspect_err(|e| panic!("Reading a range of Parquet bytes failed: {}", e));
.inspect_err(|e| log::warn!("Encountered error while streaming bytes into parquet reader. This may show up as a Thrift Error Downstream: {}", e));

let stream_reader = tokio_util::io::StreamReader::new(bytes_iter);
let convert = async_compat::Compat::new(stream_reader);
Expand Down

0 comments on commit 339e4a1

Please sign in to comment.