diff --git a/python/src/dataset/commit.rs b/python/src/dataset/commit.rs index 631726778d..844ca6c184 100644 --- a/python/src/dataset/commit.rs +++ b/python/src/dataset/commit.rs @@ -14,6 +14,8 @@ use std::fmt::Debug; +use snafu::{location, Location}; + use lance_core::io::commit::{CommitError, CommitLease, CommitLock}; use lance_core::Error; @@ -36,6 +38,7 @@ fn handle_error(py_err: PyErr, py: Python) -> CommitError { Err(import_error) => { return CommitError::OtherError(Error::Internal { message: format!("Error importing from pylance {}", import_error), + location: location!(), }) } }; @@ -45,6 +48,7 @@ fn handle_error(py_err: PyErr, py: Python) -> CommitError { } else { CommitError::OtherError(Error::Internal { message: format!("Error from commit handler: {}", py_err), + location: location!(), }) } } diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index a999e6306d..0dd10b3dd8 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -77,6 +77,7 @@ fn parse_timeunit(unit: &str) -> Result { "ns" => Ok(TimeUnit::Nanosecond), _ => Err(Error::Arrow { message: format!("Unsupported TimeUnit: {unit}"), + location: location!(), }), } } diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index db40417912..b783400e8f 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -310,6 +310,7 @@ impl Field { "Attempt to intersect different fields: {} and {}", self.name, other.name, ), + location: location!(), }); } let self_type = self.data_type(); @@ -347,6 +348,7 @@ impl Field { "Attempt to intersect different fields: ({}, {}) and ({}, {})", self.name, self_type, other.name, other_type ), + location: location!(), }); } diff --git a/rust/lance-core/src/encodings/dictionary.rs b/rust/lance-core/src/encodings/dictionary.rs index 9654534a34..87f71c28f4 100644 --- a/rust/lance-core/src/encodings/dictionary.rs +++ b/rust/lance-core/src/encodings/dictionary.rs @@ -143,6 +143,7 @@ impl<'a> DictionaryDecoder<'a> { } else { return Err(Error::Arrow { message: format!("Not a dictionary type: {}", self.data_type), + location: location!(), }); }; @@ -160,6 +161,7 @@ impl<'a> DictionaryDecoder<'a> { DataType::UInt64 => self.make_dict_array::(keys).await, _ => Err(Error::Arrow { message: format!("Dictionary encoding does not support index type: {index_type}",), + location: location!(), }), } } @@ -196,6 +198,7 @@ impl<'a> AsyncIndex for DictionaryDecoder<'a> { source: "DictionaryDecoder does not support get()" .to_string() .into(), + location: location!(), }) } } diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index bb3e9fee12..cc01ef5963 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -27,56 +27,77 @@ pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedErro #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Invalid user input: {source}"))] - InvalidInput { source: BoxedError }, - #[snafu(display("Dataset already exists: {uri}"))] - DatasetAlreadyExists { uri: String }, + #[snafu(display("Invalid user input: {source}, {location}"))] + InvalidInput { + source: BoxedError, + location: Location, + }, + #[snafu(display("Dataset already exists: {uri}, {location}"))] + DatasetAlreadyExists { uri: String, location: Location }, // #[snafu(display("Append with different schema: original={original} new={new}"))] #[snafu(display("Append with different schema:"))] SchemaMismatch {}, - #[snafu(display("Dataset at path {path} was not found: {source}"))] - DatasetNotFound { path: String, source: BoxedError }, - #[snafu(display("Encountered corrupt file {path}: {source}"))] + #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))] + DatasetNotFound { + path: String, + source: BoxedError, + location: Location, + }, + #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))] CorruptFile { path: object_store::path::Path, source: BoxedError, + location: Location, // TODO: add backtrace? }, - #[snafu(display("Not supported: {source}"))] - NotSupported { source: BoxedError }, - #[snafu(display("Commit conflict for version {version}: {source}"))] - CommitConflict { version: u64, source: BoxedError }, - #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}"))] - Internal { message: String }, - #[snafu(display("A prerequisite task failed: {message}"))] - PrerequisiteFailed { message: String }, - #[snafu(display("LanceError(Arrow): {message}"))] - Arrow { message: String }, + #[snafu(display("Not supported: {source}, {location}"))] + NotSupported { + source: BoxedError, + location: Location, + }, + #[snafu(display("Commit conflict for version {version}: {source}, {location}"))] + CommitConflict { + version: u64, + source: BoxedError, + location: Location, + }, + #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))] + Internal { message: String, location: Location }, + #[snafu(display("A prerequisite task failed: {message}, {location}"))] + PrerequisiteFailed { message: String, location: Location }, + #[snafu(display("LanceError(Arrow): {message}, {location}"))] + Arrow { message: String, location: Location }, #[snafu(display("LanceError(Schema): {message}, {location}"))] Schema { message: String, location: Location }, #[snafu(display("Not found: {uri}, {location}"))] NotFound { uri: String, location: Location }, #[snafu(display("LanceError(IO): {message}, {location}"))] IO { message: String, location: Location }, - #[snafu(display("LanceError(Index): {message}"))] - Index { message: String }, + #[snafu(display("LanceError(Index): {message}, {location}"))] + Index { message: String, location: Location }, /// Stream early stop Stop, } impl Error { - pub fn corrupt_file(path: object_store::path::Path, message: impl Into) -> Self { + pub fn corrupt_file( + path: object_store::path::Path, + message: impl Into, + location: Location, + ) -> Self { let message: String = message.into(); Self::CorruptFile { path, source: message.into(), + location, } } - pub fn invalid_input(message: impl Into) -> Self { + pub fn invalid_input(message: impl Into, location: Location) -> Self { let message: String = message.into(); Self::InvalidInput { source: message.into(), + location, } } } @@ -87,6 +108,7 @@ impl From for Error { fn from(e: ArrowError) -> Self { Self::Arrow { message: e.to_string(), + location: location!(), } } } @@ -95,6 +117,7 @@ impl From<&ArrowError> for Error { fn from(e: &ArrowError) -> Self { Self::Arrow { message: e.to_string(), + location: location!(), } } } @@ -157,6 +180,7 @@ impl From for Error { fn from(e: serde_json::Error) -> Self { Self::Arrow { message: e.to_string(), + location: location!(), } } } @@ -171,10 +195,10 @@ fn arrow_io_error_from_msg(message: String) -> ArrowError { impl From for ArrowError { fn from(value: Error) -> Self { match value { - Error::Arrow { message } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError + Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError Error::IO { message, .. } => arrow_io_error_from_msg(message), Error::Schema { message, .. } => Self::SchemaError(message), - Error::Index { message } => arrow_io_error_from_msg(message), + Error::Index { message, .. } => arrow_io_error_from_msg(message), Error::Stop => arrow_io_error_from_msg("early stop".to_string()), e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this } diff --git a/rust/lance-core/src/format/page_table.rs b/rust/lance-core/src/format/page_table.rs index 408b4f9dc0..2fba049ac2 100644 --- a/rust/lance-core/src/format/page_table.rs +++ b/rust/lance-core/src/format/page_table.rs @@ -15,6 +15,7 @@ use arrow_array::builder::Int64Builder; use arrow_array::{Array, Int64Array}; use arrow_schema::DataType; +use snafu::{location, Location}; use std::collections::BTreeMap; use tokio::io::AsyncWriteExt; @@ -96,6 +97,7 @@ impl PageTable { if self.pages.is_empty() { return Err(Error::InvalidInput { source: "empty page table".into(), + location: location!(), }); } diff --git a/rust/lance-core/src/io/commit.rs b/rust/lance-core/src/io/commit.rs index 9d69678553..1ca6d68de3 100644 --- a/rust/lance-core/src/io/commit.rs +++ b/rust/lance-core/src/io/commit.rs @@ -138,6 +138,7 @@ pub fn parse_version_from_path(path: &Path) -> Result { .and_then(|(version, _)| version.parse::().ok()) .ok_or(crate::Error::Internal { message: format!("Expected manifest file, but found {}", path), + location: location!(), }) } @@ -248,6 +249,7 @@ impl From for Error { match e { CommitError::CommitConflict => Self::Internal { message: "Commit conflict".to_string(), + location: location!(), }, CommitError::OtherError(e) => e, } diff --git a/rust/lance-core/src/io/deletion.rs b/rust/lance-core/src/io/deletion.rs index 03c1e8d79c..26f4ef663d 100644 --- a/rust/lance-core/src/io/deletion.rs +++ b/rust/lance-core/src/io/deletion.rs @@ -25,7 +25,7 @@ use bytes::Buf; use object_store::path::Path; use rand::Rng; use roaring::bitmap::RoaringBitmap; -use snafu::ResultExt; +use snafu::{location, Location, ResultExt}; use super::object_store::ObjectStore; use crate::error::{box_error, CorruptFileSnafu}; @@ -315,6 +315,7 @@ pub async fn read_deletion_file( "Expected exactly one batch in deletion file, got {}", batches.len() ), + location!(), )); } @@ -327,6 +328,7 @@ pub async fn read_deletion_file( deletion_arrow_schema(), batch.schema() ), + location!(), )); } @@ -343,6 +345,7 @@ pub async fn read_deletion_file( return Err(Error::corrupt_file( path, "Null values are not allowed in deletion files", + location!(), )); } } diff --git a/rust/lance-core/src/io/object_store.rs b/rust/lance-core/src/io/object_store.rs index 12c152447b..65a2276d87 100644 --- a/rust/lance-core/src/io/object_store.rs +++ b/rust/lance-core/src/io/object_store.rs @@ -462,6 +462,7 @@ impl ObjectStore { source: "`s3+ddb://` scheme and custom commit handler are mutually exclusive" .into(), + location: location!(), }); } @@ -473,6 +474,7 @@ impl ObjectStore { source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`" .into(), + location: location!(), }); } match url.query_pairs().next() { @@ -484,13 +486,15 @@ impl ObjectStore { source: "`s3+ddb://` scheme requires non empty dynamodb table name" .into(), + location: location!(), }); } Some(table_name) } _ => { return Err(Error::InvalidInput { - source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into() + source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(), + location: location!(), }); } } @@ -519,6 +523,7 @@ impl ObjectStore { return Err(Error::InvalidInput { source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled" .into(), + location: location!(), }); } None => params @@ -530,6 +535,7 @@ impl ObjectStore { // before creating the OSObjectStore we need to rewrite the url to drop ddb related parts url.set_scheme("s3").map_err(|()| Error::Internal { message: "could not set scheme".into(), + location: location!(), })?; url.set_query(None); diff --git a/rust/lance-core/src/io/reader.rs b/rust/lance-core/src/io/reader.rs index 07cb212313..8c0aaaa9a4 100644 --- a/rust/lance-core/src/io/reader.rs +++ b/rust/lance-core/src/io/reader.rs @@ -268,6 +268,7 @@ impl FileReader { "File {} not found in fragment {:?}", path, fragment ), + location: location!(), })? .fields, ) diff --git a/rust/lance-index/src/vector/ivf.rs b/rust/lance-index/src/vector/ivf.rs index 237a873afc..864893f952 100644 --- a/rust/lance-index/src/vector/ivf.rs +++ b/rust/lance-index/src/vector/ivf.rs @@ -164,6 +164,7 @@ impl Ivf { ) -> Result { let vector_arr = batch.column_by_name(column).ok_or(Error::Index { message: format!("Column {} does not exist.", column), + location: location!(), })?; let data = vector_arr.as_fixed_size_list_opt().ok_or(Error::Index { message: format!( @@ -171,6 +172,7 @@ impl Ivf { column, vector_arr.data_type() ), + location: location!(), })?; let matrix = MatrixView::::try_from(data)?; let part_ids = self.compute_partitions(&matrix).await; diff --git a/rust/lance-index/src/vector/kmeans.rs b/rust/lance-index/src/vector/kmeans.rs index 792b33374d..5fbebe2fcc 100644 --- a/rust/lance-index/src/vector/kmeans.rs +++ b/rust/lance-index/src/vector/kmeans.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use arrow_array::{builder::Float32Builder, FixedSizeListArray, Float32Array}; use lance_arrow::FixedSizeListArrayExt; use log::info; use rand::{seq::IteratorRandom, Rng}; +use snafu::{location, Location}; +use std::sync::Arc; use lance_core::{Error, Result}; use lance_linalg::{ @@ -42,7 +42,7 @@ pub async fn train_kmeans( if num_rows < k { return Err(Error::Index{message: format!( "KMeans: can not train {k} centroids with {num_rows} vectors, choose a smaller K (< {num_rows}) instead" - )}); + ),location: location!()}); } // Ony sample sample_rate * num_clusters. See Faiss let data = if num_rows > sample_rate * k { diff --git a/rust/lance-index/src/vector/pq.rs b/rust/lance-index/src/vector/pq.rs index 15f45341ce..b04180a7a2 100644 --- a/rust/lance-index/src/vector/pq.rs +++ b/rust/lance-index/src/vector/pq.rs @@ -25,6 +25,7 @@ use lance_linalg::distance::{cosine_distance_batch, dot_distance_batch, l2_dista use lance_linalg::kernels::argmin; use lance_linalg::{distance::MetricType, MatrixView}; use rand::SeedableRng; +use snafu::{location, Location}; pub mod transform; use super::kmeans::train_kmeans; @@ -357,6 +358,7 @@ impl ProductQuantizer { data.num_columns(), params.num_sub_vectors ), + location: location!(), }); } assert_eq!(data.data().null_count(), 0); diff --git a/rust/lance-index/src/vector/pq/transform.rs b/rust/lance-index/src/vector/pq/transform.rs index 19095b8893..eac2fe4cf1 100644 --- a/rust/lance-index/src/vector/pq/transform.rs +++ b/rust/lance-index/src/vector/pq/transform.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use lance_arrow::RecordBatchExt; use lance_core::{Error, Result}; use lance_linalg::MatrixView; +use snafu::{location, Location}; use super::ProductQuantizer; use crate::vector::transform::Transformer; @@ -64,6 +65,7 @@ impl Transformer for PQTransformer { "PQ Transform: column {} not found in batch", self.input_column ), + location: location!(), })?; let data: MatrixView = input_arr .as_fixed_size_list_opt() @@ -73,6 +75,7 @@ impl Transformer for PQTransformer { self.input_column, input_arr.data_type(), ), + location: location!(), })? .try_into()?; let pq_code = self.quantizer.transform(&data).await?; diff --git a/rust/lance-index/src/vector/residual.rs b/rust/lance-index/src/vector/residual.rs index 360434f49e..19eeb6ebc9 100644 --- a/rust/lance-index/src/vector/residual.rs +++ b/rust/lance-index/src/vector/residual.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use arrow_array::types::UInt32Type; use arrow_array::{ cast::AsArray, types::Float32Type, Array, FixedSizeListArray, Float32Array, RecordBatch, @@ -23,6 +21,8 @@ use async_trait::async_trait; use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt}; use lance_core::{Error, Result}; use lance_linalg::MatrixView; +use snafu::{location, Location}; +use std::sync::Arc; use super::transform::Transformer; @@ -70,12 +70,14 @@ impl Transformer for ResidualTransform { "Compute residual vector: partition id column not found: {}", self.part_col ), + location: location!(), })?; let original = batch.column_by_name(&self.vec_col).ok_or(Error::Index { message: format!( "Compute residual vector: original vector column not found: {}", self.vec_col ), + location: location!(), })?; let original_vectors = original.as_fixed_size_list_opt().ok_or(Error::Index { message: format!( @@ -83,6 +85,7 @@ impl Transformer for ResidualTransform { self.vec_col, original.data_type(), ), + location: location!(), })?; let original_matrix = MatrixView::::try_from(original_vectors)?; let mut residual_arr: Vec = diff --git a/rust/lance-index/src/vector/utils.rs b/rust/lance-index/src/vector/utils.rs index e51b7cd738..34e3f58a41 100644 --- a/rust/lance-index/src/vector/utils.rs +++ b/rust/lance-index/src/vector/utils.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use arrow_array::{Array, FixedSizeListArray}; use arrow_schema::{DataType, Field}; use lance_arrow::{ArrowFloatType, FloatType}; @@ -22,6 +20,8 @@ use lance_linalg::MatrixView; use prost::bytes; use rand::distributions::Standard; use rand::prelude::*; +use snafu::{location, Location}; +use std::sync::Arc; use super::pb; use crate::pb::Tensor; @@ -64,6 +64,7 @@ impl TryFrom<&DataType> for pb::tensor::DataType { DataType::Float64 => Ok(Self::Float64), _ => Err(Error::Index { message: format!("pb tensor type not supported: {:?}", dt), + location: location!(), }), } } @@ -112,6 +113,7 @@ impl TryFrom<&pb::Tensor> for FixedSizeListArray { if tensor.shape.len() != 2 { return Err(Error::Index { message: format!("only accept 2-D tensor shape, got: {:?}", tensor.shape), + location: location!(), }); } let dim = tensor.shape[1] as usize; @@ -132,6 +134,7 @@ impl TryFrom<&pb::Tensor> for FixedSizeListArray { tensor.shape, flat_array.len() ), + location: location!(), }); } diff --git a/rust/lance/src/arrow/json.rs b/rust/lance/src/arrow/json.rs index 2b6d9160cc..d7dbc0619c 100644 --- a/rust/lance/src/arrow/json.rs +++ b/rust/lance/src/arrow/json.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use std::sync::Arc; +use snafu::{location, Location}; + use arrow_schema::{DataType, Field, Schema}; use serde::{Deserialize, Serialize}; @@ -99,6 +101,7 @@ impl TryFrom<&DataType> for JsonDataType { _ => { return Err(Error::Arrow { message: format!("Json conversion: Unsupported type: {dt}"), + location: location!(), }) } }; @@ -138,6 +141,7 @@ impl TryFrom<&JsonDataType> for DataType { .as_ref() .ok_or_else(|| Error::Arrow { message: "Json conversion: List type requires a field".to_string(), + location: location!(), })? .iter() .map(Field::try_from) @@ -150,6 +154,7 @@ impl TryFrom<&JsonDataType> for DataType { let length = value.length.ok_or_else(|| Error::Arrow { message: "Json conversion: FixedSizeList type requires a length" .to_string(), + location: location!(), })?; Ok(Self::FixedSizeList( Arc::new(fields[0].clone()), @@ -162,6 +167,7 @@ impl TryFrom<&JsonDataType> for DataType { } _ => Err(Error::Arrow { message: format!("Json conversion: Unsupported type: {value:?}"), + location: location!(), }), } } diff --git a/rust/lance/src/bin/lq.rs b/rust/lance/src/bin/lq.rs index 165298366f..71ec19f7ee 100644 --- a/rust/lance/src/bin/lq.rs +++ b/rust/lance/src/bin/lq.rs @@ -17,6 +17,7 @@ use arrow_array::RecordBatch; use clap::{Parser, Subcommand, ValueEnum}; use futures::stream::StreamExt; use futures::TryStreamExt; +use snafu::{location, Location}; use lance::dataset::Dataset; use lance::index::{vector::VectorIndexParams, DatasetIndexExt}; @@ -169,9 +170,11 @@ async fn create_index( ) -> Result<()> { let col = column.as_ref().ok_or_else(|| Error::Index { message: "Must specify column".to_string(), + location: location!(), })?; let _ = index_type.ok_or_else(|| Error::Index { message: "Must specify index type".to_string(), + location: location!(), })?; let mt = match metric_type.as_ref().unwrap_or(&"l2".to_string()).as_str() { "l2" => MetricType::L2, @@ -182,6 +185,7 @@ async fn create_index( "Only l2 and cosine metric type are supported, got: {}", metric_type.as_ref().unwrap_or(&"N/A".to_string()) ), + location: location!(), }); } }; @@ -191,6 +195,7 @@ async fn create_index( true => { return Err(Error::Index { message: "Feature 'opq' not installed.".to_string(), + location: location!(), }); } }; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bb2a1e9cec..029438a1c3 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -15,11 +15,6 @@ //! Lance Dataset //! -use std::collections::{BTreeMap, HashMap}; -use std::default::Default; -use std::ops::Range; -use std::sync::Arc; - use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::Array; @@ -42,6 +37,11 @@ use lance_core::io::{ }; use log::warn; use object_store::path::Path; +use snafu::{location, Location}; +use std::collections::{BTreeMap, HashMap}; +use std::default::Default; +use std::ops::Range; +use std::sync::Arc; use tracing::instrument; mod chunker; @@ -196,6 +196,7 @@ impl Dataset { .map_err(|e| Error::DatasetNotFound { path: base_path.to_string(), source: Box::new(e), + location: location!(), })?; let session = if let Some(session) = params.session.as_ref() { @@ -279,6 +280,7 @@ impl Dataset { Error::NotFound { uri, .. } => Error::DatasetNotFound { path: uri.clone(), source: box_error(e), + location: location!(), }, _ => e, })?; @@ -291,6 +293,7 @@ impl Dataset { object_store::Error::NotFound { path: _, source } => Error::DatasetNotFound { path: base_path.to_string(), source, + location: location!(), }, _ => e.into(), })?; @@ -306,6 +309,7 @@ impl Dataset { ); return Err(Error::NotSupported { source: message.into(), + location: location!(), }); } @@ -351,6 +355,7 @@ impl Dataset { if dataset_exists && matches!(params.mode, WriteMode::Create) { return Err(Error::DatasetAlreadyExists { uri: uri.to_owned(), + location: location!(), }); } @@ -405,6 +410,7 @@ impl Dataset { ); return Err(Error::NotSupported { source: message.into(), + location: location!(), }); } } @@ -644,6 +650,7 @@ impl Dataset { Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0), _ => Err(Error::invalid_input( "read_version must be specified for this operation", + location!(), )), }, Ok, @@ -668,6 +675,7 @@ impl Dataset { return Err(Error::DatasetNotFound { path: base.to_string(), source: "The dataset must already exist unless the operation is Overwrite".into(), + location: location!(), }); } @@ -728,17 +736,20 @@ impl Dataset { ) -> Result<()> { // Sanity check. if self.schema().field(left_on).is_none() { - return Err(Error::invalid_input(format!( - "Column {} does not exist in the left side dataset", - left_on - ))); + return Err(Error::invalid_input( + format!("Column {} does not exist in the left side dataset", left_on), + location!(), + )); }; let right_schema = stream.schema(); if right_schema.field_with_name(right_on).is_err() { - return Err(Error::invalid_input(format!( - "Column {} does not exist in the right side dataset", - right_on - ))); + return Err(Error::invalid_input( + format!( + "Column {} does not exist in the right side dataset", + right_on + ), + location!(), + )); }; for field in right_schema.fields() { if field.name() == right_on { @@ -747,10 +758,13 @@ impl Dataset { continue; } if self.schema().field(field.name()).is_some() { - return Err(Error::invalid_input(format!( - "Column {} exists in both sides of the dataset", - field.name() - ))); + return Err(Error::invalid_input( + format!( + "Column {} exists in both sides of the dataset", + field.name() + ), + location!(), + )); } } @@ -841,6 +855,7 @@ impl Dataset { let mut fragments_iter = fragments.iter(); let mut current_fragment = fragments_iter.next().ok_or_else(|| Error::InvalidInput { source: "Called take on an empty dataset.".to_string().into(), + location: location!(), })?; let mut current_fragment_len = current_fragment.count_rows().await?; let mut curr_fragment_offset: u64 = 0; @@ -869,6 +884,7 @@ impl Dataset { row_index ) .into(), + location: location!(), })?; curr_fragment_offset += current_fragment_len as u64; current_fragment_len = current_fragment.count_rows().await?; @@ -942,7 +958,10 @@ impl Dataset { let range = range_start..(range_end + 1); let fragment = self.get_fragment(fragment_id).ok_or_else(|| { - Error::invalid_input(format!("row_id belongs to non-existant fragment: {start}")) + Error::invalid_input( + format!("row_id belongs to non-existant fragment: {start}"), + location!(), + ) })?; let reader = fragment.open(projection.as_ref()).await?; @@ -974,10 +993,13 @@ impl Dataset { }; let fragment = self.get_fragment(fragment_id as usize).ok_or_else(|| { - Error::invalid_input(format!( - "row_id belongs to non-existant fragment: {}", - row_ids[current_start] - )) + Error::invalid_input( + format!( + "row_id belongs to non-existant fragment: {}", + row_ids[current_start] + ), + location!(), + ) })?; let row_ids: Vec = row_ids[range].iter().map(|x| *x as u32).collect(); @@ -1040,6 +1062,7 @@ impl Dataset { .column_by_name(ROW_ID) .ok_or_else(|| Error::Internal { message: "ROW_ID column not found".into(), + location: location!(), })? .as_primitive::() .values(); @@ -1318,6 +1341,7 @@ impl Dataset { "Duplicate fragment id {} found in dataset {:?}", id, self.base ), + location!(), )); } } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7e1f140a25..28caa48ce4 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -44,12 +44,6 @@ //! (which should only be done if the caller can guarantee there are no updates //! happening at the same time) -use std::{ - collections::HashSet, - future, - sync::{Mutex, MutexGuard}, -}; - use chrono::{DateTime, Duration, Utc}; use futures::{stream, StreamExt, TryStreamExt}; use lance_core::{ @@ -62,6 +56,11 @@ use lance_core::{ Error, Result, }; use object_store::path::Path; +use std::{ + collections::HashSet, + future, + sync::{Mutex, MutexGuard}, +}; use crate::{utils::temporal::utc_now, Dataset}; @@ -461,6 +460,7 @@ mod tests { }; use lance_linalg::distance::MetricType; use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32}; + use snafu::{location, Location}; use tokio::io::AsyncWriteExt; use crate::{ @@ -652,6 +652,7 @@ mod tests { if op.contains("copy") { return Err(Error::Internal { message: "Copy blocked".to_string(), + location: location!(), }); } Ok(()) @@ -667,6 +668,7 @@ mod tests { if op.contains("delete") && path.extension() == Some("manifest") { Err(Error::Internal { message: "Delete manifest blocked".to_string(), + location: location!(), }) } else { Ok(()) diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 994e8db637..7f629b6a76 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -81,7 +81,10 @@ impl FileFragment { let (stream, schema) = reader_to_stream(reader)?; if schema.fields.is_empty() { - return Err(Error::invalid_input("Cannot write with an empty schema.")); + return Err(Error::invalid_input( + "Cannot write with an empty schema.", + location!(), + )); } let (object_store, base_path) = ObjectStore::from_uri(dataset_uri).await?; @@ -287,6 +290,7 @@ impl FileFragment { "data file has incorrect length. Expected: {} Got: {}", expected_length, length ), + location!(), )); } } @@ -302,6 +306,7 @@ impl FileFragment { &deletion_file_meta, ), format!("deletion vector contains row id that is out of range. Row id: {} Fragment length: {}", row_id, expected_length), + location!(), )); } } @@ -526,6 +531,7 @@ impl FileFragment { Examples: {:?}", physical_rows, dv_len, examples ), + location: location!(), }); } diff --git a/rust/lance/src/dataset/hash_joiner.rs b/rust/lance/src/dataset/hash_joiner.rs index aa5e85c151..07a8bf1ba6 100644 --- a/rust/lance/src/dataset/hash_joiner.rs +++ b/rust/lance/src/dataset/hash_joiner.rs @@ -140,11 +140,14 @@ impl HashJoiner { /// Will run in parallel over columns using all available cores. pub(super) async fn collect(&self, index_column: ArrayRef) -> Result { if index_column.data_type() != &self.index_type { - return Err(Error::invalid_input(format!( - "Index column type mismatch: expected {}, got {}", - self.index_type, - index_column.data_type() - ))); + return Err(Error::invalid_input( + format!( + "Index column type mismatch: expected {}, got {}", + self.index_type, + index_column.data_type() + ), + location!(), + )); } // Index to use for null values @@ -196,7 +199,7 @@ impl HashJoiner { "Found rows on LHS that do not match any rows on RHS. Lance would need to write \ nulls on the RHS, but Lance does not yet support nulls for type {:?}.", array.data_type() - ))); + ), location!())); } Ok(array) }, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index a41cf7845a..5490a7b253 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -344,6 +344,7 @@ impl Transaction { } else { return Err(Error::Internal { message: "Cannot create a new dataset without a schema".to_string(), + location: location!(), }); } } @@ -368,6 +369,7 @@ impl Transaction { "No current manifest was provided while building manifest for operation {}", self.operation.name() ), + location: location!(), }); match &self.operation { @@ -490,7 +492,7 @@ impl Transaction { } new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32)); } else { - return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data")); + return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data", location!())); } } } @@ -506,25 +508,31 @@ impl Transaction { for rewritten_index in rewritten_indices { if !modified_indices.insert(rewritten_index.old_id) { - return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id))); + return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id), location!())); } let index = indices .iter_mut() .find(|idx| idx.uuid == rewritten_index.old_id) .ok_or_else(|| { - Error::invalid_input(format!( - "Invalid compaction plan refers to index {} which does not exist", - rewritten_index.old_id - )) + Error::invalid_input( + format!( + "Invalid compaction plan refers to index {} which does not exist", + rewritten_index.old_id + ), + location!(), + ) })?; index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap( index.fragment_bitmap.as_ref().ok_or_else(|| { - Error::invalid_input(format!( - "Cannot rewrite index {} which did not store fragment bitmap", - index.uuid - )) + Error::invalid_input( + format!( + "Cannot rewrite index {} which did not store fragment bitmap", + index.uuid + ), + location!(), + ) })?, groups, )?); @@ -544,7 +552,7 @@ impl Transaction { let replace_range = { let start = final_fragments.iter().enumerate().find(|(_, f)| f.id == group.old_fragments[0].id) .ok_or_else(|| Error::CommitConflict { version, source: - format!("dataset does not contain a fragment a rewrite operation wants to replace: id={}", group.old_fragments[0].id).into() })?.0; + format!("dataset does not contain a fragment a rewrite operation wants to replace: id={}", group.old_fragments[0].id).into() , location:location!()})?.0; // Verify old_fragments matches contiguous range let mut i = 1; @@ -661,6 +669,7 @@ impl TryFrom<&pb::Transaction> for Transaction { None => { return Err(Error::Internal { message: "Transaction message did not contain an operation".to_string(), + location: location!(), }); } }; diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 92c1ce0df9..b266ac4dad 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -178,6 +178,7 @@ impl Updater { batch.num_rows(), row_id_stride ), + location: location!(), }); } @@ -221,6 +222,7 @@ pub(crate) fn add_blanks( // This is difficult because we need to create a batch for arbitrary schemas. return Err(Error::NotSupported { source: "Missing many rows in merge".into(), + location: location!(), }); } @@ -246,6 +248,7 @@ pub(crate) fn add_blanks( arrow::compute::take(array.as_ref(), &selection_vector, None).map_err(|e| { Error::Arrow { message: format!("Failed to add blanks: {}", e), + location: location!(), } }) }) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index fc09a6a784..fbcf5ec30b 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -21,6 +21,7 @@ use std::fmt; use std::sync::Arc; use async_trait::async_trait; +use snafu::{location, Location}; use uuid::Uuid; pub(crate) mod append; @@ -89,11 +90,13 @@ pub(crate) async fn remap_index( .find(|i| i.uuid == *index_id) .ok_or_else(|| Error::Index { message: format!("Index with id {} does not exist", index_id), + location: location!(), })?; if matched.fields.len() > 1 { return Err(Error::Index { message: "Remapping indices with multiple fields is not supported".to_string(), + location: location!(), }); } let field = matched @@ -159,12 +162,14 @@ impl DatasetIndexExt for Dataset { if columns.len() != 1 { return Err(Error::Index { message: "Only support building index on 1 column at the moment".to_string(), + location: location!(), }); } let column = columns[0]; let Some(field) = self.schema().field(column) else { return Err(Error::Index { message: format!("CreateIndex: column '{column}' does not exist"), + location: location!(), }); }; @@ -178,6 +183,7 @@ impl DatasetIndexExt for Dataset { "Index name '{index_name} already exists, \ please specify a different name or use replace=True" ), + location: location!(), }); }; if idx.fields != [field.id] { @@ -186,6 +192,7 @@ impl DatasetIndexExt for Dataset { "Index name '{index_name} already exists with different fields, \ please specify a different name" ), + location: location!(), }); } } @@ -199,6 +206,7 @@ impl DatasetIndexExt for Dataset { .downcast_ref::() .ok_or_else(|| Error::Index { message: "Vector index type must take a VectorIndexParams".to_string(), + location: location!(), })?; build_vector_index(self, column, &index_name, &index_id.to_string(), vec_params) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 72a962e502..ebc94ced21 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use lance_core::{format::Index as IndexMetadata, Error, Result}; use log::info; +use snafu::{location, Location}; use uuid::Uuid; use crate::dataset::index::unindexed_fragments; @@ -40,6 +41,7 @@ pub async fn append_index( "Append index: column {} does not exist", old_index.fields[0] ), + location: location!(), })?; let index = open_index( diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 406d23f63f..962af29dc7 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -15,11 +15,10 @@ //! Vector Index for Fast Approximate Nearest Neighbor (ANN) Search //! +use arrow_array::Float32Array; use std::sync::Arc; use std::{any::Any, collections::HashMap}; -use arrow_array::Float32Array; - pub mod diskann; #[allow(dead_code)] mod graph; @@ -33,6 +32,7 @@ mod utils; use lance_core::io::{read_message, Reader}; use lance_index::vector::pq::{PQBuildParams, ProductQuantizer}; use lance_linalg::distance::*; +use snafu::{location, Location}; use tracing::instrument; use uuid::Uuid; @@ -175,6 +175,7 @@ pub(crate) async fn build_vector_index( if stages.is_empty() { return Err(Error::Index { message: "Build Vector Index: must have at least 1 stage".to_string(), + location: location!(), }); }; @@ -184,11 +185,13 @@ pub(crate) async fn build_vector_index( let StageParams::Ivf(ivf_params) = &stages[len - 2] else { return Err(Error::Index { message: format!("Build Vector Index: invalid stages: {:?}", stages), + location: location!(), }); }; let StageParams::PQ(pq_params) = &stages[len - 1] else { return Err(Error::Index { message: format!("Build Vector Index: invalid stages: {:?}", stages), + location: location!(), }); }; build_ivf_pq_index( @@ -207,12 +210,14 @@ pub(crate) async fn build_vector_index( let StageParams::DiskANN(params) = stages.last().unwrap() else { return Err(Error::Index { message: format!("Build Vector Index: invalid stages: {:?}", stages), + location: location!(), }); }; build_diskann_index(dataset, column, name, uuid, params.clone()).await?; } else { return Err(Error::Index { message: format!("Build Vector Index: invalid stages: {:?}", stages), + location: location!(), }); } @@ -236,6 +241,7 @@ pub(crate) async fn remap_vector_index( .downcast_ref() .ok_or_else(|| Error::NotSupported { source: "Only IVF indexes can be remapped currently".into(), + location: location!(), })?; remap_index_file( @@ -293,6 +299,7 @@ pub(crate) async fn open_index( if proto.columns.len() != 1 { return Err(Error::Index { message: "VectorIndex only supports 1 column".to_string(), + location: location!(), }); } assert_eq!(proto.index_type, pb::IndexType::Vector as i32); @@ -300,6 +307,7 @@ pub(crate) async fn open_index( let Some(idx_impl) = proto.implementation.as_ref() else { return Err(Error::Index { message: "Invalid protobuf for VectorIndex metadata".to_string(), + location: location!(), }); }; @@ -316,6 +324,7 @@ pub(crate) async fn open_index( if last_stage.is_none() { return Err(Error::Index { message: format!("Invalid vector index stages: {:?}", vec_idx.stages), + location: location!(), }); } #[cfg(feature = "opq")] @@ -342,6 +351,7 @@ pub(crate) async fn open_index( if last_stage.is_none() { return Err(Error::Index { message: format!("Invalid vector index stages: {:?}", vec_idx.stages), + location: location!(), }); } let ivf = Ivf::try_from(ivf_pb)?; @@ -358,6 +368,7 @@ pub(crate) async fn open_index( if last_stage.is_some() { return Err(Error::Index { message: format!("Invalid vector index stages: {:?}", vec_idx.stages), + location: location!(), }); }; let pq = Arc::new(ProductQuantizer::new( @@ -378,6 +389,7 @@ pub(crate) async fn open_index( "DiskANN should be the only stage, but we got stages: {:?}", vec_idx.stages ), + location: location!(), }); }; let graph_path = index_dir.child(diskann_proto.filename.as_str()); @@ -392,6 +404,7 @@ pub(crate) async fn open_index( if last_stage.is_none() { return Err(Error::Index { message: format!("Invalid index stages: {:?}", vec_idx.stages), + location: location!(), }); } let idx = last_stage.unwrap(); diff --git a/rust/lance/src/index/vector/diskann/builder.rs b/rust/lance/src/index/vector/diskann/builder.rs index 5d7cb670f5..d8532f0dd7 100644 --- a/rust/lance/src/index/vector/diskann/builder.rs +++ b/rust/lance/src/index/vector/diskann/builder.rs @@ -31,6 +31,7 @@ use lance_linalg::{ }; use ordered_float::OrderedFloat; use rand::{distributions::Uniform, prelude::*, Rng, SeedableRng}; +use snafu::{location, Location}; use crate::dataset::{Dataset, ROW_ID}; use crate::index::pb; @@ -146,12 +147,14 @@ async fn init_graph( .column_by_qualified_name(ROW_ID) .ok_or(Error::Index { message: "row_id not found".to_string(), + location: location!(), })? .as_primitive::(); let vectors = batch .column_by_qualified_name(column) .ok_or(Error::Index { message: format!("column {} not found", column), + location: location!(), })? .as_fixed_size_list(); let matrix = MatrixView::::try_from(vectors)?; @@ -188,9 +191,11 @@ async fn init_graph( fn distance(matrix: &MatrixView, i: usize, j: usize) -> Result { let vector_i = matrix.row(i).ok_or(Error::Index { message: "Invalid row index".to_string(), + location: location!(), })?; let vector_j = matrix.row(j).ok_or(Error::Index { message: "Invalid row index".to_string(), + location: location!(), })?; Ok(l2_distance(vector_i, vector_j)) @@ -257,6 +262,7 @@ async fn robust_prune( async fn find_medoid(vectors: &MatrixView, metric_type: MetricType) -> Result { let centroid = vectors.centroid().ok_or_else(|| Error::Index { message: "Cannot find the medoid of an empty matrix".to_string(), + location: location!(), })?; // Find the closest vertex to the centroid. @@ -287,6 +293,7 @@ async fn index_once( for (i, &id) in ids.iter().enumerate() { let vector = graph.data.row(i).ok_or_else(|| Error::Index { message: format!("Cannot find vector with id {}", id), + location: location!(), })?; let state = greedy_search(graph, medoid, vector, 1, l).await?; diff --git a/rust/lance/src/index/vector/diskann/search.rs b/rust/lance/src/index/vector/diskann/search.rs index f9c1ee5ecb..21363758c9 100644 --- a/rust/lance/src/index/vector/diskann/search.rs +++ b/rust/lance/src/index/vector/diskann/search.rs @@ -27,6 +27,7 @@ use lance_index::vector::{Query, DIST_COL}; use object_store::path::Path; use ordered_float::OrderedFloat; use serde::Serialize; +use snafu::{location, Location}; use tracing::instrument; use super::row_vertex::{RowVertex, RowVertexSerDe}; @@ -263,18 +264,21 @@ impl VectorIndex for DiskANNIndex { ) -> Result> { Err(Error::Index { message: "DiskANNIndex is not loadable".to_string(), + location: location!(), }) } fn check_can_remap(&self) -> Result<()> { Err(Error::NotSupported { source: "DiskANNIndex does not yet support remap".into(), + location: location!(), }) } fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { Err(Error::NotSupported { source: "DiskANNIndex does not yet support remap".into(), + location: location!(), }) } } diff --git a/rust/lance/src/index/vector/graph/builder.rs b/rust/lance/src/index/vector/graph/builder.rs index 48a3325ce4..c99fdab135 100644 --- a/rust/lance/src/index/vector/graph/builder.rs +++ b/rust/lance/src/index/vector/graph/builder.rs @@ -21,6 +21,7 @@ use arrow_array::UInt32Array; use async_trait::async_trait; use lance_linalg::distance::{DistanceFunc, MetricType}; use lance_linalg::matrix::MatrixView; +use snafu::{location, Location}; use super::{Graph, Vertex}; use crate::{Error, Result}; @@ -99,6 +100,7 @@ impl Graph for GraphBuilder { a, self.data.num_rows() ), + location: location!(), })?; let vector_b = self.data.row(b).ok_or_else(|| Error::Index { @@ -107,6 +109,7 @@ impl Graph for GraphBuilder { b, self.data.num_rows() ), + location: location!(), })?; Ok((self.distance_func)(vector_a, vector_b)) } @@ -118,6 +121,7 @@ impl Graph for GraphBuilder { idx, self.data.num_rows() ), + location: location!(), })?; Ok((self.distance_func)(query, vector)) } diff --git a/rust/lance/src/index/vector/graph/persisted.rs b/rust/lance/src/index/vector/graph/persisted.rs index e7e33626ea..c5308b989c 100644 --- a/rust/lance/src/index/vector/graph/persisted.rs +++ b/rust/lance/src/index/vector/graph/persisted.rs @@ -31,6 +31,7 @@ use lance_core::{ use lance_linalg::distance::l2::L2; use lru_time_cache::LruCache; use object_store::path::Path; +use snafu::{location, Location}; use super::{builder::GraphBuilder, Graph}; use super::{Vertex, VertexSerDe}; @@ -114,12 +115,14 @@ impl PersistedGraph { "Vertex column must be of fixed size binary, got: {}", field.data_type() ), + location: location!(), }) } } } else { return Err(Error::Index { message: "Vertex column does not exist in the graph".to_string(), + location: location!(), }); }; let neighbors_projection = schema.project(&[NEIGHBORS_COL])?; @@ -210,6 +213,7 @@ impl PersistedGraph { if array.len() < 1 { return Err(Error::Index { message: "Invalid graph".to_string(), + location: location!(), }); } let value = array.value(0); @@ -252,6 +256,7 @@ impl Graph for PersistedGraph { if array.len() < 1 { return Err(Error::Index { message: "Invalid graph".to_string(), + location: location!(), }); } let value = array.value(0); @@ -285,6 +290,7 @@ pub async fn write_graph( if graph.is_empty() { return Err(Error::Index { message: "Invalid graph".to_string(), + location: location!(), }); } let binary_size = serde.size(); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 7f7d282b09..af05a9c041 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -104,6 +104,7 @@ impl IVFIndex { if !sub_index.is_loadable() { return Err(Error::Index { message: format!("IVF sub index must be loadable, got: {:?}", sub_index), + location: location!(), }); } Ok(Self { @@ -178,6 +179,7 @@ impl IVFIndex { .downcast_ref::() .ok_or(Error::Index { message: "Only support append to IVF_PQ".to_string(), + location: location!(), })?; // TODO: merge two IVF implementations. @@ -315,6 +317,7 @@ impl VectorIndex for IVFIndex { ) -> Result> { Err(Error::Index { message: "Flat index does not support load".to_string(), + location: location!(), }) } @@ -331,6 +334,7 @@ impl VectorIndex for IVFIndex { // mirrors some of the other IVF routines like build_ivf_pq_index Err(Error::Index { message: "Remapping IVF in this way not supported".to_string(), + location: location!(), }) } } @@ -557,10 +561,13 @@ fn sanity_check<'a>(dataset: &'a Dataset, column: &str) -> Result<&'a Field> { }; if let DataType::FixedSizeList(elem_type, _) = field.data_type() { if !matches!(elem_type.data_type(), DataType::Float32) { - return Err( - Error::Index{message: - format!("VectorIndex requires the column data type to be fixed size list of float32s, got {}", - elem_type.data_type())}); + return Err(Error::Index{ + message:format!( + "VectorIndex requires the column data type to be fixed size list of float32s, got {}", + elem_type.data_type() + ), + location: location!() + }); } } else { return Err(Error::Index { @@ -568,6 +575,7 @@ fn sanity_check<'a>(dataset: &'a Dataset, column: &str) -> Result<&'a Field> { "VectorIndex requires the column data type to be fixed size list of float32s, got {}", field.data_type() ), + location: location!(), }); } Ok(field) @@ -621,6 +629,7 @@ impl IvfBuildParams { num_partitions, centroids.len() ), + location: location!(), }); } Ok(Self { @@ -658,6 +667,7 @@ pub async fn build_ivf_pq_index( "VectorIndex requires the column data type to be fixed size list of floats, got {}", field.data_type() ), + location: location!(), }); }; @@ -683,6 +693,7 @@ pub async fn build_ivf_pq_index( centroids.len(), ivf_params.num_partitions * dim, ), + location: location!(), }); } Ivf::new(centroids.clone()) @@ -692,6 +703,7 @@ pub async fn build_ivf_pq_index( #[cfg(not(feature = "opq"))] return Err(Error::Index { message: "Feature 'opq' is not installed.".to_string(), + location: location!(), }); #[cfg(feature = "opq")] { @@ -873,6 +885,7 @@ pub(crate) async fn remap_index_file( .downcast_ref::() .ok_or_else(|| Error::NotSupported { source: "Remapping a non-pq sub-index".into(), + location: location!(), })?; let metadata = IvfPQIndexMetadata { diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 659b0ecce1..3dd20ac3b7 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -92,6 +92,7 @@ pub async fn shuffle_dataset( .await .map_err(|e| Error::Index { message: e.to_string(), + location: location!(), }) }) .buffer_unordered(num_cpus::get()) diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index 64867ac93c..33d2364647 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -36,6 +36,7 @@ use lance_linalg::{ matrix::MatrixView, }; use serde::Serialize; +use snafu::{location, Location}; use tracing::{instrument, Instrument}; use super::VectorIndex; @@ -243,6 +244,7 @@ impl VectorIndex for PQIndex { if self.code.is_none() || self.row_ids.is_none() { return Err(Error::Index { message: "PQIndex::search: PQ is not initialized".to_string(), + location: location!(), }); } pre_filter.wait_for_ready().await?; diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index e24ab07bf6..34b82bfcc3 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -18,6 +18,7 @@ use arrow_array::{cast::AsArray, types::Float32Type}; use arrow_schema::Schema as ArrowSchema; use arrow_select::concat::concat_batches; use futures::stream::TryStreamExt; +use snafu::{location, Location}; use lance_linalg::MatrixView; @@ -50,6 +51,7 @@ pub async fn maybe_sample_training_data( "Sample training data: column {} does not exist in return", column ), + location: location!(), })?; Ok(array.as_fixed_size_list().try_into()?) } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 4469bb5bb2..3b9e999432 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -35,6 +35,8 @@ use std::sync::Arc; +use snafu::{location, Location}; + use futures::future::Either; use futures::{StreamExt, TryStreamExt}; use lance_core::{ @@ -100,6 +102,7 @@ fn check_transaction( and it was missing transaction metadata.", other_version ), + location: location!(), }); } @@ -113,6 +116,7 @@ fn check_transaction( transaction, other_transaction ) .into(), + location: location!(), }); } @@ -363,6 +367,7 @@ pub(crate) async fn commit_transaction( commit_config.num_retries ) .into(), + location: location!(), }) } diff --git a/rust/lance/src/io/exec/planner.rs b/rust/lance/src/io/exec/planner.rs index c1f46a4937..d51134d569 100644 --- a/rust/lance/src/io/exec/planner.rs +++ b/rust/lance/src/io/exec/planner.rs @@ -417,10 +417,10 @@ impl Planner { if can_cast_types(&right_data_type, &left_data_type) { Arc::new(CastExpr::new(right, left_data_type, None)) } else { - return Err(Error::invalid_input(format!( - "Cannot compare {} and {}", - left_data_type, right_data_type - ))); + return Err(Error::invalid_input( + format!("Cannot compare {} and {}", left_data_type, right_data_type), + location!(), + )); } } else { right diff --git a/rust/lance/src/utils/future.rs b/rust/lance/src/utils/future.rs index ee1a362d56..b511396583 100644 --- a/rust/lance/src/utils/future.rs +++ b/rust/lance/src/utils/future.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; - use async_cell::sync::AsyncCell; use futures::Future; +use snafu::{location, Location}; +use std::sync::Arc; /// An async background task whose output can be shared across threads (via cloning) /// @@ -22,7 +22,10 @@ impl SharedPrerequisite { .get() .await .clone() - .map_err(|err| crate::Error::PrerequisiteFailed { message: err }) + .map_err(|err| crate::Error::PrerequisiteFailed { + message: err, + location: location!(), + }) } /// Synchronously get a cloned copy of the cached output @@ -45,7 +48,10 @@ impl SharedPrerequisite { .get() .await .map(|_| ()) - .map_err(|err| crate::Error::PrerequisiteFailed { message: err }) + .map_err(|err| crate::Error::PrerequisiteFailed { + message: err, + location: location!(), + }) } /// Launch a background task (using tokio::spawn) and get a shareable handle to the eventual result @@ -90,7 +96,10 @@ mod tests { } // On error - let fut = future::ready(crate::Result::Err(crate::Error::invalid_input("xyz"))); + let fut = future::ready(crate::Result::Err(crate::Error::invalid_input( + "xyz", + location!(), + ))); let prereq = SharedPrerequisite::::spawn(fut); let mut tasks = Vec::with_capacity(10);