Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build ivf partition using disk based shuffler #1312

Merged
merged 13 commits into from
Oct 2, 2023

Conversation

eddyxu
Copy link
Contributor

@eddyxu eddyxu commented Sep 22, 2023

  • Reduce the memory consumption to run IVF_PQ
  • Building block for distributed IVF_PQ indexing
  • Better multi-thread support during IVF assignment

Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts from a partial review.

@@ -190,64 +185,38 @@ pub struct KMeanMembership {
impl KMeanMembership {
/// Reconstruct a KMeans model from the membership.
async fn to_kmeans(&self) -> Result<KMeans> {
let time = std::time::Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe #[instrument] the method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.iter()
.zip(centroid.values().iter())
.map(|(v, c)| *v - *c)
.collect::<Vec<_>>() // How to avoid one memory allocation here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flat_map should accept a function that returns an iterator. Can you just remove the collect? Maybe you need to do .copied() to convert from Iterator<Item = &f32> to Iterator<Item = f32>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It complaints returns a value referencing data owned by the current function (centroid) tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed in another way: using foreach to append to pre-allocated vector.

@@ -649,6 +635,7 @@ pub async fn build_ivf_pq_index(
#[cfg(not(feature = "opq"))]
let transforms: Vec<Box<dyn Transformer>> = vec![];

let start = std::time::Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a span?

Comment on lines 199 to 203
for i in 0..dimension {
new_centroids[*c as usize * dimension + i] += v[i];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bit faster (especially for large dimension) as it can avoid some bounds checking.

Suggested change
for i in 0..dimension {
new_centroids[*c as usize * dimension + i] += v[i];
}
for (old, new) in new_centroids[(*c * dimension)..((*c + 1) * dimension)]
.iter_mut()
.zip(v)
{
*old += new;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 170 to 186
.and_then(|batch_and_range| async move {
// Split batch into per-partition batches
let (batch, range) = batch_and_range;
Ok(stream::iter(range).map(move |part_id| {
let predictions = BooleanArray::from_unary(
batch
.column_by_name(PARTITION_ID_COLUMN)
.unwrap()
.as_primitive::<UInt32Type>(),
|pid| pid == part_id,
);
let parted_batch =
filter_record_batch(&batch, &predictions)?.drop_column(PARTITION_ID_COLUMN)?;
Ok::<(u32, RecordBatch), Error>((part_id, parted_batch))
}))
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is CPU bound, so I think we need to put this into tokio::task::spawn_blocking() to get any concurrency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this piece of code.

Comment on lines 217 to 218
#[cfg(test)]
mod tests {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This going to be filled in? Or removed?

.values()
.chunks_exact(dimension)
.zip(self.cluster_ids.iter())
.for_each(|(v, c)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using single variable names makes this harder to read. (I had to cross reference different parts of the file to figure out what they meant.)

Suggested change
.for_each(|(v, c)| {
.for_each(|(vector, cluster_id)| {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.for_each(|(v, c)| {
cluster_cnts[*c as usize] += 1;
for i in 0..dimension {
new_centroids[*c as usize * dimension + i] += v[i];
Copy link
Contributor

@chebbyChefNEQ chebbyChefNEQ Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like it can be an overflow hazard. We probably haven't hit it because of sampling I'm guessing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use iterator

0..num_partitions,
)
.await?;
println!("Building partitions: {}s", start.elapsed().as_secs_f32());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/building/built

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

warn!("KMeans: cluster {} is empty", i);
} else {
for j in 0..dimension {
new_centroids[i * dimension + j] /= cluster_cnts[i] as f32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that cluster_cnts[i] is zero here? Maybe if K is larger than the # of data points?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. the training algorithm does not work that well to make sure no cluster at the moment.

eddyxu and others added 3 commits October 2, 2023 11:09
parent 8775776
author Lei Xu <[email protected]> 1695332018 -0700
committer Lei Xu <[email protected]> 1695926481 -0700

Ivf::compute_partitions
use instructment
avoid memory copy during residual computation
@eddyxu eddyxu marked this pull request as ready for review October 2, 2023 18:37
Comment on lines 101 to 104
let indices = UInt32Array::from(row_ids.clone());
// Use `take` to select rows.
let str_arr = take(&struct_arr, &indices, None)?;
let parted_batch: RecordBatch = str_arr.as_struct().into();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have that new take() method now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Fixed.

@eddyxu eddyxu merged commit fbe5798 into main Oct 2, 2023
15 checks passed
@eddyxu eddyxu deleted the lei/ivf_build_partition branch October 2, 2023 21:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
benchmark vector Vector Search
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants