Skip to content

Commit

Permalink
feat: wrap timestamps in ROWTIME expressions with STRINGTOTIMESTAMP (#…
Browse files Browse the repository at this point in the history
…3160)

* docs(syntax): update docs for rowtime timestamp literals

* feat: wrap timestamps in rowtime expressions with converter

* docs: address review comments

* refactor: add qtt tests and time zone support, change rowtime detection
  • Loading branch information
Zara Lim authored Aug 8, 2019
1 parent 2bf8c70 commit 42acd78
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 3 deletions.
15 changes: 15 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,21 @@ Example:
WHERE ROWTIME >= 1510923225000
AND ROWTIME <= 1510923228000;
When writing logical expressions using ``ROWTIME``, ISO-8601 formatted datestrings can also be used to represent dates.
For example, the above query is equivalent to the following:

.. code:: sql
SELECT * FROM pageviews
WHERE ROWTIME >= '2017-11-17T04:53:45'
AND ROWTIME <= '2017-11-17T04:53:48';
If the datestring is inexact, the rest of the timestamp is assumed to be padded with 0's.
For example, ``ROWTIME = '2019-07-30T11:00'`` is equivalent to ``ROWTIME = '2019-07-30T11:00:00.0000'``.

Timezones can be specified within the datestring. For example, `2017-11-17T04:53:45-0330` is in the Newfoundland time
zone. If no timezone is specified within the datestring, then timestamps are interperted in the UTC timezone.

A ``LIMIT`` can be used to limit the number of rows returned. Once the limit is reached the query will terminate.

Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;

import java.time.ZoneId;
import java.util.concurrent.ExecutionException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.parser.rewrite.StatementRewriteForRowtime;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.EngineProcessingLogMessageFactory;
Expand Down Expand Up @@ -54,7 +55,7 @@ class SqlPredicate {
final FunctionRegistry functionRegistry,
final ProcessingLogger processingLogger
) {
this.filterExpression = requireNonNull(filterExpression, "filterExpression");
this.filterExpression = rewriteFilter(requireNonNull(filterExpression, "filterExpression"));
this.schema = requireNonNull(schema, "schema");
this.genericRowValueTypeEnforcer = new GenericRowValueTypeEnforcer(schema);
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
Expand All @@ -63,7 +64,7 @@ class SqlPredicate {

final CodeGenRunner codeGenRunner = new CodeGenRunner(schema, ksqlConfig, functionRegistry);
final Set<CodeGenRunner.ParameterType> parameters
= codeGenRunner.getParameterInfo(filterExpression);
= codeGenRunner.getParameterInfo(this.filterExpression);

final String[] parameterNames = new String[parameters.size()];
final Class[] parameterTypes = new Class[parameters.size()];
Expand All @@ -86,7 +87,7 @@ class SqlPredicate {
final String expressionStr = new SqlToJavaVisitor(
schema,
functionRegistry
).process(filterExpression);
).process(this.filterExpression);

ee.cook(expressionStr);
} catch (final Exception e) {
Expand All @@ -99,6 +100,14 @@ class SqlPredicate {
}
}

private Expression rewriteFilter(final Expression expression) {
if (StatementRewriteForRowtime.requiresRewrite(expression)) {
return new StatementRewriteForRowtime(expression).rewriteForRowtime();
}
return expression;
}


<K> Predicate<K, GenericRow> getPredicate() {
final ExpressionMetadata expressionEvaluator = createExpressionMetadata();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{
"comments": [
"Tests covering filters using ROWTIME"
],
"tests": [
{
"name": "test ROWTIME",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 0},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 1546300808000},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 0}
],
"outputs": [
{"topic": "OUTPUT", "key": 2, "value": {"THING": 2}, "timestamp": 1546300808000},
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1546300800000}
]
},
{
"name": "test ROWTIME with BETWEEN",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00' AND '2019-12-31T23:59:59';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300808000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1536307808000},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300808000},
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1536307808000}
]
},
{
"name": "test ROWTIME with timezone",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000},
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
},
{
"name": "test ROWTIME with AND",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
},
{
"name": "test ROWTIME with inexact timestring",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000},
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;

import io.confluent.ksql.parser.tree.BetweenPredicate;
import io.confluent.ksql.parser.tree.ComparisonExpression;
import io.confluent.ksql.parser.tree.DereferenceExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
import io.confluent.ksql.parser.tree.Node;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.StringLiteral;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class StatementRewriteForRowtime {
private final Expression expression;

public StatementRewriteForRowtime(final Expression expression) {
this.expression = Objects.requireNonNull(expression, "expression");
}

public static boolean requiresRewrite(final Expression expression) {
return expression.toString().contains("ROWTIME");
}

public Expression rewriteForRowtime() {
return (Expression) new RewriteWithTimestampTransform().process(expression, null);
}

private static class TimestampRewriter extends StatementRewriter {
private static final String DATE_PATTERN = "yyyy-MM-dd";
private static final String TIME_PATTERN = "HH:mm:ss.SSS";
private static final String PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN;

@Override
public Expression visitFunctionCall(final FunctionCall node, final Object context) {
return (Expression) new StatementRewriter().process(node, context);
}

@Override
public Node visitStringLiteral(final StringLiteral node, final Object context) {
if (!node.getValue().equals("ROWTIME")) {
return new FunctionCall(
QualifiedName.of("STRINGTOTIMESTAMP"),
getFunctionArgs(node.getValue()));
}
return node;
}

private List<Expression> getFunctionArgs(final String datestring) {
final List<Expression> args = new ArrayList<>();
final String date;
final String time;
final String timezone;
if (datestring.contains("T")) {
date = datestring.substring(0, datestring.indexOf('T'));
final String withTimezone = completeTime(datestring.substring(datestring.indexOf('T') + 1));
timezone = getTimezone(withTimezone);
time = completeTime(withTimezone.substring(0, timezone.length()));
} else {
date = completeDate(datestring);
time = completeTime("");
timezone = "";
}

if (timezone.length() > 0) {
args.add(new StringLiteral(date + "T" + time));
args.add(new StringLiteral(PATTERN));
args.add(new StringLiteral(timezone));
} else {
args.add(new StringLiteral(date + "T" + time));
args.add(new StringLiteral(PATTERN));
}
return args;
}

private String getTimezone(final String time) {
if (time.contains("+")) {
return time.substring(time.indexOf('+'));
} else if (time.contains("-")) {
return time.substring(time.indexOf('-'));
} else {
return "";
}
}

private String completeDate(final String date) {
final String[] parts = date.split("-");
if (parts.length == 1) {
return date + "-01-01";
} else if (parts.length == 2) {
return date + "-01";
} else {
// It is either a complete date or an incorrectly formatted one.
// In the latter case, we can pass the incorrectly formed string
// to STRINGTITIMESTAMP which will deal with the error handling.
return date;
}
}

private String completeTime(final String time) {
if (time.length() >= TIME_PATTERN.length()) {
return time;
}
return time + TIME_PATTERN.substring(time.length()).replaceAll("[a-zA-Z]", "0");
}
}

private static class RewriteWithTimestampTransform extends StatementRewriter {
@Override
public Expression visitBetweenPredicate(final BetweenPredicate node, final Object context) {
if (StatementRewriteForRowtime.requiresRewrite(node)) {
return new BetweenPredicate(
node.getLocation(),
(Expression) new TimestampRewriter().process(node.getValue(), context),
(Expression) new TimestampRewriter().process(node.getMin(), context),
(Expression) new TimestampRewriter().process(node.getMax(), context));
}
return new BetweenPredicate(
node.getLocation(),
(Expression) process(node.getValue(), context),
(Expression) process(node.getMin(), context),
(Expression) process(node.getMax(), context));
}

@Override
public Expression visitComparisonExpression(
final ComparisonExpression node,
final Object context) {
if (expressionIsRowtime(node.getLeft()) || expressionIsRowtime(node.getRight())) {
return new ComparisonExpression(
node.getLocation(),
node.getType(),
(Expression) new TimestampRewriter().process(node.getLeft(), context),
(Expression) new TimestampRewriter().process(node.getRight(), context));
}
return new ComparisonExpression(
node.getLocation(),
node.getType(),
(Expression) process(node.getLeft(), context),
(Expression) process(node.getRight(), context));
}
}

private static boolean expressionIsRowtime(final Expression node) {
return (node instanceof DereferenceExpression)
&& ((DereferenceExpression) node).getFieldName().equals("ROWTIME");
}
}
Loading

0 comments on commit 42acd78

Please sign in to comment.