Skip to content

Commit

Permalink
Simplify summing up algorithm for aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
liketic committed Jan 3, 2018
1 parent 15cdec2 commit 1f07cca
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ public void collect(int doc, long bucket) throws IOException {

for (int i = 0; i < valueCount; i++) {
double value = values.nextValue();
if (Double.isNaN(value) || Double.isInfinite(value)) {
if (Double.isFinite(value) == false) {
sum += value;
if (Double.isNaN(sum))
break;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ public InternalAvg doReduce(List<InternalAggregation> aggregations, ReduceContex
for (InternalAggregation aggregation : aggregations) {
InternalAvg avg = (InternalAvg) aggregation;
count += avg.count;
if (Double.isNaN(sum) == false) {
if (Double.isNaN(avg.sum) || Double.isInfinite(avg.sum)) {
sum += avg.sum;
} else if (Double.isFinite(sum)) {
double corrected = avg.sum - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
if (Double.isFinite(avg.sum) == false) {
sum += avg.sum;
} else if (Double.isFinite(sum)) {
double corrected = avg.sum - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
}
return new InternalAvg(getName(), sum, count, format, pipelineAggregators(), getMetaData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,14 @@ public InternalStats doReduce(List<InternalAggregation> aggregations, ReduceCont
max = Math.max(max, stats.getMax());
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
if (Double.isNaN(sum) == false) {
double value = stats.getSum();
if (Double.isNaN(value) || Double.isInfinite(value)) {
sum += value;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
double value = stats.getSum();
if (Double.isFinite(value) == false) {
sum += value;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
}
return new InternalStats(name, count, sum, min, max, format, pipelineAggregators(), getMetaData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ public class StatsAggregator extends NumericMetricsAggregator.MultiValue {

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DoubleArray mins;
DoubleArray maxes;

private DoubleArray compensations;


public StatsAggregator(String name, ValuesSource.Numeric valuesSource, DocValueFormat format,
SearchContext context,
Expand Down Expand Up @@ -110,15 +109,13 @@ public void collect(int doc, long bucket) throws IOException {

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
if (Double.isNaN(sum) == false) {
if (Double.isNaN(value) || Double.isInfinite(value)) {
sum += value;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
if (Double.isFinite(value) == false) {
sum += value;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
min = Math.min(min, value);
max = Math.max(max, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void collect(int doc, long bucket) throws IOException {
double compensationOfSqr = compensationOfSqrs.get(bucket);
for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
if (Double.isNaN(value) || Double.isInfinite(value)) {
if (Double.isFinite(value) == false) {
sum += value;
sumOfSqr += value * value;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static Metrics resolve(String name) {
private final double sigma;

public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, count, sum, min, max, formatter, pipelineAggregators, metaData);
this.sumOfSqrs = sumOfSqrs;
this.sigma = sigma;
Expand Down Expand Up @@ -148,21 +148,19 @@ public InternalExtendedStats doReduce(List<InternalAggregation> aggregations, Re
if (stats.sigma != sigma) {
throw new IllegalStateException("Cannot reduce other stats aggregations that have a different sigma");
}
if (Double.isNaN(sumOfSqrs) == false) {
double value = stats.getSumOfSquares();
if (Double.isNaN(value) || Double.isInfinite(value)) {
sumOfSqrs += value;
} else if (Double.isFinite(sumOfSqrs)) {
double correctedOfSqrs = value - compensationOfSqrs;
double newSumOfSqrs = sumOfSqrs + correctedOfSqrs;
compensationOfSqrs = (newSumOfSqrs - sumOfSqrs) - correctedOfSqrs;
sumOfSqrs = newSumOfSqrs;
}
double value = stats.getSumOfSquares();
if (Double.isFinite(value) == false) {
sumOfSqrs += value;
} else if (Double.isFinite(sumOfSqrs)) {
double correctedOfSqrs = value - compensationOfSqrs;
double newSumOfSqrs = sumOfSqrs + correctedOfSqrs;
compensationOfSqrs = (newSumOfSqrs - sumOfSqrs) - correctedOfSqrs;
sumOfSqrs = newSumOfSqrs;
}
}
final InternalStats stats = super.doReduce(aggregations, reduceContext);
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma,
format, pipelineAggregators(), getMetaData());
format, pipelineAggregators(), getMetaData());
}

static class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ public InternalSum doReduce(List<InternalAggregation> aggregations, ReduceContex
double compensation = 0;
for (InternalAggregation aggregation : aggregations) {
double value = ((InternalSum) aggregation).sum;
if (Double.isNaN(value) || Double.isInfinite(value)) {
if (Double.isFinite(value) == false) {
sum += value;
if (Double.isNaN(sum))
break;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ public void collect(int doc, long bucket) throws IOException {
double compensation = compensations.get(bucket);
for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
if (Double.isNaN(value) || Double.isInfinite(value)) {
if (Double.isFinite(value) == false) {
sum += value;
if (Double.isNaN(sum))
break;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@

public class InternalSumTests extends InternalAggregationTestCase<InternalSum> {

private static final double TOLERANCE = 1e-10;

@Override
protected InternalSum createTestInstance(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
double value = frequently() ? randomDouble() : randomFrom(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
double value = frequently() ? randomDouble() : randomFrom(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, Double.NaN);
DocValueFormat formatter = randomFrom(new DocValueFormat.Decimal("###.##"), DocValueFormat.BOOLEAN, DocValueFormat.RAW);
return new InternalSum(name, value, formatter, pipelineAggregators, metaData);
}
Expand All @@ -54,15 +56,45 @@ protected void assertReduced(InternalSum reduced, List<InternalSum> inputs) {
}

public void testSummationAccuracy() throws IOException {
// Summing up a normal array and expect an accurate value
double[] values = new double[]{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.9, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7};
verifySummationOfDoubles(values, 13.5, 0);

// Summing up an array which contains NaN and infinities and expect a result same as naive summation
int n = randomIntBetween(5, 10);
values = new double[n];
double sum = 0;
for (int i = 0; i < n; i++) {
values[i] = frequently()
? randomFrom(Double.NaN, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY)
: randomDoubleBetween(Double.MIN_VALUE, Double.MAX_VALUE, true);
sum += values[i];
}
verifySummationOfDoubles(values, sum, TOLERANCE);

// Summing up some big double values and expect infinity result
n = randomIntBetween(5, 10);
double[] bigPositiveDoubles = new double[n];
for (int i = 0; i < n; i++) {
bigPositiveDoubles[i] = Double.MAX_VALUE;
}
verifySummationOfDoubles(bigPositiveDoubles, Double.POSITIVE_INFINITY, 0d);

double[] bigNegativeDoubles = new double[n];
for (int i = 0; i < n; i++) {
bigNegativeDoubles[i] = -Double.MAX_VALUE;
}
verifySummationOfDoubles(bigNegativeDoubles, Double.NEGATIVE_INFINITY, 0d);
}

private void verifySummationOfDoubles(double[] values, double expected, double delta) throws IOException {
List<InternalAggregation> aggregations = new ArrayList<>(values.length);
for (double value : values) {
aggregations.add(new InternalSum("dummy1", value, null, null, null));
}
InternalSum internalSum = new InternalSum("dummy", 0, null, null, null);
InternalSum reduced = internalSum.doReduce(aggregations, null);
assertEquals(13.5, reduced.value(), 0d);
assertEquals("dummy", reduced.getName());
assertEquals(expected, reduced.value(), delta);
}

@Override
Expand Down

0 comments on commit 1f07cca

Please sign in to comment.