Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Oct 10, 2023
1 parent 97b191c commit da14c0f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 59 deletions.
12 changes: 6 additions & 6 deletions rust/lance/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ impl From<&Fragment> for pb::DataFragment {
}
}

pub struct RowId(u64);
pub struct RowAddress(u64);

impl RowId {
impl RowAddress {
pub const FRAGMENT_SIZE: u64 = 1 << 32;
// A fragment id that will never be used
pub const TOMBSTONE_FRAG: u32 = 0xffffffff;
Expand Down Expand Up @@ -240,19 +240,19 @@ impl RowId {
}
}

impl From<RowId> for u64 {
fn from(row_id: RowId) -> Self {
impl From<RowAddress> for u64 {
fn from(row_id: RowAddress) -> Self {
row_id.0
}
}

impl std::fmt::Debug for RowId {
impl std::fmt::Debug for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self) // use Display
}
}

impl std::fmt::Display for RowId {
impl std::fmt::Display for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {})", self.fragment_id(), self.row_id())
}
Expand Down
82 changes: 41 additions & 41 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,47 @@ pub trait IndexParams: Send + Sync {
fn as_any(&self) -> &dyn Any;
}

#[allow(dead_code)]
async fn remap_index(
dataset: &Dataset,
index_id: &Uuid,
row_id_map: &HashMap<u64, Option<u64>>,
) -> Result<Uuid> {
// Load indices from the disk.
let indices = dataset.load_indices().await?;
let matched = indices
.iter()
.find(|i| i.uuid == *index_id)
.ok_or_else(|| Error::Index {
message: format!("Index with id {} does not exist", index_id),
})?;

if matched.fields.len() > 1 {
return Err(Error::Index {
message: "Remapping indices with multiple fields is not supported".to_string(),
});
}
let field = matched
.fields
.first()
.expect("An index existed with no fields");

let field = dataset.schema().field_by_id(*field).unwrap();

let new_id = Uuid::new_v4();

remap_vector_index(
Arc::new(dataset.clone()),
&field.name,
index_id,
&new_id,
row_id_map,
)
.await?;

Ok(new_id)
}

/// Extends Dataset with secondary index.
#[async_trait]
pub trait DatasetIndexExt {
Expand All @@ -103,12 +144,6 @@ pub trait DatasetIndexExt {
params: &dyn IndexParams,
replace: bool,
) -> Result<()>;

async fn remap_index(
&self,
index_id: &Uuid,
row_id_map: &HashMap<u64, Option<u64>>,
) -> Result<Uuid>;
}

#[async_trait]
Expand Down Expand Up @@ -199,41 +234,6 @@ impl DatasetIndexExt for Dataset {

Ok(())
}

async fn remap_index(
&self,
index_id: &Uuid,
row_id_map: &HashMap<u64, Option<u64>>,
) -> Result<Uuid> {
// Load indices from the disk.
let indices = self.load_indices().await?;
let matched = indices
.iter()
.find(|i| i.uuid == *index_id)
.ok_or_else(|| Error::Index {
message: format!("Index with id {} does not exist", index_id),
})?;

if matched.fields.len() > 1 {
return Err(Error::Index {
message: "Remapping indices with multiple fields is not supported".to_string(),
});
}
let field = matched
.fields
.first()
.expect("An index existed with no fields");

let field = self.schema().field_by_id(*field).unwrap();

let new_id = Uuid::new_v4();

let arc_self = Arc::new(self.clone());

remap_vector_index(arc_self, &field.name, index_id, &new_id, row_id_map).await?;

Ok(new_id)
}
}

#[cfg(test)]
Expand Down
11 changes: 9 additions & 2 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,13 @@ pub(crate) async fn remap_vector_index(
) -> Result<()> {
let old_index = open_index(dataset.clone(), column, &old_uuid.to_string()).await?;
old_index.check_can_remap()?;
let ivf_index: &IVFIndex = old_index.as_any().downcast_ref().unwrap();
let ivf_index: &IVFIndex =
old_index
.as_any()
.downcast_ref()
.ok_or_else(|| Error::NotSupported {
source: "Only IVF indexes can be remapped currently".into(),
})?;

remap_index_file(
dataset.as_ref(),
Expand All @@ -287,7 +293,8 @@ pub(crate) async fn remap_vector_index(
ivf_index,
mapping,
)
.await
.await?;
Ok(())
}

/// Open the Vector index on dataset, specified by the `uuid`.
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ fn generate_remap_tasks(
Ok(tasks)
}

pub async fn remap_index_file(
pub(crate) async fn remap_index_file(
dataset: &Dataset,
old_uuid: &str,
new_uuid: &str,
Expand Down Expand Up @@ -957,7 +957,7 @@ mod tests {
use std::collections::HashMap;

use crate::{
format::RowId,
format::RowAddress,
index::{
vector::{open_index, VectorIndexParams},
DatasetIndexExt, IndexType,
Expand Down Expand Up @@ -1173,7 +1173,7 @@ mod tests {
// The invalid row id should never show up in results
assert!(!found_ids
.iter()
.any(|f_id| f_id.unwrap() == RowId::TOMBSTONE_ROW));
.any(|f_id| f_id.unwrap() == RowAddress::TOMBSTONE_ROW));
}
}
}
Expand Down
23 changes: 16 additions & 7 deletions rust/lance/src/index/vector/pq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::index::prefilter::PreFilter;
use crate::index::Index;
use crate::index::{pb, vector::kmeans::train_kmeans, vector::DIST_COL};
use crate::io::object_reader::{read_fixed_stride_array, ObjectReader};
use crate::{arrow::*, format::RowId};
use crate::{arrow::*, format::RowAddress};
use crate::{Error, Result};

/// Product Quantization Index.
Expand Down Expand Up @@ -251,14 +251,23 @@ impl Index for PQIndex {
}

// Helper struct for zipped distance + row id that sorts by distance
struct DistanceRowId((f32, u64));
struct DistanceRowId {
distance: f32,
row_id: u64,
}

impl DistanceRowId {
fn new(distance: f32, row_id: u64) -> Self {
Self { distance, row_id }
}
}

impl DistanceRowId {
fn distance(&self) -> f32 {
self.0 .0
self.distance
}
fn row_id(&self) -> u64 {
self.0 .1
self.row_id
}
}

Expand Down Expand Up @@ -341,8 +350,8 @@ impl VectorIndex for PQIndex {
.iter()
.copied()
.zip(row_ids.values().iter().copied())
.filter(|(_, row_id)| *row_id != RowId::TOMBSTONE_ROW)
.map(|(distance, row_id)| DistanceRowId((distance, row_id)));
.filter(|(_, row_id)| *row_id != RowAddress::TOMBSTONE_ROW)
.map(|(distance, row_id)| DistanceRowId::new(distance, row_id));

let limit = query.k * query.refine_factor.unwrap_or(1) as usize;

Expand Down Expand Up @@ -407,7 +416,7 @@ impl VectorIndex for PQIndex {
.unwrap_or(Some(*old_row_id))
// If the row is in the mapping, but maps to None, then it is deleted, and we insert
// a tombstone in its place
.unwrap_or(RowId::TOMBSTONE_ROW)
.unwrap_or(RowAddress::TOMBSTONE_ROW)
},
));

Expand Down
4 changes: 4 additions & 0 deletions rust/lance/src/io/object_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl ObjectWriter {
Ok(())
}

pub fn sizeof_magic() -> usize {
16
}

pub async fn shutdown(&mut self) -> Result<()> {
Ok(self.writer.shutdown().await?)
}
Expand Down

0 comments on commit da14c0f

Please sign in to comment.