diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index eb6de0522ccb..32196024b759 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; @@ -78,7 +79,7 @@ public class SchemaKGroupedStreamTest { ); private static final KsqlWindowExpression KSQL_WINDOW_EXP = new SessionWindowExpression( - 100, TimeUnit.SECONDS + new WindowTimeClause(100, TimeUnit.SECONDS) ); private static final List NON_AGGREGATE_COLUMNS = ImmutableList.of( diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java index 6bfb6a0f947e..19f0fa6ea536 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/HoppingWindowExpression.java @@ -21,71 +21,51 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class HoppingWindowExpression extends KsqlWindowExpression { - private final long size; - private final TimeUnit sizeUnit; - private final long advanceBy; - private final TimeUnit advanceByUnit; + private final WindowTimeClause size; + private final WindowTimeClause advanceBy; public HoppingWindowExpression( - final long size, - final TimeUnit sizeUnit, - final long advanceBy, - final TimeUnit advanceByUnit + final WindowTimeClause size, + final WindowTimeClause advanceBy ) { this(Optional.empty(), size, - sizeUnit, advanceBy, - advanceByUnit, Optional.empty(), Optional.empty()); } public HoppingWindowExpression( final Optional location, - final long size, - final TimeUnit sizeUnit, - final long advanceBy, - final TimeUnit advanceByUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause size, + final WindowTimeClause advanceBy, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.size = size; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); - this.advanceBy = advanceBy; - this.advanceByUnit = requireNonNull(advanceByUnit, "advanceByUnit"); + this.size = requireNonNull(size, "size"); + this.advanceBy = requireNonNull(advanceBy, "advanceBy"); } @Override public WindowInfo getWindowInfo() { return WindowInfo.of( WindowType.HOPPING, - Optional.of(Duration.ofNanos(sizeUnit.toNanos(size))) + Optional.of(size.toDuration()) ); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getSize() { + public WindowTimeClause getSize() { return size; } - public TimeUnit getAdvanceByUnit() { - return advanceByUnit; - } - - public long getAdvanceBy() { + public WindowTimeClause getAdvanceBy() { return advanceBy; } @@ -96,13 +76,16 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " HOPPING ( SIZE " + size + " " + sizeUnit + " , ADVANCE BY " - + advanceBy + " " + "" + advanceByUnit + " ) "; + return " HOPPING ( SIZE " + size + " , " + + "ADVANCE BY " + advanceBy + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(size, sizeUnit, advanceBy, advanceByUnit); + return Objects.hash(size, advanceBy, retention, gracePeriod); } @Override @@ -114,8 +97,9 @@ public boolean equals(final Object o) { return false; } final HoppingWindowExpression hoppingWindowExpression = (HoppingWindowExpression) o; - return hoppingWindowExpression.size == size && hoppingWindowExpression.sizeUnit == sizeUnit - && hoppingWindowExpression.advanceBy == advanceBy && hoppingWindowExpression - .advanceByUnit == advanceByUnit; + return Objects.equals(size, hoppingWindowExpression.size) + && Objects.equals(advanceBy, hoppingWindowExpression.advanceBy) + && Objects.equals(gracePeriod, hoppingWindowExpression.gracePeriod) + && Objects.equals(retention, hoppingWindowExpression.retention); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java index d320349638aa..5ac31175bc55 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/KsqlWindowExpression.java @@ -19,28 +19,27 @@ import io.confluent.ksql.parser.Node; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Optional; @Immutable public abstract class KsqlWindowExpression extends Node { - protected final Optional retention; - protected final Optional gracePeriod; + protected final Optional retention; + protected final Optional gracePeriod; KsqlWindowExpression(final Optional nodeLocation, - final Optional retention, - final Optional gracePeriod) { + final Optional retention, + final Optional gracePeriod) { super(nodeLocation); this.retention = retention; this.gracePeriod = gracePeriod; } - public Optional getRetention() { + public Optional getRetention() { return retention; } - public Optional getGracePeriod() { + public Optional getGracePeriod() { return gracePeriod; } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java index 13182526584b..a671160f0ed2 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/SessionWindowExpression.java @@ -21,38 +21,29 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class SessionWindowExpression extends KsqlWindowExpression { - private final long gap; - private final TimeUnit sizeUnit; + private final WindowTimeClause gap; - public SessionWindowExpression(final long gap, final TimeUnit sizeUnit) { - this(Optional.empty(), gap, sizeUnit, Optional.empty(), Optional.empty()); + public SessionWindowExpression(final WindowTimeClause gap) { + this(Optional.empty(), gap, Optional.empty(), Optional.empty()); } public SessionWindowExpression( final Optional location, - final long gap, - final TimeUnit sizeUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause gap, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.gap = gap; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); + this.gap = requireNonNull(gap, "gap"); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getGap() { + public WindowTimeClause getGap() { return gap; } @@ -68,12 +59,15 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " SESSION ( " + gap + " " + sizeUnit + " ) "; + return " SESSION ( " + gap + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(gap, sizeUnit); + return Objects.hash(gap, retention, gracePeriod); } @Override @@ -85,6 +79,8 @@ public boolean equals(final Object o) { return false; } final SessionWindowExpression sessionWindowExpression = (SessionWindowExpression) o; - return sessionWindowExpression.gap == gap && sessionWindowExpression.sizeUnit == sizeUnit; + return Objects.equals(gap, sessionWindowExpression.gap) + && Objects.equals(gracePeriod, sessionWindowExpression.gracePeriod) + && Objects.equals(retention, sessionWindowExpression.retention); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java index 75c12c337524..dfd4a4f4cc1d 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/TumblingWindowExpression.java @@ -21,46 +21,37 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; -import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; @Immutable public class TumblingWindowExpression extends KsqlWindowExpression { - private final long size; - private final TimeUnit sizeUnit; + private final WindowTimeClause size; - public TumblingWindowExpression(final long size, final TimeUnit sizeUnit) { - this(Optional.empty(), size, sizeUnit, Optional.empty(), Optional.empty()); + public TumblingWindowExpression(final WindowTimeClause size) { + this(Optional.empty(), size, Optional.empty(), Optional.empty()); } public TumblingWindowExpression( final Optional location, - final long size, - final TimeUnit sizeUnit, - final Optional retention, - final Optional gracePeriod + final WindowTimeClause size, + final Optional retention, + final Optional gracePeriod ) { super(location, retention, gracePeriod); - this.size = size; - this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit"); + this.size = requireNonNull(size, "size"); } @Override public WindowInfo getWindowInfo() { return WindowInfo.of( WindowType.TUMBLING, - Optional.of(Duration.ofNanos(sizeUnit.toNanos(size))) + Optional.of(size.toDuration()) ); } - public TimeUnit getSizeUnit() { - return sizeUnit; - } - - public long getSize() { + public WindowTimeClause getSize() { return size; } @@ -71,12 +62,15 @@ public R accept(final WindowVisitor visitor, final C context) { @Override public String toString() { - return " TUMBLING ( SIZE " + size + " " + sizeUnit + " ) "; + return " TUMBLING ( SIZE " + size + + retention.map(w -> " , RETENTION " + w).orElse("") + + gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") + + " ) "; } @Override public int hashCode() { - return Objects.hash(size, sizeUnit); + return Objects.hash(size, retention, gracePeriod); } @Override @@ -88,6 +82,8 @@ public boolean equals(final Object o) { return false; } final TumblingWindowExpression tumblingWindowExpression = (TumblingWindowExpression) o; - return tumblingWindowExpression.size == size && tumblingWindowExpression.sizeUnit == sizeUnit; + return Objects.equals(size, tumblingWindowExpression.size) + && Objects.equals(gracePeriod, tumblingWindowExpression.gracePeriod) + && Objects.equals(retention, tumblingWindowExpression.retention); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java new file mode 100644 index 000000000000..1009bf617055 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.windows; + +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.Immutable; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Pojo for a time clause added to a window expression + */ +@Immutable +public class WindowTimeClause { + + private final long value; + + private final TimeUnit unit; + + public WindowTimeClause(final long value, final TimeUnit unit) { + this.value = value; + this.unit = requireNonNull(unit, "unit"); + } + + public Duration toDuration() { + return Duration.ofMillis(unit.toMillis(value)); + } + + public String toString() { + return value + " " + unit; + } + + @Override + public int hashCode() { + return Objects.hash(value, unit); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final WindowTimeClause otherClause = (WindowTimeClause) o; + return otherClause.value == value && otherClause.unit == unit; + } +} diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index fcaadc43df95..446e3b10a19d 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -58,6 +58,7 @@ import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.metastore.TypeRegistry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; @@ -143,7 +144,6 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ParserUtil; import java.math.BigDecimal; -import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -455,24 +455,26 @@ public Node visitWindowExpression(final SqlBaseParser.WindowExpressionContext ct throw new KsqlException("Window description is not correct."); } - private static Duration getDuration(final NumberContext number, - final WindowUnitContext unitCtx) { - final TimeUnit retainUnit = WindowExpression.getWindowUnit(unitCtx.getText().toUpperCase()); - if (retainUnit == null) { - throw new KsqlException("Units is not correct"); - } - return Duration.ofMillis(retainUnit.toMillis(Long.parseLong(number.getText()))); + private static WindowTimeClause getTimeClause( + final NumberContext number, + final WindowUnitContext unitCtx) { + return new WindowTimeClause( + Long.parseLong(number.getText()), + WindowExpression.getWindowUnit(unitCtx.getText().toUpperCase()) + ); } - private static Optional gracePeriodDuration(final GracePeriodClauseContext graceCtx) { + private static Optional gracePeriodClause( + final GracePeriodClauseContext graceCtx) { return graceCtx != null - ? Optional.of(getDuration(graceCtx.number(), graceCtx.windowUnit())) + ? Optional.of(getTimeClause(graceCtx.number(), graceCtx.windowUnit())) : Optional.empty(); } - private static Optional retentionDuration(final RetentionClauseContext retentionCtx) { + private static Optional retentionClause( + final RetentionClauseContext retentionCtx) { return retentionCtx != null - ? Optional.of(getDuration(retentionCtx.number(), retentionCtx.windowUnit())) + ? Optional.of(getTimeClause(retentionCtx.number(), retentionCtx.windowUnit())) : Optional.empty(); } @@ -482,19 +484,12 @@ public Node visitHoppingWindowExpression( final List numberList = ctx.number(); final List windowUnits = ctx.windowUnit(); - final String sizeStr = numberList.get(0).getText(); - final String advanceByStr = numberList.get(1).getText(); - - final String sizeUnit = windowUnits.get(0).getText(); - final String advanceByUnit = windowUnits.get(1).getText(); return new HoppingWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - Long.parseLong(advanceByStr), - WindowExpression.getWindowUnit(advanceByUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(numberList.get(0), windowUnits.get(0)), + getTimeClause(numberList.get(1), windowUnits.get(1)), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } @@ -502,14 +497,11 @@ public Node visitHoppingWindowExpression( public Node visitTumblingWindowExpression( final SqlBaseParser.TumblingWindowExpressionContext ctx ) { - final String sizeStr = ctx.number().getText(); - final String sizeUnit = ctx.windowUnit().getText(); return new TumblingWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(ctx.number(), ctx.windowUnit()), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } @@ -517,14 +509,11 @@ public Node visitTumblingWindowExpression( public Node visitSessionWindowExpression( final SqlBaseParser.SessionWindowExpressionContext ctx ) { - final String sizeStr = ctx.number().getText(); - final String sizeUnit = ctx.windowUnit().getText(); return new SessionWindowExpression( getLocation(ctx), - Long.parseLong(sizeStr), - WindowExpression.getWindowUnit(sizeUnit.toUpperCase()), - retentionDuration(ctx.retentionClause()), - gracePeriodDuration(ctx.gracePeriodClause()) + getTimeClause(ctx.number(), ctx.windowUnit()), + retentionClause(ctx.retentionClause()), + gracePeriodClause(ctx.gracePeriodClause()) ); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java index 8f8d319a7d4f..bea24dfcee9c 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/ExpressionParserTest.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.Operator; import java.util.Optional; @@ -101,8 +102,11 @@ public void shouldParseWindowExpression() { // Then: assertThat( parsed, - equalTo(new TumblingWindowExpression(parsed.getLocation(), 1, TimeUnit.DAYS, - Optional.empty(), Optional.empty())) + equalTo(new TumblingWindowExpression( + parsed.getLocation(), + new WindowTimeClause(1, TimeUnit.DAYS), + Optional.empty(), + Optional.empty())) ); } } \ No newline at end of file diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java index aea8a70734b5..5e3b755f0875 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/json/WindowExpressionTestCase.java @@ -17,12 +17,13 @@ import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import java.util.concurrent.TimeUnit; public class WindowExpressionTestCase { static final KsqlWindowExpression WINDOW_EXPRESSION = new TumblingWindowExpression( - 123, TimeUnit.DAYS + new WindowTimeClause(123, TimeUnit.DAYS) ); static final String WINDOW_EXPRESSION_TXT = "\" TUMBLING ( SIZE 123 DAYS ) \""; diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java index 15c525aa481c..c48dc8946ee8 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/HoppingWindowExpressionTest.java @@ -24,6 +24,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.execution.windows.HoppingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.serde.WindowInfo; @@ -44,29 +45,54 @@ public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new HoppingWindowExpression(10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(10, SECONDS, 20, MINUTES), - new HoppingWindowExpression(Optional.of(SOME_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()), - new HoppingWindowExpression(Optional.of(OTHER_LOCATION), 10, SECONDS, 20, MINUTES, Optional.empty(), Optional.empty()) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES)), + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause( 20, MINUTES)), + new HoppingWindowExpression( + Optional.of(SOME_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES), + Optional.empty(), + Optional.empty()), + new HoppingWindowExpression( + Optional.of(OTHER_LOCATION), + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES), + Optional.empty(), + Optional.empty()) ) .addEqualityGroup( - new HoppingWindowExpression(30, SECONDS, 20, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(30, SECONDS), + new WindowTimeClause(20, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, HOURS, 20, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(10, HOURS), + new WindowTimeClause(20, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, SECONDS, 1, MINUTES) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(1, MINUTES)) ) .addEqualityGroup( - new HoppingWindowExpression(10, SECONDS, 20, MILLISECONDS) + new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MILLISECONDS)) ) .testEquals(); } @Test public void shouldReturnWindowInfo() { - assertThat(new HoppingWindowExpression(10, SECONDS, 20, MINUTES).getWindowInfo(), + assertThat(new HoppingWindowExpression( + new WindowTimeClause(10, SECONDS), + new WindowTimeClause(20, MINUTES) + ).getWindowInfo(), is(WindowInfo.of(WindowType.HOPPING, Optional.of(Duration.ofSeconds(10))))); } } \ No newline at end of file diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java index 72fd1410fb60..12c8d8197226 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.execution.expression.tree.Type; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -70,7 +71,8 @@ public class ParserModelTest { ., Object>builder() .put(ColumnName.class, ColumnName.of("bob")) .put(Expression.class, DEFAULT_TYPE) - .put(KsqlWindowExpression.class, new TumblingWindowExpression(1, TimeUnit.SECONDS)) + .put(KsqlWindowExpression.class, new TumblingWindowExpression( + new WindowTimeClause(1, TimeUnit.SECONDS))) .put(Relation.class, DEFAULT_RELATION) .put(JoinCriteria.class, new JoinOn(DEFAULT_TYPE)) .put(Select.class, DEFAULT_SELECT) diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java index 5429ba93e9df..46a1aa146785 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SessionWindowExpressionTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import io.confluent.ksql.execution.windows.SessionWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.serde.WindowInfo; import java.util.Optional; @@ -35,7 +36,7 @@ public class SessionWindowExpressionTest { @Before public void setUp() { - windowExpression = new SessionWindowExpression(5, TimeUnit.SECONDS); + windowExpression = new SessionWindowExpression(new WindowTimeClause(5, TimeUnit.SECONDS)); } @Test diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java index deaf61d8c49e..50a532905d49 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TumblingWindowExpressionTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.serde.WindowInfo; import java.time.Duration; @@ -32,7 +33,7 @@ public class TumblingWindowExpressionTest { @Test public void shouldReturnWindowInfo() { - assertThat(new TumblingWindowExpression(11, SECONDS).getWindowInfo(), + assertThat(new TumblingWindowExpression(new WindowTimeClause(11, SECONDS)).getWindowInfo(), is(WindowInfo.of(WindowType.TUMBLING, Optional.of(Duration.ofSeconds(11))))); } } \ No newline at end of file diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index ca442f104e11..f9bede7beac2 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -33,11 +33,11 @@ import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.execution.windows.WindowVisitor; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import java.time.Duration; import java.util.List; import java.util.Objects; import org.apache.kafka.common.serialization.Serde; @@ -258,11 +258,10 @@ public KTable, GenericRow> visitHoppingWindowExpression( final HoppingWindowExpression window, final Void ctx) { TimeWindows windows = TimeWindows - .of(Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))) - .advanceBy(Duration.ofMillis(window.getAdvanceByUnit().toMillis(window.getAdvanceBy()))); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + .of(window.getSize().toDuration()) + .advanceBy(window.getAdvanceBy().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); return groupedStream .windowedBy(windows) @@ -272,7 +271,7 @@ public KTable, GenericRow> visitHoppingWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } @@ -280,12 +279,10 @@ public KTable, GenericRow> visitHoppingWindowExpression( public KTable, GenericRow> visitSessionWindowExpression( final SessionWindowExpression window, final Void ctx) { - SessionWindows windows = SessionWindows.with( - Duration.ofMillis(window.getSizeUnit().toMillis(window.getGap())) - ); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + SessionWindows windows = SessionWindows.with(window.getGap().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); + return groupedStream .windowedBy(windows) .aggregate( @@ -295,7 +292,7 @@ public KTable, GenericRow> visitSessionWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } @@ -303,11 +300,9 @@ public KTable, GenericRow> visitSessionWindowExpression( public KTable, GenericRow> visitTumblingWindowExpression( final TumblingWindowExpression window, final Void ctx) { - TimeWindows windows = TimeWindows.of( - Duration.ofMillis(window.getSizeUnit().toMillis(window.getSize()))); - windows = window.getGracePeriod().isPresent() - ? windows.grace(window.getGracePeriod().get()) - : windows; + TimeWindows windows = TimeWindows.of(window.getSize().toDuration()); + windows = window.getGracePeriod().map(WindowTimeClause::toDuration).map(windows::grace) + .orElse(windows); return groupedStream .windowedBy(windows) @@ -317,7 +312,7 @@ public KTable, GenericRow> visitTumblingWindowExpression( materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), - window.getRetention()) + window.getRetention().map(WindowTimeClause::toDuration)) ); } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java index 0dcff5b05fe4..3f4a6e96579c 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java @@ -53,6 +53,7 @@ import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.plan.WindowedTableSource; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.KsqlTableFunction; @@ -153,7 +154,7 @@ public void shouldResolveSchemaForStreamWindowedAggregate() { formats, ImmutableList.of(ColumnName.of("ORANGE")), ImmutableList.of(functionCall("COUNT", "APPLE")), - new TumblingWindowExpression(10, TimeUnit.SECONDS) + new TumblingWindowExpression(new WindowTimeClause(10, TimeUnit.SECONDS)) ); // When: diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 90f7725a7d78..4424dc13fd09 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -52,6 +52,7 @@ import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; +import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; @@ -257,7 +258,7 @@ private void givenTumblingWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new TumblingWindowExpression(WINDOW.getSeconds(), TimeUnit.SECONDS) + new TumblingWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) ); } @@ -270,10 +271,8 @@ private void givenHoppingWindowedAggregate() { NON_AGG_COLUMNS, FUNCTIONS, new HoppingWindowExpression( - WINDOW.getSeconds(), - TimeUnit.SECONDS, - HOP.getSeconds(), - TimeUnit.SECONDS + new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS), + new WindowTimeClause(HOP.getSeconds(), TimeUnit.SECONDS) ) ); } @@ -296,7 +295,7 @@ private void givenSessionWindowedAggregate() { io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), NON_AGG_COLUMNS, FUNCTIONS, - new SessionWindowExpression(WINDOW.getSeconds(), TimeUnit.SECONDS) + new SessionWindowExpression(new WindowTimeClause(WINDOW.getSeconds(), TimeUnit.SECONDS)) ); }