Skip to content

Commit

Permalink
fix: add location in corrupt_file and invalid_input
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Nov 1, 2023
1 parent 665b735 commit c4afebe
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 44 deletions.
12 changes: 8 additions & 4 deletions rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,24 @@ pub enum Error {
}

impl Error {
pub fn corrupt_file(path: object_store::path::Path, message: impl Into<String>) -> Self {
pub fn corrupt_file(
path: object_store::path::Path,
message: impl Into<String>,
location: Location,
) -> Self {
let message: String = message.into();
Self::CorruptFile {
path,
source: message.into(),
location: location!(),
location,
}
}

pub fn invalid_input(message: impl Into<String>) -> Self {
pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
let message: String = message.into();
Self::InvalidInput {
source: message.into(),
location: location!(),
location,
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion rust/lance-core/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bytes::Buf;
use object_store::path::Path;
use rand::Rng;
use roaring::bitmap::RoaringBitmap;
use snafu::ResultExt;
use snafu::{location, Location, ResultExt};

use super::object_store::ObjectStore;
use crate::error::{box_error, CorruptFileSnafu};
Expand Down Expand Up @@ -315,6 +315,7 @@ pub async fn read_deletion_file(
"Expected exactly one batch in deletion file, got {}",
batches.len()
),
location!(),
));
}

Expand All @@ -327,6 +328,7 @@ pub async fn read_deletion_file(
deletion_arrow_schema(),
batch.schema()
),
location!(),
));
}

Expand All @@ -343,6 +345,7 @@ pub async fn read_deletion_file(
return Err(Error::corrupt_file(
path,
"Null values are not allowed in deletion files",
location!(),
));
}
}
Expand Down
48 changes: 31 additions & 17 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ impl Dataset {
Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
_ => Err(Error::invalid_input(
"read_version must be specified for this operation",
location!(),
)),
},
Ok,
Expand Down Expand Up @@ -735,17 +736,20 @@ impl Dataset {
) -> Result<()> {
// Sanity check.
if self.schema().field(left_on).is_none() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the left side dataset",
left_on
)));
return Err(Error::invalid_input(
format!("Column {} does not exist in the left side dataset", left_on),
location!(),
));
};
let right_schema = stream.schema();
if right_schema.field_with_name(right_on).is_err() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the right side dataset",
right_on
)));
return Err(Error::invalid_input(
format!(
"Column {} does not exist in the right side dataset",
right_on
),
location!(),
));
};
for field in right_schema.fields() {
if field.name() == right_on {
Expand All @@ -754,10 +758,13 @@ impl Dataset {
continue;
}
if self.schema().field(field.name()).is_some() {
return Err(Error::invalid_input(format!(
"Column {} exists in both sides of the dataset",
field.name()
)));
return Err(Error::invalid_input(
format!(
"Column {} exists in both sides of the dataset",
field.name()
),
location!(),
));
}
}

Expand Down Expand Up @@ -951,7 +958,10 @@ impl Dataset {
let range = range_start..(range_end + 1);

let fragment = self.get_fragment(fragment_id).ok_or_else(|| {
Error::invalid_input(format!("row_id belongs to non-existant fragment: {start}"))
Error::invalid_input(
format!("row_id belongs to non-existant fragment: {start}"),
location!(),
)
})?;

let reader = fragment.open(projection.as_ref()).await?;
Expand Down Expand Up @@ -983,10 +993,13 @@ impl Dataset {
};

let fragment = self.get_fragment(fragment_id as usize).ok_or_else(|| {
Error::invalid_input(format!(
"row_id belongs to non-existant fragment: {}",
row_ids[current_start]
))
Error::invalid_input(
format!(
"row_id belongs to non-existant fragment: {}",
row_ids[current_start]
),
location!(),
)
})?;
let row_ids: Vec<u32> = row_ids[range].iter().map(|x| *x as u32).collect();

Expand Down Expand Up @@ -1328,6 +1341,7 @@ impl Dataset {
"Duplicate fragment id {} found in dataset {:?}",
id, self.base
),
location!(),
));
}
}
Expand Down
7 changes: 6 additions & 1 deletion rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ impl FileFragment {
let (stream, schema) = reader_to_stream(reader)?;

if schema.fields.is_empty() {
return Err(Error::invalid_input("Cannot write with an empty schema."));
return Err(Error::invalid_input(
"Cannot write with an empty schema.",
location!(),
));
}

let (object_store, base_path) = ObjectStore::from_uri(dataset_uri).await?;
Expand Down Expand Up @@ -288,6 +291,7 @@ impl FileFragment {
"data file has incorrect length. Expected: {} Got: {}",
expected_length, length
),
location!(),
));
}
}
Expand All @@ -303,6 +307,7 @@ impl FileFragment {
&deletion_file_meta,
),
format!("deletion vector contains row id that is out of range. Row id: {} Fragment length: {}", row_id, expected_length),
location!(),
));
}
}
Expand Down
15 changes: 9 additions & 6 deletions rust/lance/src/dataset/hash_joiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,14 @@ impl HashJoiner {
/// Will run in parallel over columns using all available cores.
pub(super) async fn collect(&self, index_column: ArrayRef) -> Result<RecordBatch> {
if index_column.data_type() != &self.index_type {
return Err(Error::invalid_input(format!(
"Index column type mismatch: expected {}, got {}",
self.index_type,
index_column.data_type()
)));
return Err(Error::invalid_input(
format!(
"Index column type mismatch: expected {}, got {}",
self.index_type,
index_column.data_type()
),
location!(),
));
}

// Index to use for null values
Expand Down Expand Up @@ -196,7 +199,7 @@ impl HashJoiner {
"Found rows on LHS that do not match any rows on RHS. Lance would need to write \
nulls on the RHS, but Lance does not yet support nulls for type {:?}.",
array.data_type()
)));
), location!()));
}
Ok(array)
},
Expand Down
26 changes: 16 additions & 10 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl Transaction {
}
new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32));
} else {
return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data"));
return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data", location!()));
}
}
}
Expand All @@ -508,25 +508,31 @@ impl Transaction {

for rewritten_index in rewritten_indices {
if !modified_indices.insert(rewritten_index.old_id) {
return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id)));
return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id), location!()));
}

let index = indices
.iter_mut()
.find(|idx| idx.uuid == rewritten_index.old_id)
.ok_or_else(|| {
Error::invalid_input(format!(
"Invalid compaction plan refers to index {} which does not exist",
rewritten_index.old_id
))
Error::invalid_input(
format!(
"Invalid compaction plan refers to index {} which does not exist",
rewritten_index.old_id
),
location!(),
)
})?;

index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap(
index.fragment_bitmap.as_ref().ok_or_else(|| {
Error::invalid_input(format!(
"Cannot rewrite index {} which did not store fragment bitmap",
index.uuid
))
Error::invalid_input(
format!(
"Cannot rewrite index {} which did not store fragment bitmap",
index.uuid
),
location!(),
)
})?,
groups,
)?);
Expand Down
8 changes: 4 additions & 4 deletions rust/lance/src/io/exec/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ impl Planner {
if can_cast_types(&right_data_type, &left_data_type) {
Arc::new(CastExpr::new(right, left_data_type, None))
} else {
return Err(Error::invalid_input(format!(
"Cannot compare {} and {}",
left_data_type, right_data_type
)));
return Err(Error::invalid_input(
format!("Cannot compare {} and {}", left_data_type, right_data_type),
location!(),
));
}
} else {
right
Expand Down
5 changes: 4 additions & 1 deletion rust/lance/src/utils/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ mod tests {
}

// On error
let fut = future::ready(crate::Result::Err(crate::Error::invalid_input("xyz")));
let fut = future::ready(crate::Result::Err(crate::Error::invalid_input(
"xyz",
location!(),
)));
let prereq = SharedPrerequisite::<u32>::spawn(fut);

let mut tasks = Vec::with_capacity(10);
Expand Down

0 comments on commit c4afebe

Please sign in to comment.