Skip to content

Commit

Permalink
fix: circumvent KAFKA-10179 by forcing changelog topics for tables (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Jul 8, 2020
1 parent c50a3e4 commit ef8fa4f
Show file tree
Hide file tree
Showing 394 changed files with 50,856 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void shouldAggregateWithNoWindow() {
// Then:
assertOutputOf(resultStream0, expected, is(expected));
assertTableCanBeUsedAsSource(expected, is(expected));
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT, 4, resultStream0, resultStream1);
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT, 5, resultStream0, resultStream1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyDescription;
Expand Down Expand Up @@ -171,6 +172,7 @@ public void before() {
when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde);
when(ksqlStreamBuilder.getFunctionRegistry()).thenReturn(functionRegistry);

when(rowSerde.serializer()).thenReturn(mock(Serializer.class));
when(rowSerde.deserializer()).thenReturn(mock(Deserializer.class));

when(dataSource.getKsqlTopic()).thenReturn(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.LEFT;
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.OUTER;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE_FORCE_CHANGELOG;
import static io.confluent.ksql.planner.plan.PlanTestUtil.getNodeByName;
import static java.util.Optional.empty;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -216,11 +217,11 @@ public void shouldBuildSourceNode() {
setupTopicClientExpectations(1, 1);
buildJoin();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(
builder.build(), SOURCE_NODE);
builder.build(), SOURCE_NODE_FORCE_CHANGELOG);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name)
.collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KTABLE-SOURCE-0000000001")));
assertThat(successors, equalTo(Collections.singletonList("KTABLE-SOURCE-0000000002")));
assertThat(node.topicSet(), equalTo(ImmutableSet.of("test2")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class PlanTestUtil {

static final String TRANSFORM_NODE = "KSTREAM-TRANSFORMVALUES-0000000001";
static final String SOURCE_NODE = "KSTREAM-SOURCE-0000000000";
static final String SOURCE_NODE_FORCE_CHANGELOG = "KSTREAM-SOURCE-0000000001";

private PlanTestUtil() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@
@Immutable
public final class TableSource extends SourceStep<KTableHolder<Struct>> {

private final Boolean forceChangelog;

public TableSource(
@JsonProperty(value = "properties", required = true)
final ExecutionStepPropertiesV1 properties,
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty("timestampColumn") final Optional<TimestampColumn> timestampColumn,
@JsonProperty(value = "sourceSchema", required = true) final LogicalSchema sourceSchema
@JsonProperty(value = "sourceSchema", required = true) final LogicalSchema sourceSchema,
@JsonProperty(value = "forceChangelog") final Optional<Boolean> forceChangelog
) {
super(properties, topicName, formats, timestampColumn, sourceSchema);
this.forceChangelog = forceChangelog.orElse(false);
}

public Boolean isForceChangelog() {
return forceChangelog;
}

@Override
Expand All @@ -55,11 +63,13 @@ public boolean equals(final Object o) {
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(formats, that.formats)
&& Objects.equals(timestampColumn, that.timestampColumn)
&& Objects.equals(sourceSchema, that.sourceSchema);
&& Objects.equals(sourceSchema, that.sourceSchema)
&& Objects.equals(forceChangelog, that.forceChangelog);
}

@Override
public int hashCode() {
return Objects.hash(properties, topicName, formats, timestampColumn, sourceSchema);
return Objects.hash(
properties, topicName, formats, timestampColumn, sourceSchema, forceChangelog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,27 @@ public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1),
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)),
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1))
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties2, "topic1", formats1, Optional.of(timestamp1), schema1))
properties2, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic2", formats1, Optional.of(timestamp1), schema1))
properties1, "topic2", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats2, Optional.of(timestamp1), schema1))
properties1, "topic1", formats2, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp2), schema1))
properties1, "topic1", formats1, Optional.of(timestamp2), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema2))
properties1, "topic1", formats1, Optional.of(timestamp1), schema2, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(true)))
.testEquals();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM I1 (ID INTEGER KEY, V0 INTEGER, V1 INTEGER) WITH (KAFKA_TOPIC='i1', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "I1",
"schema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"topicName" : "i1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE I2 (ID INTEGER PRIMARY KEY, V0 INTEGER, V1 INTEGER) WITH (KAFKA_TOPIC='i2', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "I2",
"schema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"topicName" : "i2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM I1 I1\nINNER JOIN I2 I2 ON ((AS_VALUE(I1.ID) = I2.ID))\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`I2_ID` INTEGER KEY, `I1_ID` INTEGER, `I1_V0` INTEGER, `I1_V1` INTEGER, `I2_V0` INTEGER, `I2_V1` INTEGER",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "I1", "I2" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamTableJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "INNER",
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "LeftSourceKeyed"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "i1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER"
},
"keyExpression" : "AS_VALUE(ID)"
},
"keyColumnNames" : [ "I1_KSQL_COL_0" ],
"selectExpressions" : [ "V0 AS I1_V0", "V1 AS I1_V1", "ROWTIME AS I1_ROWTIME", "ID AS I1_ID", "KSQL_COL_0 AS I1_KSQL_COL_0" ]
},
"rightSource" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "tableSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "i2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"forceChangelog" : true
},
"keyColumnNames" : [ "I2_ID" ],
"selectExpressions" : [ "V0 AS I2_V0", "V1 AS I2_V1", "ROWTIME AS I2_ROWTIME", "ID AS I2_ID" ]
},
"keyColName" : "I2_ID"
},
"keyColumnNames" : [ "I2_ID" ],
"selectExpressions" : [ "I1_ID AS I1_ID", "I1_V0 AS I1_V0", "I1_V1 AS I1_V1", "I2_V0 AS I2_V0", "I2_V1 AS I2_V1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Loading

0 comments on commit ef8fa4f

Please sign in to comment.