Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BucketUtils#suggestShardSideQueueSize signature #33210

Merged
merged 2 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,21 @@ private BucketUtils() {}
*
* @param finalSize
* The number of terms required in the final reduce phase.
* @param numberOfShards
* The number of shards being queried.
* @param singleShard
* whether a single shard is being queried, or multiple shards
* @return A suggested default for the size of any shard-side PriorityQueues
*/
public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) {
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
if (finalSize < 1) {
throw new IllegalArgumentException("size must be positive, got " + finalSize);
}
if (numberOfShards < 1) {
throw new IllegalArgumentException("number of shards must be positive, got " + numberOfShards);
}

if (numberOfShards == 1) {
if (singleShard) {
// In the case of a single shard, we do not need to over-request
return finalSize;
}

// Request 50% more buckets on the shards in order to improve accuracy
// as well as a small constant that should help with small values of 'size'
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
return (int) Math.min(Integer.MAX_VALUE, shardSampleSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public int shardSize() {
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards());
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
}

if (requiredSize <= 0 || shardSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}

if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}

// TODO - need to check with mapping that this is indeed a text field....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// heuristic to avoid any wrong-ranking caused by distributed
// counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}
bucketCountThresholds.ensureValidity();
if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,22 @@ public class BucketUtilsTests extends ESTestCase {

public void testBadInput() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(0, 10));
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
assertEquals(e.getMessage(), "size must be positive, got 0");

e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(10, 0));
assertEquals(e.getMessage(), "number of shards must be positive, got 0");
}

public void testOptimizesSingleShard() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, 1));
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
}
}

public void testOverFlow() {
for (int iter = 0; iter < 10; ++iter) {
final int size = Integer.MAX_VALUE - randomInt(10);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
}
}
Expand All @@ -55,7 +51,7 @@ public void testShardSizeIsGreaterThanGlobalSize() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
assertThat(shardSize, greaterThanOrEqualTo(size));
}
}
Expand Down