Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return and accept command topic offsets via REST API #2159

Merged
merged 18 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/developer-guide/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho
:json string ksql: A semicolon-delimited sequence of KSQL statements to run.
:json map streamsProperties: Property overrides to run the statements with. Refer to the :ref:`Config Reference <ksql-param-reference>` for details on properties that can be set.
:json string streamsProperties[``property-name``]: The value of the property named by ``property-name``. Both the value and ``property-name`` should be strings.
:json long commandSequenceNumber: Optional. If specified, the statements will not be run until all existing commands up through the specified sequence number have completed. If unspecified, the statements will be run immediately.
vcrfxia marked this conversation as resolved.
Show resolved Hide resolved

The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement.

Expand All @@ -112,6 +113,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho
:>json string commandId: A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
:>json string commandStatus.status: One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
:>json string commandStatus.message: Detailed message regarding the status of the execution statement.
:>json long commandSequenceNumber: The sequence number of the requested operation in the command queue.

**LIST STREAMS, SHOW STREAMS**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.TestTerminal;
import io.confluent.ksql.cli.console.Console.NoOpRowCaptor;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.EntityQueryId;
import io.confluent.ksql.rest.entity.ExecutionPlan;
Expand All @@ -49,6 +50,7 @@
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.rest.server.computation.CommandId;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.SchemaUtil;
Expand Down Expand Up @@ -127,7 +129,11 @@ public void testPrintKSqlEntityList() throws IOException {

for (int i = 0; i < 5; i++) {
final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new CommandStatusEntity("e", "topic/1/create", "SUCCESS", "Success Message"),
new CommandStatusEntity(
"e",
CommandId.fromString("topic/1/create"),
new CommandStatus(CommandStatus.Status.SUCCESS, "Success Message"),
0),
new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList()),
new Queries("e", queries),
new SourceDescriptionEntity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public RestResponse<ServerInfo> getServerInfo() {
}

public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to extend the APIs here to use the offset mechanism. Fine to add that in a follow-up though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will do in a follow-up.

final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
return postRequest("ksql", jsonRequest, Optional.empty(), true,
r -> r.readEntity(KsqlEntityList.class));
}
Expand All @@ -146,13 +146,13 @@ public RestResponse<CommandStatus> makeStatusRequest(final String commandId) {
}

public RestResponse<QueryStream> makeQueryRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
final Optional<Integer> readTimeoutMs = Optional.of(QueryStream.READ_TIMEOUT_MS);
return postRequest("query", jsonRequest, readTimeoutMs, false, QueryStream::new);
}

public RestResponse<InputStream> makePrintTopicRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
return postRequest("query", jsonRequest, Optional.empty(), false,
r -> (InputStream) r.getEntity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.ksql.rest.server.computation.CommandId;
import java.util.Map;
Expand All @@ -27,38 +28,34 @@
public class CommandStatusEntity extends KsqlEntity {
private final CommandId commandId;
private final CommandStatus commandStatus;
private final long commandSequenceNumber;

public CommandStatusEntity(
final String statementText,
final CommandId commandId,
final CommandStatus commandStatus
final CommandStatus commandStatus,
final long commandSequenceNumber
) {
super(statementText);
this.commandId = commandId;
this.commandStatus = commandStatus;
this.commandSequenceNumber = commandSequenceNumber;
}

@JsonCreator
public CommandStatusEntity(
final String statementText,
final String commandId,
final String status,
final String message
@JsonProperty("statementText") final String statementText,
@JsonProperty("commandId") final String commandId,
@JsonProperty("commandStatus") final Map<String, Object> commandStatus,
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
@JsonProperty("commandSequenceNumber") final Long commandSequenceNumber
) {
this(
statementText,
CommandId.fromString(commandId),
new CommandStatus(CommandStatus.Status.valueOf(status), message)
);
}

@SuppressWarnings("unchecked") // needs investigating
@JsonCreator
public CommandStatusEntity(final Map<String, Object> properties) {
this(
(String) properties.get("statementText"),
(String) properties.get("commandId"),
(String) ((Map<String, Object>) properties.get("commandStatus")).get("status"),
(String) ((Map<String, Object>) properties.get("commandStatus")).get("message")
new CommandStatus(
CommandStatus.Status.valueOf((String) commandStatus.get("status")),
(String) commandStatus.get("message")),
commandSequenceNumber == null ? -1 : commandSequenceNumber
);
}

Expand All @@ -71,6 +68,10 @@ public CommandStatus getCommandStatus() {
return commandStatus;
}

public long getCommandSequenceNumber() {
return commandSequenceNumber;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -80,20 +81,22 @@ public boolean equals(final Object o) {
return false;
}
final CommandStatusEntity that = (CommandStatusEntity) o;
return Objects.equals(getCommandId(), that.getCommandId())
&& Objects.equals(getCommandStatus(), that.getCommandStatus());
return Objects.equals(commandId, that.commandId)
&& Objects.equals(commandStatus, that.commandStatus)
&& (commandSequenceNumber == that.commandSequenceNumber);
}

@Override
public int hashCode() {
return Objects.hash(getCommandId(), getCommandStatus());
return Objects.hash(getCommandId(), getCommandStatus(), getCommandSequenceNumber());
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString() {
return "CommandStatusEntity{"
+ "commandId=" + commandId
+ ", commandStatus=" + commandStatus
+ ", commandSequenceNumber=" + commandSequenceNumber
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonSubTypes({})
Expand All @@ -35,16 +36,19 @@ public class KsqlRequest {

private final String ksql;
private final Map<String, Object> streamsProperties;
private final Optional<Long> commandSequenceNumber;

@JsonCreator
public KsqlRequest(
@JsonProperty("ksql") final String ksql,
@JsonProperty("streamsProperties") final Map<String, Object> streamsProperties
@JsonProperty("streamsProperties") final Map<String, Object> streamsProperties,
@JsonProperty("commandSequenceNumber") final Long commandSequenceNumber
) {
this.ksql = ksql == null ? "" : ksql;
this.streamsProperties = streamsProperties == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(streamsProperties));
this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber);
}

public String getKsql() {
Expand All @@ -55,6 +59,10 @@ public Map<String, Object> getStreamsProperties() {
return coerceTypes(streamsProperties);
}

public Optional<Long> getCommandSequenceNumber() {
return commandSequenceNumber;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -67,12 +75,13 @@ public boolean equals(final Object o) {

final KsqlRequest that = (KsqlRequest) o;
return Objects.equals(getKsql(), that.getKsql())
&& Objects.equals(getStreamsProperties(), that.getStreamsProperties());
&& Objects.equals(getStreamsProperties(), that.getStreamsProperties())
&& Objects.equals(getCommandSequenceNumber(), that.getCommandSequenceNumber());
}

@Override
public int hashCode() {
return Objects.hash(getKsql(), getStreamsProperties());
return Objects.hash(getKsql(), getStreamsProperties(), getCommandSequenceNumber());
}

private static Map<String, Object> coerceTypes(final Map<String, Object> streamsProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
JsonMapper.INSTANCE.mapper,
statementParser,
ksqlEngine,
commandRunner.getCommandStore(),
exec,
versionCheckerAgent::updateLastRequestTime
versionCheckerAgent::updateLastRequestTime,
config.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
);
}

Expand Down Expand Up @@ -371,6 +373,7 @@ public static KsqlRestApplication buildApplication(
ksqlConfig,
ksqlEngine,
statementParser,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
versionChecker::updateLastRequestTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public void close() {
}
}

public ReplayableCommandQueue getCommandStore() {
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
return commandStore;
}

void fetchAndRunCommands() {
final List<QueuedCommand> commands = commandStore.getNewCommands();
log.trace("Found {} new writes to command topic", commands.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.entity.CommandStatus;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CommandStatusFuture {
private static final CommandStatus INITIAL_STATUS = new CommandStatus(
CommandStatus.Status.QUEUED, "Statement written to command topic");

private final CommandId commandId;
private volatile CommandStatus commandStatus;
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
private final CompletableFuture<CommandStatus> future;
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved

public CommandStatusFuture(final CommandId commandId) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor (and the class itself) only need to be public for test purposes (specifically, in KsqlResourceTest), which feels weird to me, but I couldn't find a way around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the test is from another package then you can add a test utility to create instances for you e.g.

package io.confluent.ksql.rest.server.computation

public final class CommandStatusFutureTestUtil { 
   ...

   public static CommandStatusFuture create(final commandId commandId) {
      return new CommandStatusFuture(commandId); // <- can access as in same package.
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Do you think adding a test util into this package is preferable to leaving the constructor (and class) public? I've updated the other methods in the class to be package-private, so even if someone creates an instance of CommandStatusFuture, they can't do anything with it, which makes me think maybe it's not too bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this class... I don't particularly see an issue with the only constructor being public.

this.commandId = Objects.requireNonNull(commandId, "commandId cannot be null");
this.commandStatus = INITIAL_STATUS;
this.future = new CompletableFuture<>();
}

public CommandId getCommandId() {
return commandId;
}

public CommandStatus getStatus() {
return commandStatus;
}

public CommandStatus tryWaitForFinalStatus(final Duration timeout) throws InterruptedException {
try {
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
throw new RuntimeException("Error executing command " + commandId, e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't wrap in RuntimeException if already one

Suggested change
throw new RuntimeException("Error executing command " + commandId, e.getCause());
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
}
throw new RuntimeException("Error executing command " + commandId, e.getCause());

} catch (final TimeoutException e) {
return commandStatus;
}
}

public void setStatus(final CommandStatus status) {
this.commandStatus = Objects.requireNonNull(status);
}

public void setFinalStatus(final CommandStatus status) {
setStatus(status);
future.complete(status);
}
}
Loading