-
Notifications
You must be signed in to change notification settings - Fork 221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow externally generated shuffle buffer and multiple of them #1801
Conversation
43a9fdf
to
3bea032
Compare
/// # Safety | ||
/// | ||
/// user must ensure the buffers are valid. | ||
pub unsafe fn set_unsorted_buffers(&mut self, unsorted_buffers: Vec<String>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it a unsafe method?
Btw, Can we just hold a reference to &[String]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is meant to note to user that if invalid buffers are passed here it can cause undefined behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to &[impl Into<String>]
.expect("part id should exist"); | ||
|
||
let mut stream = stream::iter(start..end) | ||
.map(|i| reader.read_batch(i as i32, ReadBatchParams::RangeFull, &lance_schema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just do reader.read_batch(i as i32, .., &lance_schema)
i think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤯
.map(|i| reader.read_batch(i as i32, ReadBatchParams::RangeFull, &lance_schema)) | ||
.buffered(16); | ||
|
||
while let Some(batch) = stream.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Foreach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to update partition_sizes
Fn
closures can't do that easily because Stream::for_each
captures multiple FnMut
.expect("part id should exist"); | ||
|
||
let mut stream = stream::iter(start..end) | ||
.map(|i| reader.read_batch(i as i32, ReadBatchParams::RangeFull, &lance_schema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also might consider putting read_batch
into a tokio task. Otherwise progress on these IO calls will be blocked on the CPU-bound work happenining in your while let
loop below.
pq_codes.values()[i * num_sub_vectors..(i + 1) * num_sub_vectors] | ||
.iter(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be wrong, but I think you can make this a slice and it can do a contiguous mem copy instead of running through the iterator.
pq_codes.values()[i * num_sub_vectors..(i + 1) * num_sub_vectors] | |
.iter(), | |
&pq_codes.values()[i * num_sub_vectors..(i + 1) * num_sub_vectors], |
Btw, delete |
70ebbe0
to
87af1c7
Compare
87af1c7
to
f3b7288
Compare
This PR does a few things to the IVF shuffler
set_unsorted_buffers
method. This method allowes caller to set a list of buffers that is different from theunsorted.lance
buffer used by default.I will wire up python in the next PR to keep this smaller and more reviewable.