diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParser.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParser.java
new file mode 100644
index 000000000000..2679982a69b9
--- /dev/null
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParser.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.util.timestamp;
+
+import static io.confluent.ksql.util.KsqlConstants.TIME_PATTERN;
+
+import io.confluent.ksql.util.KsqlConstants;
+import io.confluent.ksql.util.KsqlException;
+import java.time.ZoneId;
+
+/**
+ * A parser that can handle partially complete date-times.
+ *
+ *
A hack around the fact we do not as yet have a DATETIME type.
+ */
+public class PartialStringToTimestampParser {
+
+ private static final String HELP_MESSAGE = System.lineSeparator()
+ + "Required format is: \"" + KsqlConstants.DATE_TIME_PATTERN + "\", "
+ + "with an optional numeric timezone. "
+ + "Partials are also supported, for example \"2020-05-26\"";
+
+ private static final StringToTimestampParser PARSER =
+ new StringToTimestampParser(KsqlConstants.DATE_TIME_PATTERN);
+
+ @SuppressWarnings("MethodMayBeStatic") // Non-static to support DI.
+ public long parse(final String text) {
+
+ final String date;
+ final String time;
+ final String timezone;
+
+ if (text.contains("T")) {
+ date = text.substring(0, text.indexOf('T'));
+ final String withTimezone = completeTime(
+ text.substring(text.indexOf('T') + 1)
+ );
+ timezone = getTimezone(withTimezone);
+ time = completeTime(withTimezone.substring(0, withTimezone.length() - timezone.length()));
+ } else {
+ date = completeDate(text);
+ time = completeTime("");
+ timezone = "";
+ }
+
+ try {
+ if (timezone.length() > 0) {
+ return PARSER.parse(date + "T" + time, ZoneId.of(timezone));
+ } else {
+ return PARSER.parse(date + "T" + time);
+ }
+ } catch (final RuntimeException e) {
+ throw new KsqlException("Failed to parse timestamp '" + text
+ + "': " + e.getMessage()
+ + HELP_MESSAGE,
+ e
+ );
+ }
+ }
+
+ private static String getTimezone(final String time) {
+ if (time.contains("+")) {
+ return time.substring(time.indexOf('+'));
+ }
+
+ if (time.contains("-")) {
+ return time.substring(time.indexOf('-'));
+ }
+
+ return "";
+ }
+
+ private static String completeDate(final String date) {
+ final String[] parts = date.split("-");
+ if (parts.length == 1) {
+ return date + "-01-01";
+ }
+
+ if (parts.length == 2) {
+ return date + "-01";
+ }
+
+ // It is either a complete date or an incorrectly formatted one.
+ // In the latter case, we can pass the incorrectly formed string
+ // to the timestamp parser which will deal with the error handling.
+ return date;
+ }
+
+ private static String completeTime(final String time) {
+ if (time.length() >= TIME_PATTERN.length()) {
+ return time;
+ }
+
+ return time + TIME_PATTERN
+ .substring(time.length())
+ .replaceAll("[a-zA-Z]", "0");
+ }
+}
diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParserTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParserTest.java
new file mode 100644
index 000000000000..39d779c8e636
--- /dev/null
+++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/PartialStringToTimestampParserTest.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.util.timestamp;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import io.confluent.ksql.util.KsqlConstants;
+import io.confluent.ksql.util.KsqlException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class PartialStringToTimestampParserTest {
+
+ private static final StringToTimestampParser FULL_PARSER =
+ new StringToTimestampParser(KsqlConstants.DATE_TIME_PATTERN);
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ private PartialStringToTimestampParser parser;
+
+ @Before
+ public void init() {
+ parser = new PartialStringToTimestampParser();
+ }
+
+ @Test
+ public void shouldParseYear() {
+ // When:
+ assertThat(parser.parse("2017"), is(fullParse("2017-01-01T00:00:00.000")));
+ }
+
+ @Test
+ public void shouldParseYearMonth() {
+ // When:
+ assertThat(parser.parse("2020-02"), is(fullParse("2020-02-01T00:00:00.000")));
+ }
+
+ @Test
+ public void shouldParseFullDate() {
+ // When:
+ assertThat(parser.parse("2020-01-02"), is(fullParse("2020-01-02T00:00:00.000")));
+ assertThat(parser.parse("2020-01-02T"), is(fullParse("2020-01-02T00:00:00.000")));
+ }
+
+ @Test
+ public void shouldParseDateWithHour() {
+ // When:
+ assertThat(parser.parse("2020-12-02T13"), is(fullParse("2020-12-02T13:00:00.000")));
+ }
+
+ @Test
+ public void shouldParseDateWithHourMinute() {
+ // When:
+ assertThat(parser.parse("2020-12-02T13:59"), is(fullParse("2020-12-02T13:59:00.000")));
+ }
+
+ @Test
+ public void shouldParseDateWithHourMinuteSecond() {
+ // When:
+ assertThat(parser.parse("2020-12-02T13:59:58"), is(fullParse("2020-12-02T13:59:58.000")));
+ }
+
+ @Test
+ public void shouldParseFullDateTime() {
+ // When:
+ assertThat(parser.parse("2020-12-02T13:59:58.123"), is(fullParse("2020-12-02T13:59:58.123")));
+ }
+
+ @Test
+ public void shouldParseDateTimeWithPositiveTimezones() {
+ assertThat(parser.parse("2017-11-13T23:59:58.999+0100"), is(1510613998999L));
+ }
+
+ @Test
+ public void shouldParseDateTimeWithNegativeTimezones() {
+ assertThat(parser.parse("2017-11-13T23:59:58.999-0100"), is(1510621198999L));
+ }
+
+ @Test
+ public void shouldThrowOnIncorrectlyFormattedDateTime() {
+ // Expect:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage("Failed to parse timestamp '2017-1-1'");
+
+ // When:
+ parser.parse("2017-1-1");
+ }
+
+ @Test
+ public void shouldThrowOnTimezoneParseError() {
+ // Expect:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage("Failed to parse timestamp '2017-01-01T00:00:00.000+foo'");
+
+ // When:
+ parser.parse("2017-01-01T00:00:00.000+foo");
+ }
+
+ @Test
+ public void shouldIncludeRequiredFormatInErrorMessage() {
+ // Expect:
+ expectedException.expectMessage("Required format is: \"yyyy-MM-dd'T'HH:mm:ss.SSS\", "
+ + "with an optional numeric timezone. Partials are also supported, for example \"2020-05-26\"");
+
+ // When:
+ parser.parse("2017-01-01T00:00:00.000+foo");
+ }
+
+ private static long fullParse(final String text) {
+ return FULL_PARSER.parse(text);
+ }
+}
\ No newline at end of file
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java
index 753a59fe9aa7..8608bf53f241 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java
@@ -15,9 +15,7 @@
package io.confluent.ksql.engine.rewrite;
-import static io.confluent.ksql.util.KsqlConstants.DATE_TIME_PATTERN;
-import static io.confluent.ksql.util.KsqlConstants.TIME_PATTERN;
-
+import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
@@ -26,29 +24,39 @@
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
-import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
-import io.confluent.ksql.util.timestamp.StringToTimestampParser;
-import java.time.ZoneId;
+import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
+import java.util.Objects;
import java.util.Optional;
public class StatementRewriteForRowtime {
- @SuppressWarnings("MethodMayBeStatic") // Used for DI
+ private final PartialStringToTimestampParser parser;
+
+ public StatementRewriteForRowtime() {
+ this(new PartialStringToTimestampParser());
+ }
+
+ @VisibleForTesting
+ StatementRewriteForRowtime(final PartialStringToTimestampParser parser) {
+ this.parser = Objects.requireNonNull(parser, "parser");
+ }
+
public Expression rewriteForRowtime(final Expression expression) {
- if (!requiresRewrite(expression)) {
+ if (noRewriteRequired(expression)) {
return expression;
}
- return new ExpressionTreeRewriter<>(
- new OperatorPlugin()::process).rewrite(expression, null);
+ return new ExpressionTreeRewriter<>(new OperatorPlugin()::process)
+ .rewrite(expression, null);
}
- private static boolean requiresRewrite(final Expression expression) {
- return expression.toString().contains("ROWTIME");
+ private static boolean noRewriteRequired(final Expression expression) {
+ return !expression.toString().contains("ROWTIME");
}
- private static final class OperatorPlugin
+ private final class OperatorPlugin
extends VisitParentExpressionVisitor, Context> {
+
private OperatorPlugin() {
super(Optional.empty());
}
@@ -56,24 +64,27 @@ private OperatorPlugin() {
@Override
public Optional visitBetweenPredicate(
final BetweenPredicate node,
- final Context context) {
- if (requiresRewrite(node.getValue())) {
- return Optional.of(
- new BetweenPredicate(
- node.getLocation(),
- node.getValue(),
- rewriteTimestamp(((StringLiteral) node.getMin()).getValue()),
- rewriteTimestamp(((StringLiteral) node.getMax()).getValue())
- )
- );
+ final Context context
+ ) {
+ if (noRewriteRequired(node.getValue())) {
+ return Optional.empty();
}
- return Optional.empty();
+
+ return Optional.of(
+ new BetweenPredicate(
+ node.getLocation(),
+ node.getValue(),
+ rewriteTimestamp(((StringLiteral) node.getMin()).getValue()),
+ rewriteTimestamp(((StringLiteral) node.getMax()).getValue())
+ )
+ );
}
@Override
public Optional visitComparisonExpression(
final ComparisonExpression node,
- final Context context) {
+ final Context context
+ ) {
if (expressionIsRowtime(node.getLeft()) && node.getRight() instanceof StringLiteral) {
return Optional.of(
new ComparisonExpression(
@@ -83,7 +94,9 @@ public Optional visitComparisonExpression(
rewriteTimestamp(((StringLiteral) node.getRight()).getValue())
)
);
- } else if (expressionIsRowtime(node.getRight()) && node.getLeft() instanceof StringLiteral) {
+ }
+
+ if (expressionIsRowtime(node.getRight()) && node.getLeft() instanceof StringLiteral) {
return Optional.of(
new ComparisonExpression(
node.getLocation(),
@@ -93,6 +106,7 @@ public Optional visitComparisonExpression(
)
);
}
+
return Optional.empty();
}
}
@@ -102,68 +116,7 @@ private static boolean expressionIsRowtime(final Expression node) {
&& ((ColumnReferenceExp) node).getReference().name().equals(SchemaUtil.ROWTIME_NAME);
}
- private static LongLiteral rewriteTimestamp(final String timestamp) {
- final StringToTimestampParser parser = new StringToTimestampParser(DATE_TIME_PATTERN);
-
- final String date;
- final String time;
- final String timezone;
-
- if (timestamp.contains("T")) {
- date = timestamp.substring(0, timestamp.indexOf('T'));
- final String withTimezone = completeTime(
- timestamp.substring(timestamp.indexOf('T') + 1)
- );
- timezone = getTimezone(withTimezone);
- time = completeTime(withTimezone.substring(0, timezone.length()));
- } else {
- date = completeDate(timestamp);
- time = completeTime("");
- timezone = "";
- }
-
- try {
- if (timezone.length() > 0) {
- return new LongLiteral(parser.parse(date + "T" + time, ZoneId.of(timezone)));
- } else {
- return new LongLiteral(parser.parse(date + "T" + time));
- }
- } catch (final RuntimeException e) {
- throw new KsqlException("Failed to parse timestamp '"
- + timestamp + "': " + e.getMessage(), e);
- }
- }
-
- private static String getTimezone(final String time) {
- if (time.contains("+")) {
- return time.substring(time.indexOf('+'));
- } else if (time.contains("-")) {
- return time.substring(time.indexOf('-'));
- } else {
- return "";
- }
- }
-
- private static String completeDate(final String date) {
- final String[] parts = date.split("-");
- if (parts.length == 1) {
- return date + "-01-01";
- } else if (parts.length == 2) {
- return date + "-01";
- } else {
- // It is either a complete date or an incorrectly formatted one.
- // In the latter case, we can pass the incorrectly formed string
- // to the timestamp parser which will deal with the error handling.
- return date;
- }
- }
-
- private static String completeTime(final String time) {
- if (time.length() >= TIME_PATTERN.length()) {
- return time;
- }
-
- return time
- + TIME_PATTERN.substring(time.length()).replaceAll("[a-zA-Z]", "0");
+ private LongLiteral rewriteTimestamp(final String timestamp) {
+ return new LongLiteral(parser.parse(timestamp));
}
}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java
index 25f5d07e375c..60b8a249672c 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java
@@ -16,9 +16,13 @@
package io.confluent.ksql.engine.rewrite;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.function.FunctionRegistry;
@@ -26,60 +30,68 @@
import io.confluent.ksql.parser.KsqlParserTestUtil;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
-import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
+import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
-import java.time.ZoneId;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
@SuppressWarnings("OptionalGetWithoutIsPresent")
+@RunWith(MockitoJUnitRunner.class)
public class StatementRewriteForRowtimeTest {
private static final StringToTimestampParser PARSER =
new StringToTimestampParser("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ private static final long A_TIMESTAMP = 1234567890L;
+
@Rule
public final ExpectedException expectedException = ExpectedException.none();
+ @Mock
+ private PartialStringToTimestampParser parser;
private MetaStore metaStore;
private StatementRewriteForRowtime rewritter;
@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class));
- rewritter = new StatementRewriteForRowtime();
+ rewritter = new StatementRewriteForRowtime(parser);
+
+ when(parser.parse(any())).thenReturn(A_TIMESTAMP);
}
@Test
- public void shouldReplaceDatestring() {
+ public void shouldPassRowTimeStringsToTheParser() {
// Given:
final Query statement = buildSingleAst(
- "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';");
+ "SELECT * FROM orders where ROWTIME = '2017-01-01T00:44:00.000';");
final Expression predicate = statement.getWhere().get();
// When:
- final Expression rewritten = rewritter.rewriteForRowtime(predicate);
+ rewritter.rewriteForRowtime(predicate);
// Then:
- assertThat(rewritten.toString(),
- equalTo(String.format("(ORDERS.ROWTIME > %d)", PARSER.parse("2017-01-01T00:00:00.000"))));
+ verify(parser).parse("2017-01-01T00:44:00.000");
}
@Test
- public void shouldHandleInexactTimestamp() {
+ public void shouldReplaceDateString() {
// Given:
- final Query statement = buildSingleAst("SELECT * FROM orders where ROWTIME = '2017';");
+ final Query statement = buildSingleAst(
+ "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';");
final Expression predicate = statement.getWhere().get();
// When:
final Expression rewritten = rewritter.rewriteForRowtime(predicate);
// Then:
- assertThat(rewritten.toString(),
- equalTo(String.format("(ORDERS.ROWTIME = %d)", PARSER.parse("2017-01-01T00:00:00.000"))));
+ assertThat(rewritten.toString(), is(String.format("(ORDERS.ROWTIME > %d)", A_TIMESTAMP)));
}
@Test
@@ -89,14 +101,18 @@ public void shouldHandleBetweenExpression() {
"SELECT * FROM orders where ROWTIME BETWEEN '2017-01-01' AND '2017-02-01';");
final Expression predicate = statement.getWhere().get();
+ when(parser.parse(any()))
+ .thenReturn(A_TIMESTAMP)
+ .thenReturn(7654L);
+
// When:
final Expression rewritten = rewritter.rewriteForRowtime(predicate);
// Then:
- assertThat(rewritten.toString(), equalTo(String.format(
- "(ORDERS.ROWTIME BETWEEN %d AND %d)",
- PARSER.parse("2017-01-01T00:00:00.000"),
- PARSER.parse("2017-02-01T00:00:00.000"))));
+ assertThat(
+ rewritten.toString(),
+ is(String.format("(ORDERS.ROWTIME BETWEEN %d AND %d)", A_TIMESTAMP, 7654L))
+ );
}
@Test
@@ -110,7 +126,8 @@ public void shouldNotProcessStringsInFunctions() {
final Expression rewritten = rewritter.rewriteForRowtime(predicate);
// Then:
- assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = FOO('2017-01-01'))"));
+ verify(parser, never()).parse(any());
+ assertThat(rewritten.toString(), is("(ORDERS.ROWTIME = FOO('2017-01-01'))"));
}
@Test
@@ -125,23 +142,8 @@ public void shouldIgnoreNonRowtimeStrings() {
// Then:
assertThat(rewritten.toString(),
- equalTo(String.format("((ORDERS.ROWTIME > %d) AND (ORDERS.ROWKEY = '2017-01-01'))",
- PARSER.parse("2017-01-01T00:00:00.000"))));
- }
-
- @Test
- public void shouldHandleTimezones() {
- // Given:
- final Query statement = buildSingleAst(
- "SELECT * FROM orders where ROWTIME = '2017-01-01T00:00:00.000+0100';");
- final Expression predicate = statement.getWhere().get();
-
- // When:
- final Expression rewritten = rewritter.rewriteForRowtime(predicate);
-
- // Then:
- assertThat(rewritten.toString(), equalTo(String.format("(ORDERS.ROWTIME = %d)", PARSER
- .parse("2017-01-01T00:00:00.000", ZoneId.of("+0100")))));
+ is(String.format("((ORDERS.ROWTIME > %d) AND (ORDERS.ROWKEY = '2017-01-01'))",
+ A_TIMESTAMP)));
}
@Test
@@ -155,6 +157,7 @@ public void shouldNotProcessWhenRowtimeInFunction() {
final Expression rewritten = rewritter.rewriteForRowtime(predicate);
// Then:
+ verify(parser, never()).parse(any());
assertThat(rewritten.toString(), containsString("(FOO(ORDERS.ROWTIME) = '2017-01-01')"));
}
@@ -169,33 +172,20 @@ public void shouldNotProcessArithmetic() {
final Expression rewritten = rewritter.rewriteForRowtime(predicate);
// Then:
+ verify(parser, never()).parse(any());
assertThat(rewritten.toString(), containsString("(('2017-01-01' + 10000) > ORDERS.ROWTIME)"));
}
@Test
public void shouldThrowParseError() {
// Given:
- final Query statement = buildSingleAst("SELECT * FROM orders where ROWTIME = '2oo17-01-01';");
- final Expression predicate = statement.getWhere().get();
-
- // Expect:
- expectedException.expect(KsqlException.class);
- expectedException.expectMessage("Failed to parse timestamp '2oo17-01-01'");
-
- // When:
- rewritter.rewriteForRowtime(predicate);
- }
-
- @Test
- public void shouldThrowTimezoneParseError() {
- // Given:
- final Query statement = buildSingleAst(
- "SELECT * FROM orders where ROWTIME = '2017-01-01T00:00:00.000+foo';");
+ final Query statement = buildSingleAst("SELECT * FROM orders where ROWTIME = '2017-01-01';");
final Expression predicate = statement.getWhere().get();
+ when(parser.parse(any())).thenThrow(new IllegalArgumentException("it no good"));
// Expect:
- expectedException.expect(KsqlException.class);
- expectedException.expectMessage("Failed to parse timestamp '2017-01-01T00:00:00.000+foo'");
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("it no good");
// When:
rewritter.rewriteForRowtime(predicate);
diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
index 1e9eecef7134..ca9567fb9cf9 100644
--- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
+++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
@@ -280,6 +280,48 @@
]}
]
},
+ {
+ "name": "text datetime + timezone window bounds",
+ "statements": [
+ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+ "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
+ "SELECT * FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart='2020-02-23T22:45:12.000-0100';"
+ ],
+ "inputs": [
+ {"topic": "test_topic", "timestamp": 1582501512456, "key": "11", "value": {}},
+ {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}}
+ ],
+ "responses": [
+ {"@type": "currentStatus"},
+ {"@type": "currentStatus"},
+ {"@type": "rows", "rows": [
+ ["10", 1582501512000, 1]
+ ]}
+ ]
+ },
+ {
+ "name": "partial text datetime window bounds",
+ "statements": [
+ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+ "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
+ "SELECT * FROM AGGREGATE WHERE '2020-02-23T23:45' <= WindowStart AND WindowStart < '2020-02-23T24' AND ROWKEY='10';"
+ ],
+ "inputs": [
+ {"topic": "test_topic", "timestamp": 1582501512456, "key": "11", "value": {}},
+ {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}},
+ {"topic": "test_topic", "timestamp": 1582501552456, "key": "10", "value": {}}
+ ],
+ "responses": [
+ {"@type": "currentStatus"},
+ {"@type": "currentStatus"},
+ {
+ "@type": "rows",
+ "rows": [
+ ["10", 1582501512000, 1],
+ ["10", 1582501552000, 1]
+ ]}
+ ]
+ },
{
"name": "aliased table",
"statements": [
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
index 88f955fb7f6a..5dc7979255d7 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
@@ -76,7 +76,7 @@
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
-import io.confluent.ksql.util.timestamp.StringToTimestampParser;
+import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -408,7 +408,7 @@ private static Instant asInstant(final Expression other) {
if (other instanceof StringLiteral) {
final String text = ((StringLiteral) other).getValue();
try {
- final long timestamp = new StringToTimestampParser(KsqlConstants.DATE_TIME_PATTERN)
+ final long timestamp = new PartialStringToTimestampParser()
.parse(text);
return Instant.ofEpochMilli(timestamp);