Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Rust side exceptions for Transient Errors #2197

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions daft/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Do not modify or delete these exceptions before checking where they are used in rust
# src/common/error/src/python.rs


class DaftCoreException(ValueError):
"""DaftCore Base Exception"""

pass


class DaftTypeError(DaftCoreException):
"""Type Error that occurred in Daft Core"""

pass


class DaftTransientError(DaftCoreException):
"""Daft Transient Error
This is typically raised when there is a network issue such as timeout or throttling. This can usually be retried.
"""

pass


class ConnectTimeoutError(DaftTransientError):
"""Daft Connection Timeout Error
Daft client was not able to make a connection to the server in the connect timeout time.
"""

pass


class ReadTimeoutError(DaftTransientError):
"""Daft Read Timeout Error
Daft client was not able to read bytes from server under the read timeout time.
"""

pass


class ByteStreamError(DaftTransientError):
"""Daft Byte Stream Error
Daft client had an error while reading bytes in a stream from the server.
"""

pass


class SocketError(DaftTransientError):
"""Daft Socket Error
Daft client had a socket error while reading bytes in a stream from the server.
"""

pass
36 changes: 14 additions & 22 deletions src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub enum DaftError {
source: GenericError,
},
InternalError(String),
ConnectTimeout(GenericError),
ReadTimeout(GenericError),
ByteStreamError(GenericError),
SocketError(GenericError),
External(GenericError),
}

Expand All @@ -35,7 +39,12 @@ impl std::error::Error for DaftError {
| DaftError::ValueError(_)
| DaftError::InternalError(_) => None,
DaftError::IoError(io_error) => Some(io_error),
DaftError::FileNotFound { source, .. } | DaftError::External(source) => Some(&**source),
DaftError::FileNotFound { source, .. }
| DaftError::SocketError(source)
| DaftError::External(source)
| DaftError::ReadTimeout(source)
| DaftError::ConnectTimeout(source)
| DaftError::ByteStreamError(source) => Some(&**source),
#[cfg(feature = "python")]
DaftError::PyO3Error(pyerr) => Some(pyerr),
}
Expand All @@ -48,13 +57,6 @@ impl From<arrow2::error::Error> for DaftError {
}
}

#[cfg(feature = "python")]
impl From<pyo3::PyErr> for DaftError {
fn from(error: pyo3::PyErr) -> Self {
DaftError::PyO3Error(error)
}
}

impl From<serde_json::Error> for DaftError {
fn from(error: serde_json::Error) -> Self {
DaftError::IoError(error.into())
Expand All @@ -73,20 +75,6 @@ impl From<regex::Error> for DaftError {
}
}

#[cfg(feature = "python")]
impl std::convert::From<DaftError> for pyo3::PyErr {
fn from(err: DaftError) -> pyo3::PyErr {
use pyo3::exceptions::{PyFileNotFoundError, PyValueError};
match err {
DaftError::PyO3Error(pyerr) => pyerr,
DaftError::FileNotFound { path, source } => {
PyFileNotFoundError::new_err(format!("File: {path} not found\n{source}"))
}
_ => PyValueError::new_err(err.to_string()),
}
}
}

impl From<std::fmt::Error> for DaftError {
fn from(error: std::fmt::Error) -> Self {
DaftError::ComputeError(error.to_string())
Expand All @@ -113,6 +101,10 @@ impl Display for DaftError {
Self::FileNotFound { path, source } => {
write!(f, "DaftError::FileNotFound {path}: {source}")
}
Self::ByteStreamError(e) => write!(f, "ByteStreamError: {}", e),
Self::ConnectTimeout(e) => write!(f, "ConnectTimeout: {}", e),
Self::ReadTimeout(e) => write!(f, "ReadTimeout: {}", e),
Self::SocketError(e) => write!(f, "SocketError: {}", e),
}
}
}
2 changes: 2 additions & 0 deletions src/common/error/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod error;
pub use error::DaftError;
pub use error::DaftResult;
#[cfg(feature = "python")]
mod python;
35 changes: 35 additions & 0 deletions src/common/error/src/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use pyo3::import_exception;

use crate::DaftError;

impl From<pyo3::PyErr> for DaftError {
fn from(error: pyo3::PyErr) -> Self {
DaftError::PyO3Error(error)
}
}

import_exception!(daft.exceptions, DaftCoreException);
import_exception!(daft.exceptions, DaftTypeError);
import_exception!(daft.exceptions, ConnectTimeoutError);
import_exception!(daft.exceptions, ReadTimeoutError);
import_exception!(daft.exceptions, ByteStreamError);
import_exception!(daft.exceptions, SocketError);

impl std::convert::From<DaftError> for pyo3::PyErr {
fn from(err: DaftError) -> pyo3::PyErr {
use pyo3::exceptions::PyFileNotFoundError;

match err {
DaftError::PyO3Error(pyerr) => pyerr,
DaftError::FileNotFound { path, source } => {
PyFileNotFoundError::new_err(format!("File: {path} not found\n{source}"))
}
DaftError::TypeError(err) => DaftTypeError::new_err(err),
DaftError::ConnectTimeout(err) => ConnectTimeoutError::new_err(err.to_string()),
DaftError::ReadTimeout(err) => ReadTimeoutError::new_err(err.to_string()),
DaftError::ByteStreamError(err) => ByteStreamError::new_err(err.to_string()),
DaftError::SocketError(err) => SocketError::new_err(err.to_string()),
_ => DaftCoreException::new_err(err.to_string()),
}
}
}
21 changes: 21 additions & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ pub enum Error {
source: std::io::Error,
},

#[snafu(display(
"Connection timed out when trying to connect to {}\nDetails:\n{:?}",
path,
source
))]
ConnectTimeout { path: String, source: DynError },

#[snafu(display("Read timed out when trying to read {}\nDetails:\n{:?}", path, source))]
ReadTimeout { path: String, source: DynError },

#[snafu(display(
"Socket error occurred when trying to read {}\nDetails:\n{:?}",
path,
source
))]
SocketError { path: String, source: DynError },

#[snafu(display("Unable to convert URL \"{}\" to path", path))]
InvalidUrl {
path: String,
Expand Down Expand Up @@ -112,6 +129,10 @@ impl From<Error> for DaftError {
use Error::*;
match err {
NotFound { path, source } => DaftError::FileNotFound { path, source },
ConnectTimeout { .. } => DaftError::ConnectTimeout(err.into()),
ReadTimeout { .. } => DaftError::ReadTimeout(err.into()),
UnableToReadBytes { .. } => DaftError::ByteStreamError(err.into()),
SocketError { .. } => DaftError::SocketError(err.into()),
_ => DaftError::External(err.into()),
}
}
Expand Down
133 changes: 103 additions & 30 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,120 @@ enum Error {
impl From<Error> for super::Error {
fn from(error: Error) -> Self {
use Error::*;

match error {
UnableToOpenFile { path, source } => match source.into_service_error() {
GetObjectError::NoSuchKey(no_such_key) => super::Error::NotFound {
path,
source: no_such_key.into(),
},
GetObjectError::Unhandled(v) => super::Error::Unhandled {
UnableToOpenFile { path, source } => match source {
SdkError::TimeoutError(_) => super::Error::ReadTimeout {
path,
msg: DisplayErrorContext(v).to_string(),
source: source.into(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
SdkError::DispatchFailure(ref dispatch) => {
if dispatch.is_timeout() {
super::Error::ConnectTimeout {
path,
source: source.into(),
}
} else if dispatch.is_io() {
super::Error::SocketError {
path,
source: source.into(),
}
} else {
super::Error::UnableToOpenFile {
path,
source: source.into(),
}
}
}
_ => match source.into_service_error() {
GetObjectError::NoSuchKey(no_such_key) => super::Error::NotFound {
path,
source: no_such_key.into(),
},
GetObjectError::Unhandled(v) => super::Error::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
},
},
UnableToHeadFile { path, source } => match source.into_service_error() {
HeadObjectError::NotFound(no_such_key) => super::Error::NotFound {
UnableToHeadFile { path, source } => match source {
SdkError::TimeoutError(_) => super::Error::ReadTimeout {
path,
source: no_such_key.into(),
source: source.into(),
},
HeadObjectError::Unhandled(v) => super::Error::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
SdkError::DispatchFailure(ref dispatch) => {
if dispatch.is_timeout() {
super::Error::ConnectTimeout {
path,
source: source.into(),
}
} else if dispatch.is_io() {
super::Error::SocketError {
path,
source: source.into(),
}
} else {
super::Error::UnableToOpenFile {
path,
source: source.into(),
}
}
}
_ => match source.into_service_error() {
HeadObjectError::NotFound(no_such_key) => super::Error::NotFound {
path,
source: no_such_key.into(),
},
HeadObjectError::Unhandled(v) => super::Error::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
},
},
UnableToListObjects { path, source } => match source.into_service_error() {
ListObjectsV2Error::NoSuchBucket(no_such_key) => super::Error::NotFound {
path,
source: no_such_key.into(),
},
ListObjectsV2Error::Unhandled(v) => super::Error::Unhandled {
UnableToListObjects { path, source } => match source {
SdkError::TimeoutError(_) => super::Error::ReadTimeout {
path,
msg: DisplayErrorContext(v).to_string(),
source: source.into(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
SdkError::DispatchFailure(ref dispatch) => {
if dispatch.is_timeout() {
super::Error::ConnectTimeout {
path,
source: source.into(),
}
} else if dispatch.is_io() {
super::Error::SocketError {
path,
source: source.into(),
}
} else {
super::Error::UnableToOpenFile {
path,
source: source.into(),
}
}
}
_ => match source.into_service_error() {
ListObjectsV2Error::NoSuchBucket(no_such_key) => super::Error::NotFound {
path,
source: no_such_key.into(),
},
ListObjectsV2Error::Unhandled(v) => super::Error::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
},
},
InvalidUrl { path, source } => super::Error::InvalidUrl { path, source },
Expand Down
6 changes: 2 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub mod pylib {
#[pymodule]
fn daft(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
pyo3_log::init();

common_daft_config::register_modules(_py, m)?;
common_system_info::register_modules(_py, m)?;
daft_core::register_modules(_py, m)?;
daft_core::python::register_modules(_py, m)?;
daft_dsl::register_modules(_py, m)?;
Expand All @@ -60,9 +61,6 @@ pub mod pylib {
daft_plan::register_modules(_py, m)?;
daft_micropartition::register_modules(_py, m)?;
daft_scan::register_modules(_py, m)?;
common_daft_config::register_modules(_py, m)?;
common_system_info::register_modules(_py, m)?;

m.add_wrapped(wrap_pyfunction!(version))?;
m.add_wrapped(wrap_pyfunction!(build_type))?;
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyarrow import parquet as pq

import daft
from daft.exceptions import ConnectTimeoutError, ReadTimeoutError
from daft.filesystem import get_filesystem, get_protocol_from_path
from daft.table import MicroPartition, Table

Expand Down Expand Up @@ -413,7 +414,7 @@ def test_connect_timeout(multithreaded_io):
)
)

with pytest.raises(ValueError, match="HTTP connect timeout"):
with pytest.raises((ReadTimeoutError, ConnectTimeoutError), match=f"timed out when trying to connect to {url}"):
MicroPartition.read_parquet(url, io_config=connect_timeout_config, multithreaded_io=multithreaded_io).to_arrow()


Expand All @@ -434,5 +435,5 @@ def test_read_timeout(multithreaded_io):
)
)

with pytest.raises(ValueError, match="HTTP read timeout"):
with pytest.raises((ReadTimeoutError, ConnectTimeoutError), match=f"timed out when trying to connect to {url}"):
MicroPartition.read_parquet(url, io_config=read_timeout_config, multithreaded_io=multithreaded_io).to_arrow()
Loading
Loading