Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More realistic sort benchmarks #5881

Merged
merged 3 commits into from
Apr 6, 2023

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Apr 5, 2023

Which issue does this PR close?

Relates to #5879

Rationale for this change

This combines the merge and sort benchmarks together to avoid code duplication, it also makes a number of changes to make the benchmarks more realistic:

  • Each partition / stream now contains multiple batches, an average of 13
  • The sort_merge bench now performs a partitioned sort instead of sorting into a single partition and then running a SortPreservingMerge on the single partition (which is a no-op)
  • Tuples are sliced and then collected into their output arrays, this ensures that DictionaryArray aren't all using the same identical dictionary values
  • Merge benchmark is run on presorted data, with the other benchmarks now run on unsorted data

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

// parameters:
//
// Input schemas
lazy_static! {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy static seemed overkill for what this was doing, I opted to keep things simple

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I reviewed this and it makes sense to me. I also spot checked the inputs (by printing them out to standard out and it looked good)

cc @jaylmiller

plan: Arc<dyn ExecutionPlan>,
partition_count: usize,
}
fn sort(partitions: &[Vec<RecordBatch>]) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn sort(partitions: &[Vec<RecordBatch>]) -> Self {
/// Test SortExec in "non partitioned" mode which sorts the input streams
/// into a single sorted output stream
fn sort(partitions: &[Vec<RecordBatch>]) -> Self {

}
}

fn sort_partitioned(partitions: &[Vec<RecordBatch>]) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn sort_partitioned(partitions: &[Vec<RecordBatch>]) -> Self {
/// Test SortExec in "partitioned" mode which sorts the input streams
/// individually into some number of output streams
fn sort_partitioned(partitions: &[Vec<RecordBatch>]) -> Self {

fn run(&self) {
let plan = Arc::clone(&self.plan);
let task_ctx = Arc::clone(&self.task_ctx);
fn sort_merge(partitions: &[Vec<RecordBatch>]) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn sort_merge(partitions: &[Vec<RecordBatch>]) -> Self {
/// Test SortExec in "partitioned" mode followed by a SortPreservingMerge
fn sort_merge(partitions: &[Vec<RecordBatch>]) -> Self {

let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec =
SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None);
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to behave the same, performance wise, as sort -- is that your expectation too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually yes, currently for single column cases it performs worse

runtime: Runtime,
task_ctx: Arc<TaskContext>,

// The plan to run
plan: Arc<dyn ExecutionPlan>,
}

impl SortBenchCase {
impl BenchCase {
/// Prepare to run a benchmark that merges the specified
/// partitions (streams) together using all keyes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// partitions (streams) together using all keyes
/// pre-sorted partitions (streams) together using all keys

@jaylmiller
Copy link
Contributor

Thank you @tustvold -- I reviewed this and it makes sense to me. I also spot checked the inputs (by printing them out to standard out and it looked good)

cc @jaylmiller

Looks good to me as well!

@tustvold tustvold merged commit 5c0fe0d into apache:main Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants