-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add consumer offsets to DESCRIBE EXTENDED #5476
feat: Add consumer offsets to DESCRIBE EXTENDED #5476
Conversation
It looks like @jeqo hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
Current output: ksql> describe extended orders;
Name : ORDERS
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : test_topic (partitions: 12, replication: 1)
Statement : CREATE STREAM orders (
ROWKEY INT KEY,
ORDERUNITS double
)
WITH (
kafka_topic='test_topic',
partitions=12,
replicas=1,
value_format='JSON'
);
Field | Type
ROWKEY | INTEGER (key)
ORDERUNITS | DOUBLE
Queries that read from this STREAM
-----------------------------------
CSAS_S1_0 (RUNNING) : CREATE STREAM S1 WITH (KAFKA_TOPIC='S1', PARTITIONS=12, REPLICAS=1) AS SELECT ORDERS.ROWKEY ROWKEY, ORDERS.ORDERUNITS ORDERUNITS, (CASE WHEN (ORDERS.ORDERUNITS < 2.0) THEN 'small' WHEN (ORDERS.ORDERUNITS < 4.0) THEN 'medium' ELSE 'large' END) CASE_RESULT FROM ORDERS ORDERS EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic test_topic)
Consumer Group : _confluent-ksql-default_query_CSAS_S1_0
Kafka topic : test_topic
Partition | Start Offset | End Offset | Offset | Lag
0 | 0 | 8 | 8 | 0
1 | 0 | 0 | 0 | 0
2 | 0 | 0 | 0 | 0
3 | 0 | 0 | 0 | 0
4 | 0 | 2 | 2 | 0
5 | 0 | 0 | 0 | 0
6 | 0 | 0 | 0 | 0
7 | 0 | 0 | 0 | 0
8 | 0 | 0 | 0 | 0
9 | 0 | 0 | 0 | 0
10 | 0 | 0 | 0 | 0
11 | 0 | 0 | 0 | 0 |
31548ac
to
257c4d0
Compare
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jeqo! This is a really good first stab at solving this, and I think it'll be a really useful tool to have when looking at ksqlDB applications operationally. I didn't have time to give this a full review, but I figured I give you some first round comments in the meantime.
ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
d4845b4
to
14eb7a7
Compare
[clabot:check] |
@confluentinc It looks like @jeqo just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly LGTM, sorry for the delayed review cycles! a few comments still left
ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java
Outdated
Show resolved
Hide resolved
46263e2
to
f628d07
Compare
And then a small enhancement request, if you don't mind, to make this much more useful when there are LARGE numbers of partitions... Can we add a 'max lag' summary at the top please? That way if you have 100 or 1000 partitions, you don't need to scan down to find the worst lag: you can just look at this value. Of course, you'd still need to scan down to find which partition was lagging, but at least you can quickly see how badly, or not, the group is lagging. For example:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jeqo
Awesome that you've created this PR - very much appreciated. I've added some comments, nits and suggestions below.
As it stands, I'm not sure the design is quite right. I'm not sure if the relationship between topic and consumer groups are right. If I'm reading the code correctly, (apologies, I've not had time to pull it locally and debug to check), then it's comparing the partition offsets of a source's sink topic with the consumer group offsets of queries that write into the source. I believe this is wrong.
Let's look at an example to work it through:
CREATE STREAM S1 (... some columns... )
WITH (kafka_topic='s1', ...);
CREATE STREAM S2 (... some columns... )
WITH (kafka_topic='s2', ...);
CREATE STREAM OUTPUT WITH (
kafka_topic='op'
) AS
SELECT *
FROM S1
JOIN S2 WITHIN 10 SECONDS ON S1.id = S2.id;
Then we run:
DESCRIBE EXTENDED OUTPUT;
This means:
- our
DataSource
isOUTPUT
. - it's
kafkaTopicName
isop
. - and it has a single
RunningQuery
.
Importantly, the consumer group of the RunningQuery
will be consuming from topics s1
and s2
, and NOT op
.
So we can not compare the offsets of the consumer group with the offsets of op
. We must compare the offsets of the consumer group with the offsets of the topic-partitions the consumer group is consuming!
If we add in a second query:
CREATE STREAM S3 (... some columns... )
WITH (kafka_topic='s3', ...);
INSET INTO OUTPUT
SELECT * FROM S3;
Now there will be two RunningQuery
s for OUTPUT
. The second consumer group will be consuming from s3
only.
So we need to change the logic to compare offsets of the right topics. I would suggest the following logic:
- Iterate over all
RunningQuery
a. calculate queryApplicationId
b. describe the consumer group and get the map ofTopicPartition
-> offset. - Once you've iterated over all running queries you can:
a. make a single request to get the earliest offset of ALL topic partitions we're interested in, i.e. all partitions ofs1
,s2
ands3
from the example above.
b. make a second single request to get the latest offsets of all the tps.
This way, we minimise calls to the brokers.
ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerGroupOffset.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerGroupOffsets.java
Outdated
Show resolved
Hide resolved
@big-andy-coates It was a bit confusing at the beginning to see how the concepts fit together internally, but your example makes it so much clearer, thanks for this! I'd expect to go through this around next week. |
d7de0de
to
8c33496
Compare
Output starts to look better:
Will go through the specifics later. |
d795bec
to
c681551
Compare
ksqldb-common/src/main/java/io/confluent/ksql/util/QueryApplicationId.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java
Outdated
Show resolved
Hide resolved
b5cda02
to
4b3964d
Compare
ac636f8
to
086c4b0
Compare
40fbd3f
to
6f213a8
Compare
@big-andy-coates I managed to cover the new requirements, take a look to the output:
and when topics are empty for a consumer group, the message you recommend is printed. |
Great work @jeqo - thanks for the contribution! |
Extend the `DESCRIBE EXTENDED` output to include information about the consumer groups used to populate the source, if any. For each query populating the source, the consumer group and topic information is shown. This information allows you to see the topic's start and end offset, and the consumer group's lasts committed offset and associated lag, for each partition the consumer group is consuming from. Example new output: ``` Consumer Groups summary: Consumer Group : _confluent-ksql-default_query_CSAS_OUTPUT_0 Kafka topic : S1 Max lag : 0 Partition | Start Offset | End Offset | Offset | Lag 0 | 0 | 0 | 0 | 0 1 | 0 | 0 | 0 | 0 2 | 0 | 0 | 0 | 0 3 | 0 | 3 | 3 | 0 4 | 0 | 0 | 0 | 0 5 | 0 | 0 | 0 | 0 6 | 0 | 0 | 0 | 0 7 | 0 | 0 | 0 | 0 8 | 0 | 0 | 0 | 0 9 | 0 | 0 | 0 | 0 10 | 0 | 0 | 0 | 0 11 | 0 | 0 | 0 | 0 Kafka topic : S2 Max lag : 0 Partition | Start Offset | End Offset | Offset | Lag 0 | 0 | 0 | 0 | 0 1 | 0 | 0 | 0 | 0 2 | 0 | 0 | 0 | 0 3 | 0 | 3 | 3 | 0 4 | 0 | 0 | 0 | 0 5 | 0 | 0 | 0 | 0 6 | 0 | 0 | 0 | 0 7 | 0 | 0 | 0 | 0 8 | 0 | 0 | 0 | 0 9 | 0 | 0 | 0 | 0 10 | 0 | 0 | 0 | 0 11 | 0 | 0 | 0 | 0 Consumer Group : _confluent-ksql-default_query_INSERTQUERY_7 Kafka topic : S3 Max lag : 0 Partition | Start Offset | End Offset | Offset | Lag 0 | 0 | 3 | 3 | 0 ``` Co-authored-by: Andy Coates <[email protected]>
Description
Fix #3604
Testing done
Reviewer checklist
Missing functionality
groupId
from Source