Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add full window bounds support to pull queries #4404

Merged

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Jan 29, 2020

Description

fixes: #3633
fixes: #3619

This change sees pull queries share more functionality and code around the window bounds columns WINDOWSTART and WINDOWEND introduced in #4388 and #4401.

  • pull queries on time windowed sources, i.e. TUMBLING and HOPPING, now have a WINDOWEND in their schema, just like SESSION and the new push query functionality.
  • window bound columns are now accessible within the projection of a pull query, e.g. SELECT WINDOWSTART, WINDOWEND FROM FOO WHERE ROWKEY=1;

Reviewing notes:

  • To support WINDOWEND for hopping and tumbling windows I've passed the window size down to KsMaterializedWindowTable. This also means Windows end time no longer needs to be Optional.
  • The PullQueryExecutor and TableRowEntityFactory have been updated to add WINDOWSTART and WINDOWEND by using LogicalSchema.withMetaAndKeyValueColumns(). This also means we no longer need the referenceValueColumnsOnly flag on ExpressionTypeManager
  • The PullQueryExecutor has been updated to insert the window.end time when processing rows.

So the only files that really have meaningful code changes are:

  • Window.java
  • KsMaterializedWindowTable.java
  • PullQueryExecutor.java
  • TableRowEntityFactory.java
  • ExpressionTypeManager.java
  • and the JSON test files.

The rest is just plumbing or changing related to removed parameter on ExpressionTypeManager or non-optional end time on Window..

Testing done

usual

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

fixes: confluentinc#3871

Is needed to fix:
- confluentinc#3633
- confluentinc#4015

Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`.  This is confusing and will lead to bugs.  Also, the formated string isn't very friendly for users.

This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in confluentinc#4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this.

For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work.

With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`.

## Examples:

### Push queries

* A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string.

```
ksql> SELECT * FROM windowedSource emit changes

-- old output
+---------------+------------------------------------------------------+--------+---------+------+
| ROWTIME       | ROWKEY                                               | USERID | PAGEID  | TOTAL|
+---------------+------------------------------------------------------+--------+---------+------+
| 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1    |
| 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1    |

-- new output
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME       | WINDOWSTART   | WINDOWEND     | ROWKEY           | USERID | PAGEID  | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1    |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1    |
```

* `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc.

For example:

```sql
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes;
```

However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax:

**KNOWN LIMITATION**
Where a query builds a windowed source from a non-windowed source the window bounds columns are not available.  For example:

```
-- won't yet work:
SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;
```

This issue is tracked by: confluentinc#4397

* Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides.

### Pull queries

**KNOWN LIMITATION**
Pull queries have not been updated yet. This will be done in a follow up PR confluentinc#3633. This is mainly to keep this PR manageable.

### Persistent queries

Persistent C*AS queries work similar to push queries and have the same known limitation.

BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key.  Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`.

BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
KSQL does support `<` operator for strings... revert doc change & add test
fixes: confluentinc#3633

This change sees pull queries share more functionality and code around the window bounds columns `WINDOWSTART` and `WINDOWEND` introduced in confluentinc#4388 and confluentinc#4401.

* pull queries on time windowed sources, i.e. `TUMBLING` and `HOPPING`, now have a `WINDOWEND` in their schema, just like `SESSION` and the new push query functionality.
* window bound columns are now accessible within the projection of a pull query, e.g. `SELECT WINDOWSTART, WINDOWEND FROM FOO WHERE ROWKEY=1;`
Conflicting files
docs-md/developer-guide/create-a-stream.md
docs-md/developer-guide/create-a-table.md
Comment on lines +677 to +679
if (input.schema.isKeyColumn(select.getAlias())
|| select.getAlias().equals(SchemaUtil.WINDOWSTART_NAME)
|| select.getAlias().equals(SchemaUtil.WINDOWEND_NAME)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit hacky, but will work for now.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@big-andy-coates big-andy-coates merged commit 87d9fb8 into confluentinc:master Jan 30, 2020
@big-andy-coates big-andy-coates deleted the pull_window_bounds branch January 30, 2020 10:22
big-andy-coates added a commit to big-andy-coates/ksql that referenced this pull request Jan 30, 2020
Previously, when copying the meta and key columns into the value schema to enable processing via a Streams topology the columns where copied to the _start_ of the value schema. For example, given a schema of:

```sql
ROWKEY INT KEY, VAL0 STRING
```

The schema used during processing would of been:

```sql
ROWKEY INT KEY, ROWTIME BIGINT, ROWKEY INT, VAL0 STRING
```

Note: `ROWTIME` and `ROWKEY` columns are _before_ the existing value columns.

This meant the transformer in `SourceBuilder` had to add the value for `ROWTIME` and `ROWKEY`, (and now `WINDOWSTART` and `WINDOWEND` for windowed sources), to the _start_ of the list of values in the `GenericRow`.  Internally, `GenericRow` was using an `ArrayList`. Inserting at the start of an `ArrayList` requires all the existing elements to be shuffled down to make room for the new element, hence processing incurred an array copy operation when inserting each of these system columns.

Additionally, the buffer backing the `ArrayList` would first need to be resized to accomodate the new element, resulting in an allocation and another array-copy op.

Here's the transformer code from `SourceBuilder` and details of the operations happening:

```java
@OverRide
public GenericRow transform(final K key, final GenericRow row) {
  if (row == null) {
    return row;
  }

  // The call below:
  //   - allocates a new buffer of <current-size> + 1 to hold the new element
  //   - performs an array copy to move all existing elements down one slot
  //   - inserts ROWTIME at the start of the list.
  row.getColumns().add(0, processorContext.timestamp());

  // The call below:
  //   - allocates a new buffer of <current-size> + <num-key-columns> to hold the new elements
  //   - performs an array copy to move all the existing elements, except ROWTIME, down by <num-key-columns> slots
  //   - inserts the key columns after ROWTIME.
  row.getColumns().addAll(1, rowKeyGenerator.apply(key));
  return row;
}
```

As you can see this results in two new allocations and two array copies. Given this is in the main processing loop these aren't cheap and are easily avoided.

The new code does this:

```java
@OverRide
public GenericRow transform(final K key, final GenericRow row) {
    if (row == null) {
        return row;
    }

    final long timestamp = processorContext.timestamp();
    final Collection<?> keyColumns = rowKeyGenerator.apply(key);

    // Ensures capacity is large enough for all inserts
    row.ensureAdditionalCapacity(1 + keyColumns.size());
    // Appends ROWTIME at the end of the list
    row.getColumns().add(timestamp);
    // Appends key columns at the end of the list
    row.getColumns().addAll(keyColumns);
    return row;
}
```

This has zero-or-one allocations, (see below for why its generally zero), and not unnecessary array copies.

To avoid the allocation of a new buffer when the transformer calls `row.ensureAdditionalCapacity(1 + keyColumns.size());` there is a change in `GenericRowSerDe` to create the initial `GenericRow` with enough spare capacity to hold `ROWTIME`, `ROWKEY`, `WINDOWSTART` and `WINDOWEND`.

BREAKING CHANGE: the order of columns for internal topics has changed. The `DELIMITED` format can not handle this in a backwards compatible way. Hence this is a breaking change for any existing queries the use the `DELIMITED` format and have internal topics.

This change has been made now for two reasons:
 1. its a breaking change, making it much harder to do later.
 2. The recent  confluentinc#4404 change introduced this same issue for pull queries. This current change corrects pull queries too.

### How to review

There's actually not much code changed, it's mainly tests:

1. `LogicalSchema.withMetaAndKeysInValue`
@big-andy-coates big-andy-coates mentioned this pull request Jan 30, 2020
2 tasks
big-andy-coates added a commit that referenced this pull request Feb 4, 2020
* refactor: re-order value schema used in Streams processing

Previously, when copying the meta and key columns into the value schema to enable processing via a Streams topology the columns where copied to the _start_ of the value schema. For example, given a schema of:

```sql
ROWKEY INT KEY, VAL0 STRING
```

The schema used during processing would of been:

```sql
ROWKEY INT KEY, ROWTIME BIGINT, ROWKEY INT, VAL0 STRING
```

Note: `ROWTIME` and `ROWKEY` columns are _before_ the existing value columns.

This meant the transformer in `SourceBuilder` had to add the value for `ROWTIME` and `ROWKEY`, (and now `WINDOWSTART` and `WINDOWEND` for windowed sources), to the _start_ of the list of values in the `GenericRow`.  Internally, `GenericRow` was using an `ArrayList`. Inserting at the start of an `ArrayList` requires all the existing elements to be shuffled down to make room for the new element, hence processing incurred an array copy operation when inserting each of these system columns.

Additionally, the buffer backing the `ArrayList` would first need to be resized to accomodate the new element, resulting in an allocation and another array-copy op.

Here's the transformer code from `SourceBuilder` and details of the operations happening:

```java
@OverRide
public GenericRow transform(final K key, final GenericRow row) {
  if (row == null) {
    return row;
  }

  // The call below:
  //   - allocates a new buffer of <current-size> + 1 to hold the new element
  //   - performs an array copy to move all existing elements down one slot
  //   - inserts ROWTIME at the start of the list.
  row.getColumns().add(0, processorContext.timestamp());

  // The call below:
  //   - allocates a new buffer of <current-size> + <num-key-columns> to hold the new elements
  //   - performs an array copy to move all the existing elements, except ROWTIME, down by <num-key-columns> slots
  //   - inserts the key columns after ROWTIME.
  row.getColumns().addAll(1, rowKeyGenerator.apply(key));
  return row;
}
```

As you can see this results in two new allocations and two array copies. Given this is in the main processing loop these aren't cheap and are easily avoided.

The new code does this:

```java
@OverRide
public GenericRow transform(final K key, final GenericRow row) {
    if (row == null) {
        return row;
    }

    final long timestamp = processorContext.timestamp();
    final Collection<?> keyColumns = rowKeyGenerator.apply(key);

    // Ensures capacity is large enough for all inserts
    row.ensureAdditionalCapacity(1 + keyColumns.size());
    // Appends ROWTIME at the end of the list
    row.getColumns().add(timestamp);
    // Appends key columns at the end of the list
    row.getColumns().addAll(keyColumns);
    return row;
}
```

This has zero-or-one allocations, (see below for why its generally zero), and not unnecessary array copies.

To avoid the allocation of a new buffer when the transformer calls `row.ensureAdditionalCapacity(1 + keyColumns.size());` there is a change in `GenericRowSerDe` to create the initial `GenericRow` with enough spare capacity to hold `ROWTIME`, `ROWKEY`, `WINDOWSTART` and `WINDOWEND`.

BREAKING CHANGE: the order of columns for internal topics has changed. The `DELIMITED` format can not handle this in a backwards compatible way. Hence this is a breaking change for any existing queries the use the `DELIMITED` format and have internal topics.

This change has been made now for two reasons:
 1. its a breaking change, making it much harder to do later.
 2. The recent  #4404 change introduced this same issue for pull queries. This current change corrects pull queries too.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants