Skip to content

Commit

Permalink
[BUG] Pass parquet2 io errors correctly into arrow2 (#3012)
Browse files Browse the repository at this point in the history
In
7fe3dbc,
two changes were made that caused parquet2 io errors to be mishandled.

Firstly, a `IoError` for parquet2 was introduced. However in the
implementation of `parquet2::error::Error` for `arrow2::error:Error`,
any parquet2 error that is not `FeatureNotActive` or `Transport` is
transformed into an `ExternalFormat` error.

Secondly, arrow2 IO errors are intended to be marked as
`ByteStreamError`s, primarily so that they are handled as transient
errors and can be retried appropriately. However this special case was
removed.

This PR makes now classifies parquet2 io errors as arrow2 io errors, and
arrow2 io errors are now marked as `ByteStreeamError`s.

---------

Co-authored-by: Raunak Bhagat <[email protected]>
  • Loading branch information
desmondcheongzx and raunakab authored Oct 7, 2024
1 parent 272163f commit 648b804
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/arrow2/src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ impl From<parquet2::error::Error> for Error {
parquet2::error::Error::Transport(msg) => {
Error::Io(std::io::Error::new(std::io::ErrorKind::Other, msg))
}
parquet2::error::Error::IoError(msg) => {
Error::Io(std::io::Error::new(std::io::ErrorKind::Other, msg))
}
_ => Error::ExternalFormat(error.to_string()),
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/common/error/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
[dependencies]
arrow2 = {workspace = true}
arrow2 = {workspace = true, features = ["io_parquet"]}
pyo3 = {workspace = true, optional = true}
regex = {workspace = true}
serde_json = {workspace = true}
thiserror = {workspace = true}

[dev-dependencies]
parquet2 = {workspace = true}

[features]
python = ["dep:pyo3"]

Expand Down
50 changes: 49 additions & 1 deletion src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum DaftError {
#[error("DaftError::ComputeError {0}")]
ComputeError(String),
#[error("DaftError::ArrowError {0}")]
ArrowError(#[from] arrow2::error::Error),
ArrowError(arrow2::error::Error),
#[error("DaftError::ValueError {0}")]
ValueError(String),
#[cfg(feature = "python")]
Expand Down Expand Up @@ -47,3 +47,51 @@ pub enum DaftError {
#[error("DaftError::RegexError {0}")]
RegexError(#[from] regex::Error),
}

impl From<arrow2::error::Error> for DaftError {
fn from(error: arrow2::error::Error) -> Self {
match error {
arrow2::error::Error::Io(_) => Self::ByteStreamError(error.into()),
_ => Self::ArrowError(error),
}
}
}

#[cfg(test)]
mod tests {
use std::io::ErrorKind;

use super::*;

#[test]
fn test_arrow_io_error_conversion() {
// Ensure that arrow2 IO errors get converted into transient Byte Stream errors.
let error_message = "IO error occurred";
let arrow_io_error =
arrow2::error::Error::Io(std::io::Error::new(ErrorKind::Other, error_message));
let daft_error: DaftError = arrow_io_error.into();
match daft_error {
DaftError::ByteStreamError(e) => {
assert_eq!(e.to_string(), format!("Io error: {error_message}"));
}
_ => panic!("Expected ByteStreamError"),
}
}

#[test]
fn test_parquet_io_error_conversion() {
// Ensure that parquet2 IO errors get converted into transient Byte Stream errors.
let error_message = "IO error occurred";
let parquet_io_error =
parquet2::error::Error::IoError(std::io::Error::new(ErrorKind::Other, error_message));
let arrow_error: arrow2::error::Error = parquet_io_error.into();
//let arrow_error = arrow2::error::Error::from(parquet_io_error);
let daft_error: DaftError = arrow_error.into();
match daft_error {
DaftError::ByteStreamError(e) => {
assert_eq!(e.to_string(), format!("Io error: {error_message}"));
}
_ => panic!("Expected ByteStreamError"),
}
}
}

0 comments on commit 648b804

Please sign in to comment.