-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use tournament loser tree for k-way sort-merging, increase merge speed by 50% #4301
Conversation
cc @tustvold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good, thank you, I left some comments on how to make the implementation a little easier to follow, but I would be happy for this to go in as is.
|
||
if self.in_progress.len() == self.batch_size { | ||
return Poll::Ready(Some(self.build_record_batch())); | ||
let mut cmp_node = (num_streams + winner) / 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut cmp_node = (num_streams + winner) / 2; | |
// Replace overall winner by walking tree of losers | |
let mut cmp_node = (num_streams + winner) / 2; |
/// the loser nodes | ||
loser_tree: Vec<usize>, | ||
|
||
/// Identify whether the loser tree is adjusted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Identify whether the loser tree is adjusted | |
/// Identify whether the most recently yielded overall winner has been replaced | |
/// within the loser tree, a value of `false` indicates that they overall winner | |
/// has been yielded but the loser tree has not been updated |
Or something to make it clearer what adjusted actually means.
FWIW a boolean of should_replace_winner
or something might be clearer
self.current() | ||
.cmp(&other.current()) | ||
.then_with(|| self.stream_idx.cmp(&other.stream_idx)) | ||
match (self.is_finished(), other.is_finished()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match (self.is_finished(), other.is_finished()) { | |
// Order finished cursors last | |
match (self.is_finished(), other.is_finished()) { |
|
||
// Init all cursors and the loser tree in the first poll | ||
if self.loser_tree.is_empty() { | ||
// Ensure all non-exhausted streams have a cursor from which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to follow if this method were split into a method called init_loser_tree
with a doc comment explaining what it does
self.cursor_finished[stream_idx] = true; | ||
// Adjust the loser tree if necessary | ||
if !self.loser_tree_adjusted { | ||
let mut winner = self.loser_tree[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to follow if this was moved into a method called replace_loser_tree_winner
, perhaps with a link to this GIF - https://en.wikipedia.org/wiki/K-way_merge_algorithm#/media/File:Loser_tree_replacement_selection.gif
} | ||
} | ||
} | ||
let min_cursor_idx = self.loser_tree[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be made easier to follow if it were written along the lines of
let min_cursor = self.cursors[min_cursor_idx];
if min_cursor.is_finished() {
// All streams are exhausted
return Poll::Ready((!self.in_progress.is_empty()).then(|| self.build_record_batch()))
}
self.loser_tree_adjusted = false;
self.in_progress.push(...)
if self.in_progress.len() == self.batch_size {
return Poll::Ready(Some(self.build_record_batch()));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't make this work in #4407
if challenger_win { | ||
self.loser_tree[cmp_node] = winner; | ||
winner = challenger; | ||
} else { | ||
self.loser_tree[cmp_node] = challenger; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if challenger_win { | |
self.loser_tree[cmp_node] = winner; | |
winner = challenger; | |
} else { | |
self.loser_tree[cmp_node] = challenger; | |
} | |
if challenger_win { | |
self.loser_tree[cmp_node] = winner; | |
winner = challenger; | |
} |
I will have a follow on PR up shortly |
Benchmark runs are scheduled for baseline = 52e198e and contender = 0d334cf. 0d334cf is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Follow on PR: #4407 |
Which issue does this PR close?
Closes #4300.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
running benchmarks with
cargo bench --bench merge
:Are there any user-facing changes?