Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix bug in index remapping when plan contained multiple rewrite groups #1415

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion python/python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def test_dataset_optimize(tmp_path: Path):
def create_table(min, max, nvec, ndim=8):
mat = np.random.uniform(min, max, (nvec, ndim))
tbl = vec_to_table(data=mat)
print(tbl)
# Add id column for filtering
tbl = pa.Table.from_pydict(
{"vector": tbl.column(0).chunk(0), "id": np.arange(0, nvec)}
)
return tbl


Expand Down Expand Up @@ -123,6 +126,45 @@ def check_index(has_knn_combined):
check_index(has_knn_combined=True)


def test_index_remapping_multiple_rewrite_tasks(tmp_path: Path):
base_dir = tmp_path / "dataset"
ds = lance.write_dataset(
create_table(min=0, max=1, nvec=300), base_dir, max_rows_per_file=150
)
ds = ds.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=4,
num_sub_vectors=2,
)
assert ds.has_index
ds = lance.write_dataset(
create_table(min=0, max=1, nvec=300),
base_dir,
mode="append",
max_rows_per_file=150,
)

ds.delete("id % 4 == 0")
fragments = list(ds.get_fragments())
assert len(fragments) == 4

# We have a dataset with 4 small fragments. 2 are indexed and
# 2 are not. The indexed fragments and the non-indexed fragments
# cannot be combined and so we should end up with 2 fragments after
# compaction
ds.optimize.compact_files()

fragments = list(ds.get_fragments())
assert len(fragments) == 2

index = ds.list_indices()[0]
frag_ids = index["fragment_ids"]

assert len(frag_ids) == 1
assert list(frag_ids)[0] == fragments[0].fragment_id


def test_dataset_distributed_optimize(tmp_path: Path):
base_dir = tmp_path / "dataset"
data = pa.table({"a": range(800), "b": range(800)})
Expand Down
44 changes: 24 additions & 20 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,19 +468,33 @@ impl Transaction {

fn recalculate_fragment_bitmap(
old: &RoaringBitmap,
removed: &[u32],
added: &[u32],
index_id: &Uuid,
groups: &[RewriteGroup],
) -> Result<RoaringBitmap> {
let mut new_bitmap = old.clone();
for remove in removed {
if !new_bitmap.remove(*remove) {
return Err(Error::invalid_input(format!("The compaction plan modified the fragment with id {} and rewrote the index with id {} but that fragment was not part of that index", remove, index_id)));
for group in groups {
let any_in_index = group
.old_fragments
.iter()
.any(|frag| old.contains(frag.id as u32));
let all_in_index = group
.old_fragments
.iter()
.all(|frag| old.contains(frag.id as u32));
// Any rewrite group may or may not be covered by the index. However, if any fragment
// in a rewrite group was previously covered by the index then all fragments in the rewrite
// group must have been previously covered by the index. plan_compaction takes care of
// this for us so this should be safe to assume.
if any_in_index {
if all_in_index {
for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) {
new_bitmap.remove(frag_id);
}
new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32));
} else {
return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data"));
}
}
}
for add in added {
new_bitmap.insert(*add);
}
Ok(new_bitmap)
}

Expand All @@ -490,14 +504,6 @@ impl Transaction {
groups: &[RewriteGroup],
) -> Result<()> {
let mut modified_indices = HashSet::new();
let old_frag_ids = groups
.iter()
.flat_map(|group| group.old_fragments.iter().map(|frag| frag.id as u32))
.collect::<Vec<_>>();
let new_frag_ids = groups
.iter()
.flat_map(|group| group.new_fragments.iter().map(|frag| frag.id as u32))
.collect::<Vec<_>>();

for rewritten_index in rewritten_indices {
if !modified_indices.insert(rewritten_index.old_id) {
Expand All @@ -521,9 +527,7 @@ impl Transaction {
index.uuid
))
})?,
&old_frag_ids,
&new_frag_ids,
&index.uuid,
groups,
)?);
index.uuid = rewritten_index.new_id;
}
Expand Down