Skip to content

Commit

Permalink
feat: Add window size for time window tables (#3102)
Browse files Browse the repository at this point in the history
* Added WINDOW_SIZE config property for windowed keys where the type is time window.

* Applied review feedback.

* Fix DLS_DEAD_LOCAL_STORE in tests
  • Loading branch information
hjafarpour authored Jul 22, 2019
1 parent 9013a9a commit 6ff07d5
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 19 deletions.
14 changes: 13 additions & 1 deletion docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,16 @@ The WITH clause supports the following properties:
| | ``DELIMITED``, or when the value schema has multiple fields, will result in an error. |
+-------------------------+--------------------------------------------------------------------------------------------+
| WINDOW_TYPE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, |
| | i.e. was created using KSQL using a query that contains a ``WINDOW`` clause, then the |
| | i.e., was created using KSQL using a query that contains a ``WINDOW`` clause, then the |
| | ``WINDOW_TYPE`` property can be used to provide the window type. Valid values are |
| | ``SESSION``, ``HOPPING`, and ``TUMBLING``. |
+-------------------------+--------------------------------------------------------------------------------------------+
| WINDOW_SIZE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, |
| | i.e., was created using KSQL using a query that contains a ``WINDOW`` clause, and the |
| | ``WINDOW_TYPE`` property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be |
| | set. The property is a string with two literals, window size (a number) and window size |
| | unit (a time unit). For example: '10 SECONDS'. |
+-------------------------+--------------------------------------------------------------------------------------------+

For more information on timestamp formats, see
`DateTimeFormatter <https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html>`__.
Expand Down Expand Up @@ -518,6 +524,12 @@ The WITH clause supports the following properties:
| | ``WINDOW_TYPE`` property can be used to provide the window type. Valid values are |
| | ``SESSION``, ``HOPPING`, and ``TUMBLING``. |
+-------------------------+--------------------------------------------------------------------------------------------+
| WINDOW_SIZE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, |
| | i.e., was created using KSQL using a query that contains a ``WINDOW`` clause, and the |
| | ``WINDOW_TYPE`` property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be |
| | set. The property is a string with two literals, window size (a number) and window size |
| | unit (a time unit). For example: '10 SECONDS'. |
+-------------------------+--------------------------------------------------------------------------------------------+

.. include:: ../includes/ksql-includes.rst
:start-after: Avro_note_start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.configdef.ConfigValidators.ValidCaseInsensitiveString;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

/**
* 'With Clause' properties for 'CREATE' statements.
Expand All @@ -26,6 +27,7 @@ public final class CreateConfigs {

public static final String KEY_NAME_PROPERTY = "KEY";
public static final String WINDOW_TYPE_PROPERTY = "WINDOW_TYPE";
public static final String WINDOW_SIZE_PROPERTY = "WINDOW_SIZE";
public static final String AVRO_SCHEMA_ID = "AVRO_SCHEMA_ID";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
Expand All @@ -45,6 +47,14 @@ public final class CreateConfigs {
"If the data is windowed, i.e. was created using KSQL using a query that "
+ "contains a ``WINDOW`` clause, then the property can be used to provide the "
+ "window type. Valid values are SESSION, HOPPING or TUMBLING."
).define(
WINDOW_SIZE_PROPERTY,
Type.STRING,
null,
Importance.LOW,
"If the data is windowed, i.e., was created using KSQL via a query that "
+ "contains a ``WINDOW`` clause and the window is a timed window,"
+ " then the property should be used to provide the window size. "
).define(
AVRO_SCHEMA_ID,
ConfigDef.Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void shouldExtractSessionWindowType() {
givenPropertiesWith(ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("SeSSion")));


// When:
final CreateStreamCommand cmd = createCmd();

Expand All @@ -113,7 +114,8 @@ public void shouldExtractSessionWindowType() {
public void shouldExtractHoppingWindowType() {
// Given:
givenPropertiesWith(ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("HoPPing")));
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("HoPPing"),
CreateConfigs.WINDOW_SIZE_PROPERTY, new StringLiteral("5 SECONDS")));

// When:
final CreateStreamCommand cmd = createCmd();
Expand All @@ -127,7 +129,8 @@ public void shouldExtractHoppingWindowType() {
public void shouldExtractTumblingWindowType() {
// Given:
givenPropertiesWith(ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("Tumbling")));
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("Tumbling"),
CreateConfigs.WINDOW_SIZE_PROPERTY, new StringLiteral("5 SECONDS")));

// When:
final CreateStreamCommand cmd = createCmd();
Expand All @@ -137,6 +140,37 @@ public void shouldExtractTumblingWindowType() {
is(instanceOf(WindowedSerdes.timeWindowedSerdeFrom(String.class).getClass())));
}

@Test
public void shouldThrowIfHoppingWindowSizeIsNotSet() {
// Given:
final Map<String, Literal> allProps = ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("HoPPing"));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Tumbling and Hopping window types should set WINDOW_SIZE in the WITH clause.");

// When:
CreateSourceProperties.from(getInitialProps(allProps));

}

@Test
public void shouldThrowIfTumblingWindowSizeIsNotSet() {
// Given:
final Map<String, Literal> allProps = ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("Tumbling"));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Tumbling and Hopping window types should set WINDOW_SIZE in the WITH clause.");

// When:
CreateSourceProperties.from(getInitialProps(allProps));
}

@Test
public void shouldThrowIfTopicDoesNotExist() {
// Given:
Expand Down Expand Up @@ -201,10 +235,14 @@ private CreateStreamCommand createCmd() {
);
}

private void givenPropertiesWith(final Map<String, Literal> props) {
private static Map<String, Literal> getInitialProps(final Map<String, Literal> props) {
final Map<String, Literal> allProps = new HashMap<>(props);
allProps.putIfAbsent(CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("Json"));
allProps.putIfAbsent(CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("some-topic"));
when(createStreamStatement.getProperties()).thenReturn(CreateSourceProperties.from(allProps));
return allProps;
}

private void givenPropertiesWith(final Map<String, Literal> props) {
when(createStreamStatement.getProperties()).thenReturn(CreateSourceProperties.from(getInitialProps(props)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void shouldExtractSessionWindowType() {
public void shouldExtractHoppingWindowType() {
// Given:
givenPropertiesWith(ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("HoPPing")));
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("HoPPing"),
CreateConfigs.WINDOW_SIZE_PROPERTY, new StringLiteral("2 MINUTES")));

// When:
final CreateTableCommand cmd = createCmd();
Expand All @@ -125,7 +126,8 @@ public void shouldExtractHoppingWindowType() {
public void shouldExtractTumblingWindowType() {
// Given:
givenPropertiesWith(ImmutableMap.of(
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("Tumbling")));
CreateConfigs.WINDOW_TYPE_PROPERTY, new StringLiteral("Tumbling"),
CreateConfigs.WINDOW_SIZE_PROPERTY, new StringLiteral("2 seconds")));

// When:
final CreateTableCommand cmd = createCmd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
{
"name": "import hopping table",
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID', WINDOW_TYPE='HoPping');",
"CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID', WINDOW_TYPE='HoPping', WINDOW_SIZE='30 seconds');",
"CREATE TABLE S2 as SELECT *, ROWKEY as KEY FROM test;"
],
"inputs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
{
"name": "import tumbling table",
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID', WINDOW_TYPE='Tumbling');",
"CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID', WINDOW_TYPE='Tumbling', WINDOW_SIZE='30 seconds');",
"CREATE TABLE S2 as SELECT *, ROWKEY as KEY FROM test;"
],
"inputs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.parser.properties.with;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.metastore.SerdeFactory;
import io.confluent.ksql.parser.tree.IntegerLiteral;
Expand All @@ -24,8 +23,10 @@
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.StringUtil;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
Expand All @@ -36,12 +37,9 @@
@Immutable
public final class CreateSourceProperties extends WithClauseProperties {

private static final java.util.Map<String, SerdeFactory<Windowed<String>>> WINDOW_TYPES =
ImmutableMap.of(
"SESSION", () -> WindowedSerdes.sessionWindowedSerdeFrom(String.class),
"TUMBLING", () -> WindowedSerdes.timeWindowedSerdeFrom(String.class),
"HOPPING", () -> WindowedSerdes.timeWindowedSerdeFrom(String.class)
);
private static final String TUMBLING_WINDOW_NAME = "TUMBLING";
private static final String HOPPING_WINDOW_NAME = "HOPPING";
private static final String SESSION_WINDOW_NAME = "SESSION";

public static CreateSourceProperties from(final Map<String, Literal> literals) {
try {
Expand All @@ -60,6 +58,10 @@ private CreateSourceProperties(final Map<String, Literal> originals) {
super(CreateConfigs.CONFIG_METADATA, originals);

validateDateTimeFormat(CommonCreateConfigs.TIMESTAMP_FORMAT_PROPERTY);
validateWindowType();
if (originals.containsKey(CreateConfigs.WINDOW_SIZE_PROPERTY)) {
getTimedWindowSerdeFactory();
}
}

public Format getValueFormat() {
Expand All @@ -82,10 +84,22 @@ public Optional<String> getKeyField() {
return Optional.ofNullable(getString(CreateConfigs.KEY_NAME_PROPERTY));
}

@SuppressWarnings("unchecked")
public Optional<SerdeFactory<Windowed<String>>> getWindowType() {
return Optional.ofNullable(getString(CreateConfigs.WINDOW_TYPE_PROPERTY))
.map(String::toUpperCase)
.map(WINDOW_TYPES::get);
final Optional<String> windowType = Optional.ofNullable(
getString(CreateConfigs.WINDOW_TYPE_PROPERTY))
.map(String::toUpperCase);
if (!windowType.isPresent()) {
return Optional.empty();
}
if (SESSION_WINDOW_NAME.equalsIgnoreCase(windowType.get())) {
return Optional.of(() -> WindowedSerdes.sessionWindowedSerdeFrom(String.class));
}
if (TUMBLING_WINDOW_NAME.equalsIgnoreCase(windowType.get())
|| HOPPING_WINDOW_NAME.equalsIgnoreCase(windowType.get())) {
return getTimedWindowSerdeFactory();
}
throw new KsqlException("Invalid window type: " + windowType.get());
}

public Optional<String> getTimestampColumnName() {
Expand Down Expand Up @@ -125,4 +139,62 @@ public CreateSourceProperties withPartitionsAndReplicas(

return new CreateSourceProperties(originals);
}


private void validateWindowType() {
final Optional<String> windowType = Optional.ofNullable(
getString(CreateConfigs.WINDOW_TYPE_PROPERTY))
.map(String::toUpperCase);
if (!windowType.isPresent()) {
return;
}
if (SESSION_WINDOW_NAME.equalsIgnoreCase(windowType.get())) {
if (getString(CreateConfigs.WINDOW_SIZE_PROPERTY) != null) {
throw new KsqlException(CreateConfigs.WINDOW_SIZE_PROPERTY
+ " should not be set for SESSION windows.");
}
return;
} else if (TUMBLING_WINDOW_NAME.equalsIgnoreCase(windowType.get())
|| HOPPING_WINDOW_NAME.equalsIgnoreCase(windowType.get())) {
getTimedWindowSerdeFactory();
return;
}
throw new KsqlException("Invalid window type: " + windowType.get());
}

private Optional<SerdeFactory<Windowed<String>>> getTimedWindowSerdeFactory() {
final Optional<String> windowSize = Optional.ofNullable(
getString(CreateConfigs.WINDOW_SIZE_PROPERTY))
.map(String::toUpperCase);
if (!windowSize.isPresent()) {
throw new KsqlException("Tumbling and Hopping window types should set "
+ CreateConfigs.WINDOW_SIZE_PROPERTY + " in the WITH clause.");
}
final String windowSizeProperty = windowSize.get();
final String[] sizeParts = StringUtil.cleanQuotes(windowSizeProperty).split(" ");
if (sizeParts.length != 2) {
throwWindowSizeException(windowSizeProperty);
}
try {
final long size = Long.parseLong(sizeParts[0]);
final TimeUnit timeUnit = TimeUnit.valueOf(
sizeParts[1].toUpperCase().endsWith("S")
? sizeParts[1].toUpperCase()
: sizeParts[1].toUpperCase() + "S");
return Optional.of(() -> WindowedSerdes.timeWindowedSerdeFrom(
String.class,
TimeUnit.MILLISECONDS.convert(size, timeUnit)
));
} catch (final Exception e) {
throwWindowSizeException(windowSizeProperty);
}
throw new KsqlException("Invalid time window: " + windowSizeProperty);
}

private static void throwWindowSizeException(final String windowSizeProperty) {
throw new KsqlException("Invalid " + CreateConfigs.WINDOW_SIZE_PROPERTY + " property : "
+ windowSizeProperty + ". " + CreateConfigs.WINDOW_SIZE_PROPERTY + " should be a string "
+ "with two literals, window size (a number) and window size unit (a time unit). "
+ "For example: '10 SECONDS'.");
}
}
Loading

0 comments on commit 6ff07d5

Please sign in to comment.