Skip to content

Commit

Permalink
feat: DataFusionError::find_root
Browse files Browse the repository at this point in the history
Closes apache#4435.
  • Loading branch information
crepererum committed Nov 30, 2022
1 parent 49166ea commit a1c5e07
Showing 1 changed file with 157 additions and 0 deletions.
157 changes: 157 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::Arc;

use crate::{Column, DFSchema};
#[cfg(feature = "avro")]
Expand Down Expand Up @@ -333,8 +334,101 @@ impl From<DataFusionError> for io::Error {
}
}

/// Helper for [`DataFusionError::find_root`].
enum OtherErr<'a> {
Arrow(&'a ArrowError),
Dyn(&'a (dyn std::error::Error + Send + Sync + 'static)),
}

impl DataFusionError {
/// Get underlying error.
///
/// This may be the same as `self`.
pub fn find_root(&self) -> &Self {
// Note: This is a non-recursive algorithm so we do not run out of stack space, even for long error chains. The
// algorithm will always terminate because all steps access the next error through "converging" ownership,
// i.e. there can be a fan-in by multiple parents (e.g. via `Arc`), but never a fan-out by multiple
// children (e.g. via `Weak` or interior mutability via `Mutex`).

// last error in the chain that was a DataFusionError
let mut checkpoint: &Self = self;

// current non-DataFusion error
let mut other_e: Option<OtherErr<'_>> = None;

loop {
// do we have another error type to explore?
if let Some(inner) = other_e {
// `other_e` is now bound to `inner`, so we can clear this path
other_e = None;

match inner {
OtherErr::Arrow(inner) => {
if let ArrowError::ExternalError(inner) = inner {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}
}
OtherErr::Dyn(inner) => {
if let Some(inner) = inner.downcast_ref::<Self>() {
checkpoint = inner;
continue;
}

if let Some(inner) = inner.downcast_ref::<ArrowError>() {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// some errors are wrapped into `Arc`s to share them with multiple receivers
if let Some(inner) = inner.downcast_ref::<Arc<Self>>() {
checkpoint = inner.as_ref();
continue;
}

if let Some(inner) = inner.downcast_ref::<Arc<ArrowError>>() {
other_e = Some(OtherErr::Arrow(inner.as_ref()));
continue;
}
}
}

// dead end?
break;
}

// traverse context chain
if let Self::Context(_msg, inner) = checkpoint {
checkpoint = inner;
continue;
}

// The Arrow error may itself contain a datafusion error again
// See https://github.com/apache/arrow-datafusion/issues/4172
if let Self::ArrowError(inner) = checkpoint {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// also try to introspect direct external errors
if let Self::External(inner) = checkpoint {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}

// no more traversal
break;
}

// return last checkpoint (which may be the original error)
checkpoint
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use crate::error::DataFusionError;
use arrow::error::ArrowError;

Expand All @@ -353,6 +447,61 @@ mod test {
assert_eq!(res.to_string(), "Arrow error: Schema error: bar");
}

#[test]
fn test_find_root_error() {
do_root_test(
DataFusionError::Context(
"it happened!".to_string(),
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::External(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::External(Box::new(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
ArrowError::ExternalError(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::External(Box::new(Arc::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);

do_root_test(
DataFusionError::External(Box::new(Arc::new(ArrowError::ExternalError(
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
}

/// Model what happens when implementing SendableRecrordBatchStream:
/// DataFusion code needs to return an ArrowError
#[allow(clippy::try_err)]
Expand All @@ -370,6 +519,14 @@ mod test {
Err(ArrowError::SchemaError("bar".to_string()))?;
Ok(())
}

fn do_root_test(e: DataFusionError, exp: DataFusionError) {
let e = e.find_root();

// DataFusionError does not implement Eq, so we use a string comparison + some cheap "same variant" test instead
assert_eq!(e.to_string(), exp.to_string(),);
assert_eq!(std::mem::discriminant(e), std::mem::discriminant(&exp),)
}
}

#[macro_export]
Expand Down

0 comments on commit a1c5e07

Please sign in to comment.