From 42acd78416cb37a55be434d2ac0942183c112ebd Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Thu, 8 Aug 2019 15:35:33 -0700 Subject: [PATCH] feat: wrap timestamps in ROWTIME expressions with STRINGTOTIMESTAMP (#3160) * docs(syntax): update docs for rowtime timestamp literals * feat: wrap timestamps in rowtime expressions with converter * docs: address review comments * refactor: add qtt tests and time zone support, change rowtime detection --- docs/developer-guide/syntax-reference.rst | 15 ++ .../udf/datetime/StringToTimestamp.java | 1 + .../ksql/structured/SqlPredicate.java | 15 +- .../query-validation-tests/rowtime.json | 101 +++++++++++ .../rewrite/StatementRewriteForRowtime.java | 165 ++++++++++++++++++ .../StatementRewriteForRowtimeTest.java | 100 +++++++++++ 6 files changed, 394 insertions(+), 3 deletions(-) create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json create mode 100644 ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtime.java create mode 100644 ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtimeTest.java diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 4b8ff472b383..88319c3fbe2c 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -1153,6 +1153,21 @@ Example: WHERE ROWTIME >= 1510923225000 AND ROWTIME <= 1510923228000; +When writing logical expressions using ``ROWTIME``, ISO-8601 formatted datestrings can also be used to represent dates. +For example, the above query is equivalent to the following: + +.. code:: sql + + SELECT * FROM pageviews + WHERE ROWTIME >= '2017-11-17T04:53:45' + AND ROWTIME <= '2017-11-17T04:53:48'; + +If the datestring is inexact, the rest of the timestamp is assumed to be padded with 0's. +For example, ``ROWTIME = '2019-07-30T11:00'`` is equivalent to ``ROWTIME = '2019-07-30T11:00:00.0000'``. + +Timezones can be specified within the datestring. For example, `2017-11-17T04:53:45-0330` is in the Newfoundland time +zone. If no timezone is specified within the datestring, then timestamps are interperted in the UTC timezone. + A ``LIMIT`` can be used to limit the number of rows returned. Once the limit is reached the query will terminate. Example: diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToTimestamp.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToTimestamp.java index 63f8f48d22f0..1726d1a0a856 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToTimestamp.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToTimestamp.java @@ -24,6 +24,7 @@ import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.timestamp.StringToTimestampParser; + import java.time.ZoneId; import java.util.concurrent.ExecutionException; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java index edb1c9740280..3a2a54e44242 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java @@ -23,6 +23,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.udf.Kudf; import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.parser.rewrite.StatementRewriteForRowtime; import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.EngineProcessingLogMessageFactory; @@ -54,7 +55,7 @@ class SqlPredicate { final FunctionRegistry functionRegistry, final ProcessingLogger processingLogger ) { - this.filterExpression = requireNonNull(filterExpression, "filterExpression"); + this.filterExpression = rewriteFilter(requireNonNull(filterExpression, "filterExpression")); this.schema = requireNonNull(schema, "schema"); this.genericRowValueTypeEnforcer = new GenericRowValueTypeEnforcer(schema); this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry"); @@ -63,7 +64,7 @@ class SqlPredicate { final CodeGenRunner codeGenRunner = new CodeGenRunner(schema, ksqlConfig, functionRegistry); final Set parameters - = codeGenRunner.getParameterInfo(filterExpression); + = codeGenRunner.getParameterInfo(this.filterExpression); final String[] parameterNames = new String[parameters.size()]; final Class[] parameterTypes = new Class[parameters.size()]; @@ -86,7 +87,7 @@ class SqlPredicate { final String expressionStr = new SqlToJavaVisitor( schema, functionRegistry - ).process(filterExpression); + ).process(this.filterExpression); ee.cook(expressionStr); } catch (final Exception e) { @@ -99,6 +100,14 @@ class SqlPredicate { } } + private Expression rewriteFilter(final Expression expression) { + if (StatementRewriteForRowtime.requiresRewrite(expression)) { + return new StatementRewriteForRowtime(expression).rewriteForRowtime(); + } + return expression; + } + + Predicate getPredicate() { final ExpressionMetadata expressionEvaluator = createExpressionMetadata(); diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json b/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json new file mode 100644 index 000000000000..66c560fa4e8d --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json @@ -0,0 +1,101 @@ +{ + "comments": [ + "Tests covering filters using ROWTIME" + ], + "tests": [ + { + "name": "test ROWTIME", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00';" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 1546300808000}, + {"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 2, "value": {"THING": 2}, "timestamp": 1546300808000}, + {"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1546300800000} + ] + }, + { + "name": "test ROWTIME with BETWEEN", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00' AND '2019-12-31T23:59:59';" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300808000}, + {"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1536307808000}, + {"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300808000}, + {"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1536307808000} + ] + }, + { + "name": "test ROWTIME with timezone", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000}, + {"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} + ] + }, + { + "name": "test ROWTIME with AND", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} + ] + }, + { + "name": "test ROWTIME with inexact timestring", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000}, + {"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} + ] + } + ] +} \ No newline at end of file diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtime.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtime.java new file mode 100644 index 000000000000..6e7208a7d1f3 --- /dev/null +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtime.java @@ -0,0 +1,165 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.rewrite; + +import io.confluent.ksql.parser.tree.BetweenPredicate; +import io.confluent.ksql.parser.tree.ComparisonExpression; +import io.confluent.ksql.parser.tree.DereferenceExpression; +import io.confluent.ksql.parser.tree.Expression; +import io.confluent.ksql.parser.tree.FunctionCall; +import io.confluent.ksql.parser.tree.Node; +import io.confluent.ksql.parser.tree.QualifiedName; +import io.confluent.ksql.parser.tree.StringLiteral; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class StatementRewriteForRowtime { + private final Expression expression; + + public StatementRewriteForRowtime(final Expression expression) { + this.expression = Objects.requireNonNull(expression, "expression"); + } + + public static boolean requiresRewrite(final Expression expression) { + return expression.toString().contains("ROWTIME"); + } + + public Expression rewriteForRowtime() { + return (Expression) new RewriteWithTimestampTransform().process(expression, null); + } + + private static class TimestampRewriter extends StatementRewriter { + private static final String DATE_PATTERN = "yyyy-MM-dd"; + private static final String TIME_PATTERN = "HH:mm:ss.SSS"; + private static final String PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN; + + @Override + public Expression visitFunctionCall(final FunctionCall node, final Object context) { + return (Expression) new StatementRewriter().process(node, context); + } + + @Override + public Node visitStringLiteral(final StringLiteral node, final Object context) { + if (!node.getValue().equals("ROWTIME")) { + return new FunctionCall( + QualifiedName.of("STRINGTOTIMESTAMP"), + getFunctionArgs(node.getValue())); + } + return node; + } + + private List getFunctionArgs(final String datestring) { + final List args = new ArrayList<>(); + final String date; + final String time; + final String timezone; + if (datestring.contains("T")) { + date = datestring.substring(0, datestring.indexOf('T')); + final String withTimezone = completeTime(datestring.substring(datestring.indexOf('T') + 1)); + timezone = getTimezone(withTimezone); + time = completeTime(withTimezone.substring(0, timezone.length())); + } else { + date = completeDate(datestring); + time = completeTime(""); + timezone = ""; + } + + if (timezone.length() > 0) { + args.add(new StringLiteral(date + "T" + time)); + args.add(new StringLiteral(PATTERN)); + args.add(new StringLiteral(timezone)); + } else { + args.add(new StringLiteral(date + "T" + time)); + args.add(new StringLiteral(PATTERN)); + } + return args; + } + + private String getTimezone(final String time) { + if (time.contains("+")) { + return time.substring(time.indexOf('+')); + } else if (time.contains("-")) { + return time.substring(time.indexOf('-')); + } else { + return ""; + } + } + + private String completeDate(final String date) { + final String[] parts = date.split("-"); + if (parts.length == 1) { + return date + "-01-01"; + } else if (parts.length == 2) { + return date + "-01"; + } else { + // It is either a complete date or an incorrectly formatted one. + // In the latter case, we can pass the incorrectly formed string + // to STRINGTITIMESTAMP which will deal with the error handling. + return date; + } + } + + private String completeTime(final String time) { + if (time.length() >= TIME_PATTERN.length()) { + return time; + } + return time + TIME_PATTERN.substring(time.length()).replaceAll("[a-zA-Z]", "0"); + } + } + + private static class RewriteWithTimestampTransform extends StatementRewriter { + @Override + public Expression visitBetweenPredicate(final BetweenPredicate node, final Object context) { + if (StatementRewriteForRowtime.requiresRewrite(node)) { + return new BetweenPredicate( + node.getLocation(), + (Expression) new TimestampRewriter().process(node.getValue(), context), + (Expression) new TimestampRewriter().process(node.getMin(), context), + (Expression) new TimestampRewriter().process(node.getMax(), context)); + } + return new BetweenPredicate( + node.getLocation(), + (Expression) process(node.getValue(), context), + (Expression) process(node.getMin(), context), + (Expression) process(node.getMax(), context)); + } + + @Override + public Expression visitComparisonExpression( + final ComparisonExpression node, + final Object context) { + if (expressionIsRowtime(node.getLeft()) || expressionIsRowtime(node.getRight())) { + return new ComparisonExpression( + node.getLocation(), + node.getType(), + (Expression) new TimestampRewriter().process(node.getLeft(), context), + (Expression) new TimestampRewriter().process(node.getRight(), context)); + } + return new ComparisonExpression( + node.getLocation(), + node.getType(), + (Expression) process(node.getLeft(), context), + (Expression) process(node.getRight(), context)); + } + } + + private static boolean expressionIsRowtime(final Expression node) { + return (node instanceof DereferenceExpression) + && ((DereferenceExpression) node).getFieldName().equals("ROWTIME"); + } +} \ No newline at end of file diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtimeTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtimeTest.java new file mode 100644 index 000000000000..cf2e939be102 --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtimeTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.rewrite; + +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.parser.KsqlParserTestUtil; +import io.confluent.ksql.parser.tree.*; +import io.confluent.ksql.util.MetaStoreFixture; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +public class StatementRewriteForRowtimeTest { + private MetaStore metaStore; + + @Before + public void init() { + metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class)); + } + + @Test + public void shouldWrapDatestring() { + final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))")); + } + + @Test + public void shouldHandleInexactTimestamp() { + final String query = "SELECT * FROM orders where ROWTIME = '2017';"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))")); + } + + @Test + public void shouldHandleBetweenExpression() { + final String query = "SELECT * FROM orders where ROWTIME BETWEEN '2017-01-01' AND '2017-02-01';"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME BETWEEN" + + " STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS') AND" + + " STRINGTOTIMESTAMP('2017-02-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))")); + } + + @Test + public void shouldNotProcessStringsInFunctions() { + final String query = "SELECT * FROM orders where ROWTIME = foo('2017-01-01');"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = FOO('2017-01-01'))")); + } + + @Test + public void shouldIgnoreNonRowtimeStrings() { + final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01' AND ROWKEY = '2017-01-01';"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), equalTo("((ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS')) AND (ORDERS.ROWKEY = '2017-01-01'))")); + } + + @Test + public void shouldHandleTimezones() { + final String simpleQuery = "SELECT * FROM orders where ROWTIME = '2017-01-01T00:00:00.000+0100';"; + final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement(); + final Expression predicate = statement.getWhere().get(); + final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime(); + + assertThat(rewritten.toString(), containsString("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS', '+0100'))")); + } +} \ No newline at end of file