Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce Hive splits for bucketed tables in round-robin fashion #7031

Merged
merged 2 commits into from
Jan 14, 2017

Conversation

haozhun
Copy link
Contributor

@haozhun haozhun commented Jan 10, 2017

This reduces the likelihood that the scheduler gets blocked when one worker has
more splits queued than limit while other workers have no splits. Without round
robin, for a bucketed partition, the split loader would produce a series of
splits that has node afinity like C, C, ..., C, A, A, ..., A, D, D, ..., D, B,
B, ..., B. If there are more splits for node C than the number of queued splits
allowed, node A, B, D would not have any split available to run because the
scheduler is blocked.

In addition, this commit changes the policy to determine the target size for
initial splits. The original policy tries to produce a number of initial splits
that have similar size. For example, assuming maxInitialSplitSize = 9K,
maxSplitSize = 30K, and the file size is 48K.

  • When maxInitialSplits = 10, the file will be split into 8K, 8K, 8K, 8K, 8K, 8K.
  • When maxInitialSplits = 2, the file will be split into 8K, 8K, 16K, 16K.

In the new policy,

  • When maxInitialSplits = 10, the file will be split into 9K, 9K, 9K, 9K, 9K, 3K.
  • When maxInitialSplits = 2, the file will be split into 9K, 9K, 30K.

You can see that the old policy is better for case 1, while the new policy is
better for case 2. A smart policy that are optimal in both cases are available.
However, such policy has to know exactly how many initial splits are left.
This is not possible given the parallel nature of BackgroundHiveSplitLoader.


long chunkOffset = 0;
while (chunkOffset < blockLocation.getLength()) {
if (remainingInitialSplits.decrementAndGet() < 0 && creatingInitialSplits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like you removed this decrementing. Unless I'm missing something, you need to add it back to make the initial splits work correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I should add the decrementing back.

Do you have any suggestions on verifying correctness of this code. I tried adding tests yesterday. I guess I can sit down and take time to manually construct a BackgroundHiveSplitLoader if that's what we have to do. Do you have any suggestions?

Copy link
Contributor Author

@haozhun haozhun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


long chunkOffset = 0;
while (chunkOffset < blockLocation.getLength()) {
if (remainingInitialSplits.decrementAndGet() < 0 && creatingInitialSplits) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I should add the decrementing back.

Do you have any suggestions on verifying correctness of this code. I tried adding tests yesterday. I guess I can sit down and take time to manually construct a BackgroundHiveSplitLoader if that's what we have to do. Do you have any suggestions?

This reduces the likelihood that the scheduler gets blocked when one worker has
more splits queued than limit while other workers have no splits. Without round
robin, for a bucketed partition, the split loader would produce a series of
splits that has node afinity like C, C, ..., C, A, A, ..., A, D, D, ..., D, B,
B, ..., B. If there are more splits for node C than the number of queued splits
allowed, node A, B, D would not have any split available to run because the
scheduler is blocked.

In addition, this commit changes the policy to determine the target size for
initial splits. The original policy tries to produce a number of initial splits
that have similar size. For example, assuming maxInitialSplitSize = 9K,
maxSplitSize = 30K, and the file size is 48K.

* When maxInitialSplits = 10, the file will be split into 8K, 8K, 8K, 8K, 8K, 8K.
* When maxInitialSplits = 2, the file will be split into 8K, 8K, 16K, 16K.

In the new policy,

* When maxInitialSplits = 10, the file will be split into 9K, 9K, 9K, 9K, 9K, 3K.
* When maxInitialSplits = 2, the file will be split into 9K, 9K, 30K.

You can see that the old policy is better for case 1, while the new policy is
better for case 2. A smart policy that are optimal in both cases are available.
However, such policy has to know exactly how many initial splits are left.
This is not possible given the parallel nature of BackgroundHiveSplitLoader.
@haozhun haozhun merged commit 7aab59d into prestodb:master Jan 14, 2017
@haozhun haozhun deleted the splitloader branch March 11, 2018 05:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants