Skip to content

Commit

Permalink
feat: avoid supurious tombstones
Browse files Browse the repository at this point in the history
fixes: fixes: confluentinc#3558
  • Loading branch information
big-andy-coates committed Oct 12, 2020
1 parent a10e5eb commit cc7f1da
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 71 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public KudafAggregator(

@Override
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());

// copy over group-by and aggregate parameter columns into the output row
for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

// compute the aggregation and write it into the output row. Its assumed that
Expand All @@ -63,12 +65,12 @@ public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow a
for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final KsqlAggregateFunction<Object, Object, Object> func = aggregateFunctionForColumn(idx);
final Object currentValue = rowValue.get(func.getArgIndexInValue());
final Object currentAggregate = aggRowValue.get(idx);
final Object currentAggregate = result.get(idx);
final Object newAggregate = func.aggregate(currentValue, currentAggregate);
aggRowValue.set(idx, newAggregate);
result.set(idx, newAggregate);
}

return aggRowValue;
return result;
}

public KsqlTransformer<K, GenericRow> getResultMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ public KudafUndoAggregator(
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());

for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final TableAggregationFunction function = aggregateFunctions.get(idx - nonAggColumnCount);
final Object argument = rowValue.get(function.getArgIndexInValue());
final Object previous = aggRowValue.get(idx);
aggRowValue.set(idx, function.undo(argument, previous));
final Object previous = result.get(idx);
result.set(idx, function.undo(argument, previous));
}

return aggRowValue;
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.function.KsqlAggregateFunction;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Merger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafAggregatorTest {

@Mock
private KsqlAggregateFunction<Long, String, String> func1;
@Mock
private Struct key;
@Mock
private Merger<Struct, String> func1Merger;
@Mock
private Function<String, String> func1ResultMapper;
@Mock
private KsqlProcessingContext ctx;
private KudafAggregator<String> aggregator;

@Before
public void setUp() {
aggregator = new KudafAggregator<>(2, ImmutableList.of(func1));

when(func1.getMerger()).thenReturn(func1Merger);
when(func1.getResultMapper()).thenReturn(func1ResultMapper);

when(func1.aggregate(any(), any())).thenReturn("func1-result");
when(func1Merger.apply(any(), any(), any())).thenReturn("func1-merged");
when(func1ResultMapper.apply(any())).thenReturn("func1-result");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply("key", value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnMerge() {
// Given:
final GenericRow aggOne = GenericRow.genericRow(1, 2L, 4);
final GenericRow aggTwo = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.getMerger().apply(key, aggOne, aggTwo);

// Then:
assertThat(aggOne, is(GenericRow.genericRow(1, 2L, 4)));
assertThat(aggTwo, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnResultsMap() {
// Given:
final GenericRow agg = GenericRow.genericRow(1, 2L, 4);

// When:
final GenericRow result = aggregator.getResultMapper().transform("k", agg, ctx);

// Then:
assertThat(agg, is(GenericRow.genericRow(1, 2L, 4)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.function.TableAggregationFunction;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafUndoAggregatorTest {

@Mock
private TableAggregationFunction<Long, String, String> func1;
@Mock
private Struct key;
private KudafUndoAggregator aggregator;

@Before
public void setUp() {
aggregator = new KudafUndoAggregator(2, ImmutableList.of(func1));

when(func1.undo(any(), any())).thenReturn("func1-undone");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply(key, value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldApplyUndoableAggregateFunctions() {
// Given:
final GenericRow value = genericRow(1, 2L);
final GenericRow aggRow = genericRow(1, 2L, 3);

// When:
final GenericRow resultRow = aggregator.apply(key, value, aggRow);

// Then:
assertThat(resultRow, equalTo(genericRow(1, 2L, "func1-undone")));
}
}

0 comments on commit cc7f1da

Please sign in to comment.