diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index e1ae6aa7c3..5e29583577 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -49,7 +49,10 @@ def test_dataset_optimize(tmp_path: Path): def create_table(min, max, nvec, ndim=8): mat = np.random.uniform(min, max, (nvec, ndim)) tbl = vec_to_table(data=mat) - print(tbl) + # Add id column for filtering + tbl = pa.Table.from_pydict( + {"vector": tbl.column(0).chunk(0), "id": np.arange(0, nvec)} + ) return tbl @@ -123,6 +126,45 @@ def check_index(has_knn_combined): check_index(has_knn_combined=True) +def test_index_remapping_multiple_rewrite_tasks(tmp_path: Path): + base_dir = tmp_path / "dataset" + ds = lance.write_dataset( + create_table(min=0, max=1, nvec=300), base_dir, max_rows_per_file=150 + ) + ds = ds.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=2, + ) + assert ds.has_index + ds = lance.write_dataset( + create_table(min=0, max=1, nvec=300), + base_dir, + mode="append", + max_rows_per_file=150, + ) + + ds.delete("id % 4 == 0") + fragments = list(ds.get_fragments()) + assert len(fragments) == 4 + + # We have a dataset with 4 small fragments. 2 are indexed and + # 2 are not. The indexed fragments and the non-indexed fragments + # cannot be combined and so we should end up with 2 fragments after + # compaction + ds.optimize.compact_files() + + fragments = list(ds.get_fragments()) + assert len(fragments) == 2 + + index = ds.list_indices()[0] + frag_ids = index["fragment_ids"] + + assert len(frag_ids) == 1 + assert list(frag_ids)[0] == fragments[0].fragment_id + + def test_dataset_distributed_optimize(tmp_path: Path): base_dir = tmp_path / "dataset" data = pa.table({"a": range(800), "b": range(800)}) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 589006dabe..fa81996cb1 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -468,19 +468,33 @@ impl Transaction { fn recalculate_fragment_bitmap( old: &RoaringBitmap, - removed: &[u32], - added: &[u32], - index_id: &Uuid, + groups: &[RewriteGroup], ) -> Result { let mut new_bitmap = old.clone(); - for remove in removed { - if !new_bitmap.remove(*remove) { - return Err(Error::invalid_input(format!("The compaction plan modified the fragment with id {} and rewrote the index with id {} but that fragment was not part of that index", remove, index_id))); + for group in groups { + let any_in_index = group + .old_fragments + .iter() + .any(|frag| old.contains(frag.id as u32)); + let all_in_index = group + .old_fragments + .iter() + .all(|frag| old.contains(frag.id as u32)); + // Any rewrite group may or may not be covered by the index. However, if any fragment + // in a rewrite group was previously covered by the index then all fragments in the rewrite + // group must have been previously covered by the index. plan_compaction takes care of + // this for us so this should be safe to assume. + if any_in_index { + if all_in_index { + for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) { + new_bitmap.remove(frag_id); + } + new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32)); + } else { + return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data")); + } } } - for add in added { - new_bitmap.insert(*add); - } Ok(new_bitmap) } @@ -490,14 +504,6 @@ impl Transaction { groups: &[RewriteGroup], ) -> Result<()> { let mut modified_indices = HashSet::new(); - let old_frag_ids = groups - .iter() - .flat_map(|group| group.old_fragments.iter().map(|frag| frag.id as u32)) - .collect::>(); - let new_frag_ids = groups - .iter() - .flat_map(|group| group.new_fragments.iter().map(|frag| frag.id as u32)) - .collect::>(); for rewritten_index in rewritten_indices { if !modified_indices.insert(rewritten_index.old_id) { @@ -521,9 +527,7 @@ impl Transaction { index.uuid )) })?, - &old_frag_ids, - &new_frag_ids, - &index.uuid, + groups, )?); index.uuid = rewritten_index.new_id; }