-
Notifications
You must be signed in to change notification settings - Fork 221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: make sure we use multiple threads when scanning #1705
Conversation
// If contiguous, continue | ||
if indices[i + 1] == indices[i] { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a performance bug for large vectors. If we have vectors larger than the block size (1536-dim f32 vectors are 6,144 bytes while the default block size on local fs is 4K), then without this line we make a range request for every vector we take. 😱
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the one question
@@ -294,7 +294,12 @@ impl FragmentScanner { | |||
let stream = futures::stream::iter(simplified_predicates.into_iter().enumerate()).map( | |||
move |(batch_id, predicate)| { | |||
let scanner_ref = scanner.clone(); | |||
async move { scanner_ref.read_batch(batch_id, predicate).await } | |||
tokio::task::spawn(async move { scanner_ref.read_batch(batch_id, predicate).await }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would buffered
/ buffer_unordered
not work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do call it below, it's just not sufficient. Each of the read_batch()
tasks has some CPU-bound work, which can block IO of other tasks if they are in the same thread. By spawning, we distribute the tasks amongst threads, ensuring we get concurrency.
No description provided.