Skip to content

Commit

Permalink
chore: add support for windowed SR-key joins (#6661)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Nov 24, 2020
1 parent 599c4ee commit a54c114
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,23 +323,16 @@ public SchemaKStream<K> selectKey(
final boolean keyFormatChange = forceInternalKeyFormat.isPresent()
&& !forceInternalKeyFormat.get().equals(keyFormat.getFormatInfo());

if (!keyFormatChange
&& !forceRepartition
&& repartitionNotNeeded(ImmutableList.of(keyExpression))
) {
final boolean repartitionNeeded = repartitionNeeded(ImmutableList.of(keyExpression));
if (!keyFormatChange && !forceRepartition && !repartitionNeeded) {
return this;
}

if (keyFormat.isWindowed()) {
final String errorMsg = "Implicit repartitioning of windowed sources is not supported. "
+ "See https://github.com/confluentinc/ksql/issues/4385.";
final String additionalMsg = forceRepartition
? " As a result, ksqlDB does not support joins on windowed sources with "
+ "Schema-Registry-enabled key formats (AVRO, JSON_SR, PROTOBUF) at this time. "
+ "Please repartition your sources to use a different key format before performing "
+ "the join."
: "";
throw new KsqlException(errorMsg + additionalMsg);
if ((repartitionNeeded || !forceRepartition) && keyFormat.isWindowed()) {
throw new KsqlException(
"Implicit repartitioning of windowed sources is not supported. "
+ "See https://github.com/confluentinc/ksql/issues/4385."
);
}

final ExecutionStep<KStreamHolder<K>> step = ExecutionStepFactory
Expand All @@ -361,16 +354,16 @@ && repartitionNotNeeded(ImmutableList.of(keyExpression))
);
}

boolean repartitionNotNeeded(final List<Expression> expressions) {
return !Repartitioning.repartitionNeeded(schema, expressions);
boolean repartitionNeeded(final List<Expression> expressions) {
return Repartitioning.repartitionNeeded(schema, expressions);
}

public SchemaKGroupedStream groupBy(
final FormatInfo valueFormat,
final List<Expression> groupByExpressions,
final Stacker contextStacker
) {
if (repartitionNotNeeded(groupByExpressions)) {
if (!repartitionNeeded(groupByExpressions)) {
return groupByKey(keyFormat.getFormatInfo(), valueFormat, contextStacker);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public SchemaKTable<K> selectKey(
final Stacker contextStacker,
final boolean forceRepartition
) {
if (!forceRepartition && repartitionNotNeeded(ImmutableList.of(keyExpression))) {
final boolean repartitionNeeded = repartitionNeeded(ImmutableList.of(keyExpression));
if (!forceRepartition && !repartitionNeeded) {
return this;
}

Expand All @@ -163,7 +164,9 @@ public SchemaKTable<K> selectKey(
}

// Table repartitioning is only supported for internal use in enabling joins
if (!forceRepartition) {
// where we know that the key will be semantically equivalent, but may be serialized
// differently (thus ensuring all keys are routed to the same partitions)
if (!forceRepartition || repartitionNeeded) {
throw new UnsupportedOperationException("Cannot repartition a TABLE source. "
+ "If this is a join, make sure that the criteria uses the TABLE's key column "
+ Iterables.getOnlyElement(schema.key()).name().text() + " instead of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.structured;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -82,6 +83,8 @@ public class SchemaKStreamTest {
.getNewMetaStore(new InternalFunctionRegistry());
private final KeyFormat keyFormat = KeyFormat
.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of());
private final KeyFormat windowedKeyFormat = KeyFormat
.windowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of(), WindowInfo.of(WindowType.SESSION, Optional.empty()));
private final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()),
SerdeFeatures.of());
private final QueryContext.Stacker queryContext
Expand Down Expand Up @@ -154,6 +157,28 @@ public void shouldNotRepartitionIfSameKeyField() {
assertThat(result, is(initialSchemaKStream));
}

@Test
public void shouldFailIfForceRepartitionWindowedStream() {
// Given:
givenInitialKStreamOf(
"SELECT col0, col2, col3 FROM test1 PARTITION BY col0 EMIT CHANGES;",
windowedKeyFormat);

// When:
final KsqlException e = assertThrows(KsqlException.class, () -> initialSchemaKStream
.selectKey(
valueFormat.getFormatInfo(),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")),
Optional.empty(),
childContextStacker,
true)
);

// Then:
assertThat(e.getMessage(), containsString(
"Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385."));
}

@Test
public void shouldRepartitionIfForced() {
// Given:
Expand Down Expand Up @@ -541,6 +566,10 @@ private SchemaKStream buildSchemaKStreamForJoin(final KsqlStream ksqlStream) {
}

private PlanNode givenInitialKStreamOf(final String selectQuery) {
return givenInitialKStreamOf(selectQuery, keyFormat);
}

private PlanNode givenInitialKStreamOf(final String selectQuery, final KeyFormat keyFormat) {
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
ksqlConfig,
selectQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.Pair;
import java.util.Arrays;
Expand Down Expand Up @@ -301,7 +302,7 @@ public void shouldBuildSchemaForSelectKey() {
// When:
final SchemaKTable<?> resultSchemaKTable = initialSchemaKTable.selectKey(
valueFormat.getFormatInfo(),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL0")),
Optional.empty(),
childContextStacker,
true
Expand All @@ -325,7 +326,7 @@ public void shouldBuildStepForSelectKey() {
// When:
final SchemaKTable<?> resultSchemaKTable = initialSchemaKTable.selectKey(
valueFormat.getFormatInfo(),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL0")),
Optional.empty(),
childContextStacker,
true
Expand All @@ -339,12 +340,34 @@ public void shouldBuildStepForSelectKey() {
childContextStacker,
initialSchemaKTable.getSourceTableStep(),
InternalFormats.of(keyFormat.getFormatInfo(), valueFormat.getFormatInfo()),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1"))
new UnqualifiedColumnReferenceExp(ColumnName.of("COL0"))
)
)
);
}

@Test
public void shouldFailSelectKeyForceRepartitionOnNonKeyColumn() {
// Given:
final String selectQuery = "SELECT col0, col2, col3 FROM test2 WHERE col0 > 100 EMIT CHANGES;";
final PlanNode logicalPlan = buildLogicalPlan(selectQuery);
initialSchemaKTable = buildSchemaKTableFromPlan(logicalPlan);

// When:
final UnsupportedOperationException e = assertThrows(
UnsupportedOperationException.class,
() -> initialSchemaKTable.selectKey(
valueFormat.getFormatInfo(),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")),
Optional.empty(),
childContextStacker,
true
));

// Then:
assertThat(e.getMessage(), containsString("Cannot repartition a TABLE source."));
}

@Test
public void shouldFailSelectKeyIfNotForced() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2972,18 +2972,159 @@
}
},
{
"name": "windowed - SR-enabled key format",
"name": "matching session-windowed - SR-enabled key format",
"comments": [
"Note: the first record on the right topic intersects with the session on the right side, but no row is output as keys must",
"be an EXACT BINARY match"
],
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ID INT KEY, V bigint) WITH (kafka_topic='right_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM OUTPUT as SELECT S1.ID, S1.V, S2.V FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ID = S2.ID;"
],
"inputs": [
{"topic": "left_topic", "key": 1, "value": {"V": 1}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}},
{"topic": "right_topic", "key": 1, "value": {"V": 2}, "timestamp": 567, "window": {"start": 234, "end": 567, "type": "session"}},
{"topic": "right_topic", "key": 1, "value": {"V": 3}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"S1_V": 1, "S2_V": 3}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "JSON_SR", "windowType": "SESSION"},
"schema": "S1_ID INT KEY, S1_V BIGINT, S2_V BIGINT"
}
],
"topics" : {
"topics" : [
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-left-repartition",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "SESSION"}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "SESSION"}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000016-store-changelog",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "SESSION"}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000017-store-changelog",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "SESSION"}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
}
]
}
}
},
{
"name": "matching time-windowed - SR-enabled key format",
"properties": {
"ksql.key.format.enabled": true
},
"comments": [
"Note: the two streams use a different window size. However, only the start of the window is serialized, so its possible to get a matching binary key",
"This may meet users requirements, hence KSQL allows such joins",
"Note: the key format is currently taken from the left source."
],
"statements": [
"CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='s1', key_format='AVRO', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='2 SECOND');",
"CREATE STREAM S2 (ID INT KEY, V bigint) WITH (kafka_topic='s2', key_format='AVRO', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');",
"CREATE STREAM OUTPUT AS SELECT S1.ID, S2.V FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ID = S2.ID;"
"CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');",
"CREATE STREAM S2 (ID INT KEY, V bigint) WITH (kafka_topic='right_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');",
"CREATE STREAM OUTPUT as SELECT *, S1.ROWTIME, S2.ROWTIME FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ID = S2.ID;"
],
"inputs": [
{"topic": "left_topic", "key": 1, "value": {"V": 1}, "timestamp": 0, "window": {"start": 0, "end": 5000, "type": "time"}},
{"topic": "left_topic", "key": 1, "value": {"V": 2}, "timestamp": 1000, "window": {"start": 1000, "end": 6000, "type": "time"}},
{"topic": "left_topic", "key": 1, "value": {"V": 3}, "timestamp": 2000, "window": {"start": 2000, "end": 7000, "type": "time"}},
{"topic": "right_topic", "key": 1, "value": {"V": 4}, "timestamp": 0, "window": {"start": 0, "end": 2000, "type": "time"}},
{"topic": "right_topic", "key": 1, "value": {"V": 5}, "timestamp": 2000, "window": {"start": 2000, "end": 4000, "type": "time"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"S1_ROWTIME": 0, "S1_WINDOWSTART": 0, "S1_WINDOWEND": 5000, "S1_V": 1, "S2_ROWTIME": 0, "S2_WINDOWSTART": 0, "S2_WINDOWEND": 2000, "S2_ID": 1, "S2_V": 4}, "timestamp": 0, "window": {"start": 0, "end":5000, "type": "time"}},
{"topic": "OUTPUT", "key": 1, "value": {"S1_ROWTIME": 2000, "S1_WINDOWSTART": 2000, "S1_WINDOWEND": 7000, "S1_V": 3, "S2_ROWTIME": 2000, "S2_WINDOWSTART": 2000, "S2_WINDOWEND": 4000, "S2_ID": 1, "S2_V": 5}, "timestamp": 2000, "window": {"start": 2000, "end":7000, "type": "time"}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "JSON_SR", "windowType": "HOPPING", "windowSize": 5000},
"schema": "S1_ID INT KEY, `S1_WINDOWSTART` BIGINT, `S1_WINDOWEND` BIGINT, `S1_V` BIGINT, S2_ID INTEGER, `S2_WINDOWSTART` BIGINT, `S2_WINDOWEND` BIGINT, `S2_V` BIGINT, S1_ROWTIME BIGINT, S2_ROWTIME BIGINT"
}
],
"topics" : {
"topics" : [
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-left-repartition",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "HOPPING", "size": 5}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "HOPPING", "size": 5}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000016-store-changelog",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "HOPPING", "size": 5}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
},
{
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000017-store-changelog",
"keyFormat" : {"format" : "JSON_SR", "features": ["UNWRAP_SINGLES"], "windowInfo": {"type": "HOPPING", "size": 5}},
"keySchema" : {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]},
"valueFormat" : {"format" : "JSON"}
}
]
}
}
},
{
"name": "matching time-windowed - SR-enabled key format - fails if join on non-key",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM S1 (ID INT KEY, V int) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');",
"CREATE STREAM S2 (ID INT KEY, V int) WITH (kafka_topic='right_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');",
"CREATE STREAM OUTPUT as SELECT *, S1.ROWTIME, S2.ROWTIME FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ID = S2.V;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385."
}
},
{
"name": "stream-table key-to-key with necessary repartition - SR-enabled key format",
"statements": [
"CREATE TABLE T (ID INT PRIMARY KEY, VAL INT) WITH (kafka_topic='t', key_format='AVRO', value_format='JSON');",
"CREATE STREAM S (ID INT KEY, FOO INT) WITH (kafka_topic='s', key_format='AVRO', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT S.ID, VAL FROM S JOIN T ON S.ID = T.VAL;"
],
"properties": {
"ksql.key.format.enabled": true
},
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385. As a result, ksqlDB does not support joins on windowed sources with Schema-Registry-enabled key formats (AVRO, JSON_SR, PROTOBUF) at this time. Please repartition your sources to use a different key format before performing the join."
"message": "Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column ID instead of VAL"
}
}
]
Expand Down

0 comments on commit a54c114

Please sign in to comment.