Skip to content

Commit

Permalink
Return and accept command topic offsets via REST API (#2159)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Dec 13, 2018
1 parent c1f6db5 commit 20d6512
Show file tree
Hide file tree
Showing 29 changed files with 1,221 additions and 339 deletions.
2 changes: 2 additions & 0 deletions docs/developer-guide/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho
:json string ksql: A semicolon-delimited sequence of KSQL statements to run.
:json map streamsProperties: Property overrides to run the statements with. Refer to the :ref:`Config Reference <ksql-param-reference>` for details on properties that can be set.
:json string streamsProperties[``property-name``]: The value of the property named by ``property-name``. Both the value and ``property-name`` should be strings.
:json long commandSequenceNumber: Optional. If specified, the statements will not be run until all existing commands up to and including the specified sequence number have completed. If unspecified, the statements are run immediately. (Note: When a command is processed, the result object contains its sequence number.)

The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement.

Expand All @@ -112,6 +113,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho
:>json string commandId: A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
:>json string commandStatus.status: One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
:>json string commandStatus.message: Detailed message regarding the status of the execution statement.
:>json long commandSequenceNumber: The sequence number of the requested operation in the command queue, or -1 if the operation was unsuccessful.

**LIST STREAMS, SHOW STREAMS**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.TestTerminal;
import io.confluent.ksql.cli.console.Console.NoOpRowCaptor;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.EntityQueryId;
import io.confluent.ksql.rest.entity.ExecutionPlan;
Expand All @@ -49,6 +50,7 @@
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.rest.server.computation.CommandId;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.SchemaUtil;
Expand Down Expand Up @@ -127,7 +129,11 @@ public void testPrintKSqlEntityList() throws IOException {

for (int i = 0; i < 5; i++) {
final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new CommandStatusEntity("e", "topic/1/create", "SUCCESS", "Success Message"),
new CommandStatusEntity(
"e",
CommandId.fromString("topic/1/create"),
new CommandStatus(CommandStatus.Status.SUCCESS, "Success Message"),
0L),
new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList()),
new Queries("e", queries),
new SourceDescriptionEntity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public RestResponse<ServerInfo> getServerInfo() {
}

public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
return postRequest("ksql", jsonRequest, Optional.empty(), true,
r -> r.readEntity(KsqlEntityList.class));
}
Expand All @@ -146,13 +146,13 @@ public RestResponse<CommandStatus> makeStatusRequest(final String commandId) {
}

public RestResponse<QueryStream> makeQueryRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
final Optional<Integer> readTimeoutMs = Optional.of(QueryStream.READ_TIMEOUT_MS);
return postRequest("query", jsonRequest, readTimeoutMs, false, QueryStream::new);
}

public RestResponse<InputStream> makePrintTopicRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap());
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
return postRequest("query", jsonRequest, Optional.empty(), false,
r -> (InputStream) r.getEntity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,28 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.ksql.rest.server.computation.CommandId;
import java.util.Map;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class CommandStatusEntity extends KsqlEntity {
private final CommandId commandId;
private final CommandStatus commandStatus;
private final long commandSequenceNumber;

@JsonCreator
public CommandStatusEntity(
final String statementText,
final CommandId commandId,
final CommandStatus commandStatus
@JsonProperty("statementText") final String statementText,
@JsonProperty("commandId") final CommandId commandId,
@JsonProperty("commandStatus") final CommandStatus commandStatus,
@JsonProperty("commandSequenceNumber") final Long commandSequenceNumber
) {
super(statementText);
this.commandId = commandId;
this.commandStatus = commandStatus;
}

public CommandStatusEntity(
final String statementText,
final String commandId,
final String status,
final String message
) {
this(
statementText,
CommandId.fromString(commandId),
new CommandStatus(CommandStatus.Status.valueOf(status), message)
);
}

@SuppressWarnings("unchecked") // needs investigating
@JsonCreator
public CommandStatusEntity(final Map<String, Object> properties) {
this(
(String) properties.get("statementText"),
(String) properties.get("commandId"),
(String) ((Map<String, Object>) properties.get("commandStatus")).get("status"),
(String) ((Map<String, Object>) properties.get("commandStatus")).get("message")
);
this.commandId = Objects.requireNonNull(commandId, "commandId");
this.commandStatus = Objects.requireNonNull(commandStatus, "commandStatus");
this.commandSequenceNumber = commandSequenceNumber == null ? -1 : commandSequenceNumber;
}

public CommandId getCommandId() {
Expand All @@ -71,6 +51,10 @@ public CommandStatus getCommandStatus() {
return commandStatus;
}

public Long getCommandSequenceNumber() {
return commandSequenceNumber;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -80,20 +64,22 @@ public boolean equals(final Object o) {
return false;
}
final CommandStatusEntity that = (CommandStatusEntity) o;
return Objects.equals(getCommandId(), that.getCommandId())
&& Objects.equals(getCommandStatus(), that.getCommandStatus());
return Objects.equals(commandId, that.commandId)
&& Objects.equals(commandStatus, that.commandStatus)
&& (commandSequenceNumber == that.commandSequenceNumber);
}

@Override
public int hashCode() {
return Objects.hash(getCommandId(), getCommandStatus());
return Objects.hash(commandId, commandStatus, commandSequenceNumber);
}

@Override
public String toString() {
return "CommandStatusEntity{"
+ "commandId=" + commandId
+ ", commandStatus=" + commandStatus
+ ", commandSequenceNumber=" + commandSequenceNumber
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonSubTypes({})
Expand All @@ -35,16 +36,19 @@ public class KsqlRequest {

private final String ksql;
private final Map<String, Object> streamsProperties;
private final Optional<Long> commandSequenceNumber;

@JsonCreator
public KsqlRequest(
@JsonProperty("ksql") final String ksql,
@JsonProperty("streamsProperties") final Map<String, Object> streamsProperties
@JsonProperty("streamsProperties") final Map<String, Object> streamsProperties,
@JsonProperty("commandSequenceNumber") final Long commandSequenceNumber
) {
this.ksql = ksql == null ? "" : ksql;
this.streamsProperties = streamsProperties == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(streamsProperties));
this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber);
}

public String getKsql() {
Expand All @@ -55,6 +59,10 @@ public Map<String, Object> getStreamsProperties() {
return coerceTypes(streamsProperties);
}

public Optional<Long> getCommandSequenceNumber() {
return commandSequenceNumber;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -66,13 +74,14 @@ public boolean equals(final Object o) {
}

final KsqlRequest that = (KsqlRequest) o;
return Objects.equals(getKsql(), that.getKsql())
&& Objects.equals(getStreamsProperties(), that.getStreamsProperties());
return Objects.equals(ksql, that.ksql)
&& Objects.equals(streamsProperties, that.streamsProperties)
&& Objects.equals(commandSequenceNumber, that.commandSequenceNumber);
}

@Override
public int hashCode() {
return Objects.hash(getKsql(), getStreamsProperties());
return Objects.hash(ksql, streamsProperties, commandSequenceNumber);
}

private static Map<String, Object> coerceTypes(final Map<String, Object> streamsProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.confluent.ksql.rest.server.computation.CommandIdAssigner;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue;
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
Expand Down Expand Up @@ -109,6 +110,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple
private final KsqlConfig ksqlConfig;
private final KsqlEngine ksqlEngine;
private final CommandRunner commandRunner;
private final ReplayableCommandQueue replayableCommandQueue;
private final RootDocument rootDocument;
private final StatusResource statusResource;
private final StreamedQueryResource streamedQueryResource;
Expand All @@ -127,6 +129,7 @@ private KsqlRestApplication(
final KsqlConfig ksqlConfig,
final KsqlRestConfig config,
final CommandRunner commandRunner,
final ReplayableCommandQueue replayableCommandQueue,
final RootDocument rootDocument,
final StatusResource statusResource,
final StreamedQueryResource streamedQueryResource,
Expand All @@ -137,6 +140,7 @@ private KsqlRestApplication(
this.ksqlConfig = ksqlConfig;
this.ksqlEngine = ksqlEngine;
this.commandRunner = commandRunner;
this.replayableCommandQueue = replayableCommandQueue;
this.rootDocument = rootDocument;
this.statusResource = statusResource;
this.streamedQueryResource = streamedQueryResource;
Expand Down Expand Up @@ -264,8 +268,11 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
JsonMapper.INSTANCE.mapper,
statementParser,
ksqlEngine,
replayableCommandQueue,
exec,
versionCheckerAgent::updateLastRequestTime
versionCheckerAgent::updateLastRequestTime,
Duration.ofMillis(config.getLong(
KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG))
);
}

Expand Down Expand Up @@ -371,6 +378,7 @@ public static KsqlRestApplication buildApplication(
ksqlConfig,
ksqlEngine,
statementParser,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
versionChecker::updateLastRequestTime
Expand All @@ -390,6 +398,7 @@ public static KsqlRestApplication buildApplication(
ksqlConfig,
restConfig,
commandRunner,
commandStore,
rootDocument,
statusResource,
streamedQueryResource,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.entity.CommandStatus;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CommandStatusFuture {
private static final CommandStatus INITIAL_STATUS = new CommandStatus(
CommandStatus.Status.QUEUED, "Statement written to command topic");

private final CommandId commandId;
private volatile CommandStatus currentStatus;
private final CompletableFuture<CommandStatus> finalStatusFuture;

public CommandStatusFuture(final CommandId commandId) {
this.commandId = Objects.requireNonNull(commandId, "commandId cannot be null");
this.currentStatus = INITIAL_STATUS;
this.finalStatusFuture = new CompletableFuture<>();
}

CommandId getCommandId() {
return commandId;
}

CommandStatus getStatus() {
return currentStatus;
}

CommandStatus tryWaitForFinalStatus(final Duration timeout) throws InterruptedException {
try {
return finalStatusFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException("Error executing command " + commandId, e.getCause());
} catch (final TimeoutException e) {
return currentStatus;
}
}

void setStatus(final CommandStatus status) {
this.currentStatus = Objects.requireNonNull(status);
}

void setFinalStatus(final CommandStatus status) {
setStatus(status);
finalStatusFuture.complete(status);
}
}
Loading

0 comments on commit 20d6512

Please sign in to comment.