From 5ed68718d934caf2dc3462343ec1bcb55847e416 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 4 Mar 2020 11:03:16 -0800 Subject: [PATCH 1/6] feat: support for tunable retention, grace period for windowed tables - Fixes #4157, adding SQL syntax for specifying retention, grace period - Added functional tests feat: initial commit for tunable retention feat: hacky test now works --- .../ks/KsMaterializationFunctionalTest.java | 190 ++++++++++++++++-- .../windows/HoppingWindowExpression.java | 14 +- .../windows/KsqlWindowExpression.java | 18 +- .../windows/SessionWindowExpression.java | 9 +- .../windows/TumblingWindowExpression.java | 8 +- .../io/confluent/ksql/parser/SqlBase.g4 | 17 +- .../io/confluent/ksql/parser/AstBuilder.java | 39 +++- .../ksql/parser/ExpressionParserTest.java | 4 +- .../tree/HoppingWindowExpressionTest.java | 4 +- .../streams/AggregateBuilderUtils.java | 3 +- .../streams/MaterializedFactory.java | 39 ++-- .../ksql/execution/streams/SourceBuilder.java | 6 +- .../streams/StreamAggregateBuilder.java | 38 ++-- .../streams/MaterializedFactoryTest.java | 7 +- .../execution/streams/SourceBuilderTest.java | 4 +- .../streams/StreamAggregateBuilderTest.java | 14 +- .../streams/TableAggregateBuilderTest.java | 6 +- 17 files changed, 339 insertions(+), 81 deletions(-) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java index 145d80dec95f..3640cedda087 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.serde.FormatFactory.JSON; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -44,17 +45,24 @@ import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.test.util.KsqlIdentifierTestUtil; +import io.confluent.ksql.util.PageViewDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.UserDataProvider; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import java.util.stream.Stream; import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Struct; @@ -80,18 +88,26 @@ public class KsMaterializationFunctionalTest { private static final String USER_TABLE = "users_table"; private static final String USER_STREAM = "users_stream"; + private static final String PAGEVIEWS_TOPIC = "pageviews_topic"; + private static final String PAGEVIEWS_STREAM = "pageviews_stream"; + private static final Format VALUE_FORMAT = JSON; private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider(); + private static final PageViewDataProvider PAGE_VIEW_DATA_PROVIDER = new PageViewDataProvider(); private static final Duration WINDOW_SIZE = Duration.ofSeconds(5); + private static final Duration WINDOW_SEGMENT_DURATION = Duration.ofSeconds(60); + private static final int NUM_WINDOWS = 4; + private static final List WINDOW_START_INSTANTS = LongStream.range(1, NUM_WINDOWS + 1) + // records have to be apart by at-least a segment for retention to enforced + .mapToObj(i -> Instant.ofEpochMilli(i * WINDOW_SEGMENT_DURATION.toMillis())) + .collect(Collectors.toList()); private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); - private static final Deserializer> TIME_WINDOWED_DESERIALIZER = WindowedSerdes .timeWindowedSerdeFrom(String.class, WINDOW_SIZE.toMillis()) .deserializer(); - private static final Deserializer> SESSION_WINDOWED_DESERIALIZER = WindowedSerdes .sessionWindowedSerdeFrom(String.class) @@ -120,13 +136,22 @@ public class KsMaterializationFunctionalTest { @BeforeClass public static void classSetUp() { - TEST_HARNESS.ensureTopics(USERS_TOPIC); + TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGEVIEWS_TOPIC); TEST_HARNESS.produceRows( USERS_TOPIC, USER_DATA_PROVIDER, VALUE_FORMAT ); + + for (final Instant windowTime : WINDOW_START_INSTANTS) { + TEST_HARNESS.produceRows( + PAGEVIEWS_TOPIC, + PAGE_VIEW_DATA_PROVIDER, + VALUE_FORMAT, + windowTime::toEpochMilli + ); + } } @Before @@ -205,7 +230,7 @@ public void shouldQueryMaterializedTableForAggregatedTable() { final LogicalSchema schema = schema("KSQL_COL_0", SqlTypes.BIGINT); - final Map rows = waitForTableRows(STRING_DESERIALIZER, schema); + final Map rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -239,7 +264,7 @@ public void shouldQueryMaterializedTableForAggregatedStream() { final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); - final Map rows = waitForTableRows(STRING_DESERIALIZER, schema); + final Map rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -275,7 +300,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() { final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); final Map, GenericRow> rows = - waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema); + waitForUniqueUserRows(TIME_WINDOWED_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -321,7 +346,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() { final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); final Map, GenericRow> rows = - waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema); + waitForUniqueUserRows(TIME_WINDOWED_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -365,7 +390,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() { final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); final Map, GenericRow> rows = - waitForTableRows(SESSION_WINDOWED_DESERIALIZER, schema); + waitForUniqueUserRows(SESSION_WINDOWED_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -396,6 +421,103 @@ public void shouldQueryMaterializedTableForSessionWindowed() { }); } + @Test(expected = IllegalArgumentException.class) + public void shouldFailQueryWithRetentionSmallerThanGracePeriod() { + // Given: + executeQuery("CREATE TABLE " + output + " AS" + + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + + " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS)" + + " GROUP BY PAGEID;" + ); + } + + @Test + public void shouldQueryTumblingWindowMaterializedTableWithRetention() { + // Given: + final PersistentQueryMetadata query = executeQuery( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + + " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS," + + " GRACE PERIOD 0 SECONDS)" + + " GROUP BY PAGEID;" + ); + + final List, GenericRow>> rows = + waitForPageViewRows(TIME_WINDOWED_DESERIALIZER, query.getPhysicalSchema()); + + // When: + final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); + + // Then: + assertThat(materialization.windowType(), is(Optional.of(WindowType.TUMBLING))); + final MaterializedWindowedTable table = materialization.windowed(); + final Set> expectedWindows = Stream.of( + Window.of(WINDOW_START_INSTANTS.get(1), WINDOW_START_INSTANTS.get(1).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())), + Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())), + Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())) + ).map(Optional::of).collect(Collectors.toSet()); + verifyRetainedWindows(rows, table, query, expectedWindows); + } + + @Test + public void shouldQueryHoppingWindowMaterializedTableWithRetention() { + // Given: + final PersistentQueryMetadata query = executeQuery( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " WINDOW HOPPING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + + " ADVANCE BY " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS, " + + " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + + " GRACE PERIOD 0 SECONDS" + + ") GROUP BY PAGEID;" + ); + + final List, GenericRow>> rows = + waitForPageViewRows(TIME_WINDOWED_DESERIALIZER, query.getPhysicalSchema()); + + // When: + final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); + + // Then: + assertThat(materialization.windowType(), is(Optional.of(WindowType.HOPPING))); + final MaterializedWindowedTable table = materialization.windowed(); + final Set> expectedWindows = Stream.of( + Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())), + Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())) + ).map(Optional::of).collect(Collectors.toSet()); + verifyRetainedWindows(rows, table, query, expectedWindows); + } + + @Test + public void shouldQuerySessionWindowMaterializedTableWithRetention() { + // Given: + final PersistentQueryMetadata query = executeQuery( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " WINDOW SESSION (" + WINDOW_SEGMENT_DURATION.getSeconds()/2 + " SECONDS," + + " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + + " GRACE PERIOD 0 SECONDS" + + ") GROUP BY USERID;" + ); + + final List, GenericRow>> rows = + waitForPageViewRows(SESSION_WINDOWED_DESERIALIZER, query.getPhysicalSchema()); + + // When: + final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); + + // Then: + assertThat(materialization.windowType(), is(Optional.of(WindowType.SESSION))); + final MaterializedWindowedTable table = materialization.windowed(); + final Set> expectedWindows = Stream.of( + Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2)), + Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3)) + ).map(Optional::of).collect(Collectors.toSet()); + verifyRetainedWindows(rows, table, query, expectedWindows); + } + @Test public void shouldQueryMaterializedTableWithKeyFieldsInProjection() { // Given: @@ -405,14 +527,13 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() { + " GROUP BY USERID;" ); - final LogicalSchema schema = schema( "USERID", SqlTypes.STRING, "KSQL_COL_1", SqlTypes.BIGINT, "USERID_2", SqlTypes.STRING ); - final Map rows = waitForTableRows(STRING_DESERIALIZER, schema); + final Map rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); // When: @@ -434,7 +555,7 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() { } @Test - public void shouldQueryMaterializedTableWitMultipleAggregationColumns() { + public void shouldQueryMaterializedTableWithMultipleAggregationColumns() { // Given: final PersistentQueryMetadata query = executeQuery( "CREATE TABLE " + output + " AS" @@ -447,7 +568,7 @@ public void shouldQueryMaterializedTableWitMultipleAggregationColumns() { "SUM", SqlTypes.BIGINT ); - final Map rows = waitForTableRows(STRING_DESERIALIZER, schema); + final Map rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -481,7 +602,7 @@ public void shouldIgnoreHavingClause() { final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); - final Map rows = waitForTableRows(STRING_DESERIALIZER, schema); + final Map rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); // When: final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); @@ -502,7 +623,26 @@ public void shouldIgnoreHavingClause() { }); } - private Map waitForTableRows( + private void verifyRetainedWindows( + final List, GenericRow>> rows, + final MaterializedWindowedTable table, + final PersistentQueryMetadata query, + final Set> expectedWindows + ) { + rows.forEach(record -> { + final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema()); + final List resultAtWindowStart = table.get(key, Range.all()); + assertThat("Should have fewer windows retained", + resultAtWindowStart, + hasSize(expectedWindows.size())); + final Set> actualWindows = resultAtWindowStart.stream() + .map(WindowedRow::window) + .collect(Collectors.toSet()); + assertThat("Should retain the latest windows", actualWindows, equalTo(expectedWindows)); + }); + } + + private Map waitForUniqueUserRows( final Deserializer keyDeserializer, final LogicalSchema aggregateSchema ) { @@ -515,6 +655,18 @@ private Map waitForTableRows( ); } + private List> waitForPageViewRows( + final Deserializer keyDeserializer, + final PhysicalSchema aggregateSchema) { + return TEST_HARNESS.verifyAvailableRows( + output.toUpperCase(), + hasSize(PAGE_VIEW_DATA_PROVIDER.data().size() * NUM_WINDOWS), + VALUE_FORMAT, + aggregateSchema, + keyDeserializer + ); + } + private PersistentQueryMetadata executeQuery(final String statement) { return executeQuery(ksqlContext, statement); } @@ -601,6 +753,16 @@ private static void initializeKsql(final TestKsqlContext ksqlContext) { + " key = '" + USER_DATA_PROVIDER.key() + "'" + ");" ); + + ksqlContext.sql("CREATE STREAM " + PAGEVIEWS_STREAM + " " + + " (" + PAGE_VIEW_DATA_PROVIDER.ksqlSchemaString() + ")" + + " WITH (" + + " kafka_topic='" + PAGEVIEWS_TOPIC + "', " + + " value_format='" + VALUE_FORMAT.name() + "', " + + " key = '" + PAGE_VIEW_DATA_PROVIDER.key() + "'" + + ");" + ); + } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java index 01f4ce062884..6bfb6a0f947e 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java @@ -40,7 +40,13 @@ public HoppingWindowExpression( final long advanceBy, final TimeUnit advanceByUnit ) { - this(Optional.empty(), size, sizeUnit, advanceBy, advanceByUnit); + this(Optional.empty(), + size, + sizeUnit, + advanceBy, + advanceByUnit, + Optional.empty(), + Optional.empty()); } public HoppingWindowExpression( @@ -48,9 +54,11 @@ public HoppingWindowExpression( final long size, final TimeUnit sizeUnit, final long advanceBy, - final TimeUnit advanceByUnit + final TimeUnit advanceByUnit, + final Optional retention, + final Optional gracePeriod ) { - super(location); + super(location, retention, gracePeriod); this.size = size; this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); this.advanceBy = advanceBy; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java index f5de6ccee405..d320349638aa 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java @@ -19,13 +19,29 @@ import io.confluent.ksql.parser.Node; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; +import java.time.Duration; import java.util.Optional; @Immutable public abstract class KsqlWindowExpression extends Node { - KsqlWindowExpression(final Optional nodeLocation) { + protected final Optional retention; + protected final Optional gracePeriod; + + KsqlWindowExpression(final Optional nodeLocation, + final Optional retention, + final Optional gracePeriod) { super(nodeLocation); + this.retention = retention; + this.gracePeriod = gracePeriod; + } + + public Optional getRetention() { + return retention; + } + + public Optional getGracePeriod() { + return gracePeriod; } public abstract WindowInfo getWindowInfo(); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java index ed22f3f4d06e..13182526584b 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java @@ -21,6 +21,7 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; +import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -32,15 +33,17 @@ public class SessionWindowExpression extends KsqlWindowExpression { private final TimeUnit sizeUnit; public SessionWindowExpression(final long gap, final TimeUnit sizeUnit) { - this(Optional.empty(), gap, sizeUnit); + this(Optional.empty(), gap, sizeUnit, Optional.empty(), Optional.empty()); } public SessionWindowExpression( final Optional location, final long gap, - final TimeUnit sizeUnit + final TimeUnit sizeUnit, + final Optional retention, + final Optional gracePeriod ) { - super(location); + super(location, retention, gracePeriod); this.gap = gap; this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java index 45b5602dcad2..75c12c337524 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java @@ -33,15 +33,17 @@ public class TumblingWindowExpression extends KsqlWindowExpression { private final TimeUnit sizeUnit; public TumblingWindowExpression(final long size, final TimeUnit sizeUnit) { - this(Optional.empty(), size, sizeUnit); + this(Optional.empty(), size, sizeUnit, Optional.empty(), Optional.empty()); } public TumblingWindowExpression( final Optional location, final long size, - final TimeUnit sizeUnit + final TimeUnit sizeUnit, + final Optional retention, + final Optional gracePeriod ) { - super(location); + super(location, retention, gracePeriod); this.size = size; this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); } diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 049de85a48a3..6ea9921d18e2 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -114,21 +114,29 @@ limitClause : LIMIT number ; +retentionClause + : ',' RETENTION number windowUnit + ; + +gracePeriodClause + : ',' GRACE PERIOD number windowUnit + ; + windowExpression : (IDENTIFIER)? ( tumblingWindowExpression | hoppingWindowExpression | sessionWindowExpression ) ; tumblingWindowExpression - : TUMBLING '(' SIZE number windowUnit')' + : TUMBLING '(' SIZE number windowUnit (retentionClause)? (gracePeriodClause)?')' ; hoppingWindowExpression - : HOPPING '(' SIZE number windowUnit ',' ADVANCE BY number windowUnit ')' + : HOPPING '(' SIZE number windowUnit ',' ADVANCE BY number windowUnit (retentionClause)? (gracePeriodClause)?')' ; sessionWindowExpression - : SESSION '(' number windowUnit ')' + : SESSION '(' number windowUnit (retentionClause)? (gracePeriodClause)?')' ; windowUnit @@ -387,6 +395,9 @@ TUMBLING: 'TUMBLING'; HOPPING: 'HOPPING'; SIZE: 'SIZE'; ADVANCE: 'ADVANCE'; +RETENTION: 'RETENTION'; +GRACE: 'GRACE'; +PERIOD: 'PERIOD'; CASE: 'CASE'; WHEN: 'WHEN'; THEN: 'THEN'; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index adaf6f4dbd84..d86a91a75bfd 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -69,16 +69,20 @@ import io.confluent.ksql.parser.SqlBaseParser.DropTypeContext; import io.confluent.ksql.parser.SqlBaseParser.ExpressionContext; import io.confluent.ksql.parser.SqlBaseParser.FloatLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.GracePeriodClauseContext; import io.confluent.ksql.parser.SqlBaseParser.IdentifierContext; import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext; import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext; import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext; import io.confluent.ksql.parser.SqlBaseParser.ListTypesContext; +import io.confluent.ksql.parser.SqlBaseParser.NumberContext; import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext; +import io.confluent.ksql.parser.SqlBaseParser.RetentionClauseContext; import io.confluent.ksql.parser.SqlBaseParser.SourceNameContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext; +import io.confluent.ksql.parser.SqlBaseParser.WindowUnitContext; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.AliasedRelation; @@ -139,6 +143,7 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ParserUtil; import java.math.BigDecimal; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -450,6 +455,27 @@ public Node visitWindowExpression(final SqlBaseParser.WindowExpressionContext ct throw new KsqlException("Window description is not correct."); } + private static Duration getDuration(final NumberContext number, + final WindowUnitContext unitCtx) { + final TimeUnit retainUnit = WindowExpression.getWindowUnit(unitCtx.getText().toUpperCase()); + if (retainUnit == null) { + throw new KsqlException("Units is not correct"); + } + return Duration.ofMillis(retainUnit.toMillis(Long.parseLong(number.getText()))); + } + + private static Optional gracePeriodDuration(final GracePeriodClauseContext graceCtx) { + return graceCtx != null + ? Optional.of(getDuration(graceCtx.number(), graceCtx.windowUnit())) + : Optional.empty(); + } + + private static Optional retentionDuration(final RetentionClauseContext retentionCtx) { + return retentionCtx != null + ? Optional.of(getDuration(retentionCtx.number(), retentionCtx.windowUnit())) + : Optional.empty(); + } + @Override public Node visitHoppingWindowExpression( final SqlBaseParser.HoppingWindowExpressionContext ctx) { @@ -461,12 +487,15 @@ public Node visitHoppingWindowExpression( final String sizeUnit = windowUnits.get(0).getText(); final String advanceByUnit = windowUnits.get(1).getText(); + return new HoppingWindowExpression( getLocation(ctx), Long.parseLong(sizeStr), WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), Long.parseLong(advanceByStr), - WindowExpression.getWindowUnit(advanceByUnit.toUpperCase()) + WindowExpression.getWindowUnit(advanceByUnit.toUpperCase()), + retentionDuration(ctx.retentionClause()), + gracePeriodDuration(ctx.gracePeriodClause()) ); } @@ -479,7 +508,9 @@ public Node visitTumblingWindowExpression( return new TumblingWindowExpression( getLocation(ctx), Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()) + WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), + retentionDuration(ctx.retentionClause()), + gracePeriodDuration(ctx.gracePeriodClause()) ); } @@ -492,7 +523,9 @@ public Node visitSessionWindowExpression( return new SessionWindowExpression( getLocation(ctx), Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()) + WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), + retentionDuration(ctx.retentionClause()), + gracePeriodDuration(ctx.gracePeriodClause()) ); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java index 25d1d3ba7ca0..8f8d319a7d4f 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.windows.TumblingWindowExpression; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.Operator; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; @@ -100,7 +101,8 @@ public void shouldParseWindowExpression() { // Then: assertThat( parsed, - equalTo(new TumblingWindowExpression(parsed.getLocation(), 1, TimeUnit.DAYS)) + equalTo(new TumblingWindowExpression(parsed.getLocation(), 1, TimeUnit.DAYS, + Optional.empty(), Optional.empty())) ); } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java index 60cbe69ca945..15c525aa481c 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java @@ -46,8 +46,8 @@ public void shouldImplementHashCodeAndEqualsProperty() { // Note: At the moment location does not take part in equality testing new HoppingWindowExpression(10, SECONDS, 20, MINUTES), new HoppingWindowExpression(10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(Optional.of(SOME_LOCATION), 10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(Optional.of(OTHER_LOCATION), 10, SECONDS, 20, MINUTES) + new HoppingWindowExpression(Optional.of(SOME_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()), + new HoppingWindowExpression(Optional.of(OTHER_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()) ) .addEqualityGroup( new HoppingWindowExpression(30, SECONDS, 20, MINUTES) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java index 3f39217f632e..c84ac43aae47 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.data.Struct; @@ -80,7 +81,7 @@ static Materialized> buildMater buildValueSerde(formats, queryBuilder, physicalAggregationSchema, queryContext); return materializedFactory - .create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)); + .create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), Optional.empty()); } static MaterializationInfo.Builder materializationInfoBuilder( diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java index 35eeb835176a..681728487dd1 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java @@ -16,28 +16,32 @@ package io.confluent.ksql.execution.streams; import io.confluent.ksql.GenericRow; +import java.time.Duration; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; public interface MaterializedFactory { - Materialized create( - Serde keySerde, Serde valSerde, String name); + Materialized create(Serde keySerde, + Serde valSerde, + String name, + Optional retention); static MaterializedFactory create() { return create( new Materializer() { @Override - public Materialized materializedWith( - final Serde keySerde, - final Serde valueSerde) { - return Materialized.with(keySerde, valueSerde); - } - - @Override + @SuppressWarnings("unchecked") public Materialized materializedAs( - final String storeName) { - return Materialized.as(storeName); + final String storeName, + final Optional retention) { + if (retention.isPresent()) { + return (Materialized) Materialized.as(storeName) + .withRetention(retention.get()); + } else { + return Materialized.as(storeName); + } } } ); @@ -49,8 +53,9 @@ static MaterializedFactory create(final Materializer materializer) { public Materialized create( final Serde keySerde, final Serde valSerde, - final String name) { - return materializer.materializedAs(name) + final String name, + final Optional retention) { + return materializer.materializedAs(name, retention) .withKeySerde(keySerde) .withValueSerde(valSerde); } @@ -58,10 +63,8 @@ public Materialized create( } interface Materializer { - Materialized materializedWith( - Serde keySerde, - Serde valueSerde); - - Materialized materializedAs(String storeName); + Materialized materializedAs( + String storeName, + Optional retention); } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index d830842be011..0687156d8dcc 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -180,7 +180,8 @@ public static KTableHolder buildTable( materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()) + tableChangeLogOpName(source.getProperties()), + Optional.empty() ); final KTable ktable = buildKTable( @@ -229,7 +230,8 @@ static KTableHolder> buildWindowedTable( materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()) + tableChangeLogOpName(source.getProperties()), + Optional.empty() ); final KTable, GenericRow> ktable = buildKTable( diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index b86f01f2a8a0..ca442f104e11 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -257,19 +257,22 @@ private static class WindowedAggregator public KTable, GenericRow> visitHoppingWindowExpression( final HoppingWindowExpression window, final Void ctx) { - final TimeWindows windows = TimeWindows + TimeWindows windows = TimeWindows .of(Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))) - .advanceBy( - Duration.ofMillis(window.getAdvanceByUnit().toMillis(window.getAdvanceBy())) - ); + .advanceBy(Duration.ofMillis(window.getAdvanceByUnit().toMillis(window.getAdvanceBy()))); + windows = window.getGracePeriod().isPresent() + ? windows.grace(window.getGracePeriod().get()) + : windows; return groupedStream .windowedBy(windows) .aggregate( aggregateParams.getInitializer(), aggregateParams.getAggregator(), - materializedFactory.create( - keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)) + materializedFactory.create(keySerde, + valueSerde, + StreamsUtil.buildOpName(queryContext), + window.getRetention()) ); } @@ -277,17 +280,22 @@ public KTable, GenericRow> visitHoppingWindowExpression( public KTable, GenericRow> visitSessionWindowExpression( final SessionWindowExpression window, final Void ctx) { - final SessionWindows windows = SessionWindows.with( + SessionWindows windows = SessionWindows.with( Duration.ofMillis(window.getSizeUnit().toMillis(window.getGap())) ); + windows = window.getGracePeriod().isPresent() + ? windows.grace(window.getGracePeriod().get()) + : windows; return groupedStream .windowedBy(windows) .aggregate( aggregateParams.getInitializer(), aggregateParams.getAggregator(), aggregateParams.getAggregator().getMerger(), - materializedFactory.create( - keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)) + materializedFactory.create(keySerde, + valueSerde, + StreamsUtil.buildOpName(queryContext), + window.getRetention()) ); } @@ -295,15 +303,21 @@ public KTable, GenericRow> visitSessionWindowExpression( public KTable, GenericRow> visitTumblingWindowExpression( final TumblingWindowExpression window, final Void ctx) { - final TimeWindows windows = TimeWindows.of( + TimeWindows windows = TimeWindows.of( Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))); + windows = window.getGracePeriod().isPresent() + ? windows.grace(window.getGracePeriod().get()) + : windows; + return groupedStream .windowedBy(windows) .aggregate( aggregateParams.getInitializer(), aggregateParams.getAggregator(), - materializedFactory.create( - keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)) + materializedFactory.create(keySerde, + valueSerde, + StreamsUtil.buildOpName(queryContext), + window.getRetention()) ); } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index 04a244359188..456465dbd8b3 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import io.confluent.ksql.GenericRow; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; @@ -47,7 +48,7 @@ public class MaterializedFactoryTest { public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // Given: final Materialized asName = mock(Materialized.class); - when(materializer.materializedAs(OP_NAME)).thenReturn(asName); + when(materializer.materializedAs(OP_NAME, Optional.empty())).thenReturn(asName); final Materialized withKeySerde = mock(Materialized.class); when(asName.withKeySerde(keySerde)).thenReturn(withKeySerde); final Materialized withRowSerde = mock(Materialized.class); @@ -56,11 +57,11 @@ public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // When: final Materialized returned = MaterializedFactory.create(materializer).create( - keySerde, rowSerde, OP_NAME); + keySerde, rowSerde, OP_NAME, Optional.empty()); // Then: assertThat(returned, is(withRowSerde)); - verify(materializer).materializedAs(OP_NAME); + verify(materializer).materializedAs(OP_NAME, Optional.empty()); verify(asName).withKeySerde(keySerde); verify(withKeySerde).withValueSerde(rowSerde); } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index 90c1fbcfda5a..a766b71ab6e4 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -216,7 +216,7 @@ public void setup() { when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger); when(streamsFactories.getConsumedFactory()).thenReturn(consumedFactory); when(streamsFactories.getMaterializedFactory()).thenReturn(materializationFactory); - when(materializationFactory.create(any(), any(), any())) + when(materializationFactory.create(any(), any(), any(), any())) .thenReturn((Materialized) materialized); planBuilder = new KSPlanBuilder( @@ -656,7 +656,7 @@ public void shouldBuildTableWithCorrectStoreName() { tableSource.build(planBuilder); // Then: - verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce"); + verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce", Optional.empty()); } @SuppressWarnings("unchecked") diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 80169270d97e..38408a10601c 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -222,7 +222,7 @@ public void init() { @SuppressWarnings("unchecked") private void givenUnwindowedAggregate() { - when(materializedFactory.>create(any(), any(), any())) + when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(materialized); when(groupedStream.aggregate(any(), any(), any(Materialized.class))).thenReturn(aggregated); when(aggregated.transformValues(any(), any(Named.class))) @@ -238,7 +238,7 @@ private void givenUnwindowedAggregate() { @SuppressWarnings("unchecked") private void givenTimeWindowedAggregate() { - when(materializedFactory.>create(any(), any(), any())) + when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(timeWindowMaterialized); when(groupedStream.windowedBy(any(Windows.class))).thenReturn(timeWindowedStream); when(timeWindowedStream.aggregate(any(), any(), any(Materialized.class))) @@ -280,7 +280,7 @@ private void givenHoppingWindowedAggregate() { @SuppressWarnings("unchecked") private void givenSessionWindowedAggregate() { - when(materializedFactory.>create(any(), any(), any())) + when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(sessionWindowMaterialized); when(groupedStream.windowedBy(any(SessionWindows.class))).thenReturn(sessionWindowedStream); when(sessionWindowedStream.aggregate(any(), any(), any(), any(Materialized.class))) @@ -349,7 +349,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForUnwindowedAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(same(keySerde), same(valueSerde), any()); + verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any()); } @Test @@ -361,7 +361,7 @@ public void shouldBuildMaterializedWithCorrectNameForUnwindowedAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize")); + verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any()); } @Test @@ -536,7 +536,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForWindowedAggregate() { windowedAggregate.build(planBuilder); // Then: - verify(materializedFactory).create(same(keySerde), same(valueSerde), any()); + verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any()); } } @@ -551,7 +551,7 @@ public void shouldBuildMaterializedWithCorrectNameForWindowedAggregate() { windowedAggregate.build(planBuilder); // Then: - verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize")); + verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any()); } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index f54f733485c7..0d38af1701bb 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -162,7 +162,7 @@ public void init() { when(aggregateParams.getAggregateSchema()).thenReturn(AGGREGATE_SCHEMA); when(aggregateParams.getSchema()).thenReturn(AGGREGATE_SCHEMA); when(aggregator.getResultMapper()).thenReturn(resultMapper); - when(materializedFactory.>create(any(), any(), any())) + when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(materialized); when(groupedTable.aggregate(any(), any(), any(), any(Materialized.class))).thenReturn( aggregated); @@ -218,7 +218,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(same(keySerde), same(valueSerde), any()); + verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any()); } @Test @@ -227,7 +227,7 @@ public void shouldBuildMaterializedWithCorrectNameForAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize")); + verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any()); } @Test From 724b2bb8bac81f6d0db2f63bd41871bad1bcfb8c Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Mon, 9 Mar 2020 15:56:21 -0700 Subject: [PATCH 2/6] refactor: add an overload in MaterializedFactory --- .../ks/KsMaterializationFunctionalTest.java | 20 ++++++++-------- .../io/confluent/ksql/parser/AstBuilder.java | 1 - .../streams/AggregateBuilderUtils.java | 3 +-- .../streams/MaterializedFactory.java | 24 +++++++++++++++---- .../ksql/execution/streams/SourceBuilder.java | 6 ++--- .../execution/streams/SourceBuilderTest.java | 4 ++-- .../streams/StreamAggregateBuilderTest.java | 4 ++-- .../streams/TableAggregateBuilderTest.java | 6 ++--- 8 files changed, 40 insertions(+), 28 deletions(-) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java index 3640cedda087..ff24f279a0dc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java @@ -88,8 +88,8 @@ public class KsMaterializationFunctionalTest { private static final String USER_TABLE = "users_table"; private static final String USER_STREAM = "users_stream"; - private static final String PAGEVIEWS_TOPIC = "pageviews_topic"; - private static final String PAGEVIEWS_STREAM = "pageviews_stream"; + private static final String PAGE_VIEWS_TOPIC = "page_views_topic"; + private static final String PAGE_VIEWS_STREAM = "page_views_stream"; private static final Format VALUE_FORMAT = JSON; private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider(); @@ -136,7 +136,7 @@ public class KsMaterializationFunctionalTest { @BeforeClass public static void classSetUp() { - TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGEVIEWS_TOPIC); + TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGE_VIEWS_TOPIC); TEST_HARNESS.produceRows( USERS_TOPIC, @@ -146,7 +146,7 @@ public static void classSetUp() { for (final Instant windowTime : WINDOW_START_INSTANTS) { TEST_HARNESS.produceRows( - PAGEVIEWS_TOPIC, + PAGE_VIEWS_TOPIC, PAGE_VIEW_DATA_PROVIDER, VALUE_FORMAT, windowTime::toEpochMilli @@ -425,7 +425,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() { public void shouldFailQueryWithRetentionSmallerThanGracePeriod() { // Given: executeQuery("CREATE TABLE " + output + " AS" - + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM + " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS)" + " GROUP BY PAGEID;" @@ -437,7 +437,7 @@ public void shouldQueryTumblingWindowMaterializedTableWithRetention() { // Given: final PersistentQueryMetadata query = executeQuery( "CREATE TABLE " + output + " AS" - + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM + " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS," + " GRACE PERIOD 0 SECONDS)" @@ -466,7 +466,7 @@ public void shouldQueryHoppingWindowMaterializedTableWithRetention() { // Given: final PersistentQueryMetadata query = executeQuery( "CREATE TABLE " + output + " AS" - + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM + " WINDOW HOPPING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + " ADVANCE BY " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS, " + " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," @@ -495,7 +495,7 @@ public void shouldQuerySessionWindowMaterializedTableWithRetention() { // Given: final PersistentQueryMetadata query = executeQuery( "CREATE TABLE " + output + " AS" - + " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM + + " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM + " WINDOW SESSION (" + WINDOW_SEGMENT_DURATION.getSeconds()/2 + " SECONDS," + " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS," + " GRACE PERIOD 0 SECONDS" @@ -754,10 +754,10 @@ private static void initializeKsql(final TestKsqlContext ksqlContext) { + ");" ); - ksqlContext.sql("CREATE STREAM " + PAGEVIEWS_STREAM + " " + ksqlContext.sql("CREATE STREAM " + PAGE_VIEWS_STREAM + " " + " (" + PAGE_VIEW_DATA_PROVIDER.ksqlSchemaString() + ")" + " WITH (" - + " kafka_topic='" + PAGEVIEWS_TOPIC + "', " + + " kafka_topic='" + PAGE_VIEWS_TOPIC + "', " + " value_format='" + VALUE_FORMAT.name() + "', " + " key = '" + PAGE_VIEW_DATA_PROVIDER.key() + "'" + ");" diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index d86a91a75bfd..fcaadc43df95 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -487,7 +487,6 @@ public Node visitHoppingWindowExpression( final String sizeUnit = windowUnits.get(0).getText(); final String advanceByUnit = windowUnits.get(1).getText(); - return new HoppingWindowExpression( getLocation(ctx), Long.parseLong(sizeStr), diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java index c84ac43aae47..3f39217f632e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java @@ -26,7 +26,6 @@ import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.data.Struct; @@ -81,7 +80,7 @@ static Materialized> buildMater buildValueSerde(formats, queryBuilder, physicalAggregationSchema, queryContext); return materializedFactory - .create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), Optional.empty()); + .create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)); } static MaterializationInfo.Builder materializationInfoBuilder( diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java index 681728487dd1..df960effeff8 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java @@ -23,10 +23,18 @@ import org.apache.kafka.streams.processor.StateStore; public interface MaterializedFactory { - Materialized create(Serde keySerde, - Serde valSerde, - String name, - Optional retention); + Materialized create( + Serde keySerde, + Serde valSerde, + String name, + Optional retention + ); + + Materialized create( + Serde keySerde, + Serde valSerde, + String name + ); static MaterializedFactory create() { return create( @@ -59,6 +67,14 @@ public Materialized create( .withKeySerde(keySerde) .withValueSerde(valSerde); } + + @Override + public Materialized create( + final Serde keySerde, + final Serde valSerde, + final String name) { + return create(keySerde, valSerde, name, Optional.empty()); + } }; } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index 0687156d8dcc..d830842be011 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -180,8 +180,7 @@ public static KTableHolder buildTable( materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()), - Optional.empty() + tableChangeLogOpName(source.getProperties()) ); final KTable ktable = buildKTable( @@ -230,8 +229,7 @@ static KTableHolder> buildWindowedTable( materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()), - Optional.empty() + tableChangeLogOpName(source.getProperties()) ); final KTable, GenericRow> ktable = buildKTable( diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index a766b71ab6e4..90c1fbcfda5a 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -216,7 +216,7 @@ public void setup() { when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger); when(streamsFactories.getConsumedFactory()).thenReturn(consumedFactory); when(streamsFactories.getMaterializedFactory()).thenReturn(materializationFactory); - when(materializationFactory.create(any(), any(), any(), any())) + when(materializationFactory.create(any(), any(), any())) .thenReturn((Materialized) materialized); planBuilder = new KSPlanBuilder( @@ -656,7 +656,7 @@ public void shouldBuildTableWithCorrectStoreName() { tableSource.build(planBuilder); // Then: - verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce", Optional.empty()); + verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce"); } @SuppressWarnings("unchecked") diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 38408a10601c..90f7725a7d78 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -222,7 +222,7 @@ public void init() { @SuppressWarnings("unchecked") private void givenUnwindowedAggregate() { - when(materializedFactory.>create(any(), any(), any(), any())) + when(materializedFactory.>create(any(), any(), any())) .thenReturn(materialized); when(groupedStream.aggregate(any(), any(), any(Materialized.class))).thenReturn(aggregated); when(aggregated.transformValues(any(), any(Named.class))) @@ -361,7 +361,7 @@ public void shouldBuildMaterializedWithCorrectNameForUnwindowedAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any()); + verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize")); } @Test diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index 0d38af1701bb..f54f733485c7 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -162,7 +162,7 @@ public void init() { when(aggregateParams.getAggregateSchema()).thenReturn(AGGREGATE_SCHEMA); when(aggregateParams.getSchema()).thenReturn(AGGREGATE_SCHEMA); when(aggregator.getResultMapper()).thenReturn(resultMapper); - when(materializedFactory.>create(any(), any(), any(), any())) + when(materializedFactory.>create(any(), any(), any())) .thenReturn(materialized); when(groupedTable.aggregate(any(), any(), any(), any(Materialized.class))).thenReturn( aggregated); @@ -218,7 +218,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any()); + verify(materializedFactory).create(same(keySerde), same(valueSerde), any()); } @Test @@ -227,7 +227,7 @@ public void shouldBuildMaterializedWithCorrectNameForAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any()); + verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize")); } @Test From d70e8b86fbc9afe027c09d251a3d65b8f0a80e8a Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 10 Mar 2020 10:56:07 -0700 Subject: [PATCH 3/6] refactor: introduce abstraction for passing window's time clauses --- .../execution/windows/WindowTimeClause.java | 65 +++++++++++++++++++ .../structured/SchemaKGroupedStreamTest.java | 3 +- .../windows/HoppingWindowExpression.java | 62 +++++++----------- .../windows/KsqlWindowExpression.java | 13 ++-- .../windows/SessionWindowExpression.java | 36 +++++----- .../windows/TumblingWindowExpression.java | 38 +++++------ .../io/confluent/ksql/parser/AstBuilder.java | 59 +++++++---------- .../ksql/parser/ExpressionParserTest.java | 8 ++- .../parser/json/WindowExpressionTestCase.java | 3 +- .../tree/HoppingWindowExpressionTest.java | 44 ++++++++++--- .../ksql/parser/tree/ParserModelTest.java | 4 +- .../tree/SessionWindowExpressionTest.java | 3 +- .../tree/TumblingWindowExpressionTest.java | 3 +- .../streams/StreamAggregateBuilder.java | 35 +++++----- .../streams/StepSchemaResolverTest.java | 3 +- .../streams/StreamAggregateBuilderTest.java | 13 ++-- 16 files changed, 226 insertions(+), 166 deletions(-) create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java new file mode 100644 index 000000000000..1009bf617055 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.windows; + +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.Immutable; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Pojo for a time clause added to a window expression + */ +@Immutable +public class WindowTimeClause { + + private final long value; + + private final TimeUnit unit; + + public WindowTimeClause(final long value, final TimeUnit unit) { + this.value = value; + this.unit = requireNonNull(unit, "unit"); + } + + public Duration toDuration() { + return Duration.ofMillis(unit.toMillis(value)); + } + + public String toString() { + return value + " " + unit; + } + + @Override + public int hashCode() { + return Objects.hash(value, unit); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final WindowTimeClause otherClause = (WindowTimeClause) o; + return otherClause.value == value && otherClause.unit == unit; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index eb6de0522ccb..32196024b759 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; @@ -78,7 +79,7 @@ public class SchemaKGroupedStreamTest { ); private static final KsqlWindowExpression KSQL_WINDOW_EXP = new SessionWindowExpression( - 100, TimeUnit.SECONDS + new WindowTimeClause(100, TimeUnit.SECONDS) ); private static final List NON_AGGREGATE_COLUMNS = ImmutableList.of( diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java index 6bfb6a0f947e..19f0fa6ea536 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java @@ -21,71 +21,51 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class HoppingWindowExpression extends KsqlWindowExpression { - private final long size; - private final TimeUnit sizeUnit; - private final long advanceBy; - private final TimeUnit advanceByUnit; + private final WindowTimeClause size; + private final WindowTimeClause advanceBy; public HoppingWindowExpression( - final long size, - final TimeUnit sizeUnit, - final long advanceBy, - final TimeUnit advanceByUnit + final WindowTimeClause size, + final WindowTimeClause advanceBy ) { this(Optional.empty(), size, - sizeUnit, advanceBy, - advanceByUnit, Optional.empty(), Optional.empty()); } public HoppingWindowExpression( final Optional location, - final long size, - final TimeUnit sizeUnit, - final long advanceBy, - final TimeUnit advanceByUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause size, + final WindowTimeClause advanceBy, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.size = size; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); - this.advanceBy = advanceBy; - this.advanceByUnit = requireNonNull(advanceByUnit, "advanceByUnit"); + this.size = requireNonNull(size, "size"); + this.advanceBy = requireNonNull(advanceBy, "advanceBy"); } @Override public WindowInfo getWindowInfo() { return WindowInfo.of( WindowType.HOPPING, - Optional.of(Duration.ofNanos(sizeUnit.toNanos(size))) + Optional.of(size.toDuration()) ); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getSize() { + public WindowTimeClause getSize() { return size; } - public TimeUnit getAdvanceByUnit() { - return advanceByUnit; - } - - public long getAdvanceBy() { + public WindowTimeClause getAdvanceBy() { return advanceBy; } @@ -96,13 +76,16 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " HOPPING ( SIZE " + size + " " + sizeUnit + " , ADVANCE BY " - + advanceBy + " " + "" + advanceByUnit + " ) "; + return " HOPPING ( SIZE " + size + " , " + + "ADVANCE BY " + advanceBy + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(size, sizeUnit, advanceBy, advanceByUnit); + return Objects.hash(size, advanceBy, retention, gracePeriod); } @Override @@ -114,8 +97,9 @@ public boolean equals(final Object o) { return false; } final HoppingWindowExpression hoppingWindowExpression = (HoppingWindowExpression) o; - return hoppingWindowExpression.size == size && hoppingWindowExpression.sizeUnit == sizeUnit - && hoppingWindowExpression.advanceBy == advanceBy && hoppingWindowExpression - .advanceByUnit == advanceByUnit; + return Objects.equals(size, hoppingWindowExpression.size) + && Objects.equals(advanceBy, hoppingWindowExpression.advanceBy) + && Objects.equals(gracePeriod, hoppingWindowExpression.gracePeriod) + && Objects.equals(retention, hoppingWindowExpression.retention); } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java index d320349638aa..5ac31175bc55 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java @@ -19,28 +19,27 @@ import io.confluent.ksql.parser.Node; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Optional; @Immutable public abstract class KsqlWindowExpression extends Node { - protected final Optional retention; - protected final Optional gracePeriod; + protected final Optional retention; + protected final Optional gracePeriod; KsqlWindowExpression(final Optional nodeLocation, - final Optional retention, - final Optional gracePeriod) { + final Optional retention, + final Optional gracePeriod) { super(nodeLocation); this.retention = retention; this.gracePeriod = gracePeriod; } - public Optional getRetention() { + public Optional getRetention() { return retention; } - public Optional getGracePeriod() { + public Optional getGracePeriod() { return gracePeriod; } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java index 13182526584b..a671160f0ed2 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java @@ -21,38 +21,29 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class SessionWindowExpression extends KsqlWindowExpression { - private final long gap; - private final TimeUnit sizeUnit; + private final WindowTimeClause gap; - public SessionWindowExpression(final long gap, final TimeUnit sizeUnit) { - this(Optional.empty(), gap, sizeUnit, Optional.empty(), Optional.empty()); + public SessionWindowExpression(final WindowTimeClause gap) { + this(Optional.empty(), gap, Optional.empty(), Optional.empty()); } public SessionWindowExpression( final Optional location, - final long gap, - final TimeUnit sizeUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause gap, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.gap = gap; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); + this.gap = requireNonNull(gap, "gap"); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getGap() { + public WindowTimeClause getGap() { return gap; } @@ -68,12 +59,15 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " SESSION ( " + gap + " " + sizeUnit + " ) "; + return " SESSION ( " + gap + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(gap, sizeUnit); + return Objects.hash(gap, retention, gracePeriod); } @Override @@ -85,6 +79,8 @@ public boolean equals(final Object o) { return false; } final SessionWindowExpression sessionWindowExpression = (SessionWindowExpression) o; - return sessionWindowExpression.gap == gap && sessionWindowExpression.sizeUnit == sizeUnit; + return Objects.equals(gap, sessionWindowExpression.gap) + && Objects.equals(gracePeriod, sessionWindowExpression.gracePeriod) + && Objects.equals(retention, sessionWindowExpression.retention); } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java index 75c12c337524..dfd4a4f4cc1d 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java @@ -21,46 +21,37 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class TumblingWindowExpression extends KsqlWindowExpression { - private final long size; - private final TimeUnit sizeUnit; + private final WindowTimeClause size; - public TumblingWindowExpression(final long size, final TimeUnit sizeUnit) { - this(Optional.empty(), size, sizeUnit, Optional.empty(), Optional.empty()); + public TumblingWindowExpression(final WindowTimeClause size) { + this(Optional.empty(), size, Optional.empty(), Optional.empty()); } public TumblingWindowExpression( final Optional location, - final long size, - final TimeUnit sizeUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause size, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.size = size; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); + this.size = requireNonNull(size, "size"); } @Override public WindowInfo getWindowInfo() { return WindowInfo.of( WindowType.TUMBLING, - Optional.of(Duration.ofNanos(sizeUnit.toNanos(size))) + Optional.of(size.toDuration()) ); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getSize() { + public WindowTimeClause getSize() { return size; } @@ -71,12 +62,15 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " TUMBLING ( SIZE " + size + " " + sizeUnit + " ) "; + return " TUMBLING ( SIZE " + size + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(size, sizeUnit); + return Objects.hash(size, retention, gracePeriod); } @Override @@ -88,6 +82,8 @@ public boolean equals(final Object o) { return false; } final TumblingWindowExpression tumblingWindowExpression = (TumblingWindowExpression) o; - return tumblingWindowExpression.size == size && tumblingWindowExpression.sizeUnit == sizeUnit; + return Objects.equals(size, tumblingWindowExpression.size) + && Objects.equals(gracePeriod, tumblingWindowExpression.gracePeriod) + && Objects.equals(retention, tumblingWindowExpression.retention); } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index fcaadc43df95..446e3b10a19d 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -58,6 +58,7 @@ import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.metastore.TypeRegistry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; @@ -143,7 +144,6 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ParserUtil; import java.math.BigDecimal; -import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -455,24 +455,26 @@ public Node visitWindowExpression(final SqlBaseParser.WindowExpressionContext ct throw new KsqlException("Window description is not correct."); } - private static Duration getDuration(final NumberContext number, - final WindowUnitContext unitCtx) { - final TimeUnit retainUnit = WindowExpression.getWindowUnit(unitCtx.getText().toUpperCase()); - if (retainUnit == null) { - throw new KsqlException("Units is not correct"); - } - return Duration.ofMillis(retainUnit.toMillis(Long.parseLong(number.getText()))); + private static WindowTimeClause getTimeClause( + final NumberContext number, + final WindowUnitContext unitCtx) { + return new WindowTimeClause( + Long.parseLong(number.getText()), + WindowExpression.getWindowUnit(unitCtx.getText().toUpperCase()) + ); } - private static Optional gracePeriodDuration(final GracePeriodClauseContext graceCtx) { + private static Optional gracePeriodClause( + final GracePeriodClauseContext graceCtx) { return graceCtx != null - ? Optional.of(getDuration(graceCtx.number(), graceCtx.windowUnit())) + ? Optional.of(getTimeClause(graceCtx.number(), graceCtx.windowUnit())) : Optional.empty(); } - private static Optional retentionDuration(final RetentionClauseContext retentionCtx) { + private static Optional retentionClause( + final RetentionClauseContext retentionCtx) { return retentionCtx != null - ? Optional.of(getDuration(retentionCtx.number(), retentionCtx.windowUnit())) + ? Optional.of(getTimeClause(retentionCtx.number(), retentionCtx.windowUnit())) : Optional.empty(); } @@ -482,19 +484,12 @@ public Node visitHoppingWindowExpression( final List numberList = ctx.number(); final List windowUnits = ctx.windowUnit(); - final String sizeStr = numberList.get(0).getText(); - final String advanceByStr = numberList.get(1).getText(); - - final String sizeUnit = windowUnits.get(0).getText(); - final String advanceByUnit = windowUnits.get(1).getText(); return new HoppingWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - Long.parseLong(advanceByStr), - WindowExpression.getWindowUnit(advanceByUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(numberList.get(0), windowUnits.get(0)), + getTimeClause(numberList.get(1), windowUnits.get(1)), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } @@ -502,14 +497,11 @@ public Node visitHoppingWindowExpression( public Node visitTumblingWindowExpression( final SqlBaseParser.TumblingWindowExpressionContext ctx ) { - final String sizeStr = ctx.number().getText(); - final String sizeUnit = ctx.windowUnit().getText(); return new TumblingWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(ctx.number(), ctx.windowUnit()), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } @@ -517,14 +509,11 @@ public Node visitTumblingWindowExpression( public Node visitSessionWindowExpression( final SqlBaseParser.SessionWindowExpressionContext ctx ) { - final String sizeStr = ctx.number().getText(); - final String sizeUnit = ctx.windowUnit().getText(); return new SessionWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(ctx.number(), ctx.windowUnit()), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java index 8f8d319a7d4f..bea24dfcee9c 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.Operator; import java.util.Optional; @@ -101,8 +102,11 @@ public void shouldParseWindowExpression() { // Then: assertThat( parsed, - equalTo(new TumblingWindowExpression(parsed.getLocation(), 1, TimeUnit.DAYS, - Optional.empty(), Optional.empty())) + equalTo(new TumblingWindowExpression( + parsed.getLocation(), + new WindowTimeClause(1, TimeUnit.DAYS), + Optional.empty(), + Optional.empty())) ); } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java index aea8a70734b5..5e3b755f0875 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java @@ -17,12 +17,13 @@ import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import java.util.concurrent.TimeUnit; public class WindowExpressionTestCase { static final KsqlWindowExpression WINDOW_EXPRESSION = new TumblingWindowExpression( - 123, TimeUnit.DAYS + new WindowTimeClause(123, TimeUnit.DAYS) ); static final String WINDOW_EXPRESSION_TXT = "\" TUMBLING ( SIZE 123 DAYS ) \""; diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java index 15c525aa481c..c48dc8946ee8 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java @@ -24,6 +24,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.execution.windows.HoppingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; @@ -44,29 +45,54 @@ public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new HoppingWindowExpression(10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(Optional.of(SOME_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()), - new HoppingWindowExpression(Optional.of(OTHER_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES)), + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause( 20, MINUTES)), + new HoppingWindowExpression( + Optional.of(SOME_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES), + Optional.empty(), + Optional.empty()), + new HoppingWindowExpression( + Optional.of(OTHER_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES), + Optional.empty(), + Optional.empty()) ) .addEqualityGroup( - new HoppingWindowExpression(30, SECONDS, 20, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(30, SECONDS), + new WindowTimeClause(20, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, HOURS, 20, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(10, HOURS), + new WindowTimeClause(20, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, SECONDS, 1, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(1, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, SECONDS, 20, MILLISECONDS) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MILLISECONDS)) ) .testEquals(); } @Test public void shouldReturnWindowInfo() { - assertThat(new HoppingWindowExpression(10, SECONDS, 20, MINUTES).getWindowInfo(), + assertThat(new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES) + ).getWindowInfo(), is(WindowInfo.of(WindowType.HOPPING, Optional.of(Duration.ofSeconds(10))))); } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java index 72fd1410fb60..12c8d8197226 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.execution.expression.tree.Type; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -70,7 +71,8 @@ public class ParserModelTest { ., Object>builder() .put(ColumnName.class, ColumnName.of("bob")) .put(Expression.class, DEFAULT_TYPE) - .put(KsqlWindowExpression.class, new TumblingWindowExpression(1, TimeUnit.SECONDS)) + .put(KsqlWindowExpression.class, new TumblingWindowExpression( + new WindowTimeClause(1, TimeUnit.SECONDS))) .put(Relation.class, DEFAULT_RELATION) .put(JoinCriteria.class, new JoinOn(DEFAULT_TYPE)) .put(Select.class, DEFAULT_SELECT) diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java index 5429ba93e9df..46a1aa146785 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import io.confluent.ksql.execution.windows.SessionWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.serde.WindowInfo; import java.util.Optional; @@ -35,7 +36,7 @@ public class SessionWindowExpressionTest { @Before public void setUp() { - windowExpression = new SessionWindowExpression(5, TimeUnit.SECONDS); + windowExpression = new SessionWindowExpression(new WindowTimeClause(5, TimeUnit.SECONDS)); } @Test diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java index deaf61d8c49e..50a532905d49 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.serde.WindowInfo; import java.time.Duration; @@ -32,7 +33,7 @@ public class TumblingWindowExpressionTest { @Test public void shouldReturnWindowInfo() { - assertThat(new TumblingWindowExpression(11, SECONDS).getWindowInfo(), + assertThat(new TumblingWindowExpression(new WindowTimeClause(11, SECONDS)).getWindowInfo(), is(WindowInfo.of(WindowType.TUMBLING, Optional.of(Duration.ofSeconds(11))))); } } \ No newline at end of file diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index ca442f104e11..f9bede7beac2 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -33,11 +33,11 @@ import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.execution.windows.WindowVisitor; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import java.time.Duration; import java.util.List; import java.util.Objects; import org.apache.kafka.common.serialization.Serde; @@ -258,11 +258,10 @@ public KTable, GenericRow> visitHoppingWindowExpression( final HoppingWindowExpression window, final Void ctx) { TimeWindows windows = TimeWindows - .of(Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))) - .advanceBy(Duration.ofMillis(window.getAdvanceByUnit().toMillis(window.getAdvanceBy()))); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + .of(window.getSize().toDuration()) + .advanceBy(window.getAdvanceBy().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); return groupedStream .windowedBy(windows) @@ -272,7 +271,7 @@ public KTable, GenericRow> visitHoppingWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } @@ -280,12 +279,10 @@ public KTable, GenericRow> visitHoppingWindowExpression( public KTable, GenericRow> visitSessionWindowExpression( final SessionWindowExpression window, final Void ctx) { - SessionWindows windows = SessionWindows.with( - Duration.ofMillis(window.getSizeUnit().toMillis(window.getGap())) - ); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + SessionWindows windows = SessionWindows.with(window.getGap().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); + return groupedStream .windowedBy(windows) .aggregate( @@ -295,7 +292,7 @@ public KTable, GenericRow> visitSessionWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } @@ -303,11 +300,9 @@ public KTable, GenericRow> visitSessionWindowExpression( public KTable, GenericRow> visitTumblingWindowExpression( final TumblingWindowExpression window, final Void ctx) { - TimeWindows windows = TimeWindows.of( - Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + TimeWindows windows = TimeWindows.of(window.getSize().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); return groupedStream .windowedBy(windows) @@ -317,7 +312,7 @@ public KTable, GenericRow> visitTumblingWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java index 0dcff5b05fe4..3f4a6e96579c 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java @@ -53,6 +53,7 @@ import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.plan.WindowedTableSource; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.KsqlTableFunction; @@ -153,7 +154,7 @@ public void shouldResolveSchemaForStreamWindowedAggregate() { formats, ImmutableList.of(ColumnName.of("ORANGE")), ImmutableList.of(functionCall("COUNT", "APPLE")), - new TumblingWindowExpression(10, TimeUnit.SECONDS) + new TumblingWindowExpression(new WindowTimeClause(10, TimeUnit.SECONDS)) ); // When: diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 90f7725a7d78..351272c41709 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -52,6 +52,7 @@ import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; @@ -257,7 +258,7 @@ private void givenTumblingWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new TumblingWindowExpression(WINDOW.getSeconds(), TimeUnit.SECONDS) + new TumblingWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) ); } @@ -270,10 +271,8 @@ private void givenHoppingWindowedAggregate() { NON_AGG_COLUMNS, FUNCTIONS, new HoppingWindowExpression( - WINDOW.getSeconds(), - TimeUnit.SECONDS, - HOP.getSeconds(), - TimeUnit.SECONDS + new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS), + new WindowTimeClause(HOP.getSeconds(), TimeUnit.SECONDS) ) ); } @@ -296,7 +295,7 @@ private void givenSessionWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new SessionWindowExpression(WINDOW.getSeconds(), TimeUnit.SECONDS) + new SessionWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) ); } @@ -349,7 +348,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForUnwindowedAggregate() { aggregate.build(planBuilder); // Then: - verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any()); + verify(materializedFactory).create(same(keySerde), same(valueSerde), any()); } @Test From f08fe813e5d44fe325e69814574bcb759e57a1b5 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 11 Mar 2020 15:48:58 -0700 Subject: [PATCH 4/6] test: enhancing unit tests around retention/grace period parameters --- .../ksql/parser/ExpressionParserTest.java | 19 +++++++++ .../tree/HoppingWindowExpressionTest.java | 13 ++++++ .../tree/SessionWindowExpressionTest.java | 27 ++++++++++++ .../tree/TumblingWindowExpressionTest.java | 35 ++++++++++++++++ .../streams/StreamAggregateBuilder.java | 9 ++-- .../streams/MaterializedFactoryTest.java | 20 ++++++++- .../streams/StreamAggregateBuilderTest.java | 42 ++++++++++++++++--- 7 files changed, 155 insertions(+), 10 deletions(-) diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java index bea24dfcee9c..381e65228a76 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java @@ -109,4 +109,23 @@ public void shouldParseWindowExpression() { Optional.empty())) ); } + + @Test + public void shouldParseWindowExpressionWithRetention() { + // When: + final KsqlWindowExpression parsed = ExpressionParser.parseWindowExpression( + "TUMBLING (SIZE 1 DAYS, RETENTION 2 DAYS, GRACE PERIOD 2 DAYS)" + ); + + // Then: + assertThat( + parsed, + equalTo(new TumblingWindowExpression( + parsed.getLocation(), + new WindowTimeClause(1, TimeUnit.DAYS), + Optional.of(new WindowTimeClause(2, TimeUnit.DAYS)), + Optional.of(new WindowTimeClause(2, TimeUnit.DAYS))) + ) + ); + } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java index c48dc8946ee8..eabcbb3c2c13 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java @@ -83,6 +83,19 @@ public void shouldImplementHashCodeAndEqualsProperty() { new HoppingWindowExpression( new WindowTimeClause(10, SECONDS), new WindowTimeClause(20, MILLISECONDS)) + ).addEqualityGroup( + new HoppingWindowExpression( + Optional.of(SOME_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MILLISECONDS), + Optional.of(new WindowTimeClause(40, MINUTES)), + Optional.of(new WindowTimeClause(0, MINUTES))), + new HoppingWindowExpression( + Optional.of(OTHER_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MILLISECONDS), + Optional.of(new WindowTimeClause(40, MINUTES)), + Optional.of(new WindowTimeClause(0, MINUTES))) ) .testEquals(); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java index 46a1aa146785..4ad3d5762622 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java @@ -18,9 +18,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import com.google.common.testing.EqualsTester; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; +import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -44,4 +46,29 @@ public void shouldReturnWindowInfo() { assertThat(windowExpression.getWindowInfo(), is(WindowInfo.of(WindowType.SESSION, Optional.empty()))); } + + @Test + public void shouldImplementHashCodeAndEqualsProperty() { + new EqualsTester() + .addEqualityGroup( + new SessionWindowExpression( + new WindowTimeClause(10, TimeUnit.SECONDS) + ) + ) + .addEqualityGroup( + new SessionWindowExpression( + Optional.empty(), + new WindowTimeClause(10, TimeUnit.SECONDS), + Optional.of(new WindowTimeClause(20, TimeUnit.SECONDS)), + Optional.of(new WindowTimeClause(0, TimeUnit.SECONDS)) + ), + new SessionWindowExpression( + Optional.of(new NodeLocation(0, 0)), + new WindowTimeClause(10, TimeUnit.SECONDS), + Optional.of(new WindowTimeClause(20, TimeUnit.SECONDS)), + Optional.of(new WindowTimeClause(0, TimeUnit.SECONDS)) + ) + ) + .testEquals(); + } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java index 50a532905d49..4e4e91a663ac 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java @@ -19,12 +19,15 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import com.google.common.testing.EqualsTester; import io.confluent.ksql.execution.windows.TumblingWindowExpression; import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; +import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -35,5 +38,37 @@ public class TumblingWindowExpressionTest { public void shouldReturnWindowInfo() { assertThat(new TumblingWindowExpression(new WindowTimeClause(11, SECONDS)).getWindowInfo(), is(WindowInfo.of(WindowType.TUMBLING, Optional.of(Duration.ofSeconds(11))))); + assertThat(new TumblingWindowExpression( + Optional.empty(), + new WindowTimeClause(20, SECONDS), + Optional.of(new WindowTimeClause(20, SECONDS)), + Optional.of(new WindowTimeClause(10, SECONDS)) + ).getWindowInfo(), + is(WindowInfo.of(WindowType.TUMBLING, Optional.of(Duration.ofSeconds(20))))); + } + + @Test + public void shouldImplementHashCodeAndEqualsProperty() { + new EqualsTester() + .addEqualityGroup( + new TumblingWindowExpression( + new WindowTimeClause(50, TimeUnit.SECONDS) + ) + ) + .addEqualityGroup( + new TumblingWindowExpression( + Optional.empty(), + new WindowTimeClause(40, TimeUnit.SECONDS), + Optional.of(new WindowTimeClause(80, TimeUnit.SECONDS)), + Optional.of(new WindowTimeClause(40, TimeUnit.SECONDS)) + ), + new TumblingWindowExpression( + Optional.of(new NodeLocation(0,0)), + new WindowTimeClause(40, TimeUnit.SECONDS), + Optional.of(new WindowTimeClause(80, TimeUnit.SECONDS)), + Optional.of(new WindowTimeClause(40, TimeUnit.SECONDS)) + ) + ) + .testEquals(); } } \ No newline at end of file diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index f9bede7beac2..9eac3eb5abd0 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -260,7 +260,8 @@ public KTable, GenericRow> visitHoppingWindowExpression( TimeWindows windows = TimeWindows .of(window.getSize().toDuration()) .advanceBy(window.getAdvanceBy().toDuration()); - windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + windows = window.getGracePeriod().map(WindowTimeClause::toDuration) + .map(windows::grace) .orElse(windows); return groupedStream @@ -280,7 +281,8 @@ public KTable, GenericRow> visitSessionWindowExpression( final SessionWindowExpression window, final Void ctx) { SessionWindows windows = SessionWindows.with(window.getGap().toDuration()); - windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + windows = window.getGracePeriod().map(WindowTimeClause::toDuration) + .map(windows::grace) .orElse(windows); return groupedStream @@ -301,7 +303,8 @@ public KTable, GenericRow> visitTumblingWindowExpression( final TumblingWindowExpression window, final Void ctx) { TimeWindows windows = TimeWindows.of(window.getSize().toDuration()); - windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + windows = window.getGracePeriod().map(WindowTimeClause::toDuration) + .map(windows::grace) .orElse(windows); return groupedStream diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index 456465dbd8b3..e3eb9cc97c2d 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import io.confluent.ksql.GenericRow; +import java.time.Duration; import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Materialized; @@ -42,6 +43,8 @@ public class MaterializedFactoryTest { private Serde rowSerde; @Mock private MaterializedFactory.Materializer materializer; + @Mock + private Optional retention; @Test @SuppressWarnings("unchecked") @@ -57,7 +60,7 @@ public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // When: final Materialized returned = MaterializedFactory.create(materializer).create( - keySerde, rowSerde, OP_NAME, Optional.empty()); + keySerde, rowSerde, OP_NAME); // Then: assertThat(returned, is(withRowSerde)); @@ -65,4 +68,19 @@ public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { verify(asName).withKeySerde(keySerde); verify(withKeySerde).withValueSerde(rowSerde); } + + @Test + public void shouldSetupRetentionWhenNonEmpty() { + // Given: + when(retention.isPresent()).thenReturn(true); + when(retention.get()).thenReturn(Duration.ofSeconds(10)); + + // When: + final Materialized returned + = MaterializedFactory.create().create(keySerde, rowSerde, OP_NAME, retention); + + // Then: + verify(retention).isPresent(); + verify(retention).get(); + } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 351272c41709..0e16457a2552 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -64,6 +64,7 @@ import io.confluent.ksql.serde.SerdeOption; import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -88,6 +89,7 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @SuppressWarnings("unchecked") @@ -186,6 +188,10 @@ public class StreamAggregateBuilderTest { private ExecutionStep sourceStep; @Mock private KsqlProcessingContext ctx; + @Spy + private WindowTimeClause retentionClause = new WindowTimeClause(10, TimeUnit.SECONDS); + @Spy + private WindowTimeClause gracePeriodClause = new WindowTimeClause(0, TimeUnit.SECONDS); private PlanBuilder planBuilder; private StreamAggregate aggregate; @@ -258,7 +264,12 @@ private void givenTumblingWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new TumblingWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) + new TumblingWindowExpression( + Optional.empty(), + new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS), + Optional.of(retentionClause), + Optional.of(gracePeriodClause) + ) ); } @@ -271,8 +282,11 @@ private void givenHoppingWindowedAggregate() { NON_AGG_COLUMNS, FUNCTIONS, new HoppingWindowExpression( + Optional.empty(), new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS), - new WindowTimeClause(HOP.getSeconds(), TimeUnit.SECONDS) + new WindowTimeClause(HOP.getSeconds(), TimeUnit.SECONDS), + Optional.of(retentionClause), + Optional.of(gracePeriodClause) ) ); } @@ -295,7 +309,12 @@ private void givenSessionWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new SessionWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) + new SessionWindowExpression( + Optional.empty(), + new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS), + Optional.of(retentionClause), + Optional.of(gracePeriodClause) + ) ); } @@ -419,6 +438,9 @@ public void shouldBuildTumblingWindowedAggregateCorrectly() { // Then: assertThat(result.getTable(), is(windowedWithWindowBounds)); + verify(gracePeriodClause).toDuration(); + verify(retentionClause).toDuration(); + final InOrder inOrder = Mockito.inOrder( groupedStream, timeWindowedStream, @@ -426,7 +448,7 @@ public void shouldBuildTumblingWindowedAggregateCorrectly() { windowedWithResults, windowedWithWindowBounds ); - inOrder.verify(groupedStream).windowedBy(TimeWindows.of(WINDOW)); + inOrder.verify(groupedStream).windowedBy(TimeWindows.of(WINDOW).grace(gracePeriodClause.toDuration())); inOrder.verify(timeWindowedStream).aggregate(initializer, aggregator, timeWindowMaterialized); inOrder.verify(windowed).transformValues(any(), any(Named.class)); inOrder.verify(windowedWithResults).transformValues(any(), any(Named.class)); @@ -445,6 +467,8 @@ public void shouldBuildHoppingWindowedAggregateCorrectly() { // Then: assertThat(result.getTable(), is(windowedWithWindowBounds)); + verify(gracePeriodClause).toDuration(); + verify(retentionClause).toDuration(); final InOrder inOrder = Mockito.inOrder( groupedStream, timeWindowedStream, @@ -452,7 +476,9 @@ public void shouldBuildHoppingWindowedAggregateCorrectly() { windowedWithResults, windowedWithWindowBounds ); - inOrder.verify(groupedStream).windowedBy(TimeWindows.of(WINDOW).advanceBy(HOP)); + + inOrder.verify(groupedStream).windowedBy(TimeWindows.of(WINDOW).advanceBy(HOP) + .grace(gracePeriodClause.toDuration())); inOrder.verify(timeWindowedStream).aggregate(initializer, aggregator, timeWindowMaterialized); inOrder.verify(windowed).transformValues(any(), any(Named.class)); inOrder.verify(windowedWithResults).transformValues(any(), any(Named.class)); @@ -471,6 +497,8 @@ public void shouldBuildSessionWindowedAggregateCorrectly() { // Then: assertThat(result.getTable(), is(windowedWithWindowBounds)); + verify(gracePeriodClause).toDuration(); + verify(retentionClause).toDuration(); final InOrder inOrder = Mockito.inOrder( groupedStream, sessionWindowedStream, @@ -478,7 +506,9 @@ public void shouldBuildSessionWindowedAggregateCorrectly() { windowedWithResults, windowedWithWindowBounds ); - inOrder.verify(groupedStream).windowedBy(SessionWindows.with(WINDOW)); + inOrder.verify(groupedStream).windowedBy(SessionWindows.with(WINDOW) + .grace(gracePeriodClause.toDuration()) + ); inOrder.verify(sessionWindowedStream).aggregate( initializer, aggregator, From febc520ea4e4e4b0905e0eb4db925cc803aed3dd Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 11 Mar 2020 16:31:37 -0700 Subject: [PATCH 5/6] docs: document retention,grace period for window queries --- .../time-and-windows-in-ksqldb-queries.md | 38 +++++++++++++++++++ .../execution/windows/WindowTimeClause.java | 0 .../streams/MaterializedFactoryTest.java | 4 +- 3 files changed, 39 insertions(+), 3 deletions(-) rename {ksql-execution => ksqldb-execution}/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java (100%) diff --git a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md index 4355cf0fac28..a6c6524a9e80 100644 --- a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md +++ b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md @@ -365,4 +365,42 @@ SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse For more information on joins, see [Join Event Streams with ksqlDB](../developer-guide/joins/join-streams-and-tables.md). +### Late Arriving Events + +Frequently, events that belong to a window can arrive late, for example, over slow networks, +and a grace period may be required to ensure the events are accepted into the window. +ksqlDB enables configuring this behavior, for each of the window types. + +For example, to allow events to be accepted for up to two hours after the window ends, +you might run a query like: + +```sql +SELECT orderzip_code, TOPK(order_total, 5) FROM orders + WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS) + GROUP BY order_zipcode + EMIT CHANGES; +``` + +Events that arrive later than the grace period are dropped and not included in the aggregate result. + +### Window Retention + +For each window type, you can configure the number of windows in the past that ksqlDB retains. This +capability is very useful for interactive applications that use ksqlDB as their primary +serving data store. + +For example, to retain the computed windowed aggregation results for a week, +you might run the following query: + +```sql +SELECT regionid, COUNT(*) FROM pageviews + WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS, RETENTION 7 DAYS, GRACE PERIOD 30 MINUTES) + WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' + GROUP BY regionid + EMIT CHANGES; +``` + +Note that the specified retention period should be larger than the sum of window size and any grace +period. + Page last revised on: {{ git_revision_date }} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java similarity index 100% rename from ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java rename to ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index e3eb9cc97c2d..75f6d7503624 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -76,11 +76,9 @@ public void shouldSetupRetentionWhenNonEmpty() { when(retention.get()).thenReturn(Duration.ofSeconds(10)); // When: - final Materialized returned - = MaterializedFactory.create().create(keySerde, rowSerde, OP_NAME, retention); + MaterializedFactory.create().create(keySerde, rowSerde, OP_NAME, retention); // Then: - verify(retention).isPresent(); verify(retention).get(); } } From e83b08c05eebdd1111e8b641510bcf770e946b8c Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 13 Mar 2020 16:22:24 -0700 Subject: [PATCH 6/6] test: add grace period/retention cases for sql formatter test --- .../ksql/parser/SqlFormatterTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 08a7a9556ef6..0720734478cf 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -732,6 +732,23 @@ public void shouldFormatTumblingWindow() { + "EMIT CHANGES")); } + @Test + public void shouldFormatTumblingWindowWithRetention() { + // Given: + final String statementString = "CREATE STREAM S AS SELECT ITEMID, COUNT(*) FROM ORDERS WINDOW TUMBLING (SIZE 7 DAYS, RETENTION 14 DAYS) GROUP BY ITEMID;"; + final Statement statement = parseSingle(statementString); + + final String result = SqlFormatter.formatSql(statement); + + assertThat(result, is("CREATE STREAM S AS SELECT\n" + + " ITEMID,\n" + + " COUNT(*)\n" + + "FROM ORDERS ORDERS\n" + + "WINDOW TUMBLING ( SIZE 7 DAYS , RETENTION 14 DAYS ) \n" + + "GROUP BY ITEMID\n" + + "EMIT CHANGES")); + } + @Test public void shouldFormatHoppingWindow() { // Given: @@ -749,6 +766,23 @@ public void shouldFormatHoppingWindow() { + "EMIT CHANGES")); } + @Test + public void shouldFormatHoppingWindowWithGracePeriod() { + // Given: + final String statementString = "CREATE STREAM S AS SELECT ITEMID, COUNT(*) FROM ORDERS WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS, GRACE PERIOD 2 HOURS) GROUP BY ITEMID;"; + final Statement statement = parseSingle(statementString); + + final String result = SqlFormatter.formatSql(statement); + + assertThat(result, is("CREATE STREAM S AS SELECT\n" + + " ITEMID,\n" + + " COUNT(*)\n" + + "FROM ORDERS ORDERS\n" + + "WINDOW HOPPING ( SIZE 20 SECONDS , ADVANCE BY 5 SECONDS , GRACE PERIOD 2 HOURS ) \n" + + "GROUP BY ITEMID\n" + + "EMIT CHANGES")); + } + @Test public void shouldFormatSessionWindow() { // Given: @@ -766,6 +800,23 @@ public void shouldFormatSessionWindow() { + "EMIT CHANGES")); } + @Test + public void shouldFormatSessionWindowWithRetentionAndGracePeriod() { + // Given: + final Statement statement = parseSingle( + "CREATE STREAM S AS SELECT ITEMID, COUNT(*) FROM ORDERS WINDOW SESSION (15 MINUTES, RETENTION 2 DAYS, GRACE PERIOD 1 HOUR) GROUP BY ITEMID;"); + + final String result = SqlFormatter.formatSql(statement); + + assertThat(result, is("CREATE STREAM S AS SELECT\n" + + " ITEMID,\n" + + " COUNT(*)\n" + + "FROM ORDERS ORDERS\n" + + "WINDOW SESSION ( 15 MINUTES , RETENTION 2 DAYS , GRACE PERIOD 1 HOURS ) \n" + + "GROUP BY ITEMID\n" + + "EMIT CHANGES")); + } + @Test public void shouldFormatDescribeSource() { // Given: