-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add consistency vector handling to CLI and Java client #8264
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @vpapavas ! Just added a few comments.
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
Show resolved
Hide resolved
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
Outdated
Show resolved
Hide resolved
...api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
Outdated
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/util/ConsistencyToken.java
Outdated
Show resolved
Hide resolved
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
Outdated
Show resolved
Hide resolved
...api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryRow.java
Outdated
Show resolved
Hide resolved
...-rest-app/src/main/java/io/confluent/ksql/api/server/DelimitedQueryStreamResponseWriter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java
Show resolved
Hide resolved
eb9cffd
to
612fd50
Compare
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @vpapavas , thanks for this, and I'm sorry it took me so long to review it.
I found a couple of public API "leaks" that we might want to avoid until the feature is ready. Also, it seems like the HTTP/2 (aka query-stream
) API changes are both risky and complicated. What if we just skip HTTP/2 for now and plan to add consistency as part of a future API revision?
Thanks,
-John
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java
Outdated
Show resolved
Hide resolved
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
Show resolved
Hide resolved
...-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java
Outdated
Show resolved
Hide resolved
...-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java
Outdated
Show resolved
Hide resolved
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/JsonMapper.java
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/util/ConsistencyOffsetVector.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java
Show resolved
Hide resolved
...-rest-app/src/main/java/io/confluent/ksql/api/server/DelimitedQueryStreamResponseWriter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java
Outdated
Show resolved
Hide resolved
d6544f2
to
2d98dc7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vpapavas !
One mega comment is whether relying on Java Serializable would be sufficient for any possible future evolution of the vector serde, if we ever want to remove any fields, for example. Otherwise it looks promising.
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.java
Outdated
Show resolved
Hide resolved
...-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/util/ConsistencyOffsetVector.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java
Show resolved
Hide resolved
ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java
Show resolved
Hide resolved
added consistency token to rest client and cli http2 not working added CT to queue rebased and tests pass rename consistencytoken javadoc added test for ws, made ct in client string rebase and fix conflicts merge different pr's address comments fixed test after rename fix test after rename address comments and fix
111c2b3
to
c6f25ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vpapavas !
I suspect we might want to tweak some of the serialization stuff before the next release, but it seems like this PR has accomplished what we wanted from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leaving a few comments, nothing that's a showstopper for this PR.
|
||
@VisibleForTesting | ||
public String getSerializedConsistencyVector() { | ||
return serializedConsistencyVector.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the idea of VisibleForTesting that you can keep the method package-private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not used in a test that is in the same package. The test is an integration test and uses functionality that is available in that specific package
...est/java/io/confluent/ksql/api/client/integration/ConsistencyOffsetVectorFunctionalTest.java
Outdated
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/util/ConsistencyOffsetVector.java
Show resolved
Hide resolved
if (json instanceof JsonObject) { | ||
final JsonObject jsonObject = (JsonObject) json; | ||
// This is the serialized consistency vector | ||
// Don't add it to the result list since the user should not see it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked before about supporting an older client. It seems like the server will just return the consistency token if it's enabled on the server. We should only send it down if it's enabled on the client which can be done by indicating that it's enabled in the request property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consistency token will be included only if it is enabled both in the client and the server. The requirements are
Client side:
- Set the server property
KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED
to true and Include in the request properties - Add to the request properties the actual consistency token (may be an empty string the first time a request is made)
Server side:
- The server config
KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED
must be set to true - The request properties must contain a consistency token (may be an empty string the first time a request is made)
So, only if both the client and the server know and enable the use of consistency tokens, will it be sent back and forth
...est-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java
Outdated
Show resolved
Hide resolved
|
||
public static boolean isConsistencyVectorEnabled(final Map<String, Object> requestProperties) { | ||
final Object consistencyEnabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally request properties are defined in KsqlRequestConfig
. We don't usually send KsqlConfigs up as a request property, but rather a normal property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid creating two different configs for the same thing so reused the server config in the client as well. I can change it if it is too ugly not the right way to do it
@@ -489,6 +491,18 @@ private EndpointResponse handleTablePullQuery( | |||
pullBandRateLimiter.allow(KsqlQueryType.PULL); | |||
|
|||
final Optional<Decrementer> optionalDecrementer = Optional.ofNullable(decrementer); | |||
Optional<ConsistencyOffsetVector> consistencyOffsetVector = Optional.empty(); | |||
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand, if the client doesn't send up anything, the feature is not enabled, right?
There are four main cases:
- The feature is disabled on the server
- The feature is enabled on the server, but not the client
- The feature is enabled on the server and the client, but the client has provided no consistency token
- The feature is enabled on the server and client, and the client provides a consistency token for the pull query.
This seems to cover all cases, but I just want to be sure. 2 is covered because the client indicates that the feature is enabled but no consistency token is provided by using an empty string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client needs to set the config to true and then only will the consistency token be included in the request properties. So, if it is disabled on the client, no consistency token will be added to the request properties and the server won't do anything.
The empty string is used only when it is enabled and signals an empty, not yet initialized consistency token.
@@ -136,13 +153,19 @@ public BatchedQueryResult executeQuery( | |||
final String sql, | |||
final Map<String, Object> properties | |||
) { | |||
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you meaning to check this using properties
rather than requestProperties
since it's defined in KsqlConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to properties
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) { | ||
requestProperties.put( | ||
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR, | ||
serializedConsistencyVector.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be null rather than empty string if it's been unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had it as null before but the code for JsonObject.put
throws an NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about things offline. A couple of those smaller things would be great to get in this PR, but if not a followup is good too.
Description
Add consistency vector handling to the CLI and Java clients. The usage is hidden behind a config flag that is false by default. Both the client and the server need to enable the config. The client needs to both enable the config and add the consistency vector in the request properties.
The consistency vector is added to the queue that stores the query results after the query has been completed successfully. This design is not needed for http1 as we could pass the token back to the client using the headers of the
StreamedRow
. However, this is not possible to do in http2 (at least not without heavy refactoring) since theQueryResponseMetadata
is sent back to the client before even the query executes. Moreover, the only way for the Publisher to communicate/send something to the Subscriber is through the queue. Hence, and in order to keep the design the same in both endpoints, the vector is added to the queue as the last item.The vector is serialized before being written to the wire and the clients only "see" the serialized version. In practice, the CLI does not print the vector so it is hidden from the user.
Testing done
I created the
ConsistencyOffsetVectorFunctionalTest
that tests that the consistency vector is correctly sent back and forth and also not sent when the flag is not enabled.I did manual tests for the CLI to make sure the vector is not printed but used.
With the following curl command, one can see the vector:
There is no way to avoid this since the response contains the vector and there is no client where I could remove it as I do with the CLI.
Reviewer checklist