-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return and accept command topic offsets via REST API #2159
Conversation
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! Took a pass through and left some comments. There's an accompanying set of changes we need to make to use this functionality from the CLI to give a better experience within a session. But we can get to that in a follow-up.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommandStatus.java
Show resolved
Hide resolved
@@ -67,6 +75,7 @@ public CommandStore( | |||
this.commandProducer = commandProducer; | |||
this.commandIdAssigner = commandIdAssigner; | |||
this.commandStatusMap = Maps.newConcurrentMap(); | |||
this.commandOffsetFutures = new ArrayList<>(); | |||
|
|||
commandConsumer.assign(Collections.singleton(new TopicPartition(commandTopic, 0))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can keep this TopicPartition in a member variable and use it when checking the current position. That way we keep the hard-coding of the usage of partition 0 only in 1 place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
throw new RuntimeException( | ||
"Interrupted while waiting for command offset of " + String.valueOf(offset), e); | ||
} catch (final TimeoutException e) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By throwing RuntimeException here, we will return a generic 500 error. We should return a specific error class for this condition, with the appropriate HTTP code.
I'd say the appropriate error code would be a 503 Service Unavailable.
We also need to return a specific error indicating that we timed out waiting for the server to catch up on the command topic. Check out io.confluent.ksql.rest.server.resources.Errors. You can define a new error code like:
public static final int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = toErrorCode(toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 1);
public static Response commandQueueCatchupTimeout(final String msg) {
return Response
.status(SERVICE_UNAVAILABLE)
.entity(new KsqlErrorMessage(ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT, msg))
.build();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for the pointers!
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldNotWaitIfCommandTopicOffsetReached() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is a bit of a misleading name, no? The test is actually validating that we do wait until the offset is reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test has changed as result of one of the refactors suggested above, hopefully taking the confusion with it.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia. This is looking good. There are a lot of comments below, but there's actually nothing majorly wrong with what's there already. It's mainly just suggestions and nits.
However, I would suggest two main things:
- think more about the status codes being returned by the rest endpoint. 500 INTERNAL_SERVER_ERROR is not appropriate for a timeout, IMHO.
- try to avoid 'offset' in the public facing APIs for this. Currently this is implementing using the offset of the Kafka topic. But that's an implementation detail and might change. Better to call it something more generic. Maybe
commandId
, (though that's currently used else where for something else), orcommandNumber
or even justcorrelationId
. I'm talking mostly about the fields inKsqlRequest
andCommandStatusEntity
, but its worth having this flow down all the way to theBlahCommandStore
interface and having only theCommandStore
impl know its an offset. This way, a different implementation ofCommandStore
could implement it some other way.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
...app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java
Show resolved
Hide resolved
I wonder if this could be done by using a similar mechanism to what https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java does? /cc @ewencp i.e., the command topic writer doesn't write to the topic until the reader has read up to the latest offset. Then we wouldn't need to return an offset that subsequently needs to sent with the next query |
I think this makes sense as well. More modern version with new clients is in Connect. Connect has an (internal) utility class that helps manage this (and link goes to particularly relevant part re: end offsets): https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L268 |
@dguy @ewencp what you're describing is a different flavor of the same basic mechanism - it's just that the offset to wait on is inferred to be the current end offset. The thinking behind allowing the client to specify the offset was that a smart client could still partially function even if the cluster is not able to progress on the command topic (e.g. if it can't talk to the schema registry). Any statement that doesn't modify the meta-store could still be run as long as consumption from the command topic is sufficiently far along. Maybe a premature optimization, but it's a pretty small increment over the basic functionality on the server side. Having the client use it correctly is more challenging. One thing we could do is to accept a designated value (e.g. Long.MAX_LONG) that tells the server to wait until the current end offset. This can just be the default going forward. For now the cli can make a best effort by trying with this value a few times before asking the server not to wait (by omitting the offset) and adding a warning to the output. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai and @big-andy-coates for the very helpful reviews! Made the suggested changes and left a few comments / questions. Notable changes include:
-
Waiting on futures (and associated error handling) now occurs in CommandStore, rather than having CommandStore return a future that the caller then waits on
-
QueuedCommandStatus has been split into two classes, QueuedCommandStatus and SequenceNumberFutureStore, to avoid the need for status and command offset setters
-
Error code on timeout has been updated from 500 INTERNAL_SERVER_ERROR to 503 SERVICE_UNAVAILABLE
-
Added support for requests with offsets to StreamedQueryResource and WSQueryEndpoint (in addition to KsqlResource)
-
Renamed “commandOffset” to “commandSequenceNumber”
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java
Show resolved
Hide resolved
private volatile CommandStatus commandStatus; | ||
private final CompletableFuture<CommandStatus> future; | ||
|
||
public CommandStatusFuture(final CommandId commandId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor (and the class itself) only need to be public for test purposes (specifically, in KsqlResourceTest), which feels weird to me, but I couldn't find a way around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the test is from another package then you can add a test utility to create instances for you e.g.
package io.confluent.ksql.rest.server.computation
public final class CommandStatusFutureTestUtil {
...
public static CommandStatusFuture create(final commandId commandId) {
return new CommandStatusFuture(commandId); // <- can access as in same package.
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. Do you think adding a test util into this package is preferable to leaving the constructor (and class) public? I've updated the other methods in the class to be package-private, so even if someone creates an instance of CommandStatusFuture
, they can't do anything with it, which makes me think maybe it's not too bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this class... I don't particularly see an issue with the only constructor being public.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java
Outdated
Show resolved
Hide resolved
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with one suggested tweak.
Co-Authored-By: vcrfxia <[email protected]>
@@ -190,6 +191,9 @@ public Response handleKsqlStatements(final KsqlRequest request) { | |||
activenessRegistrar.updateLastRequestTime(); | |||
|
|||
try { | |||
CommandStoreUtil.httpWaitForCommandSequenceNumber( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the prefix http?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CommandStoreUtil has two versions of waitForCommandSequenceNumber, waitForCommandSequenceNumber
which potentially throws a TimeoutException (and is called by WSQueryEndpoint), and httpWaitForCommandSequenceNumber
which catches the TimeoutException and wraps it into a KsqlRestException (and is called by KsqlResource and StreamedQueryResource). If this function name is confusing, we can easily rename it. Got any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't have the two versions personally. But then again, I don't think this warrants a utility class. I would just inline in WSQueryEndpoint
like:
try {
request.getCommandSequenceNumber()
.ifPresent(seqNo -> replayableCommandQueue.ensureConsumedUpThrough(seqNo, timeout));
} catch (final TimeoutException e) {
log.debug("Timed out wait for prior commands to execute. seqNo: " + seqNo, e);
SessionUtil.closeSilently(session, CloseCodes.TRY_AGAIN_LATER, e.getMessage());
}
And in KsqlResource
and StreamedQueryResource
I would move the logic into a function within those classes. There just isn't enough code here to warrant a utility class. The fact you're changing exception types indicates to me it should be in the calling class.
private void waitForPriorCommandToExecute(final KsqlRequest request) {
try {
request.getCommandSequenceNumber().ifPresent(seqNo ->
replayableCommandQueue.ensureConsumedUpThrough(seqNo, disconnectCheckInterval));
} catch (final TimeoutException e) {
throw new KsqlRestException(Errors.commandQueueCatchUpTimeout(e.getMessage()));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving this utility for now. (See explanation below.)
...est-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java
Outdated
Show resolved
Hide resolved
@@ -132,7 +132,7 @@ public void setServerAddress(final String serverAddress) { | |||
} | |||
|
|||
public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to extend the APIs here to use the offset mechanism. Fine to add that in a follow-up though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will do in a follow-up.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
...t-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java
Outdated
Show resolved
Hide resolved
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
Outdated
Show resolved
Hide resolved
// Given: | ||
givenCmdStoreUpThroughPosition(2); | ||
expect(sequenceNumberFutureStore.getFutureForSequenceNumber(EasyMock.anyLong())).andReturn(future); | ||
expect(future.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should verify that the right timeout is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. (Also cleaned up and refactored the test to Mockito while I was it.)
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Outdated
Show resolved
Hide resolved
public void ensureConsumedUpThrough(final long seqNum, final long timeout) | ||
throws TimeoutException { | ||
final long consumerPosition = getNextConsumerSequenceNumber(); | ||
if (consumerPosition > seqNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not quite what we want here. consumerPosition
is the consumer's position, which is the offset of the next record to be consumed. What we want here is the offset of the last command that we know we processed. These are not the same for the current batch of records being processed. Imagine the executor is off processing a batch of records consisting of a single record w/ offset 10. consumerPosition would be 11, so we might not wait for seqNum=10
, when we should, since we don't know if its been completed yet. So what we really want is our own bookkeeping of the latest processed command.
I also think you should move this bookkeeping and check into SequenceNumberFutureStore and add some synchronization with completeFuturesUpToSequenceNumber
. The reason is that there's a small race here where an API thread could call ensureConsumedUpThrough
and the statement executor thread could be just finishing the latest batch of statements. If it completes futures and sleeps on consume between the check and registering the future, this could timeout when it shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @rodesai ! Made the requested changes.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
@@ -132,7 +132,7 @@ public void setServerAddress(final String serverAddress) { | |||
} | |||
|
|||
public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will do in a follow-up.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java
Outdated
Show resolved
Hide resolved
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
Outdated
Show resolved
Hide resolved
...est-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java
Outdated
Show resolved
Hide resolved
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Outdated
Show resolved
Hide resolved
public void ensureConsumedUpThrough(final long seqNum, final long timeout) | ||
throws TimeoutException { | ||
final long consumerPosition = getNextConsumerSequenceNumber(); | ||
if (consumerPosition > seqNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Done.
...t-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java
Outdated
Show resolved
Hide resolved
@@ -190,6 +191,9 @@ public Response handleKsqlStatements(final KsqlRequest request) { | |||
activenessRegistrar.updateLastRequestTime(); | |||
|
|||
try { | |||
CommandStoreUtil.httpWaitForCommandSequenceNumber( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CommandStoreUtil has two versions of waitForCommandSequenceNumber, waitForCommandSequenceNumber
which potentially throws a TimeoutException (and is called by WSQueryEndpoint), and httpWaitForCommandSequenceNumber
which catches the TimeoutException and wraps it into a KsqlRestException (and is called by KsqlResource and StreamedQueryResource). If this function name is confusing, we can easily rename it. Got any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia and sorry for the review delay. Looking real close. Just a few things... ;)
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldReturn503IfTimeoutWhileWaitingForCommandSequenceNumber() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void shouldReturn503IfTimeoutWhileWaitingForCommandSequenceNumber() throws Exception { | |
public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumber() throws Exception { |
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
Outdated
Show resolved
Hide resolved
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Outdated
Show resolved
Hide resolved
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Outdated
Show resolved
Hide resolved
...est-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java
Show resolved
Hide resolved
commandQueueCatchupTimeout); | ||
} catch (final TimeoutException e) { | ||
log.debug("Timeout while processing request", e); | ||
SessionUtil.closeSilently(session, CloseCodes.TRY_AGAIN_LATER, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a return
here. There should be a test that's failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes, good catch. Added a check for this into WSQueryEndpointTest#shouldReturnErrorIfCommandQueueCatchupTimeout()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, once the in-line comment about the missing return is addressed. Thanks!
...t-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the second round of reviews, @big-andy-coates and @rodesai ! Gave it another pass.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
Outdated
Show resolved
Hide resolved
import io.confluent.ksql.rest.server.computation.CommandId; | ||
import org.junit.Test; | ||
|
||
public class CommandStatusEntityTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests checking null values in the constructor. Don't think additional serialization/deserialization tests are warranted, but let me know if you disagree.
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java
Outdated
Show resolved
Hide resolved
...rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ReplayableCommandQueue.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java
Outdated
Show resolved
Hide resolved
private volatile CommandStatus commandStatus; | ||
private final CompletableFuture<CommandStatus> future; | ||
|
||
public CommandStatusFuture(final CommandId commandId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. Do you think adding a test util into this package is preferable to leaving the constructor (and class) public? I've updated the other methods in the class to be package-private, so even if someone creates an instance of CommandStatusFuture
, they can't do anything with it, which makes me think maybe it's not too bad.
try { | ||
return finalStatusFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
} catch (final ExecutionException e) { | ||
throw new RuntimeException("Error executing command " + commandId, e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't wrap in RuntimeException
if already one
throw new RuntimeException("Error executing command " + commandId, e.getCause()); | |
if (e instanceof RuntimeException) { | |
throw (RuntimeException)e; | |
} | |
throw new RuntimeException("Error executing command " + commandId, e.getCause()); |
private volatile CommandStatus commandStatus; | ||
private final CompletableFuture<CommandStatus> future; | ||
|
||
public CommandStatusFuture(final CommandId commandId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this class... I don't particularly see an issue with the only constructor being public.
@@ -44,31 +50,53 @@ | |||
* the beginning until now, or any new messages since then), and writing to it. | |||
*/ | |||
|
|||
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you still need to turn this rule off with Hojjat's work to split the CommandTopic out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be, but I'm merging first. Will coordinate with @hjafarpour to have to it removed.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandStoreUtil.java
Show resolved
Hide resolved
...t-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia - LGTM, except ...
Thinking more on @dguy 's suggestion: #2159 (comment) and then @ewencp's and @rodesai's follow on comments...
The default behaviour, if no command sequence is provided in the request, is to do nothing special right? i.e. don't wait for any previous request to finish. This means the default behaviour can result in requests failing if a previous request, upon which it relies, hasn't been fully handled.
I think a safer default is to wait for all previous requests to have finished. Then more advanced users can use the sequence number to improve throughput for unrelated requests.
It probably makes sense to do this in a follow up PR... so created #2268 and assigned to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure @big-andy-coates , I'll look at incorporating that into my follow-up PR that will introduce that as the default behavior for the CLI.
Thanks for the review! Added your requested tests/checks. Will merge once the build passes.
...t-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java
Show resolved
Hide resolved
import io.confluent.ksql.rest.server.computation.CommandId; | ||
import org.junit.Test; | ||
|
||
public class CommandStatusEntityTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -44,31 +50,53 @@ | |||
* the beginning until now, or any new messages since then), and writing to it. | |||
*/ | |||
|
|||
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be, but I'm merging first. Will coordinate with @hjafarpour to have to it removed.
* fork/master: (107 commits) copy-edited (confluentinc#2299) copy-edited (confluentinc#2299) remove unused map from command store `getRestoreCommands` (confluentinc#2296) Speed up integration tests. (confluentinc#2288) Move KSQL Clickstream demo to Examples repo (confluentinc#2270) Bump Confluent to 5.1.1-SNAPSHOT, Kafka to 2.1.1-SNAPSHOT Set Confluent to 5.1.0, Kafka to 2.1.0-cp1. Add ServiceContext (confluentinc#2243) Update to use CCL (confluentinc#2278) Remove redundant intro sentence in README.md (confluentinc#2277) Switch to use CCL (confluentinc#2275) Update readme for CCL (confluentinc#2276) Minor: add Hamcrest matchers for KsqlRestException and KsqlErrorMessage (confluentinc#2273) Update per relicensing (confluentinc#2274) CP-584: 5.1.0 changelog (confluentinc#2267) add null checks to min, max, and count aggregates; call Math.min/max for consistency (confluentinc#2246) Return and accept command topic offsets via REST API (confluentinc#2159) MINOR: Fix bug encountered when restoring RUN SCRIPT command. (again) (confluentinc#2265) More improvements to the way the CLI handles inline comments in multi-line statements. (confluentinc#2241) Fix issue where Avro subject does not exist (confluentinc#2260) ...
Description
This PR adds functionality for users to optionally specify a command topic offset when submitting requests via the REST API. When an offset is provided with a request, the request will not be processed until the server has processed commands from the command topic up through the specified offset. In order to make this functionality meaningful, this PR also augments responses for commands that are successfully written to the command topic to include the offset of the command in the command topic.
Background and motivation: Currently, if a user uses the REST API to interact with a KSQL setup running multiple server threads, there is no guarantee that consecutive requests will hit the same server. This means a user may observe inconsistent responses, for example, if a
CREATE STREAM foo (...)
request is first received and processed by Server A, and a subsequentINSERT INTO foo (...)
request is received by Server B before Server B has fetched and processed theCREATE STREAM
command from the command topic. In this scenario, the user would receive an error message reporting that streamfoo
does not exist.This PR provides a way to remedy these inconsistencies. When the user sends the
CREATE STREAM foo (...)
request, the response will include the offset of theCREATE STREAM
command in the command topic (assuming the request was successfully parsed and validated). The user can then include this offset as part of their next request, guaranteeing that the server which receives the subsequent request will not try to parse or validate the statement until after it has processed theCREATE STREAM
.Behavior: When request with an offset specified is received, the server will wait no longer than some specified timeout for that offset to be reached before throwing a TimeoutException. This timeout is currently set to be the same as the timeout used when writing commands to the command topic (or the disconnect check interval, in the case of StreamedQueryResource), but this can be updated in a subsequent PR. When no offset is specified with a request (the default), no waiting happens, consistent with the current behavior.
Documentation: Updated documentation accordingly.
Testing done
Added unit tests.
Reviewer checklist