Skip to content

Commit

Permalink
feat: avoid spurious tombstones in table output (#6405)
Browse files Browse the repository at this point in the history
* feat: avoid spurious tombstones in table output

fixes: #3558

AK commit apache/kafka#9156 enhances Kafka Streams so that filters on tables now avoid emitting spurious tombstones. ksqlDB now benefits from this.  Tombstones are no longer emitted to the sink topic when a HAVING clause excludes a row from the result _that has never been in the result table_.

BREAKING CHANGE: This change fixes a _bug_ where unnecessary tombstones where being emitted when a `HAVING` clause filtered out a row from the source that is not in the output table

For example, given:

```sql
-- source stream:
CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...);

-- aggregate into a table:
CREATE TABLE BAR AS
    SELECT ID, SUM(VAL) AS SUM
    FROM FOO
    GROUP BY ID
    HAVING SUM(VAL) > 0;


-- insert some values into the stream:
INSERT INTO FOO VALUES(1, -5); 
INSERT INTO FOO VALUES(1, 6); 
INSERT INTO FOO VALUES(1, -2); 
INSERT INTO FOO VALUES(1, -1); 
```

Where previously the contents of the sink topic `BAR` would have contained records:

| Key | Value | Notes |
|-----|-------|------|
| 1.     | null.   | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. |
| 1.     | {sum=1} | Row added as HAVING criteria now met |
| 1.     | null.   | Row deleted as HAVING criteria now not met |
| 1.     | null.   | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. |

Note: the first record in the tom

The topic will now contain:

| Key | Value |
|-----|-------|
| 1.     | {sum=1} |
| 1.     | null.   |

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Oct 12, 2020
1 parent 1168e26 commit 4c7c9b5
Show file tree
Hide file tree
Showing 49 changed files with 260 additions and 545 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,9 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() {
}

@Test
public void shouldIgnoreHavingClause() {
// Note: HAVING clause are handled centrally by KsqlMaterialization
public void shouldHandleHavingClause() {
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been
// installed as part of building the below statement:

// Given:
final PersistentQueryMetadata query = executeQuery(
Expand All @@ -632,7 +633,11 @@ public void shouldIgnoreHavingClause() {

final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);
final int matches = (int) USER_DATA_PROVIDER.data().values().stream()
.filter(row -> ((Long) row.get(0)) > 2)
.count();

final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand All @@ -641,16 +646,22 @@ public void shouldIgnoreHavingClause() {
final MaterializedTable table = materialization.nonWindowed();

rows.forEach((rowKey, value) -> {
// Rows passing the HAVING clause:
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> expected = Optional.ofNullable(value)
.map(v -> Row.of(schema, key, v, -1L));

final Optional<Row> row = withRetry(() -> table.get(key));
assertThat(row.map(Row::schema), is(expected.map(Row::schema)));
assertThat(row.map(Row::key), is(expected.map(Row::key)));
assertThat(row.map(Row::value), is(expected.map(Row::value)));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

USER_DATA_PROVIDER.data().entries().stream()
.filter(e -> !rows.containsKey(e.getKey().getString("USERID")))
.forEach(e -> {
// Rows filtered by the HAVING clause:
final Optional<Row> row = withRetry(() -> table.get(e.getKey()));
assertThat(row, is(Optional.empty()));
});
}

private static void verifyRetainedWindows(
Expand All @@ -677,10 +688,22 @@ private static void verifyRetainedWindows(
private <T> Map<T, GenericRow> waitForUniqueUserRows(
final Deserializer<T> keyDeserializer,
final LogicalSchema aggregateSchema
) {
return waitForUniqueUserRows(
USER_DATA_PROVIDER.data().size(),
keyDeserializer,
aggregateSchema
);
}

private <T> Map<T, GenericRow> waitForUniqueUserRows(
final int count,
final Deserializer<T> keyDeserializer,
final LogicalSchema aggregateSchema
) {
return TEST_HARNESS.verifyAvailableUniqueRows(
output.toUpperCase(),
USER_DATA_PROVIDER.data().size(),
count,
VALUE_FORMAT,
PhysicalSchema.from(aggregateSchema, SerdeFeatures.of(), SerdeFeatures.of()),
keyDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public KudafAggregator(

@Override
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());

// copy over group-by and aggregate parameter columns into the output row
for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

// compute the aggregation and write it into the output row. Its assumed that
Expand All @@ -63,12 +65,12 @@ public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow a
for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final KsqlAggregateFunction<Object, Object, Object> func = aggregateFunctionForColumn(idx);
final Object currentValue = rowValue.get(func.getArgIndexInValue());
final Object currentAggregate = aggRowValue.get(idx);
final Object currentAggregate = result.get(idx);
final Object newAggregate = func.aggregate(currentValue, currentAggregate);
aggRowValue.set(idx, newAggregate);
result.set(idx, newAggregate);
}

return aggRowValue;
return result;
}

public KsqlTransformer<K, GenericRow> getResultMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ public KudafUndoAggregator(
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());

for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final TableAggregationFunction function = aggregateFunctions.get(idx - nonAggColumnCount);
final Object argument = rowValue.get(function.getArgIndexInValue());
final Object previous = aggRowValue.get(idx);
aggRowValue.set(idx, function.undo(argument, previous));
final Object previous = result.get(idx);
result.set(idx, function.undo(argument, previous));
}

return aggRowValue;
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.function.KsqlAggregateFunction;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Merger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafAggregatorTest {

@Mock
private KsqlAggregateFunction<Long, String, String> func1;
@Mock
private Struct key;
@Mock
private Merger<Struct, String> func1Merger;
@Mock
private Function<String, String> func1ResultMapper;
@Mock
private KsqlProcessingContext ctx;
private KudafAggregator<String> aggregator;

@Before
public void setUp() {
aggregator = new KudafAggregator<>(2, ImmutableList.of(func1));

when(func1.getMerger()).thenReturn(func1Merger);
when(func1.getResultMapper()).thenReturn(func1ResultMapper);

when(func1.aggregate(any(), any())).thenReturn("func1-result");
when(func1Merger.apply(any(), any(), any())).thenReturn("func1-merged");
when(func1ResultMapper.apply(any())).thenReturn("func1-result");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply("key", value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnMerge() {
// Given:
final GenericRow aggOne = GenericRow.genericRow(1, 2L, 4);
final GenericRow aggTwo = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.getMerger().apply(key, aggOne, aggTwo);

// Then:
assertThat(aggOne, is(GenericRow.genericRow(1, 2L, 4)));
assertThat(aggTwo, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnResultsMap() {
// Given:
final GenericRow agg = GenericRow.genericRow(1, 2L, 4);

// When:
final GenericRow result = aggregator.getResultMapper().transform("k", agg, ctx);

// Then:
assertThat(agg, is(GenericRow.genericRow(1, 2L, 4)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.function.TableAggregationFunction;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafUndoAggregatorTest {

@Mock
private TableAggregationFunction<Long, String, String> func1;
@Mock
private Struct key;
private KudafUndoAggregator aggregator;

@Before
public void setUp() {
aggregator = new KudafUndoAggregator(2, ImmutableList.of(func1));

when(func1.undo(any(), any())).thenReturn("func1-undone");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply(key, value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldApplyUndoableAggregateFunctions() {
// Given:
final GenericRow value = genericRow(1, 2L);
final GenericRow aggRow = genericRow(1, 2L, 3);

// When:
final GenericRow resultRow = aggregator.apply(key, value, aggRow);

// Then:
assertThat(resultRow, equalTo(genericRow(1, 2L, "func1-undone")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public PlannedTestLoader(

public Stream<TestCase> loadTests() {
return planLoader.load(predicate)
.filter(t -> t.getSpecNode().getTestCase().isEnabled())
.map(PlannedTestUtils::buildPlannedTestCase);
}

Expand Down
Loading

0 comments on commit 4c7c9b5

Please sign in to comment.