-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return and accept command topic offsets via REST API #2159
Changes from 4 commits
c3a70c9
7241858
baffaf8
8890ad9
838e102
4606a96
fcc4a82
b343130
e9af7fc
d89059e
44f5b28
3463a22
c516723
8e15a75
64fca27
c0f9886
aa2d38a
807cbd5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,67 @@ | ||||||||||||
/* | ||||||||||||
* Copyright 2018 Confluent Inc. | ||||||||||||
* | ||||||||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||
* you may not use this file except in compliance with the License. | ||||||||||||
* You may obtain a copy of the License at | ||||||||||||
* | ||||||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
* | ||||||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||
* See the License for the specific language governing permissions and | ||||||||||||
* limitations under the License. | ||||||||||||
*/ | ||||||||||||
|
||||||||||||
package io.confluent.ksql.rest.server.computation; | ||||||||||||
|
||||||||||||
import io.confluent.ksql.rest.entity.CommandStatus; | ||||||||||||
import java.time.Duration; | ||||||||||||
import java.util.Objects; | ||||||||||||
import java.util.concurrent.CompletableFuture; | ||||||||||||
import java.util.concurrent.ExecutionException; | ||||||||||||
import java.util.concurrent.TimeUnit; | ||||||||||||
import java.util.concurrent.TimeoutException; | ||||||||||||
|
||||||||||||
public class CommandStatusFuture { | ||||||||||||
private static final CommandStatus INITIAL_STATUS = new CommandStatus( | ||||||||||||
CommandStatus.Status.QUEUED, "Statement written to command topic"); | ||||||||||||
|
||||||||||||
private final CommandId commandId; | ||||||||||||
private volatile CommandStatus commandStatus; | ||||||||||||
big-andy-coates marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
private final CompletableFuture<CommandStatus> future; | ||||||||||||
big-andy-coates marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
||||||||||||
public CommandStatusFuture(final CommandId commandId) { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This constructor (and the class itself) only need to be public for test purposes (specifically, in KsqlResourceTest), which feels weird to me, but I couldn't find a way around it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the test is from another package then you can add a test utility to create instances for you e.g. package io.confluent.ksql.rest.server.computation
public final class CommandStatusFutureTestUtil {
...
public static CommandStatusFuture create(final commandId commandId) {
return new CommandStatusFuture(commandId); // <- can access as in same package.
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know. Do you think adding a test util into this package is preferable to leaving the constructor (and class) public? I've updated the other methods in the class to be package-private, so even if someone creates an instance of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this class... I don't particularly see an issue with the only constructor being public. |
||||||||||||
this.commandId = Objects.requireNonNull(commandId, "commandId cannot be null"); | ||||||||||||
this.commandStatus = INITIAL_STATUS; | ||||||||||||
this.future = new CompletableFuture<>(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
public CommandId getCommandId() { | ||||||||||||
return commandId; | ||||||||||||
} | ||||||||||||
|
||||||||||||
public CommandStatus getStatus() { | ||||||||||||
return commandStatus; | ||||||||||||
} | ||||||||||||
|
||||||||||||
public CommandStatus tryWaitForFinalStatus(final Duration timeout) throws InterruptedException { | ||||||||||||
try { | ||||||||||||
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); | ||||||||||||
} catch (final ExecutionException e) { | ||||||||||||
throw new RuntimeException("Error executing command " + commandId, e.getCause()); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: don't wrap in
Suggested change
|
||||||||||||
} catch (final TimeoutException e) { | ||||||||||||
return commandStatus; | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
public void setStatus(final CommandStatus status) { | ||||||||||||
this.commandStatus = Objects.requireNonNull(status); | ||||||||||||
} | ||||||||||||
|
||||||||||||
public void setFinalStatus(final CommandStatus status) { | ||||||||||||
setStatus(status); | ||||||||||||
future.complete(status); | ||||||||||||
} | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to extend the APIs here to use the offset mechanism. Fine to add that in a follow-up though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will do in a follow-up.