diff --git a/src/daft-parquet/src/read_planner.rs b/src/daft-parquet/src/read_planner.rs index 8cc1757f54..f1e994f6fd 100644 --- a/src/daft-parquet/src/read_planner.rs +++ b/src/daft-parquet/src/read_planner.rs @@ -4,11 +4,8 @@ use bytes::Bytes; use common_error::DaftResult; use daft_io::IOClient; use futures::StreamExt; -use snafu::ResultExt; use tokio::task::JoinHandle; -use crate::JoinSnafu; - type RangeList = Vec>; pub trait ReadPlanPass: Send { @@ -88,7 +85,7 @@ impl ReadPlanPass for SplitLargeRequestPass { } enum RangeCacheState { - InFlight(JoinHandle>), + InFlight(JoinHandle>), Ready(Bytes), } @@ -107,9 +104,7 @@ impl RangeCacheEntry { // TODO(sammy): thread in url for join error let v = f .await - .context(JoinSnafu { path: "UNKNOWN" }) - .unwrap() - .unwrap(); + .map_err(|e| daft_io::Error::JoinError { source: e })??; *_guard = RangeCacheState::Ready(v.clone()); Ok(v.slice(range)) } @@ -164,7 +159,7 @@ impl ReadPlanner { let get_result = owned_io_client .single_url_get(owned_url, Some(range.clone())) .await?; - Ok(get_result.bytes().await?) + get_result.bytes().await }); let state = RangeCacheState::InFlight(join_handle); let entry = RangeCacheEntry {