Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for tunable retention, grace period for windowed tables #4733

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs-md/concepts/time-and-windows-in-ksqldb-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,42 @@ SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse
For more information on joins, see
[Join Event Streams with ksqlDB](../developer-guide/joins/join-streams-and-tables.md).

### Late Arriving Events

Frequently, events that belong to a window can arrive late, for example, over slow networks,
and a grace period may be required to ensure the events are accepted into the window.
ksqlDB enables configuring this behavior, for each of the window types.

For example, to allow events to be accepted for up to two hours after the window ends,
you might run a query like:

```sql
SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS)
GROUP BY order_zipcode
EMIT CHANGES;
```

Events that arrive later than the grace period are dropped and not included in the aggregate result.

### Window Retention

For each window type, you can configure the number of windows in the past that ksqlDB retains. This
capability is very useful for interactive applications that use ksqlDB as their primary
serving data store.

For example, to retain the computed windowed aggregation results for a week,
you might run the following query:

```sql
SELECT regionid, COUNT(*) FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS, RETENTION 7 DAYS, GRACE PERIOD 30 MINUTES)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid
EMIT CHANGES;
```

Note that the specified retention period should be larger than the sum of window size and any grace
period.
agavra marked this conversation as resolved.
Show resolved Hide resolved

Page last revised on: {{ git_revision_date }}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.serde.FormatFactory.JSON;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand All @@ -44,17 +45,24 @@
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.test.util.KsqlIdentifierTestUtil;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.UserDataProvider;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -80,18 +88,26 @@ public class KsMaterializationFunctionalTest {
private static final String USER_TABLE = "users_table";
private static final String USER_STREAM = "users_stream";

private static final String PAGE_VIEWS_TOPIC = "page_views_topic";
private static final String PAGE_VIEWS_STREAM = "page_views_stream";

private static final Format VALUE_FORMAT = JSON;
private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider();
private static final PageViewDataProvider PAGE_VIEW_DATA_PROVIDER = new PageViewDataProvider();

private static final Duration WINDOW_SIZE = Duration.ofSeconds(5);
private static final Duration WINDOW_SEGMENT_DURATION = Duration.ofSeconds(60);
private static final int NUM_WINDOWS = 4;
private static final List<Instant> WINDOW_START_INSTANTS = LongStream.range(1, NUM_WINDOWS + 1)
// records have to be apart by at-least a segment for retention to enforced
.mapToObj(i -> Instant.ofEpochMilli(i * WINDOW_SEGMENT_DURATION.toMillis()))
.collect(Collectors.toList());

private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();

private static final Deserializer<Windowed<String>> TIME_WINDOWED_DESERIALIZER =
WindowedSerdes
.timeWindowedSerdeFrom(String.class, WINDOW_SIZE.toMillis())
.deserializer();

private static final Deserializer<Windowed<String>> SESSION_WINDOWED_DESERIALIZER =
WindowedSerdes
.sessionWindowedSerdeFrom(String.class)
Expand Down Expand Up @@ -120,13 +136,22 @@ public class KsMaterializationFunctionalTest {

@BeforeClass
public static void classSetUp() {
TEST_HARNESS.ensureTopics(USERS_TOPIC);
TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGE_VIEWS_TOPIC);

TEST_HARNESS.produceRows(
USERS_TOPIC,
USER_DATA_PROVIDER,
VALUE_FORMAT
);

for (final Instant windowTime : WINDOW_START_INSTANTS) {
TEST_HARNESS.produceRows(
PAGE_VIEWS_TOPIC,
PAGE_VIEW_DATA_PROVIDER,
VALUE_FORMAT,
windowTime::toEpochMilli
);
}
}

@Before
Expand Down Expand Up @@ -205,7 +230,7 @@ public void shouldQueryMaterializedTableForAggregatedTable() {

final LogicalSchema schema = schema("KSQL_COL_0", SqlTypes.BIGINT);

final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -239,7 +264,7 @@ public void shouldQueryMaterializedTableForAggregatedStream() {

final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -275,7 +300,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<Windowed<String>, GenericRow> rows =
waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema);
waitForUniqueUserRows(TIME_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -321,7 +346,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<Windowed<String>, GenericRow> rows =
waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema);
waitForUniqueUserRows(TIME_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -365,7 +390,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<Windowed<String>, GenericRow> rows =
waitForTableRows(SESSION_WINDOWED_DESERIALIZER, schema);
waitForUniqueUserRows(SESSION_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -396,6 +421,103 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
});
}

@Test(expected = IllegalArgumentException.class)
public void shouldFailQueryWithRetentionSmallerThanGracePeriod() {
// Given:
executeQuery("CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS)"
+ " GROUP BY PAGEID;"
);
}

@Test
public void shouldQueryTumblingWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS,"
+ " GRACE PERIOD 0 SECONDS)"
+ " GROUP BY PAGEID;"
);

final List<ConsumerRecord<Windowed<String>, GenericRow>> rows =
waitForPageViewRows(TIME_WINDOWED_DESERIALIZER, query.getPhysicalSchema());

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.TUMBLING)));
final MaterializedWindowedTable table = materialization.windowed();
final Set<Optional<Window>> expectedWindows = Stream.of(
Window.of(WINDOW_START_INSTANTS.get(1), WINDOW_START_INSTANTS.get(1).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())),
Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())),
Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds()))
).map(Optional::of).collect(Collectors.toSet());
verifyRetainedWindows(rows, table, query, expectedWindows);
}

@Test
public void shouldQueryHoppingWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW HOPPING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " ADVANCE BY " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS, "
+ " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " GRACE PERIOD 0 SECONDS"
+ ") GROUP BY PAGEID;"
);

final List<ConsumerRecord<Windowed<String>, GenericRow>> rows =
waitForPageViewRows(TIME_WINDOWED_DESERIALIZER, query.getPhysicalSchema());

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.HOPPING)));
final MaterializedWindowedTable table = materialization.windowed();
final Set<Optional<Window>> expectedWindows = Stream.of(
Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds())),
Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3).plusSeconds(WINDOW_SEGMENT_DURATION.getSeconds()))
).map(Optional::of).collect(Collectors.toSet());
verifyRetainedWindows(rows, table, query, expectedWindows);
}

@Test
public void shouldQuerySessionWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW SESSION (" + WINDOW_SEGMENT_DURATION.getSeconds()/2 + " SECONDS,"
+ " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " GRACE PERIOD 0 SECONDS"
+ ") GROUP BY USERID;"
);

final List<ConsumerRecord<Windowed<String>, GenericRow>> rows =
waitForPageViewRows(SESSION_WINDOWED_DESERIALIZER, query.getPhysicalSchema());

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.SESSION)));
final MaterializedWindowedTable table = materialization.windowed();
final Set<Optional<Window>> expectedWindows = Stream.of(
Window.of(WINDOW_START_INSTANTS.get(2), WINDOW_START_INSTANTS.get(2)),
Window.of(WINDOW_START_INSTANTS.get(3), WINDOW_START_INSTANTS.get(3))
).map(Optional::of).collect(Collectors.toSet());
verifyRetainedWindows(rows, table, query, expectedWindows);
}

@Test
public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {
// Given:
Expand All @@ -405,14 +527,13 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {
+ " GROUP BY USERID;"
);


final LogicalSchema schema = schema(
"USERID", SqlTypes.STRING,
"KSQL_COL_1", SqlTypes.BIGINT,
"USERID_2", SqlTypes.STRING
);

final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);


// When:
Expand All @@ -434,7 +555,7 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {
}

@Test
public void shouldQueryMaterializedTableWitMultipleAggregationColumns() {
public void shouldQueryMaterializedTableWithMultipleAggregationColumns() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
Expand All @@ -447,7 +568,7 @@ public void shouldQueryMaterializedTableWitMultipleAggregationColumns() {
"SUM", SqlTypes.BIGINT
);

final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand Down Expand Up @@ -481,7 +602,7 @@ public void shouldIgnoreHavingClause() {

final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand All @@ -502,7 +623,26 @@ public void shouldIgnoreHavingClause() {
});
}

private <T> Map<T, GenericRow> waitForTableRows(
private void verifyRetainedWindows(
final List<ConsumerRecord<Windowed<String>, GenericRow>> rows,
final MaterializedWindowedTable table,
final PersistentQueryMetadata query,
final Set<Optional<Window>> expectedWindows
) {
rows.forEach(record -> {
final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema());
final List<WindowedRow> resultAtWindowStart = table.get(key, Range.all());
assertThat("Should have fewer windows retained",
resultAtWindowStart,
hasSize(expectedWindows.size()));
final Set<Optional<Window>> actualWindows = resultAtWindowStart.stream()
.map(WindowedRow::window)
.collect(Collectors.toSet());
assertThat("Should retain the latest windows", actualWindows, equalTo(expectedWindows));
});
}

private <T> Map<T, GenericRow> waitForUniqueUserRows(
final Deserializer<T> keyDeserializer,
final LogicalSchema aggregateSchema
) {
Expand All @@ -515,6 +655,18 @@ private <T> Map<T, GenericRow> waitForTableRows(
);
}

private <T> List<ConsumerRecord<T, GenericRow>> waitForPageViewRows(
final Deserializer<T> keyDeserializer,
final PhysicalSchema aggregateSchema) {
return TEST_HARNESS.verifyAvailableRows(
output.toUpperCase(),
hasSize(PAGE_VIEW_DATA_PROVIDER.data().size() * NUM_WINDOWS),
VALUE_FORMAT,
aggregateSchema,
keyDeserializer
);
}

private PersistentQueryMetadata executeQuery(final String statement) {
return executeQuery(ksqlContext, statement);
}
Expand Down Expand Up @@ -601,6 +753,16 @@ private static void initializeKsql(final TestKsqlContext ksqlContext) {
+ " key = '" + USER_DATA_PROVIDER.key() + "'"
+ ");"
);

ksqlContext.sql("CREATE STREAM " + PAGE_VIEWS_STREAM + " "
+ " (" + PAGE_VIEW_DATA_PROVIDER.ksqlSchemaString() + ")"
+ " WITH ("
+ " kafka_topic='" + PAGE_VIEWS_TOPIC + "', "
+ " value_format='" + VALUE_FORMAT.name() + "', "
+ " key = '" + PAGE_VIEW_DATA_PROVIDER.key() + "'"
+ ");"
);

}
}

Loading