Skip to content

Commit

Permalink
fix: list_indices never updated after first call (#2936)
Browse files Browse the repository at this point in the history
lance in rust would cache the index metadata so no need to cache it in
python.

- also added an option to control which indices to optimize
- also added tests for optimize_indices

---------

Signed-off-by: BubbleCal <[email protected]>
  • Loading branch information
BubbleCal committed Sep 26, 2024
1 parent 51126ab commit ed86ff6
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 9 deletions.
13 changes: 10 additions & 3 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,7 @@ def tags(self) -> Tags:
return Tags(self._ds)

def list_indices(self) -> List[Dict[str, Any]]:
if getattr(self, "_list_indices_res", None) is None:
self._list_indices_res = self._ds.load_indices()
return self._list_indices_res
return self._ds.load_indices()

def index_statistics(self, index_name: str) -> Dict[str, Any]:
warnings.warn(
Expand Down Expand Up @@ -2790,6 +2788,15 @@ def optimize_indices(self, **kwargs):
the new data to existing partitions. This means an update is much quicker
than retraining the entire index but may have less accuracy (especially
if the new data exhibits new patterns, concepts, or trends)
Parameters
----------
num_indices_to_merge: int, default 1
The number of indices to merge.
If set to 0, new delta index will be created.
index_names: List[str], default None
The names of the indices to optimize.
If None, all indices will be optimized.
"""
self._dataset._ds.optimize_indices(**kwargs)

Expand Down
21 changes: 21 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,24 @@ def test_fragment_scan_disallowed_on_ann_with_index_scan_prefilter(tmp_path):
fragments=[LanceFragment(dataset, 0)],
)
scanner.explain_plan(True)


def test_load_indices(dataset):
indices = dataset.list_indices()
assert len(indices) == 0

dataset.create_index(
"vector", index_type="IVF_PQ", num_partitions=4, num_sub_vectors=16
)
indices = dataset.list_indices()
assert len(indices) == 1


def test_optimize_indices(indexed_dataset):
data = create_table()
indexed_dataset = lance.write_dataset(data, indexed_dataset.uri, mode="append")
indices = indexed_dataset.list_indices()
assert len(indices) == 1
indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0)
indices = indexed_dataset.list_indices()
assert len(indices) == 2
7 changes: 7 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,13 @@ impl Dataset {
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? {
options.num_indices_to_merge = num_indices_to_merge.extract()?;
}
if let Some(index_names) = kwargs.get_item("index_names")? {
options.index_names = Some(
index_names
.extract::<Vec<String>>()
.map_err(|err| PyValueError::new_err(err.to_string()))?,
);
}
}
RT.block_on(
None,
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ pub struct OptimizeOptions {
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version,
/// and accumulate a few delta indices, then merge them into the snapshot.
pub num_indices_to_merge: usize,

/// the index names to optimize. If None, all indices will be optimized.
pub index_names: Option<Vec<String>>,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 1,
index_names: None,
}
}
}
86 changes: 80 additions & 6 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Secondary Index
//!

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use arrow_schema::DataType;
Expand Down Expand Up @@ -425,8 +425,17 @@ impl DatasetIndexExt for Dataset {
let dataset = Arc::new(self.clone());
let indices = self.load_indices().await?;

let indices_to_optimize = options
.index_names
.as_ref()
.map(|names| names.iter().collect::<HashSet<_>>());
let name_to_indices = indices
.iter()
.filter(|idx| {
indices_to_optimize
.as_ref()
.map_or(true, |names| names.contains(&idx.name))
})
.map(|idx| (idx.name.clone(), idx))
.into_group_map();

Expand Down Expand Up @@ -964,22 +973,33 @@ mod tests {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = Field::new(
let vec_field = Field::new(
column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
let schema = Arc::new(Schema::new(vec![field]));
let other_column_name = "other_vec";
let other_vec_field = Field::new(
other_column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));

let float_arr = generate_random_array(512 * dimensions as usize);

let vectors =
arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
let vectors = Arc::new(
arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
);

let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
let record_batch =
RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();

let reader = RecordBatchIterator::new(
vec![record_batch.clone()].into_iter().map(Ok),
Expand All @@ -999,6 +1019,16 @@ mod tests {
)
.await
.unwrap();
dataset
.create_index(
&[other_column_name],
IndexType::Vector,
Some("other_vec_idx".into()),
&params,
true,
)
.await
.unwrap();

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
Expand All @@ -1019,9 +1049,50 @@ mod tests {
assert_eq!(stats["num_unindexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
index_names: Some(vec![]), // Optimize nothing
})
.await
.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_unindexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

// optimize the other index
dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
index_names: Some(vec!["other_vec_idx".to_string()]),
})
.await
.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_unindexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("other_vec_idx").await.unwrap())
.unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 1024);
assert_eq!(stats["num_indexed_fragments"], 2);
assert_eq!(stats["num_unindexed_fragments"], 0);
assert_eq!(stats["num_indices"], 2);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
..Default::default()
})
.await
.unwrap();
Expand All @@ -1038,6 +1109,7 @@ mod tests {
dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 2,
..Default::default()
})
.await
.unwrap();
Expand Down Expand Up @@ -1122,6 +1194,7 @@ mod tests {
dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
..Default::default()
})
.await
.unwrap();
Expand All @@ -1137,6 +1210,7 @@ mod tests {
dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 2,
..Default::default()
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ mod tests {
dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0,
..Default::default()
})
.await
.unwrap();
Expand Down

0 comments on commit ed86ff6

Please sign in to comment.