From 20d651217611ff7ae66f736c88c48e62b34ed7b7 Mon Sep 17 00:00:00 2001 From: vcrfxia Date: Thu, 13 Dec 2018 13:03:03 -0800 Subject: [PATCH] Return and accept command topic offsets via REST API (#2159) --- docs/developer-guide/api.rst | 2 + .../ksql/cli/console/ConsoleTest.java | 8 +- .../ksql/rest/client/KsqlRestClient.java | 6 +- .../ksql/rest/entity/CommandStatusEntity.java | 52 +-- .../ksql/rest/entity/KsqlRequest.java | 17 +- .../ksql/rest/server/KsqlRestApplication.java | 11 +- .../computation/CommandStatusFuture.java | 70 ++++ .../rest/server/computation/CommandStore.java | 79 +++- .../server/computation/QueuedCommand.java | 6 +- .../computation/QueuedCommandStatus.java | 51 +-- .../computation/ReplayableCommandQueue.java | 5 + .../SequenceNumberFutureStore.java | 49 +++ .../server/computation/StatementExecutor.java | 25 +- .../ksql/rest/server/resources/Errors.java | 11 + .../rest/server/resources/KsqlResource.java | 9 +- .../streaming/StreamedQueryResource.java | 9 + .../resources/streaming/WSQueryEndpoint.java | 42 ++- .../ksql/rest/util/CommandStoreUtil.java | 58 +++ .../rest/entity/CommandStatusEntityTest.java | 121 ++++++ .../ksql/rest/entity/KsqlRequestTest.java | 90 ++++- .../ksql/rest/integration/RestApiTest.java | 10 +- .../server/computation/CommandStoreTest.java | 343 ++++++++++-------- .../rest/server/computation/RecoveryTest.java | 10 +- .../SequenceNumberFutureStoreTest.java | 118 ++++++ .../computation/StatementExecutorTest.java | 8 +- .../server/resources/KsqlResourceTest.java | 82 ++++- .../resources/StreamedQueryResourceTest.java | 91 ++++- .../streaming/WSQueryEndpointTest.java | 92 ++++- .../ksql/rest/util/CommandStoreUtilTest.java | 85 +++++ 29 files changed, 1221 insertions(+), 339 deletions(-) create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandStoreUtil.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/CommandStatusEntityTest.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStoreTest.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/CommandStoreUtilTest.java diff --git a/docs/developer-guide/api.rst b/docs/developer-guide/api.rst index 9f297ac95900..00e66ccd6ce5 100644 --- a/docs/developer-guide/api.rst +++ b/docs/developer-guide/api.rst @@ -103,6 +103,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho :json string ksql: A semicolon-delimited sequence of KSQL statements to run. :json map streamsProperties: Property overrides to run the statements with. Refer to the :ref:`Config Reference ` for details on properties that can be set. :json string streamsProperties[``property-name``]: The value of the property named by ``property-name``. Both the value and ``property-name`` should be strings. + :json long commandSequenceNumber: Optional. If specified, the statements will not be run until all existing commands up to and including the specified sequence number have completed. If unspecified, the statements are run immediately. (Note: When a command is processed, the result object contains its sequence number.) The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement. @@ -112,6 +113,7 @@ The KSQL resource runs a sequence of KSQL statements. All statements, except tho :>json string commandId: A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint. :>json string commandStatus.status: One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR. :>json string commandStatus.message: Detailed message regarding the status of the execution statement. + :>json long commandSequenceNumber: The sequence number of the requested operation in the command queue, or -1 if the operation was unsuccessful. **LIST STREAMS, SHOW STREAMS** diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 139d6d3a6436..a543b532724d 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -27,6 +27,7 @@ import io.confluent.ksql.TestTerminal; import io.confluent.ksql.cli.console.Console.NoOpRowCaptor; import io.confluent.ksql.rest.entity.ArgumentInfo; +import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.EntityQueryId; import io.confluent.ksql.rest.entity.ExecutionPlan; @@ -49,6 +50,7 @@ import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.entity.TopicDescription; +import io.confluent.ksql.rest.server.computation.CommandId; import io.confluent.ksql.rest.util.EntityUtil; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.SchemaUtil; @@ -127,7 +129,11 @@ public void testPrintKSqlEntityList() throws IOException { for (int i = 0; i < 5; i++) { final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( - new CommandStatusEntity("e", "topic/1/create", "SUCCESS", "Success Message"), + new CommandStatusEntity( + "e", + CommandId.fromString("topic/1/create"), + new CommandStatus(CommandStatus.Status.SUCCESS, "Success Message"), + 0L), new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList()), new Queries("e", queries), new SourceDescriptionEntity( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java index b9fc0e51db8b..7a94f6159378 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java @@ -132,7 +132,7 @@ public RestResponse getServerInfo() { } public RestResponse makeKsqlRequest(final String ksql) { - final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap()); + final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null); return postRequest("ksql", jsonRequest, Optional.empty(), true, r -> r.readEntity(KsqlEntityList.class)); } @@ -146,13 +146,13 @@ public RestResponse makeStatusRequest(final String commandId) { } public RestResponse makeQueryRequest(final String ksql) { - final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap()); + final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null); final Optional readTimeoutMs = Optional.of(QueryStream.READ_TIMEOUT_MS); return postRequest("query", jsonRequest, readTimeoutMs, false, QueryStream::new); } public RestResponse makePrintTopicRequest(final String ksql) { - final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap()); + final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null); return postRequest("query", jsonRequest, Optional.empty(), false, r -> (InputStream) r.getEntity()); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java index fccb1b4900e9..4135cd97fdcd 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java @@ -18,48 +18,28 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.confluent.ksql.rest.server.computation.CommandId; -import java.util.Map; import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) public class CommandStatusEntity extends KsqlEntity { private final CommandId commandId; private final CommandStatus commandStatus; + private final long commandSequenceNumber; + @JsonCreator public CommandStatusEntity( - final String statementText, - final CommandId commandId, - final CommandStatus commandStatus + @JsonProperty("statementText") final String statementText, + @JsonProperty("commandId") final CommandId commandId, + @JsonProperty("commandStatus") final CommandStatus commandStatus, + @JsonProperty("commandSequenceNumber") final Long commandSequenceNumber ) { super(statementText); - this.commandId = commandId; - this.commandStatus = commandStatus; - } - - public CommandStatusEntity( - final String statementText, - final String commandId, - final String status, - final String message - ) { - this( - statementText, - CommandId.fromString(commandId), - new CommandStatus(CommandStatus.Status.valueOf(status), message) - ); - } - - @SuppressWarnings("unchecked") // needs investigating - @JsonCreator - public CommandStatusEntity(final Map properties) { - this( - (String) properties.get("statementText"), - (String) properties.get("commandId"), - (String) ((Map) properties.get("commandStatus")).get("status"), - (String) ((Map) properties.get("commandStatus")).get("message") - ); + this.commandId = Objects.requireNonNull(commandId, "commandId"); + this.commandStatus = Objects.requireNonNull(commandStatus, "commandStatus"); + this.commandSequenceNumber = commandSequenceNumber == null ? -1 : commandSequenceNumber; } public CommandId getCommandId() { @@ -71,6 +51,10 @@ public CommandStatus getCommandStatus() { return commandStatus; } + public Long getCommandSequenceNumber() { + return commandSequenceNumber; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -80,13 +64,14 @@ public boolean equals(final Object o) { return false; } final CommandStatusEntity that = (CommandStatusEntity) o; - return Objects.equals(getCommandId(), that.getCommandId()) - && Objects.equals(getCommandStatus(), that.getCommandStatus()); + return Objects.equals(commandId, that.commandId) + && Objects.equals(commandStatus, that.commandStatus) + && (commandSequenceNumber == that.commandSequenceNumber); } @Override public int hashCode() { - return Objects.hash(getCommandId(), getCommandStatus()); + return Objects.hash(commandId, commandStatus, commandSequenceNumber); } @Override @@ -94,6 +79,7 @@ public String toString() { return "CommandStatusEntity{" + "commandId=" + commandId + ", commandStatus=" + commandStatus + + ", commandSequenceNumber=" + commandSequenceNumber + '}'; } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java index c801490c03c5..64411e7f7ab5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; @JsonIgnoreProperties(ignoreUnknown = true) @JsonSubTypes({}) @@ -35,16 +36,19 @@ public class KsqlRequest { private final String ksql; private final Map streamsProperties; + private final Optional commandSequenceNumber; @JsonCreator public KsqlRequest( @JsonProperty("ksql") final String ksql, - @JsonProperty("streamsProperties") final Map streamsProperties + @JsonProperty("streamsProperties") final Map streamsProperties, + @JsonProperty("commandSequenceNumber") final Long commandSequenceNumber ) { this.ksql = ksql == null ? "" : ksql; this.streamsProperties = streamsProperties == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(streamsProperties)); + this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber); } public String getKsql() { @@ -55,6 +59,10 @@ public Map getStreamsProperties() { return coerceTypes(streamsProperties); } + public Optional getCommandSequenceNumber() { + return commandSequenceNumber; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -66,13 +74,14 @@ public boolean equals(final Object o) { } final KsqlRequest that = (KsqlRequest) o; - return Objects.equals(getKsql(), that.getKsql()) - && Objects.equals(getStreamsProperties(), that.getStreamsProperties()); + return Objects.equals(ksql, that.ksql) + && Objects.equals(streamsProperties, that.streamsProperties) + && Objects.equals(commandSequenceNumber, that.commandSequenceNumber); } @Override public int hashCode() { - return Objects.hash(getKsql(), getStreamsProperties()); + return Objects.hash(ksql, streamsProperties, commandSequenceNumber); } private static Map coerceTypes(final Map streamsProperties) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index d5d3aacf1246..bd7957c16c2e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -44,6 +44,7 @@ import io.confluent.ksql.rest.server.computation.CommandIdAssigner; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; import io.confluent.ksql.rest.server.computation.StatementExecutor; import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper; import io.confluent.ksql.rest.server.resources.KsqlResource; @@ -109,6 +110,7 @@ public final class KsqlRestApplication extends Application imple private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; private final CommandRunner commandRunner; + private final ReplayableCommandQueue replayableCommandQueue; private final RootDocument rootDocument; private final StatusResource statusResource; private final StreamedQueryResource streamedQueryResource; @@ -127,6 +129,7 @@ private KsqlRestApplication( final KsqlConfig ksqlConfig, final KsqlRestConfig config, final CommandRunner commandRunner, + final ReplayableCommandQueue replayableCommandQueue, final RootDocument rootDocument, final StatusResource statusResource, final StreamedQueryResource streamedQueryResource, @@ -137,6 +140,7 @@ private KsqlRestApplication( this.ksqlConfig = ksqlConfig; this.ksqlEngine = ksqlEngine; this.commandRunner = commandRunner; + this.replayableCommandQueue = replayableCommandQueue; this.rootDocument = rootDocument; this.statusResource = statusResource; this.streamedQueryResource = streamedQueryResource; @@ -264,8 +268,11 @@ public T getEndpointInstance(final Class endpointClass) { JsonMapper.INSTANCE.mapper, statementParser, ksqlEngine, + replayableCommandQueue, exec, - versionCheckerAgent::updateLastRequestTime + versionCheckerAgent::updateLastRequestTime, + Duration.ofMillis(config.getLong( + KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)) ); } @@ -371,6 +378,7 @@ public static KsqlRestApplication buildApplication( ksqlConfig, ksqlEngine, statementParser, + commandStore, Duration.ofMillis( restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)), versionChecker::updateLastRequestTime @@ -390,6 +398,7 @@ public static KsqlRestApplication buildApplication( ksqlConfig, restConfig, commandRunner, + commandStore, rootDocument, statusResource, streamedQueryResource, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java new file mode 100644 index 000000000000..d435dc48f02d --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStatusFuture.java @@ -0,0 +1,70 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; + +import io.confluent.ksql.rest.entity.CommandStatus; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class CommandStatusFuture { + private static final CommandStatus INITIAL_STATUS = new CommandStatus( + CommandStatus.Status.QUEUED, "Statement written to command topic"); + + private final CommandId commandId; + private volatile CommandStatus currentStatus; + private final CompletableFuture finalStatusFuture; + + public CommandStatusFuture(final CommandId commandId) { + this.commandId = Objects.requireNonNull(commandId, "commandId cannot be null"); + this.currentStatus = INITIAL_STATUS; + this.finalStatusFuture = new CompletableFuture<>(); + } + + CommandId getCommandId() { + return commandId; + } + + CommandStatus getStatus() { + return currentStatus; + } + + CommandStatus tryWaitForFinalStatus(final Duration timeout) throws InterruptedException { + try { + return finalStatusFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException)e.getCause(); + } + throw new RuntimeException("Error executing command " + commandId, e.getCause()); + } catch (final TimeoutException e) { + return currentStatus; + } + } + + void setStatus(final CommandStatus status) { + this.currentStatus = Objects.requireNonNull(status); + } + + void setFinalStatus(final CommandStatus status) { + setStatus(status); + finalStatusFuture.complete(status); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 2dccdf98e59b..6614777f6d31 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -28,12 +28,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -44,17 +50,21 @@ * the beginning until now, or any new messages since then), and writing to it. */ +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class CommandStore implements ReplayableCommandQueue, Closeable { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final Logger log = LoggerFactory.getLogger(CommandStore.class); private static final Duration POLLING_TIMEOUT_FOR_COMMAND_TOPIC = Duration.ofMillis(5000); private final String commandTopic; + private final TopicPartition topicPartition; private final Consumer commandConsumer; private final Producer commandProducer; private final CommandIdAssigner commandIdAssigner; - private final Map commandStatusMap; + private final Map commandStatusMap; + private final SequenceNumberFutureStore sequenceNumberFutureStore; public CommandStore( final String commandTopic, @@ -62,13 +72,31 @@ public CommandStore( final Producer commandProducer, final CommandIdAssigner commandIdAssigner ) { - this.commandTopic = commandTopic; - this.commandConsumer = commandConsumer; - this.commandProducer = commandProducer; - this.commandIdAssigner = commandIdAssigner; + this( + commandTopic, + commandConsumer, + commandProducer, + commandIdAssigner, + new SequenceNumberFutureStore()); + } + + CommandStore( + final String commandTopic, + final Consumer commandConsumer, + final Producer commandProducer, + final CommandIdAssigner commandIdAssigner, + final SequenceNumberFutureStore sequenceNumberFutureStore + ) { + this.commandTopic = Objects.requireNonNull(commandTopic, "commandTopic"); + this.topicPartition = new TopicPartition(commandTopic, 0); + this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer"); + this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer"); + this.commandIdAssigner = Objects.requireNonNull(commandIdAssigner, "commandIdAssigner"); this.commandStatusMap = Maps.newConcurrentMap(); + this.sequenceNumberFutureStore = + Objects.requireNonNull(sequenceNumberFutureStore, "sequenceNumberFutureStore"); - commandConsumer.assign(Collections.singleton(new TopicPartition(commandTopic, 0))); + commandConsumer.assign(Collections.singleton(topicPartition)); } /** @@ -101,12 +129,11 @@ public QueuedCommandStatus enqueueCommand( statementString, overwriteProperties, ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); - final QueuedCommandStatus status = new QueuedCommandStatus(commandId); - this.commandStatusMap.compute( + final CommandStatusFuture statusFuture = this.commandStatusMap.compute( commandId, (k, v) -> { if (v == null) { - return status; + return new CommandStatusFuture(commandId); } // We should fail registration if a future is already registered, to prevent // a caller from receiving a future for a different statement. @@ -118,7 +145,9 @@ public QueuedCommandStatus enqueueCommand( } ); try { - commandProducer.send(new ProducerRecord<>(commandTopic, commandId, command)).get(); + final RecordMetadata recordMetadata = + commandProducer.send(new ProducerRecord<>(commandTopic, commandId, command)).get(); + return new QueuedCommandStatus(recordMetadata.offset(), statusFuture); } catch (final Exception e) { commandStatusMap.remove(commandId); throw new KsqlException( @@ -130,7 +159,6 @@ public QueuedCommandStatus enqueueCommand( e ); } - return status; } /** @@ -139,6 +167,8 @@ public QueuedCommandStatus enqueueCommand( * @return The commands that have been polled from the command topic */ public List getNewCommands() { + completeSatisfiedSequenceNumberFutures(); + final List queuedCommands = Lists.newArrayList(); commandConsumer.poll(Duration.ofMillis(Long.MAX_VALUE)).forEach( c -> { @@ -186,6 +216,28 @@ public List getRestoreCommands() { return restoreCommands; } + @Override + public void ensureConsumedPast(final long seqNum, final Duration timeout) + throws InterruptedException, TimeoutException { + final CompletableFuture future = + sequenceNumberFutureStore.getFutureForSequenceNumber(seqNum); + try { + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException)e.getCause(); + } + throw new RuntimeException( + "Error waiting for command sequence number of " + seqNum, e.getCause()); + } catch (final TimeoutException e) { + throw new TimeoutException( + String.format( + "Timeout reached while waiting for command sequence number of %d. (Timeout: %d ms)", + seqNum, + timeout.toMillis())); + } + } + private Collection getTopicPartitionsForTopic(final String topic) { final List partitionInfoList = commandConsumer.partitionsFor(topic); @@ -195,4 +247,9 @@ private Collection getTopicPartitionsForTopic(final String topic } return result; } + + private void completeSatisfiedSequenceNumberFutures() { + final long consumerPosition = commandConsumer.position(topicPartition);; + sequenceNumberFutureStore.completeFuturesUpToAndIncludingSequenceNumber(consumerPosition - 1); + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java index 00199f66cb80..8379ae5ecaa5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java @@ -22,11 +22,11 @@ public class QueuedCommand { private final CommandId commandId; private final Command command; - private final Optional status; + private final Optional status; public QueuedCommand(final CommandId commandId, final Command command, - final Optional status) { + final Optional status) { this.commandId = Objects.requireNonNull(commandId); this.command = Objects.requireNonNull(command); this.status = Objects.requireNonNull(status); @@ -40,7 +40,7 @@ public CommandId getCommandId() { return commandId; } - public Optional getStatus() { + public Optional getStatus() { return status; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommandStatus.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommandStatus.java index 2f5a3d3e2bbf..add3c20cc121 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommandStatus.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommandStatus.java @@ -20,50 +20,31 @@ import java.time.Duration; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class QueuedCommandStatus { - private static final CommandStatus INITIAL_STATUS = new CommandStatus( - CommandStatus.Status.QUEUED, "Statement written to command topic"); - - private final CommandId commandId; - private volatile CommandStatus commandStatus; - private final CompletableFuture future; - - public QueuedCommandStatus(final CommandId commandId) { - this.commandId = Objects.requireNonNull(commandId); - this.commandStatus = INITIAL_STATUS; - this.future = new CompletableFuture<>(); - } - - public CommandId getCommandId() { - return commandId; + private final CommandStatusFuture commandStatusFuture; + private final long commandSequenceNumber; + + public QueuedCommandStatus( + final long commandSequenceNumber, final CommandStatusFuture commandStatusFuture) { + this.commandSequenceNumber = + Objects.requireNonNull(commandSequenceNumber, "commandSequenceNumber"); + this.commandStatusFuture = Objects.requireNonNull(commandStatusFuture, "commandStatusFuture"); } public CommandStatus getStatus() { - return commandStatus; + return commandStatusFuture.getStatus(); } - public CommandStatus tryWaitForFinalStatus(final Duration timeout) - throws InterruptedException { - try { - return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (final ExecutionException e) { - throw new RuntimeException("Error executing command " + commandId, e.getCause()); - } catch (final TimeoutException e) { - return commandStatus; - } + public CommandId getCommandId() { + return commandStatusFuture.getCommandId(); } - public void setStatus(final CommandStatus status) { - this.commandStatus = Objects.requireNonNull(status); + public long getCommandSequenceNumber() { + return commandSequenceNumber; } - public void setFinalStatus(final CommandStatus status) { - setStatus(status); - future.complete(status); + public CommandStatus tryWaitForFinalStatus(final Duration timeout) throws InterruptedException { + return commandStatusFuture.tryWaitForFinalStatus(timeout); } -} \ No newline at end of file +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ReplayableCommandQueue.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ReplayableCommandQueue.java index 45a03061bbb0..e84855f9ddbc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ReplayableCommandQueue.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ReplayableCommandQueue.java @@ -20,8 +20,10 @@ import io.confluent.ksql.util.KsqlConfig; import java.io.Closeable; +import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; public interface ReplayableCommandQueue extends Closeable { QueuedCommandStatus enqueueCommand( @@ -34,4 +36,7 @@ QueuedCommandStatus enqueueCommand( List getNewCommands(); List getRestoreCommands(); + + void ensureConsumedPast(long seqNum, Duration timeout) + throws InterruptedException, TimeoutException; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java new file mode 100644 index 000000000000..8ecea895fe71 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStore.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +class SequenceNumberFutureStore { + private final ConcurrentHashMap> sequenceNumberFutures; + private long lastCompletedSequenceNumber; + + SequenceNumberFutureStore() { + sequenceNumberFutures = new ConcurrentHashMap<>(); + lastCompletedSequenceNumber = -1; + } + + synchronized CompletableFuture getFutureForSequenceNumber(final long seqNum) { + if (seqNum <= lastCompletedSequenceNumber) { + return CompletableFuture.completedFuture(null); + } + return sequenceNumberFutures.computeIfAbsent(seqNum, k -> new CompletableFuture<>()); + } + + void completeFuturesUpToAndIncludingSequenceNumber(final long seqNum) { + synchronized (this) { + lastCompletedSequenceNumber = seqNum; + } + sequenceNumberFutures.keySet().stream() + .filter(k -> k <= seqNum) + .forEach(k -> { + sequenceNumberFutures.get(k).complete(null); + sequenceNumberFutures.remove(k); + }); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index 842b162d907b..353a2de4be14 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -122,17 +122,17 @@ public Optional getStatus(final CommandId statementId) { } public void putStatus(final CommandId commandId, - final Optional queuedCommandStatus, + final Optional commandStatusFuture, final CommandStatus status) { statusStore.put(commandId, status); - queuedCommandStatus.ifPresent(s -> s.setStatus(status)); + commandStatusFuture.ifPresent(s -> s.setStatus(status)); } public void putFinalStatus(final CommandId commandId, - final Optional queuedCommandStatus, + final Optional commandStatusFuture, final CommandStatus status) { statusStore.put(commandId, status); - queuedCommandStatus.ifPresent(s -> s.setFinalStatus(status)); + commandStatusFuture.ifPresent(s -> s.setFinalStatus(status)); } /** @@ -145,7 +145,7 @@ public void putFinalStatus(final CommandId commandId, void handleStatementWithTerminatedQueries( final Command command, final CommandId commandId, - final Optional queuedCommandStatus, + final Optional commandStatusFuture, final Mode mode ) { try { @@ -153,23 +153,23 @@ void handleStatementWithTerminatedQueries( maybeTerminateQueryForLegacyDropCommand(commandId, command); putStatus( commandId, - queuedCommandStatus, + commandStatusFuture, new CommandStatus(CommandStatus.Status.PARSING, "Parsing statement")); final Statement statement = statementParser.parseSingleStatement(statementString); putStatus( commandId, - queuedCommandStatus, + commandStatusFuture, new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); executeStatement( - statement, command, commandId, queuedCommandStatus, mode); + statement, command, commandId, commandStatusFuture, mode); } catch (final KsqlException exception) { log.error("Failed to handle: " + command, exception); final CommandStatus errorStatus = new CommandStatus( CommandStatus.Status.ERROR, ExceptionUtil.stackTraceToString(exception) ); - putFinalStatus(commandId, queuedCommandStatus, errorStatus); + putFinalStatus(commandId, commandStatusFuture, errorStatus); } } @@ -177,7 +177,7 @@ private void executeStatement( final Statement statement, final Command command, final CommandId commandId, - final Optional queuedCommandStatus, + final Optional commandStatusFuture, final Mode mode ) { final String statementStr = command.getStatement(); @@ -217,7 +217,7 @@ private void executeStatement( CommandStatus.Status.SUCCESS, result != null ? result.getMessage() : successMessage ); - putFinalStatus(commandId, queuedCommandStatus, successStatus); + putFinalStatus(commandId, commandStatusFuture, successStatus); } private void handleRunScript(final Command command, final Mode mode) { @@ -247,7 +247,8 @@ private void handleRunScript(final Command command, final Mode mode) { if (mode == Mode.EXECUTE) { for (final QueryMetadata queryMetadata : queryMetadataList) { if (queryMetadata instanceof PersistentQueryMetadata) { - final PersistentQueryMetadata persistentQueryMd = (PersistentQueryMetadata) queryMetadata; + final PersistentQueryMetadata persistentQueryMd = + (PersistentQueryMetadata) queryMetadata; persistentQueryMd.start(); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java index 58770e48d512..e697156aed2c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java @@ -20,6 +20,7 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static javax.ws.rs.core.Response.Status.UNAUTHORIZED; import io.confluent.ksql.rest.entity.KsqlEntityList; @@ -40,6 +41,9 @@ public final class Errors { public static final int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode()); + public static final int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = + toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 1; + private Errors() { } @@ -121,4 +125,11 @@ public static Response serverErrorForStatement( .entity(new KsqlStatementErrorMessage(ERROR_CODE_SERVER_ERROR, t, statementText, entities)) .build(); } + + public static Response commandQueueCatchUpTimeout(final String msg) { + return Response + .status(SERVICE_UNAVAILABLE) + .entity(new KsqlErrorMessage(ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT, msg)) + .build(); + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 204cbfe7bf5a..3e4b20b49488 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -81,6 +81,7 @@ import io.confluent.ksql.rest.server.KsqlRestApplication; import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; +import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.rest.util.QueryCapacityUtil; import io.confluent.ksql.serde.DataSource.DataSourceType; import io.confluent.ksql.util.KafkaConsumerGroupClient; @@ -189,11 +190,14 @@ public KsqlResource( public Response handleKsqlStatements(final KsqlRequest request) { activenessRegistrar.updateLastRequestTime(); try { + CommandStoreUtil.httpWaitForCommandSequenceNumber( + replayableCommandQueue, request, Duration.ofMillis(distributedCommandResponseTimeout)); + final List> statements = parseStatements(request.getKsql()); return executeStatements(statements, request.getStreamsProperties()); } catch (final KsqlRestException e) { - return e.getResponse(); + throw e; } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { @@ -586,7 +590,8 @@ private CommandStatusEntity distributeStatement( return new CommandStatusEntity( withSchema.getStatementText(), queuedCommandStatus.getCommandId(), - commandStatus + commandStatus, + queuedCommandStatus.getCommandSequenceNumber() ); } catch (final Exception e) { throw new KsqlException(String.format( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index fc1e8d3035f0..90ca9b98576b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -24,8 +24,10 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; import io.confluent.ksql.rest.server.resources.Errors; import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.rest.util.JsonMapper; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -51,6 +53,7 @@ public class StreamedQueryResource { private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; private final StatementParser statementParser; + private final ReplayableCommandQueue replayableCommandQueue; private final Duration disconnectCheckInterval; private final ObjectMapper objectMapper; private final ActivenessRegistrar activenessRegistrar; @@ -59,12 +62,15 @@ public StreamedQueryResource( final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, final StatementParser statementParser, + final ReplayableCommandQueue replayableCommandQueue, final Duration disconnectCheckInterval, final ActivenessRegistrar activenessRegistrar ) { this.ksqlConfig = ksqlConfig; this.ksqlEngine = ksqlEngine; this.statementParser = statementParser; + this.replayableCommandQueue = + Objects.requireNonNull(replayableCommandQueue, "replayableCommandQueue"); this.disconnectCheckInterval = Objects.requireNonNull(disconnectCheckInterval, "disconnectCheckInterval"); this.objectMapper = JsonMapper.INSTANCE.mapper; @@ -80,6 +86,9 @@ public Response streamQuery(final KsqlRequest request) throws Exception { return Errors.badRequest("\"ksql\" field must be populated"); } activenessRegistrar.updateLastRequestTime(); + + CommandStoreUtil.httpWaitForCommandSequenceNumber( + replayableCommandQueue, request, disconnectCheckInterval); try { statement = statementParser.parseSingleStatement(ksql); } catch (IllegalArgumentException | KsqlException e) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 5a7a99d96f06..ee49f800b2fc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -28,14 +28,18 @@ import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.Versions; import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; +import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.util.HandlerMaps; import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap2; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.version.metrics.ActivenessRegistrar; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeoutException; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.EndpointConfig; @@ -64,10 +68,12 @@ public class WSQueryEndpoint { private final ObjectMapper mapper; private final StatementParser statementParser; private final KsqlEngine ksqlEngine; + private final ReplayableCommandQueue replayableCommandQueue; private final ListeningScheduledExecutorService exec; private final ActivenessRegistrar activenessRegistrar; private final QueryPublisher queryPublisher; private final PrintTopicPublisher topicPublisher; + private final Duration commandQueueCatchupTimeout; private WebSocketSubscriber subscriber; @@ -76,16 +82,21 @@ public WSQueryEndpoint( final ObjectMapper mapper, final StatementParser statementParser, final KsqlEngine ksqlEngine, + final ReplayableCommandQueue replayableCommandQueue, final ListeningScheduledExecutorService exec, - final ActivenessRegistrar activenessRegistrar + final ActivenessRegistrar activenessRegistrar, + final Duration commandQueueCatchupTimeout ) { this(ksqlConfig, mapper, statementParser, - ksqlEngine, exec, + ksqlEngine, + replayableCommandQueue, + exec, WSQueryEndpoint::startQueryPublisher, WSQueryEndpoint::startPrintPublisher, - activenessRegistrar); + activenessRegistrar, + commandQueueCatchupTimeout); } WSQueryEndpoint( @@ -93,20 +104,26 @@ public WSQueryEndpoint( final ObjectMapper mapper, final StatementParser statementParser, final KsqlEngine ksqlEngine, + final ReplayableCommandQueue replayableCommandQueue, final ListeningScheduledExecutorService exec, final QueryPublisher queryPublisher, final PrintTopicPublisher topicPublisher, - final ActivenessRegistrar activenessRegistrar + final ActivenessRegistrar activenessRegistrar, + final Duration commandQueueCatchupTimeout ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.mapper = Objects.requireNonNull(mapper, "mapper"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); + this.replayableCommandQueue = + Objects.requireNonNull(replayableCommandQueue, "replayableCommandQueue"); this.exec = Objects.requireNonNull(exec, "exec"); this.queryPublisher = Objects.requireNonNull(queryPublisher, "queryPublisher"); this.topicPublisher = Objects.requireNonNull(topicPublisher, "topicPublisher"); this.activenessRegistrar = Objects.requireNonNull(activenessRegistrar, "activenessRegistrar"); + this.commandQueueCatchupTimeout = + Objects.requireNonNull(commandQueueCatchupTimeout, "commandQueueCatchupTimeout"); } @SuppressWarnings("unused") @@ -118,12 +135,27 @@ public void onOpen(final Session session, final EndpointConfig unused) { validateVersion(session); final KsqlRequest request = parseRequest(session); + + try { + CommandStoreUtil.waitForCommandSequenceNumber(replayableCommandQueue, request, + commandQueueCatchupTimeout); + } catch (final InterruptedException e) { + log.debug("Interrupted while waiting for command queue " + + "to reach specified command sequence number", + e); + SessionUtil.closeSilently(session, CloseCodes.UNEXPECTED_CONDITION, e.getMessage()); + return; + } catch (final TimeoutException e) { + log.debug("Timeout while processing request", e); + SessionUtil.closeSilently(session, CloseCodes.TRY_AGAIN_LATER, e.getMessage()); + return; + } + final Statement statement = parseStatement(request); HANDLER_MAP .getOrDefault(statement.getClass(), WSQueryEndpoint::handleUnsupportedStatement) .handle(this, new SessionAndRequest(session, request), statement); - } catch (final Exception e) { log.debug("Error processing request", e); SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandStoreUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandStoreUtil.java new file mode 100644 index 000000000000..03d5a76da35b --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandStoreUtil.java @@ -0,0 +1,58 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.ksql.rest.util; + +import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; +import io.confluent.ksql.rest.server.resources.Errors; +import io.confluent.ksql.rest.server.resources.KsqlRestException; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +public final class CommandStoreUtil { + private CommandStoreUtil() { + } + + public static void httpWaitForCommandSequenceNumber( + final ReplayableCommandQueue replayableCommandQueue, + final KsqlRequest request, + final Duration timeout) { + try { + waitForCommandSequenceNumber(replayableCommandQueue, request, timeout); + } catch (final InterruptedException e) { + final String errorMsg = "Interrupted while waiting for command queue to reach " + + "specified command sequence number in request: " + request.getKsql(); + throw new KsqlRestException( + Errors.serverErrorForStatement(e, errorMsg, new KsqlEntityList())); + } catch (final TimeoutException e) { + throw new KsqlRestException(Errors.commandQueueCatchUpTimeout(e.getMessage())); + } + } + + public static void waitForCommandSequenceNumber( + final ReplayableCommandQueue replayableCommandQueue, + final KsqlRequest request, + final Duration timeout) throws InterruptedException, TimeoutException { + final Optional commandSequenceNumber = request.getCommandSequenceNumber(); + if (commandSequenceNumber.isPresent()) { + final long seqNum = commandSequenceNumber.get(); + replayableCommandQueue.ensureConsumedPast(seqNum, timeout); + } + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/CommandStatusEntityTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/CommandStatusEntityTest.java new file mode 100644 index 000000000000..9e595a23dd8c --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/CommandStatusEntityTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.ksql.rest.entity; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.ksql.rest.server.computation.CommandId; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class CommandStatusEntityTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String JSON_ENTITY = "{" + + "\"@type\":\"currentStatus\"," + + "\"statementText\":\"sql\"," + + "\"commandId\":\"topic/1/create\"," + + "\"commandStatus\":{" + + "\"status\":\"SUCCESS\"," + + "\"message\":\"some success message\"" + + "}," + + "\"commandSequenceNumber\":2" + + "}"; + private static final String OLD_JSON_ENTITY = "{" + + "\"@type\":\"currentStatus\"," + + "\"statementText\":\"sql\"," + + "\"commandId\":\"topic/1/create\"," + + "\"commandStatus\":{" + + "\"status\":\"SUCCESS\"," + + "\"message\":\"some success message\"" + + "}}"; + + private static final String STATEMENT_TEXT = "sql"; + private static final CommandId COMMAND_ID = CommandId.fromString("topic/1/create"); + private static final CommandStatus COMMAND_STATUS = + new CommandStatus(CommandStatus.Status.SUCCESS, "some success message"); + private static final long COMMAND_SEQUENCE_NUMBER = 2L; + private static final CommandStatusEntity ENTITY = + new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS, COMMAND_SEQUENCE_NUMBER); + private static final CommandStatusEntity ENTITY_WITHOUT_SEQUENCE_NUMBER = + new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS, null); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldSerializeToJson() throws Exception { + // When: + final String json = OBJECT_MAPPER.writeValueAsString(ENTITY); + + // Then: + assertThat(json, is(JSON_ENTITY)); + } + + @Test + public void shouldDeserializeFromJson() throws Exception { + // When: + final CommandStatusEntity entity = + OBJECT_MAPPER.readValue(JSON_ENTITY, CommandStatusEntity.class); + + // Then: + assertThat(entity, is(ENTITY)); + } + + @Test + public void shouldBeAbleToDeserializeOlderServerMessage() throws Exception { + // When: + final CommandStatusEntity entity = + OBJECT_MAPPER.readValue(OLD_JSON_ENTITY, CommandStatusEntity.class); + + // Then: + assertThat(entity, is(ENTITY_WITHOUT_SEQUENCE_NUMBER)); + } + + @Test + public void shouldHandleNullSequenceNumber() { + // When: + final CommandStatusEntity entity = + new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, COMMAND_STATUS, null); + + // Then: + assertThat(entity.getCommandSequenceNumber(), is(-1L)); + } + + @Test + public void shouldThrowOnNullCommandId() { + // Given: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("commandId"); + + // When: + new CommandStatusEntity(STATEMENT_TEXT, null, COMMAND_STATUS, COMMAND_SEQUENCE_NUMBER); + } + + @Test + public void shouldThrowOnNullCommandStatus() { + // Given: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("commandStatus"); + + // When: + new CommandStatusEntity(STATEMENT_TEXT, COMMAND_ID, null, COMMAND_SEQUENCE_NUMBER); + } +} \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 194e14c016c1..9bf72a42c7cc 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.nullValue; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; import io.confluent.ksql.util.KsqlConfig; @@ -32,6 +33,8 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Optional; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -45,24 +48,50 @@ public class KsqlRequestTest { + "\"streamsProperties\":{" + "\"" + KsqlConfig.KSQL_SERVICE_ID_CONFIG + "\":\"some-service-id\"" + "}}"; + private static final String A_JSON_REQUEST_WITH_COMMAND_NUMBER = "{" + + "\"ksql\":\"sql\"," + + "\"streamsProperties\":{" + + "\"" + KsqlConfig.KSQL_SERVICE_ID_CONFIG + "\":\"some-service-id\"" + + "}," + + "\"commandSequenceNumber\":2}"; + private static final String A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER = "{" + + "\"ksql\":\"sql\"," + + "\"streamsProperties\":{" + + "\"" + KsqlConfig.KSQL_SERVICE_ID_CONFIG + "\":\"some-service-id\"" + + "}," + + "\"commandSequenceNumber\":null}"; private static final ImmutableMap SOME_PROPS = ImmutableMap.of( KsqlConfig.KSQL_SERVICE_ID_CONFIG, "some-service-id" ); + private static final long SOME_COMMAND_NUMBER = 2L; - private static final KsqlRequest A_REQUEST = new KsqlRequest("sql", SOME_PROPS); + private static final KsqlRequest A_REQUEST = new KsqlRequest("sql", SOME_PROPS, null); + private static final KsqlRequest A_REQUEST_WITH_COMMAND_NUMBER = + new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER); + + @BeforeClass + public static void setUpClass() { + OBJECT_MAPPER.registerModule(new Jdk8Module()); + } @Rule public final ExpectedException expectedException = ExpectedException.none(); @Test public void shouldHandleNullStatement() { - assertThat(new KsqlRequest(null, SOME_PROPS).getKsql(), is("")); + assertThat(new KsqlRequest(null, SOME_PROPS, SOME_COMMAND_NUMBER).getKsql(), is("")); } @Test public void shouldHandleNullProps() { - assertThat(new KsqlRequest("sql", null).getStreamsProperties(), is(Collections.emptyMap())); + assertThat(new KsqlRequest("sql", null, SOME_COMMAND_NUMBER).getStreamsProperties(), + is(Collections.emptyMap())); + } + + @Test + public void shouldHandleNullCommandNumber() { + assertThat(new KsqlRequest("sql", SOME_PROPS, null).getCommandSequenceNumber(), is(Optional.empty())); } @Test @@ -74,21 +103,50 @@ public void shouldDeserializeFromJson() { assertThat(request, is(A_REQUEST)); } + @Test + public void shouldDeserializeFromJsonWithCommandNumber() { + // When: + final KsqlRequest request = deserialize(A_JSON_REQUEST_WITH_COMMAND_NUMBER); + + // Then: + assertThat(request, is(A_REQUEST_WITH_COMMAND_NUMBER)); + } + + @Test + public void shouldDeserializeFromJsonWithNullCommandNumber() { + // When: + final KsqlRequest request = deserialize(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER); + + // Then: + assertThat(request, is(A_REQUEST)); + } + @Test public void shouldSerializeToJson() { // When: final String jsonRequest = serialize(A_REQUEST); // Then: - assertThat(jsonRequest, is(A_JSON_REQUEST)); + assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); + } + + @Test + public void shouldSerializeToJsonWithCommandNumber() { + // When: + final String jsonRequest = serialize(A_REQUEST_WITH_COMMAND_NUMBER); + + // Then: + assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); } @Test public void shouldImplementHashCodeAndEqualsCorrectly() { new EqualsTester() - .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS), new KsqlRequest("sql", SOME_PROPS)) - .addEqualityGroup(new KsqlRequest("different-sql", SOME_PROPS)) - .addEqualityGroup(new KsqlRequest("sql", ImmutableMap.of())) + .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER), + new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("different-sql", SOME_PROPS, SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("sql", ImmutableMap.of(), SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, null)) .testEquals(); } @@ -111,9 +169,12 @@ public void shouldHandleShortProperties() { @Test public void shouldThrowOnInvalidPropertyValue() { // Given: - final KsqlRequest request = new KsqlRequest("sql", ImmutableMap.of( - SINK_NUMBER_OF_REPLICAS_PROPERTY, "not-parsable" - )); + final KsqlRequest request = new KsqlRequest( + "sql", + ImmutableMap.of( + SINK_NUMBER_OF_REPLICAS_PROPERTY, "not-parsable" + ), + null); expectedException.expect(KsqlException.class); expectedException.expectMessage(containsString(SINK_NUMBER_OF_REPLICAS_PROPERTY)); @@ -126,9 +187,12 @@ public void shouldThrowOnInvalidPropertyValue() { @Test public void shouldHandleNullPropertyValue() { // Given: - final KsqlRequest request = new KsqlRequest("sql", Collections.singletonMap( - KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, null - )); + final KsqlRequest request = new KsqlRequest( + "sql", + Collections.singletonMap( + KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, null + ), + null); // When: final Map props = request.getStreamsProperties(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index be7dd0591617..e79f2828b2f2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -102,9 +102,8 @@ public void setUp() { @Test public void shouldExecuteStreamingQueryWithV1ContentType() { - final KsqlRequest request = new KsqlRequest(String.format("SELECT * from %s;", - PAGE_VIEW_STREAM), - Collections.emptyMap()); + final KsqlRequest request = new KsqlRequest( + String.format("SELECT * from %s;", PAGE_VIEW_STREAM), Collections.emptyMap(), null); try (final Response response = restClient.target(serverAddress) .path("query") .request(Versions.KSQL_V1_JSON) @@ -116,9 +115,8 @@ public void shouldExecuteStreamingQueryWithV1ContentType() { @Test public void shouldExecuteStreamingQueryWithJsonContentType() { - final KsqlRequest request = new KsqlRequest(String.format("SELECT * from %s;", - PAGE_VIEW_STREAM), - Collections.emptyMap()); + final KsqlRequest request = new KsqlRequest( + String.format("SELECT * from %s;", PAGE_VIEW_STREAM), Collections.emptyMap(), null); try (final Response response = restClient.target(serverAddress) .path("query") .request(MediaType.APPLICATION_JSON_TYPE) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index c9963f1bad76..39a3831de776 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -16,15 +16,6 @@ package io.confluent.ksql.rest.server.computation; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; @@ -33,10 +24,14 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.function.InternalFunctionRegistry; -import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.util.KsqlConfig; @@ -49,8 +44,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -62,34 +60,95 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; +import org.apache.kafka.common.record.RecordBatch; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; @SuppressWarnings("unchecked") -@RunWith(EasyMockRunner.class) +@RunWith(MockitoJUnitRunner.class) public class CommandStoreTest { private static final String COMMAND_TOPIC = "command"; - private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(Collections.emptyMap()); - private static final Map OVERRIDE_PROPERTIES = Collections.emptyMap(); - - private final Consumer commandConsumer = niceMock(Consumer.class); - private final Producer commandProducer = mock(Producer.class); + private static final TopicPartition COMMAND_TOPIC_PARTITION = + new TopicPartition(COMMAND_TOPIC, 0); + private static final KsqlConfig KSQL_CONFIG = new KsqlConfig( + Collections.singletonMap(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "foo")); + private static final Map OVERRIDE_PROPERTIES = + Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + private static final Duration TIMEOUT = Duration.ofMillis(1000); + private static final AtomicInteger COUNTER = new AtomicInteger(); private static final String statementText = "test-statement"; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private Consumer commandConsumer; + @Mock + private Producer commandProducer; + @Mock + private SequenceNumberFutureStore sequenceNumberFutureStore; + @Mock + private CompletableFuture future; + @Mock + private Statement statement; + @Mock + private Future recordMetadataFuture; + @Mock + private Node node; + @Mock + private CommandIdAssigner commandIdAssigner; + + @Captor + private ArgumentCaptor> recordCaptor; + private final CommandId commandId = new CommandId(CommandId.Type.STREAM, "foo", CommandId.Action.CREATE); - private final Statement statement = mock(Statement.class); - private final Future future = niceMock(Future.class); private final Command command = new Command(statementText, Collections.emptyMap(), Collections.emptyMap()); - private final Node node = mock(Node.class); + private final RecordMetadata recordMetadata = new RecordMetadata( + COMMAND_TOPIC_PARTITION, 0, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); + + private CommandStore commandStore; + + @Before + public void setUp() throws Exception { + when(commandIdAssigner.getCommandId(any())) + .thenAnswer(invocation -> new CommandId( + CommandId.Type.STREAM, "foo" + COUNTER.getAndIncrement(), CommandId.Action.CREATE)); + + when(commandProducer.send(any(ProducerRecord.class))).thenReturn(recordMetadataFuture); + when(recordMetadataFuture.get()).thenReturn(recordMetadata); + + when(commandConsumer.poll(any(Duration.class))).thenReturn(buildRecords(commandId, command)); + when(commandConsumer.partitionsFor(COMMAND_TOPIC)) + .thenReturn(ImmutableList.of( + new PartitionInfo(COMMAND_TOPIC, 0, node, new Node[]{node}, new Node[]{node}) + )); + + when(sequenceNumberFutureStore.getFutureForSequenceNumber(anyLong())).thenReturn(future); + + commandStore = new CommandStore( + COMMAND_TOPIC, + commandConsumer, + commandProducer, + commandIdAssigner, + sequenceNumberFutureStore + ); + } @Test public void shouldHaveAllCreateCommandsInOrder() { + // Given: final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP); final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); @@ -101,19 +160,19 @@ public void shouldHaveAllCreateCommandsInOrder() { "a new statement", Collections.emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); final ConsumerRecords records = buildRecords( - createId, originalCommand, - dropId, dropCommand, - createId, latestCommand + createId, originalCommand, + dropId, dropCommand, + createId, latestCommand ); - EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); + when(commandConsumer.poll(any())) + .thenReturn(records) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); - EasyMock.expect(commandConsumer.poll(anyObject())).andReturn(records) - .andReturn(new ConsumerRecords<>(Collections.emptyMap())); - EasyMock.replay(commandConsumer); + // When: + final List> commands = getPriorCommands(commandStore); - final CommandStore command = createCommandStore(); - final List> commands = getPriorCommands(command); + // Then: assertThat(commands, equalTo(Arrays.asList(new Pair<>(createId, originalCommand), new Pair<>(dropId, dropCommand), new Pair<>(createId, latestCommand)))); @@ -121,207 +180,174 @@ public void shouldHaveAllCreateCommandsInOrder() { @Test public void shouldFailEnqueueIfCommandWithSameIdRegistered() { - final CommandStore commandStore = createCommandStoreThatAssignsSameId(commandId); - // Given: - expect(commandProducer.send(anyObject())).andReturn(future); - replay(commandProducer); + when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); + commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); - try { - // When: - commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); + expectedException.expect(IllegalStateException.class); - // Then: - fail("Second enqueue call should throw IllegalStateException"); - } catch (final IllegalStateException e) { - } + // When: + commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); } @Test public void shouldCleanupCommandStatusOnProduceError() { - final CommandStore commandStore = createCommandStoreThatAssignsSameId(commandId); - // Given: - expect(commandProducer.send(anyObject())).andThrow(new RuntimeException("oops")).times(1); - expect(commandProducer.send(anyObject())).andReturn(future).times(1); - replay(commandProducer); + when(commandProducer.send(any(ProducerRecord.class))) + .thenThrow(new RuntimeException("oops")) + .thenReturn(recordMetadataFuture); try { commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); fail("enqueueCommand should have raised an exception"); } catch (final KsqlException e) { } - // When: + // Should: commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); - - // Then: - // main condition being verified is that the above call doesn't throw - verify(commandProducer); } @Test - public void shouldEnqueueNewAfterHandlingExistingCommand() throws InterruptedException, ExecutionException { - final CommandStore commandStore = createCommandStoreThatAssignsSameId(commandId); - + public void shouldEnqueueNewAfterHandlingExistingCommand() { // Given: - setupConsumerToReturnCommand(commandId, command); - expect(commandProducer.send(anyObject(ProducerRecord.class))).andAnswer( - () -> { - commandStore.getNewCommands(); - return future; - } - ).times(1); - expect(commandProducer.send(anyObject(ProducerRecord.class))).andReturn(future); - future.get(); - expectLastCall().andStubReturn(null); - replay(future, commandProducer); + when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); + commandStore.getNewCommands(); - // When: + // Should: commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); - - // Then: - verify(future, commandProducer, commandConsumer); } @Test - public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() - throws ExecutionException, InterruptedException { - final CommandStore commandStore = createCommandStoreThatAssignsSameId(commandId); - + public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() { // Given: - setupConsumerToReturnCommand(commandId, command); - expect(commandProducer.send(anyObject(ProducerRecord.class))).andAnswer( - () -> { + when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); + when(commandProducer.send(any(ProducerRecord.class))).thenAnswer( + invocation -> { + // Set up consumer to return record with matching key, i.e. force a race condition: + final ProducerRecord record = invocation.getArgument(0); + when(commandConsumer.poll(any())).thenReturn(buildRecords(record.key(), record.value())); + final QueuedCommand queuedCommand = commandStore.getNewCommands().get(0); assertThat(queuedCommand.getCommandId(), equalTo(commandId)); assertThat(queuedCommand.getStatus().isPresent(), equalTo(true)); assertThat( queuedCommand.getStatus().get().getStatus().getStatus(), equalTo(CommandStatus.Status.QUEUED)); - return future; + return recordMetadataFuture; } ); - future.get(); - expectLastCall().andReturn(null); - replay(future, commandProducer); // When: commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); // Then: - // verifying the commandProducer also verifies the assertions in its IAnswer were run - verify(future, commandProducer, commandConsumer); + // verifying the commandProducer also verifies the assertions in its Answer were run + verify(commandProducer).send(any(ProducerRecord.class)); } @Test public void shouldFilterNullCommands() { // Given: - final CommandId id = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); - final Command command = new Command( - "some statement", Collections.emptyMap(), Collections.emptyMap()); final ConsumerRecords records = buildRecords( - id, null, - id, command); - expect(commandConsumer.poll(anyObject())).andReturn(records); - replay(commandConsumer); + commandId, null, + commandId, command); + when(commandConsumer.poll(any())).thenReturn(records); // When: - final List commands = createCommandStore().getNewCommands(); + final List commands = commandStore.getNewCommands(); // Then: assertThat(commands, hasSize(1)); - assertThat(commands.get(0).getCommandId(), equalTo(id)); + assertThat(commands.get(0).getCommandId(), equalTo(commandId)); assertThat(commands.get(0).getCommand(), equalTo(command)); } @Test public void shouldFilterNullPriorCommand() { // Given: - final CommandId id = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); - final Command command = new Command( - "some statement", Collections.emptyMap(), Collections.emptyMap()); final ConsumerRecords records = buildRecords( - id, null, - id, command); - expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andStubReturn( - ImmutableList.of( - new PartitionInfo(COMMAND_TOPIC, 0, node, new Node[]{node}, new Node[]{node}) - ) - ); - expect(commandConsumer.poll(anyObject())).andReturn(records); - expect(commandConsumer.poll(anyObject())).andReturn(ConsumerRecords.empty()); - replay(commandConsumer); + commandId, null, + commandId, command); + when(commandConsumer.poll(any())) + .thenReturn(records) + .thenReturn(ConsumerRecords.empty()); // When: - final List commands = createCommandStore().getRestoreCommands(); + final List commands = commandStore.getRestoreCommands(); // Then: assertThat(commands, hasSize(1)); - assertThat(commands.get(0).getCommandId(), equalTo(id)); + assertThat(commands.get(0).getCommandId(), equalTo(commandId)); assertThat(commands.get(0).getCommand(), equalTo(command)); } @Test - public void shouldDistributeCommand() throws ExecutionException, InterruptedException { - final KsqlConfig ksqlConfig = new KsqlConfig( - Collections.singletonMap(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "foo")); - final Map overrideProperties = Collections.singletonMap( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - final String statementText = "test-statement"; - - final Statement statement = mock(Statement.class); - final Capture> recordCapture = Capture.newInstance(); - final Future future = mock(Future.class); + public void shouldDistributeCommand() { + when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); + when(commandProducer.send(recordCaptor.capture())).thenReturn(recordMetadataFuture); - expect(commandProducer.send(capture(recordCapture))).andReturn(future); - future.get(); - expectLastCall().andReturn(null); - replay(commandProducer, future); - - final CommandStore commandStore = createCommandStoreThatAssignsSameId(commandId); - commandStore.enqueueCommand(statementText, statement, ksqlConfig, overrideProperties); - - verify(commandProducer, future); + // When: + commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); - final ProducerRecord record = recordCapture.getValue(); + // Then: + final ProducerRecord record = recordCaptor.getValue(); assertThat(record.key(), equalTo(commandId)); assertThat(record.value().getStatement(), equalTo(statementText)); - assertThat(record.value().getOverwriteProperties(), equalTo(overrideProperties)); - assertThat(record.value().getOriginalProperties(), equalTo(ksqlConfig.getAllConfigPropsWithSecretsObfuscated())); + assertThat(record.value().getOverwriteProperties(), equalTo(OVERRIDE_PROPERTIES)); + assertThat(record.value().getOriginalProperties(), + equalTo(KSQL_CONFIG.getAllConfigPropsWithSecretsObfuscated())); } - private void setupConsumerToReturnCommand(final CommandId commandId, final Command command) { - reset(commandConsumer); - expect(commandConsumer.poll(anyObject(Duration.class))).andReturn( - buildRecords(commandId, command) - ).times(1); - replay(commandConsumer); + @Test + public void shouldIncludeCommandSequenceNumberInSuccessfulQueuedCommandStatus() { + // When: + final QueuedCommandStatus commandStatus = + commandStore.enqueueCommand(statementText, statement, KSQL_CONFIG, OVERRIDE_PROPERTIES); + + // Then: + assertThat(commandStatus.getCommandSequenceNumber(), equalTo(recordMetadata.offset())); } - private CommandStore createCommandStoreThatAssignsSameId(final CommandId commandId) { - final CommandIdAssigner commandIdAssigner = mock(CommandIdAssigner.class); - expect(commandIdAssigner.getCommandId(anyObject())).andStubAnswer( - () -> new CommandId(commandId.getType(), commandId.getEntity(), commandId.getAction()) - ); - replay(commandIdAssigner); - return createCommandStore(commandIdAssigner); + @Test + public void shouldWaitOnSequenceNumberFuture() throws Exception { + // When: + commandStore.ensureConsumedPast(2, TIMEOUT); + + // Then: + verify(future).get(eq(TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); } - private CommandStore createCommandStore() { - return createCommandStore(new CommandIdAssigner(new MetaStoreImpl(new InternalFunctionRegistry()))); + @Test + public void shouldThrowExceptionOnTimeout() throws Exception { + // Given: + when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException()); + + expectedException.expect(TimeoutException.class); + expectedException.expectMessage(String.format( + "Timeout reached while waiting for command sequence number of 2. (Timeout: %d ms)", + TIMEOUT.toMillis())); + + // When: + commandStore.ensureConsumedPast(2, TIMEOUT); } - private CommandStore createCommandStore(final CommandIdAssigner commandIdAssigner) { - return new CommandStore( - COMMAND_TOPIC, - commandConsumer, - commandProducer, - commandIdAssigner); + @Test + public void shouldCompleteFuturesWhenGettingNewCommands() { + // Given: + when(commandConsumer.position(COMMAND_TOPIC_PARTITION)).thenReturn(22L); + + // When: + commandStore.getNewCommands(); + + // Then: + final InOrder inOrder = inOrder(sequenceNumberFutureStore, commandConsumer); + inOrder.verify(sequenceNumberFutureStore) + .completeFuturesUpToAndIncludingSequenceNumber(eq(21L)); + inOrder.verify(commandConsumer).poll(any()); } - private List> getPriorCommands(final CommandStore commandStore) { + private static List> getPriorCommands(final CommandStore commandStore) { return commandStore.getRestoreCommands().stream() .map( queuedCommand -> new Pair<>( @@ -329,20 +355,15 @@ private List> getPriorCommands(final CommandStore comma .collect(Collectors.toList()); } - private ConsumerRecords buildRecords(final Object ...args) { + private static ConsumerRecords buildRecords(final Object ...args) { assertThat(args.length % 2, equalTo(0)); final List> records = new ArrayList<>(); for (int i = 0; i < args.length; i += 2) { assertThat(args[i], instanceOf(CommandId.class)); assertThat(args[i + 1], anyOf(is(nullValue()), instanceOf(Command.class))); records.add( - new ConsumerRecord<>("topic", 0, 0, (CommandId) args[i], (Command) args[i + 1])); + new ConsumerRecord<>(COMMAND_TOPIC, 0, 0, (CommandId) args[i], (Command) args[i + 1])); } - return new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition("topic", 0), - records - ) - ); + return new ConsumerRecords<>(Collections.singletonMap(COMMAND_TOPIC_PARTITION, records)); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index f7e3909f4ef4..56d267585788 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -45,6 +45,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -105,6 +106,7 @@ public QueuedCommandStatus enqueueCommand( final KsqlConfig ksqlConfig, final Map overwriteProperties) { final CommandId commandId = commandIdAssigner.getCommandId(statement); + final long commandSequenceNumber = commandLog.size(); commandLog.add( new QueuedCommand( commandId, @@ -113,7 +115,7 @@ public QueuedCommandStatus enqueueCommand( Collections.emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()), Optional.empty())); - return new QueuedCommandStatus(commandId); + return new QueuedCommandStatus(commandSequenceNumber, new CommandStatusFuture(commandId)); } @Override @@ -130,6 +132,10 @@ public List getRestoreCommands() { return restoreCommands; } + @Override + public void ensureConsumedPast(final long seqNum, final Duration timeout) { + } + @Override public void close() { } @@ -178,7 +184,7 @@ void executeCommands() { void submitCommands(final String ...statements) { for (final String statement : statements) { final Response response = ksqlResource.handleKsqlStatements( - new KsqlRequest(statement, Collections.emptyMap())); + new KsqlRequest(statement, Collections.emptyMap(), null)); assertThat(response.getStatus(), equalTo(200)); executeCommands(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStoreTest.java new file mode 100644 index 000000000000..f4378a7d741a --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/SequenceNumberFutureStoreTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class SequenceNumberFutureStoreTest { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private SequenceNumberFutureStore futureStore; + + @Before + public void setUp() { + futureStore = new SequenceNumberFutureStore(); + } + + @Test + public void shouldReturnFutureForNewSequenceNumber() { + // When: + final CompletableFuture future = futureStore.getFutureForSequenceNumber(2); + + // Then: + assertFutureIsNotCompleted(future); + } + + @Test + public void shouldReturnFutureForExistingSequenceNumber() { + // Given: + final CompletableFuture existingFuture = futureStore.getFutureForSequenceNumber(2); + + // When: + final CompletableFuture newFuture = futureStore.getFutureForSequenceNumber(2); + + // Then: + assertThat(newFuture, is(sameInstance(existingFuture))); + } + + @Test + public void shouldReturnFutureForCompletedSequenceNumber() { + // Given: + futureStore.completeFuturesUpToAndIncludingSequenceNumber(2); + + // When: + final CompletableFuture future = futureStore.getFutureForSequenceNumber(2); + + // Then: + assertFutureIsCompleted(future); + } + + @Test + public void shouldCompleteFutures() { + // Given: + final CompletableFuture firstFuture = futureStore.getFutureForSequenceNumber(2); + final CompletableFuture secondFuture = futureStore.getFutureForSequenceNumber(3); + + // When: + futureStore.completeFuturesUpToAndIncludingSequenceNumber(2); + + // Then: + assertFutureIsCompleted(firstFuture); + assertFutureIsNotCompleted(secondFuture); + } + + @Test + public void shouldBeThreadSafe() { + // When: + final List> futures = IntStream.range(1, 11).parallel() + .mapToObj(idx -> { + final CompletableFuture f = futureStore.getFutureForSequenceNumber(idx); + if (idx % 10 == 0) { + futureStore.completeFuturesUpToAndIncludingSequenceNumber(idx); + } + return f; + }) + .collect(Collectors.toList()); + + // Then: + assertThat(futures.stream().allMatch(CompletableFuture::isDone), is(true)); + } + + private static void assertFutureIsCompleted(CompletableFuture future) { + assertThat(future.isDone(), is(true)); + assertThat(future.isCancelled(), is(false)); + assertThat(future.isCompletedExceptionally(), is(false)); + } + + private static void assertFutureIsNotCompleted(CompletableFuture future) { + assertThat(future.isDone(), is(false)); + } +} \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 531492049873..e70b7e9a131e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -132,7 +132,7 @@ public void tearDown() { private void handleStatement( final Command command, final CommandId commandId, - final Optional commandStatus) { + final Optional commandStatus) { handleStatement(statementExecutor, command, commandId, commandStatus); } @@ -140,7 +140,7 @@ private void handleStatement( final StatementExecutor statementExecutor, final Command command, final CommandId commandId, - final Optional commandStatus) { + final Optional commandStatus) { statementExecutor.handleStatement(new QueuedCommand(commandId, command, commandStatus)); } @@ -359,7 +359,7 @@ public void shouldCompleteFutureOnSuccess() { final CommandId commandId = new CommandId(CommandId.Type.STREAM, "foo", CommandId.Action.CREATE); - final QueuedCommandStatus status = mock(QueuedCommandStatus.class); + final CommandStatusFuture status = mock(CommandStatusFuture.class); status.setStatus(sameStatus(CommandStatus.Status.PARSING)); expectLastCall(); status.setStatus(sameStatus(CommandStatus.Status.EXECUTING)); @@ -388,7 +388,7 @@ public void shouldCompleteFutureOnFailure() { final CommandId commandId = new CommandId(CommandId.Type.STREAM, "foo", CommandId.Action.CREATE); - final QueuedCommandStatus status = mock(QueuedCommandStatus.class); + final CommandStatusFuture status = mock(CommandStatusFuture.class); status.setStatus(sameStatus(CommandStatus.Status.PARSING)); expectLastCall(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 530302e7ef65..b11a01694af8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -33,9 +33,11 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -82,6 +84,7 @@ import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.computation.CommandId; +import io.confluent.ksql.rest.server.computation.CommandStatusFuture; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; import io.confluent.ksql.rest.util.EntityUtil; @@ -103,6 +106,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.ws.rs.core.Response; import org.apache.avro.Schema.Type; @@ -132,7 +136,8 @@ public class KsqlResourceTest { private static final long DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT = 1000; private static final KsqlRequest VALID_EXECUTABLE_REQUEST = new KsqlRequest( "CREATE STREAM S AS SELECT * FROM test_stream;", - ImmutableMap.of(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG, true)); + ImmutableMap.of(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG, true), + 0L); @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -153,8 +158,8 @@ public class KsqlResourceTest { @Before public void setUp() throws IOException, RestClientException { - commandStatus - = new QueuedCommandStatus(new CommandId(TOPIC, "whateva", CREATE)); + commandStatus = new QueuedCommandStatus( + 0, new CommandStatusFuture(new CommandId(TOPIC, "whateva", CREATE))); schemaRegistryClient = new MockSchemaRegistryClient(); registerSchema(schemaRegistryClient); ksqlRestConfig = new KsqlRestConfig(getDefaultKsqlConfig()); @@ -194,7 +199,7 @@ public void shouldInstantRegisterTopic() { // Then: assertThat(result, is(new CommandStatusEntity( "REGISTER TOPIC FOO WITH (kafka_topic='bar', value_format='json');", - commandStatus.getCommandId(), commandStatus.getStatus()))); + commandStatus.getCommandId(), commandStatus.getStatus(), 0L))); } @Test @@ -481,7 +486,7 @@ public void shouldReturnStatusEntityFromPersistentQuery() { // Then: assertThat(result, is(new CommandStatusEntity( "CREATE STREAM S AS SELECT * FROM test_stream;", - commandStatus.getCommandId(), commandStatus.getStatus()))); + commandStatus.getCommandId(), commandStatus.getStatus(), 0L))); } @Test @@ -817,7 +822,7 @@ public void shouldListPropertiesWithOverrides() { // When: final PropertiesList props = makeSingleRequest( - new KsqlRequest("list properties;", overrides), PropertiesList.class); + new KsqlRequest("list properties;", overrides, null), PropertiesList.class); // Then: assertThat(props.getProperties().get("ksql.streams.auto.offset.reset"), is("latest")); @@ -969,10 +974,44 @@ public void shouldNotIncludeSslPropertiesInListPropertiesOutput() { not(hasItems(KsqlConfig.SSL_CONFIG_NAMES.toArray(new String[0])))); } + @Test + public void shouldNotWaitIfNoCommandSequenceNumberSpecified() throws Exception { + // When: + makeSingleRequestWithSequenceNumber("list properties;", null, PropertiesList.class); + + // Then: + verify(commandStore, never()).ensureConsumedPast(anyLong(), any()); + } + + @Test + public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { + // When: + makeSingleRequestWithSequenceNumber("list properties;", 2L, PropertiesList.class); + + // Then: + verify(commandStore).ensureConsumedPast(eq(2L), any()); + } + + @Test + public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumber() + throws Exception { + // Given: + doThrow(new TimeoutException("timed out!")) + .when(commandStore).ensureConsumedPast(anyLong(), any()); + + // When: + final KsqlErrorMessage result = + makeFailingRequestWithSequenceNumber("list properties;", 2L, Code.SERVICE_UNAVAILABLE); + + // Then: + assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT)); + assertThat(result.getMessage(), is("timed out!")); + } + @Test public void shouldUpdateTheLastRequestTime() { // When: - ksqlResource.handleKsqlStatements(new KsqlRequest("foo", Collections.emptyMap())); + ksqlResource.handleKsqlStatements(VALID_EXECUTABLE_REQUEST); // Then: verify(activenessRegistrar).updateLastRequestTime(); @@ -1028,18 +1067,39 @@ private List createRunningQueries( } private KsqlErrorMessage makeFailingRequest(final String ksql, final Code errorCode) { - final Response response = ksqlResource.handleKsqlStatements( - new KsqlRequest(ksql, Collections.emptyMap())); + return makeFailingRequestWithSequenceNumber(ksql, null, errorCode); + } + + private KsqlErrorMessage makeFailingRequestWithSequenceNumber( + final String ksql, + final Long seqNum, + final Code errorCode) { + return makeFailingRequest(new KsqlRequest(ksql, Collections.emptyMap(), seqNum), errorCode); + } + + private KsqlErrorMessage makeFailingRequest(final KsqlRequest ksqlRequest, final Code errorCode) { + try { + final Response response = ksqlResource.handleKsqlStatements(ksqlRequest); assertThat(response.getStatus(), is(errorCode.getCode())); assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class)); return (KsqlErrorMessage) response.getEntity(); - + } catch (KsqlRestException e) { + return (KsqlErrorMessage) e.getResponse().getEntity(); + } } private T makeSingleRequest( final String sql, final Class expectedEntityType) { - return makeSingleRequest(new KsqlRequest(sql, Collections.emptyMap()), expectedEntityType); + return makeSingleRequestWithSequenceNumber(sql, null, expectedEntityType); + } + + private T makeSingleRequestWithSequenceNumber( + final String sql, + final Long seqNum, + final Class expectedEntityType) { + return makeSingleRequest( + new KsqlRequest(sql, Collections.emptyMap(), seqNum), expectedEntityType); } private T makeSingleRequest( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index b73ea44bec20..0d7bb6377a2a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -16,8 +16,10 @@ package io.confluent.ksql.rest.server.resources; +import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; @@ -28,8 +30,10 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; @@ -43,11 +47,11 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource; import io.confluent.ksql.rest.util.JsonMapper; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.KafkaTopicClient; -import io.confluent.ksql.util.KafkaTopicClientImpl; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.QueuedQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -61,6 +65,7 @@ import java.util.Map; import java.util.Scanner; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; @@ -89,6 +94,8 @@ public class StreamedQueryResourceTest { private KafkaTopicClient mockKafkaTopicClient; @Mock(MockType.NICE) private StatementParser mockStatementParser; + @Mock + private ReplayableCommandQueue replayableCommandQueue; @Mock(MockType.NICE) private ActivenessRegistrar activenessRegistrar; private StreamedQueryResource testResource; @@ -104,7 +111,11 @@ public void setup() { replay(mockKsqlEngine, mockStatementParser); testResource = new StreamedQueryResource( - ksqlConfig, mockKsqlEngine, mockStatementParser, DISCONNECT_CHECK_INTERVAL, + ksqlConfig, + mockKsqlEngine, + mockStatementParser, + replayableCommandQueue, + DISCONNECT_CHECK_INTERVAL, activenessRegistrar); } @@ -118,8 +129,8 @@ public void shouldReturn400OnBadStatement() throws Exception { replay(mockStatementParser); // When: - final Response response = testResource - .streamQuery(new KsqlRequest("query", Collections.emptyMap())); + final Response response = testResource. + streamQuery(new KsqlRequest("query", Collections.emptyMap(), null)); // Then: verify(mockStatementParser); @@ -130,6 +141,59 @@ public void shouldReturn400OnBadStatement() throws Exception { assertThat(errorMessage.getMessage(), containsString("some error message")); } + @Test + public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { + // Given: + replay(replayableCommandQueue); + + // When: + testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), null)); + + // Then: + verify(replayableCommandQueue); + } + + @Test + public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { + // Given: + replayableCommandQueue.ensureConsumedPast(eq(3L), anyObject()); + expectLastCall(); + + replay(replayableCommandQueue); + + // When: + testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), 3L)); + + // Then: + verify(replayableCommandQueue); + } + + @Test + public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumber() + throws Exception { + // Given: + replayableCommandQueue.ensureConsumedPast(anyLong(), anyObject()); + expectLastCall().andThrow(new TimeoutException("whoops")); + + replay(replayableCommandQueue); + + try { + // When: + testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), 3L)); + + // Then: + fail("Should throw KsqlRestException in response to timeout."); + } catch (KsqlRestException e) { + final Response response = e.getResponse(); + assertThat(response.getStatus(), is(Response.Status.SERVICE_UNAVAILABLE.getStatusCode())); + assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class)); + final KsqlErrorMessage errorMessage = (KsqlErrorMessage)response.getEntity(); + assertThat( + errorMessage.getErrorCode(), is(Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT)); + assertThat(errorMessage.getMessage(), equalTo("whoops")); + } + } + @SuppressWarnings("unchecked") @Test public void shouldStreamRowsCorrectly() throws Throwable { @@ -180,14 +244,11 @@ public void run() { final Map requestStreamsProperties = Collections.emptyMap(); - final KsqlConfig mockKsqlConfig = mock(KsqlConfig.class); - final KsqlEngine mockKsqlEngine = mock(KsqlEngine.class); - final KafkaTopicClient mockKafkaTopicClient = mock(KafkaTopicClientImpl.class); + reset(mockKsqlEngine); expect(mockKsqlEngine.getTopicClient()).andReturn(mockKafkaTopicClient); expect(mockKsqlEngine.getSchemaRegistryClient()).andReturn(new MockSchemaRegistryClient()); expect(mockKsqlEngine.hasActiveQueries()).andReturn(false); - replay(mockOutputNode, mockKafkaStreams); final QueuedQueryMetadata queuedQueryMetadata = new QueuedQueryMetadata(queryString, mockKafkaStreams, mockOutputNode, "", rowQueue, DataSource.DataSourceType.KSTREAM, "", @@ -195,21 +256,17 @@ public void run() { reset(mockOutputNode); expect(mockOutputNode.getSchema()) .andReturn(SchemaBuilder.struct().field("f1", SchemaBuilder.OPTIONAL_INT32_SCHEMA)); - expect(mockKsqlEngine.execute(queryString, mockKsqlConfig, requestStreamsProperties)) + expect(mockKsqlEngine.execute(queryString, ksqlConfig, requestStreamsProperties)) .andReturn(Collections.singletonList(queuedQueryMetadata)); mockKsqlEngine.removeTemporaryQuery(queuedQueryMetadata); expectLastCall(); - final StatementParser mockStatementParser = mock(StatementParser.class); + reset(mockStatementParser); expect(mockStatementParser.parseSingleStatement(queryString)).andReturn(mock(Query.class)); - replay(mockKsqlEngine, mockStatementParser, mockOutputNode); - - final StreamedQueryResource testResource = new StreamedQueryResource( - mockKsqlConfig, mockKsqlEngine, mockStatementParser, DISCONNECT_CHECK_INTERVAL, () -> { - }); + replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); final Response response = - testResource.streamQuery(new KsqlRequest(queryString, requestStreamsProperties)); + testResource.streamQuery(new KsqlRequest(queryString, requestStreamsProperties, null)); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); final StreamingOutput responseStream = (StreamingOutput) response.getEntity(); @@ -334,7 +391,7 @@ public void shouldUpdateTheLastRequestTime() throws Exception { EasyMock.replay(activenessRegistrar); // When: - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap())); + testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), null)); // Then: EasyMock.verify(activenessRegistrar); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 7866c83d611e..ecff4f7f09a6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -19,13 +19,16 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; @@ -41,21 +44,25 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.PrintTopicPublisher; import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.QueryPublisher; import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.Session; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -70,14 +77,20 @@ public class WSQueryEndpointTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final KsqlRequest VALID_REQUEST = new KsqlRequest("test-sql", - ImmutableMap.of(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test-id")); + ImmutableMap.of(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test-id"), null); private static final KsqlRequest ANOTHER_REQUEST = new KsqlRequest("other-sql", - ImmutableMap.of()); + ImmutableMap.of(), null); + + private static final long SEQUENCE_NUMBER = 2L; + private static final KsqlRequest REQUEST_WITHOUT_SEQUENCE_NUMBER = VALID_REQUEST; + private static final KsqlRequest REQUEST_WITH_SEQUENCE_NUMBER = new KsqlRequest("test-sql", + ImmutableMap.of(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test-id"), SEQUENCE_NUMBER); private static final String VALID_VERSION = Versions.KSQL_V1_WS; private static final String[] NO_VERSION_PROPERTY = null; private static final KsqlRequest[] NO_REQUEST_PROPERTY = (KsqlRequest[]) null; + private static final Duration COMMAND_QUEUE_CATCHUP_TIMEOUT = Duration.ofMillis(5000L); @Mock private KsqlConfig ksqlConfig; @@ -96,17 +109,23 @@ public class WSQueryEndpointTest { @Mock private QueryBody queryBody; @Mock + private ReplayableCommandQueue replayableCommandQueue; + @Mock private QueryPublisher queryPublisher; @Mock private PrintTopicPublisher topicPublisher; + @Mock + private ActivenessRegistrar activenessRegistrar; @Captor private ArgumentCaptor closeReasonCaptor; private Query query; private WSQueryEndpoint wsQueryEndpoint; - @Mock - private ActivenessRegistrar activenessRegistrar; + @BeforeClass + public static void setUpClass() { + OBJECT_MAPPER.registerModule(new Jdk8Module()); + } @Before public void setUp() { @@ -119,8 +138,8 @@ public void setUp() { givenRequest(VALID_REQUEST); wsQueryEndpoint = new WSQueryEndpoint( - ksqlConfig, OBJECT_MAPPER, statementParser, ksqlEngine, exec, - queryPublisher, topicPublisher, activenessRegistrar); + ksqlConfig, OBJECT_MAPPER, statementParser, ksqlEngine, replayableCommandQueue, exec, + queryPublisher, topicPublisher, activenessRegistrar, COMMAND_QUEUE_CATCHUP_TIMEOUT); } @Test @@ -132,7 +151,7 @@ public void shouldReturnErrorOnBadVersion() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Received invalid api version: [bad-version]"); + verifyClosedWithReason("Received invalid api version: [bad-version]", CloseCodes.CANNOT_ACCEPT); } @Test @@ -144,7 +163,7 @@ public void shouldReturnErrorOnMultipleVersions() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Received multiple api versions: [1, 2]"); + verifyClosedWithReason("Received multiple api versions: [1, 2]", CloseCodes.CANNOT_ACCEPT); } @Test @@ -192,7 +211,8 @@ public void shouldReturnErrorOnNoRequest() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Error parsing request: missing request parameter"); + verifyClosedWithReason( + "Error parsing request: missing request parameter", CloseCodes.CANNOT_ACCEPT); } @Test @@ -204,7 +224,8 @@ public void shouldReturnErrorOnEmptyRequests() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Error parsing request: missing request parameter"); + verifyClosedWithReason( + "Error parsing request: missing request parameter", CloseCodes.CANNOT_ACCEPT); } @Test @@ -241,7 +262,7 @@ public void shouldReturnErrorOnBadStatement() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Error parsing query: Boom!"); + verifyClosedWithReason("Error parsing query: Boom!", CloseCodes.CANNOT_ACCEPT); } @Test @@ -259,7 +280,9 @@ public void shouldReturnErrorOnInvalidStreamProperty() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("Error parsing request: Failed to set 'unknown-property' to 'true'"); + verifyClosedWithReason( + "Error parsing request: Failed to set 'unknown-property' to 'true'", + CloseCodes.CANNOT_ACCEPT); } @Test @@ -310,7 +333,46 @@ public void shouldReturnErrorIfTopicDoesNotExist() throws Exception { wsQueryEndpoint.onOpen(session, null); // Then: - verifyClosedWithReason("topic does not exist: bob"); + verifyClosedWithReason("topic does not exist: bob", CloseCodes.CANNOT_ACCEPT); + } + + @Test + public void shouldNotWaitIfNoSequenceNumberSpecified() throws Exception { + // Given: + givenRequest(REQUEST_WITHOUT_SEQUENCE_NUMBER); + + // When: + wsQueryEndpoint.onOpen(session, null); + + // Then: + verify(replayableCommandQueue, never()).ensureConsumedPast(anyLong(), any()); + } + + @Test + public void shouldWaitIfSequenceNumberSpecified() throws Exception { + // Given: + givenRequest(REQUEST_WITH_SEQUENCE_NUMBER); + + // When: + wsQueryEndpoint.onOpen(session, null); + + // Then: + verify(replayableCommandQueue).ensureConsumedPast(eq(SEQUENCE_NUMBER), any()); + } + + @Test + public void shouldReturnErrorIfCommandQueueCatchupTimeout() throws Exception { + // Given: + givenRequest(REQUEST_WITH_SEQUENCE_NUMBER); + doThrow(new TimeoutException("yikes")) + .when(replayableCommandQueue).ensureConsumedPast(eq(SEQUENCE_NUMBER), any()); + + // When: + wsQueryEndpoint.onOpen(session, null); + + // Then: + verifyClosedWithReason("yikes", CloseCodes.TRY_AGAIN_LATER); + verify(statementParser, never()).parseSingleStatement(any()); } private PrintTopic printTopic(final String name, final boolean fromBeginning) { @@ -378,11 +440,11 @@ private static String serialize(final KsqlRequest request) { } } - private void verifyClosedWithReason(final String reason) throws Exception { + private void verifyClosedWithReason(final String reason, final CloseCodes code) throws Exception { verify(session).close(closeReasonCaptor.capture()); final CloseReason closeReason = closeReasonCaptor.getValue(); assertThat(closeReason.getReasonPhrase(), is(reason)); - assertThat(closeReason.getCloseCode(), is(CloseCodes.CANNOT_ACCEPT)); + assertThat(closeReason.getCloseCode(), is(code)); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/CommandStoreUtilTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/CommandStoreUtilTest.java new file mode 100644 index 000000000000..45f8178fca79 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/CommandStoreUtilTest.java @@ -0,0 +1,85 @@ +package io.confluent.ksql.rest.util; + + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.server.computation.ReplayableCommandQueue; +import io.confluent.ksql.rest.server.resources.KsqlRestException; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import javax.ws.rs.core.Response; +import org.eclipse.jetty.http.HttpStatus.Code; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class CommandStoreUtilTest { + + private static final Duration TIMEOUT = Duration.ofMillis(5000L); + private static final long SEQUENCE_NUMBER = 2; + + @Mock + private ReplayableCommandQueue replayableCommandQueue; + @Mock + private KsqlRequest request; + + @Test + public void shouldNotWaitIfNoSequenceNumberSpecified() throws Exception { + // Given: + when(request.getCommandSequenceNumber()).thenReturn(Optional.empty()); + + // When: + CommandStoreUtil.waitForCommandSequenceNumber(replayableCommandQueue, request, TIMEOUT); + + // Then: + verify(replayableCommandQueue, never()).ensureConsumedPast(anyLong(), any()); + } + + @Test + public void shouldWaitIfSequenceNumberSpecified() throws Exception { + // Given: + when(request.getCommandSequenceNumber()).thenReturn(Optional.of(SEQUENCE_NUMBER)); + + // When: + CommandStoreUtil.waitForCommandSequenceNumber(replayableCommandQueue, request, TIMEOUT); + + // Then: + verify(replayableCommandQueue).ensureConsumedPast(SEQUENCE_NUMBER, TIMEOUT); + } + + @Test + public void shouldThrowKsqlRestExceptionOnTimeout() throws Exception { + // Given: + when(request.getCommandSequenceNumber()).thenReturn(Optional.of(SEQUENCE_NUMBER)); + doThrow(new TimeoutException("uh oh")) + .when(replayableCommandQueue).ensureConsumedPast(SEQUENCE_NUMBER, TIMEOUT); + + try { + // When: + CommandStoreUtil.httpWaitForCommandSequenceNumber(replayableCommandQueue, request, TIMEOUT); + + // Then: + fail("Should propagate error."); + } catch (final KsqlRestException e) { + final Response response = e.getResponse(); + assertThat(response.getStatus(), is(Code.SERVICE_UNAVAILABLE.getCode())); + assertThat(response.getEntity(), is(instanceOf(KsqlErrorMessage.class))); + final KsqlErrorMessage message = (KsqlErrorMessage) (response.getEntity()); + assertThat(message.getMessage(), is("uh oh")); + } + } +} \ No newline at end of file