Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds a new RoutingFilter, FreshnessFilter that looks at offset lags #4

Open
wants to merge 17 commits into
base: confluent-ha-routing
Choose a base branch
from

Conversation

AlanConfluent
Copy link

Description

What behavior do you want to change, why, how does your patch achieve the changes?

Testing done

Describe the testing strategy. Unit and integration tests are expected for any behavior changes.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

rodesai and others added 2 commits February 5, 2020 09:01
* chore: drop ColumnRef

this patch drops ColumnRef in favor of just using ColumnName.
ColumnRef became pointless once we dropped qualifiers from the schema
@AlanConfluent AlanConfluent changed the base branch from master to confluent-ha-routing February 5, 2020 23:39
Copy link

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some cosmetics comments.. But overall looks good!

Very clever use of a factory to contain all the random options. :)

/**
* These are options used for locating the host to retrieve data from.
*/
public interface RoutingOptions {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we consolidate other routing options like ; should route to standbys into this itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object is useful for query time options more than server options. Maybe I can change the name?

Which other flags do you mean? It would required adding this to many call-site parameters if I were to do that for something like KSQL_QUERY_PULL_ENABLE_STANDBY_READS. (That has more to do with state stores than routing, per se).

@@ -127,7 +129,8 @@ KsLocator create(
String stateStoreName,
KafkaStreams kafkaStreams,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too bad, we can't get the applicationId from this kafkaStreams instance. do you think we should push for that?

Copy link
Author

@AlanConfluent AlanConfluent Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice. It makes sense for a KafkaStreams instance to have. Does KafkaStreams currently have any notion of an id? Many setups probably don't have more than one or two applications at a time.

public static Optional<FreshnessFilter> create(
final Optional<LagReportingAgent> lagReportingAgent,
final RoutingOptions routingOptions,
final List<KsqlHost> hosts,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC.. we have a stream already in KsLocator and we make it a list and pass it in? we seem to be recreating a stream out of it anyway.. So change hosts to Stream<KsqlHost>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only process a stream once, so I have to stick with a list here. I removed the stream in KsLocator anyway and just produce a single list.

@vinothchandar
Copy link

@AlanConfluent oh and just making sure, we will be adding some HostRanker interface to rank the hosts based on most to least caught up. correct?

big-andy-coates and others added 2 commits February 6, 2020 16:10
…confluentinc#4450)

KSQL currently lets you take a non-windowed stream and perform a windowed group by:

```sql
CREATE TABLE T as SELECT stuff FROM S WINDOW TUMBLING (SIZE 1 SECOND) group by something;
```

Which is essentially grouping by not just `something`, but also implicitly by the window bounds.

This might be more correctly written with a Tumbling table function:

```sql
CREATE TABLE T as SELECT stuff FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;
```

Where the Tumbling table function returns one row for each row in `S`, with the addition of the `windowstart` and `windowend` columns.   (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions).

In a correct SQL model `windowstart` and `windowend` would therefore be available as fields within the selection, e.g.

```
CREATE TABLE T as SELECT windowstart, windowend, something, count() FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;
```

This change makes such awesomeness possible.
@AlanConfluent
Copy link
Author

@AlanConfluent oh and just making sure, we will be adding some HostRanker interface to rank the hosts based on most to least caught up. correct?

Yes, that's right. After filtering, ranking would occur. Though, as we mentioned, with active always first, regardless.

@AlanConfluent AlanConfluent changed the base branch from confluent-ha-routing to master February 6, 2020 22:15
@AlanConfluent AlanConfluent changed the base branch from master to confluent-ha-routing February 6, 2020 22:15
vpapavas and others added 10 commits February 6, 2020 15:09
* feat: remove WindowStart() and WindowEnd() UDAFs

These two UDAFs were introduced to allow access to the start and end times of the window in a windowed source. `WINDOWSTART` and `WINDOWEND` are now accessible as columns to be used in the SELECT of a query, (outside of UDAFs).  This makes the two UDAFs redundant.

BREAKING CHANGE: The `WindowStart()` and `WindowEnd()` UDAFs have been removed from KSQL. Use the `WindowStart` and `WindowEnd` system columns to access the window bounds within the SELECT expression instead.
* test: add test to prove confluentinc#596 is fixed

fixes: confluentinc#596

Functionality to join on an expression was added previously.
* feat: primitive key support

ksqlDB now supports the following primitive key types: `INT`, `BIGINT`, `DOUBLE` as well as the existing `STRING` type.

The key type can be defined in the CREATE TABLE or CREATE STREAM statement by including a column definition for `ROWKEY` in the form `ROWKEY <primitive-key-type> KEY,`, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT KEY, NAME STRING, RATING DOUBLE) WITH (kafka_topic='users', VALUE_FORMAT='json');
```

ksqlDB currently requires the name of the key column to be `ROWKEY`. Support for arbitrary key names is tracked by confluentinc#3536.

ksqlDB currently requires keys to use the `KAFKA` format. Support for additional formats is tracked by https://github.com/confluentinc/ksql/projects/3.

Schema inference currently only works with `STRING` keys, Support for additional key types is tracked by confluentinc#4462. (Schema inference is where ksqlDB infers the schema of a CREATE TABLE and CREATE STREAM statements from the schema registered in the Schema Registry, as opposed to the user supplying the set of columns in the statement).

Apache Kafka Connect can be configured to output keys in the `KAFKA` format by using a Converter, e.g. `"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"`. Details of which converter to use for which key type can be found here: https://docs.confluent.io/current/ksql/docs/developer-guide/serialization.html#kafka in the `Connect Converter` column.

@rmoff has written an introductory blog about primitive keys: https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/

BREAKING CHANGE: existing queries that perform a PARTITION BY or GROUP BY on a single column of one of the above supported primitive key types will now set the key to the appropriate type, not a `STRING` as previously.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants