From febc520ea4e4e4b0905e0eb4db925cc803aed3dd Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 11 Mar 2020 16:31:37 -0700 Subject: [PATCH] docs: document retention,grace period for window queries --- .../time-and-windows-in-ksqldb-queries.md | 38 +++++++++++++++++++ .../execution/windows/WindowTimeClause.java | 0 .../streams/MaterializedFactoryTest.java | 4 +- 3 files changed, 39 insertions(+), 3 deletions(-) rename {ksql-execution => ksqldb-execution}/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java (100%) diff --git a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md index 4355cf0fac28..a6c6524a9e80 100644 --- a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md +++ b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md @@ -365,4 +365,42 @@ SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse For more information on joins, see [Join Event Streams with ksqlDB](../developer-guide/joins/join-streams-and-tables.md). +### Late Arriving Events + +Frequently, events that belong to a window can arrive late, for example, over slow networks, +and a grace period may be required to ensure the events are accepted into the window. +ksqlDB enables configuring this behavior, for each of the window types. + +For example, to allow events to be accepted for up to two hours after the window ends, +you might run a query like: + +```sql +SELECT orderzip_code, TOPK(order_total, 5) FROM orders + WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS) + GROUP BY order_zipcode + EMIT CHANGES; +``` + +Events that arrive later than the grace period are dropped and not included in the aggregate result. + +### Window Retention + +For each window type, you can configure the number of windows in the past that ksqlDB retains. This +capability is very useful for interactive applications that use ksqlDB as their primary +serving data store. + +For example, to retain the computed windowed aggregation results for a week, +you might run the following query: + +```sql +SELECT regionid, COUNT(*) FROM pageviews + WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS, RETENTION 7 DAYS, GRACE PERIOD 30 MINUTES) + WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' + GROUP BY regionid + EMIT CHANGES; +``` + +Note that the specified retention period should be larger than the sum of window size and any grace +period. + Page last revised on: {{ git_revision_date }} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java similarity index 100% rename from ksql-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java rename to ksqldb-execution/src/main/java/io/confluent/ksql/execution/windows/WindowTimeClause.java diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index e3eb9cc97c2d..75f6d7503624 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -76,11 +76,9 @@ public void shouldSetupRetentionWhenNonEmpty() { when(retention.get()).thenReturn(Duration.ofSeconds(10)); // When: - final Materialized returned - = MaterializedFactory.create().create(keySerde, rowSerde, OP_NAME, retention); + MaterializedFactory.create().create(keySerde, rowSerde, OP_NAME, retention); // Then: - verify(retention).isPresent(); verify(retention).get(); } }