From c7aaca98df597b12f7f44612041e60cfa5663d02 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Tue, 22 Sep 2020 15:32:35 -0700 Subject: [PATCH 1/8] rebase --- .../server/computation/CommandRunner.java | 5 ++- .../InteractiveStatementExecutor.java | 39 ++++++++++++++---- .../computation/RestoreCommandsCompactor.java | 12 ++++++ .../server/computation/CommandRunnerTest.java | 40 +++++++++---------- .../InteractiveStatementExecutorTest.java | 12 ++++-- 5 files changed, 75 insertions(+), 33 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 0bb68edced0a..35491a3f4555 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.rest.Errors; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException; import io.confluent.ksql.rest.server.state.ServerState; @@ -266,7 +267,7 @@ public void processPriorCommands() { } final List compacted = compactor.apply(compatibleCommands); - + final QueryId lastTerminateQueryId = RestoreCommandsCompactor.lastTerminateQueryId; compacted.forEach( command -> { currentCommandRef.set(new Pair<>(command, clock.instant())); @@ -274,7 +275,7 @@ public void processPriorCommands() { maxRetries, STATEMENT_RETRY_MS, MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command), + () -> statementExecutor.handleRestore(command, lastTerminateQueryId), WakeupException.class ); currentCommandRef.set(null); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index abc8f48af5e0..f21143f7ad1d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -125,19 +125,23 @@ void handleStatement(final QueuedCommand queuedCommand) { queuedCommand.getAndDeserializeCommandId(), queuedCommand.getStatus(), Mode.EXECUTE, - queuedCommand.getOffset() + queuedCommand.getOffset(), + Optional.empty() ); } - void handleRestore(final QueuedCommand queuedCommand) { + void handleRestore(final QueuedCommand queuedCommand, final QueryId lastTerminateQueryId) { throwIfNotConfigured(); + final Optional queryId = lastTerminateQueryId == null + ? Optional.empty() : Optional.of(lastTerminateQueryId); handleStatementWithTerminatedQueries( queuedCommand.getAndDeserializeCommand(commandDeserializer), queuedCommand.getAndDeserializeCommandId(), queuedCommand.getStatus(), Mode.RESTORE, - queuedCommand.getOffset() + queuedCommand.getOffset(), + queryId ); } @@ -191,11 +195,19 @@ private void handleStatementWithTerminatedQueries( final CommandId commandId, final Optional commandStatusFuture, final Mode mode, - final long offset + final long offset, + final Optional lastTerminateQueryId ) { try { if (command.getPlan().isPresent()) { - executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset); + executePlan( + command, + commandId, + commandStatusFuture, + command.getPlan().get(), + mode, + offset, + lastTerminateQueryId); return; } final String statementString = command.getStatement(); @@ -228,7 +240,8 @@ private void executePlan( final Optional commandStatusFuture, final KsqlPlan plan, final Mode mode, - final long offset + final long offset, + final Optional lastTerminateQueryId ) { final KsqlConfig mergedConfig = buildMergedConfig(command); final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of( @@ -242,7 +255,19 @@ private void executePlan( ); final ExecuteResult result = ksqlEngine.execute(serviceContext, configured); if (result.getQuery().isPresent()) { - queryIdGenerator.setNextId(offset + 1); + long queryID = Long.MIN_VALUE; + if (lastTerminateQueryId.isPresent()) { + final String ltq = lastTerminateQueryId.get().toString(); + final int lastIndex = ltq.lastIndexOf("_"); + queryID = Long.parseLong(ltq.substring(lastIndex + 1)); + } + // We increase the queryID by 1 if the last command was a terminate, + // to avoid the new command getting the same queryId. + if (offset > 0 && offset == queryID - 1) { + queryIdGenerator.setNextId(queryID + 1); + } else { + queryIdGenerator.setNextId(offset + 1); + } if (mode == Mode.EXECUTE) { result.getQuery().get().start(); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index 5630feca18d0..255181bcdfc4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -24,12 +24,17 @@ import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Util for compacting the restore commands */ public final class RestoreCommandsCompactor { + static QueryId lastTerminateQueryId; + private static final Logger LOG = LoggerFactory.getLogger(RestoreCommandsCompactor.class); + private RestoreCommandsCompactor() { } @@ -86,6 +91,8 @@ public static CompactedNode maybeAppend( if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) { final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity()); markShouldSkip(queryId, latestNodeWithId); + //keep track of the last terminate command + lastTerminateQueryId = queryId; // terminate commands don't get added to the list of commands to execute // because we "execute" them in this class by removing query plans from @@ -151,4 +158,9 @@ private static Optional compact(final CompactedNode node) { node.queued.getOffset() )); } + + static QueryId getLastTerminateQueryId() { + return lastTerminateQueryId; + } + } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 7d3dc09f1851..349d5b43d71c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -54,7 +54,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; - import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -166,9 +165,9 @@ public void shouldRunThePriorCommandsCorrectly() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any()); } @Test @@ -186,7 +185,7 @@ public void shouldRunThePriorCommandsWithTerminateCorrectly() { inOrder.verify(commandStore).wakeup(); inOrder.verify(clusterTerminator).terminateCluster(anyList()); - verify(statementExecutor, never()).handleRestore(any()); + verify(statementExecutor, never()).handleRestore(any(), any()); } @Test @@ -199,7 +198,7 @@ public void shouldEarlyOutIfRestoreContainsTerminate() { commandRunner.processPriorCommands(); // Then: - verify(statementExecutor, never()).handleRestore(any()); + verify(statementExecutor, never()).handleRestore(any(), any()); } @Test @@ -224,10 +223,10 @@ public void shouldOnlyRestoreCompacted() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any()); - verify(statementExecutor, never()).handleRestore(queuedCommand2); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); } @Test @@ -244,13 +243,14 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInRestor // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND)); - verify(statementExecutor, never()).handleRestore(queuedCommand3); + + verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); } @Test @@ -287,13 +287,13 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND)); - verify(statementExecutor, never()).handleRestore(queuedCommand3); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); } @Test @@ -414,9 +414,9 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { commandRunner.fetchAndRunCommands(); // Then: - verify(statementExecutor, never()).handleRestore(queuedCommand1); - verify(statementExecutor, never()).handleRestore(queuedCommand2); - verify(statementExecutor, never()).handleRestore(queuedCommand3); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand1), any()); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); } @Test @@ -483,7 +483,7 @@ public void shouldEarlyOutOnShutdown() { commandRunner.fetchAndRunCommands(); // Then: - verify(statementExecutor, never()).handleRestore(queuedCommand2); + verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); } @Test @@ -525,7 +525,7 @@ public void shouldNotStartCommandRunnerThreadIfSerializationExceptionInRestore() inOrder.verify(executor).awaitTermination(anyLong(), any()); inOrder.verify(commandStore).close(); verify(commandStore, never()).getNewCommands(any()); - verify(statementExecutor, times(2)).handleRestore(any()); + verify(statementExecutor, times(2)).handleRestore(any(), any()); } @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index bdd32fb80157..cf492f2ae0ba 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -227,7 +227,7 @@ public void shouldThrowOnHandleRestoreIfNotConfigured() { ); // When: - statementExecutor.handleRestore(queuedCommand); + statementExecutor.handleRestore(queuedCommand, null); } @Test @@ -548,7 +548,9 @@ public void shouldHandlePriorStatements() { for (int i = 0; i < priorCommands.size(); i++) { final Pair pair = priorCommands.get(i); statementExecutor.handleRestore( - new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i) + new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i), + null + ); } @@ -632,7 +634,8 @@ public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() { command, Optional.empty(), 2L - ) + ), + null ); // Then: @@ -655,7 +658,8 @@ public void shouldSkipStartWhenReplayingLog() { command, Optional.empty(), 0L - ) + ), + null ); // Then: From 814c5fe8497d7f479ed469c8a5700004af58ea31 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Tue, 22 Sep 2020 21:52:32 -0700 Subject: [PATCH 2/8] increase queryID only for queries, handle all terminate --- .../server/computation/CommandRunner.java | 7 ++-- .../InteractiveStatementExecutor.java | 33 ++++++++---------- .../computation/RestoreCommandsCompactor.java | 15 +++++--- .../rest/server/computation/RecoveryTest.java | 34 ++++++++++++++++++- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 35491a3f4555..ed63219116c6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -16,8 +16,8 @@ package io.confluent.ksql.rest.server.computation; import com.google.common.annotations.VisibleForTesting; -import io.confluent.ksql.rest.Errors; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException; import io.confluent.ksql.rest.server.state.ServerState; @@ -43,7 +43,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; - import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; @@ -267,7 +266,7 @@ public void processPriorCommands() { } final List compacted = compactor.apply(compatibleCommands); - final QueryId lastTerminateQueryId = RestoreCommandsCompactor.lastTerminateQueryId; + final QueryId greatestQueryId = RestoreCommandsCompactor.greatestQueryId; compacted.forEach( command -> { currentCommandRef.set(new Pair<>(command, clock.instant())); @@ -275,7 +274,7 @@ public void processPriorCommands() { maxRetries, STATEMENT_RETRY_MS, MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command, lastTerminateQueryId), + () -> statementExecutor.handleRestore(command, greatestQueryId), WakeupException.class ); currentCommandRef.set(null); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index f21143f7ad1d..5a66ef763015 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -130,11 +130,11 @@ void handleStatement(final QueuedCommand queuedCommand) { ); } - void handleRestore(final QueuedCommand queuedCommand, final QueryId lastTerminateQueryId) { + void handleRestore(final QueuedCommand queuedCommand, final QueryId greatestQueryId) { throwIfNotConfigured(); - final Optional queryId = lastTerminateQueryId == null - ? Optional.empty() : Optional.of(lastTerminateQueryId); + final Optional queryId = greatestQueryId == null + ? Optional.empty() : Optional.of(greatestQueryId); handleStatementWithTerminatedQueries( queuedCommand.getAndDeserializeCommand(commandDeserializer), queuedCommand.getAndDeserializeCommandId(), @@ -196,7 +196,7 @@ private void handleStatementWithTerminatedQueries( final Optional commandStatusFuture, final Mode mode, final long offset, - final Optional lastTerminateQueryId + final Optional greatestQueryId ) { try { if (command.getPlan().isPresent()) { @@ -207,7 +207,7 @@ private void handleStatementWithTerminatedQueries( command.getPlan().get(), mode, offset, - lastTerminateQueryId); + greatestQueryId); return; } final String statementString = command.getStatement(); @@ -241,7 +241,7 @@ private void executePlan( final KsqlPlan plan, final Mode mode, final long offset, - final Optional lastTerminateQueryId + final Optional greatestQueryId ) { final KsqlConfig mergedConfig = buildMergedConfig(command); final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of( @@ -254,20 +254,15 @@ private void executePlan( new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); final ExecuteResult result = ksqlEngine.execute(serviceContext, configured); + long queryID = Long.MIN_VALUE; + if (greatestQueryId.isPresent() && mode == Mode.RESTORE) { + final String ltq = greatestQueryId.get().toString(); + final int lastIndex = ltq.lastIndexOf("_"); + queryID = Long.parseLong(ltq.substring(lastIndex + 1)); + queryIdGenerator.setNextId(queryID + 1); + } if (result.getQuery().isPresent()) { - long queryID = Long.MIN_VALUE; - if (lastTerminateQueryId.isPresent()) { - final String ltq = lastTerminateQueryId.get().toString(); - final int lastIndex = ltq.lastIndexOf("_"); - queryID = Long.parseLong(ltq.substring(lastIndex + 1)); - } - // We increase the queryID by 1 if the last command was a terminate, - // to avoid the new command getting the same queryId. - if (offset > 0 && offset == queryID - 1) { - queryIdGenerator.setNextId(queryID + 1); - } else { - queryIdGenerator.setNextId(offset + 1); - } + queryIdGenerator.setNextId(offset + 1); if (mode == Mode.EXECUTE) { result.getQuery().get().start(); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index 255181bcdfc4..4bce4fd74e19 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -32,7 +32,7 @@ */ public final class RestoreCommandsCompactor { - static QueryId lastTerminateQueryId; + static QueryId greatestQueryId; private static final Logger LOG = LoggerFactory.getLogger(RestoreCommandsCompactor.class); private RestoreCommandsCompactor() { @@ -92,7 +92,7 @@ public static CompactedNode maybeAppend( final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity()); markShouldSkip(queryId, latestNodeWithId); //keep track of the last terminate command - lastTerminateQueryId = queryId; + //lastTerminateQueryId = queryId; // terminate commands don't get added to the list of commands to execute // because we "execute" them in this class by removing query plans from @@ -134,6 +134,13 @@ private CompactedNode( private static Optional compact(final CompactedNode node) { final Command command = node.command; + + if (command.getPlan().get().getQueryPlan().isPresent()) { + if (greatestQueryId == null) { + greatestQueryId = command.getPlan().get().getQueryPlan().get().getQueryId(); + } + } + if (!node.shouldSkip) { return Optional.of(node.queued); } @@ -159,8 +166,8 @@ private static Optional compact(final CompactedNode node) { )); } - static QueryId getLastTerminateQueryId() { - return lastTerminateQueryId; + static QueryId getGreatestQueryId() { + return greatestQueryId; } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 6c7eb843b17e..7a154a18c177 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -713,6 +712,39 @@ public void shouldRecoverQueryIDs() { assertThat(queryIdNames, contains(new QueryId("CSAS_C_0"))); } + @Test + public void shouldIncrementQueryIDsNoPlans() { + server1.submitCommands( + "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", + "CREATE STREAM B AS SELECT * FROM A;", + "TERMINATE CSAS_B_0;"); + + final KsqlServer server = new KsqlServer(commands); + server.recover(); + server.submitCommands("CREATE STREAM C AS SELECT * FROM A;"); + final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) + .keySet(); + + assertThat(queryIdNames, contains(new QueryId("CSAS_C_1"))); + } + + @Test + public void shouldIncrementQueryIDsWithPlan() { + server1.submitCommands( + "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", + "CREATE STREAM B AS SELECT * FROM A;", + "TERMINATE CSAS_B_0;"); + + final KsqlServer server = new KsqlServer(commands); + server.recover(); + server.submitCommands("CREATE STREAM C AS SELECT * FROM A;"); + final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) + .keySet(); + + assertThat(queryIdNames, contains(new QueryId("CSAS_C_1"))); + } + + // Simulate bad commands that have been introduced due to race condition in logic producing to cmd topic private void addDuplicateOfLastCommand() { final QueuedCommand original = commands.get(commands.size() - 1); From eea5ef3f007de0d429c13df28eb5d2bbaf3f6b33 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 15:45:38 -0700 Subject: [PATCH 3/8] always increase query id --- .../integration/ClientIntegrationTest.java | 4 +-- .../server/computation/CommandRunner.java | 3 +- .../InteractiveStatementExecutor.java | 28 +++++-------------- .../computation/RestoreCommandsCompactor.java | 18 ------------ .../InteractiveStatementExecutorTest.java | 2 +- .../rest/server/computation/RecoveryTest.java | 24 ++++++++-------- 6 files changed, 23 insertions(+), 56 deletions(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 6f21ec449807..41b108587b06 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -862,7 +862,7 @@ public void shouldListQueries() { // Then assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT)); - assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_0")); + assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5")); assertThat(queries.get(0).getSql(), is( "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + " " + TEST_STREAM + ".STR STR,\n" @@ -894,7 +894,7 @@ public void shouldDescribeSource() throws Exception { assertThat(description.valueFormat(), is("JSON")); assertThat(description.readQueries(), hasSize(1)); assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT)); - assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_0")); + assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_5")); assertThat(description.readQueries().get(0).getSql(), is( "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + " " + TEST_STREAM + ".STR STR,\n" diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index ed63219116c6..5eb5692cd222 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -266,7 +266,6 @@ public void processPriorCommands() { } final List compacted = compactor.apply(compatibleCommands); - final QueryId greatestQueryId = RestoreCommandsCompactor.greatestQueryId; compacted.forEach( command -> { currentCommandRef.set(new Pair<>(command, clock.instant())); @@ -274,7 +273,7 @@ public void processPriorCommands() { maxRetries, STATEMENT_RETRY_MS, MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command, greatestQueryId), + () -> statementExecutor.handleRestore(command), WakeupException.class ); currentCommandRef.set(null); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 5a66ef763015..88b0ac7d504e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -125,23 +125,19 @@ void handleStatement(final QueuedCommand queuedCommand) { queuedCommand.getAndDeserializeCommandId(), queuedCommand.getStatus(), Mode.EXECUTE, - queuedCommand.getOffset(), - Optional.empty() + queuedCommand.getOffset() ); } - void handleRestore(final QueuedCommand queuedCommand, final QueryId greatestQueryId) { + void handleRestore(final QueuedCommand queuedCommand) { throwIfNotConfigured(); - final Optional queryId = greatestQueryId == null - ? Optional.empty() : Optional.of(greatestQueryId); handleStatementWithTerminatedQueries( queuedCommand.getAndDeserializeCommand(commandDeserializer), queuedCommand.getAndDeserializeCommandId(), queuedCommand.getStatus(), Mode.RESTORE, - queuedCommand.getOffset(), - queryId + queuedCommand.getOffset() ); } @@ -195,8 +191,7 @@ private void handleStatementWithTerminatedQueries( final CommandId commandId, final Optional commandStatusFuture, final Mode mode, - final long offset, - final Optional greatestQueryId + final long offset ) { try { if (command.getPlan().isPresent()) { @@ -206,8 +201,7 @@ private void handleStatementWithTerminatedQueries( commandStatusFuture, command.getPlan().get(), mode, - offset, - greatestQueryId); + offset); return; } final String statementString = command.getStatement(); @@ -240,8 +234,7 @@ private void executePlan( final Optional commandStatusFuture, final KsqlPlan plan, final Mode mode, - final long offset, - final Optional greatestQueryId + final long offset ) { final KsqlConfig mergedConfig = buildMergedConfig(command); final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of( @@ -254,15 +247,8 @@ private void executePlan( new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); final ExecuteResult result = ksqlEngine.execute(serviceContext, configured); - long queryID = Long.MIN_VALUE; - if (greatestQueryId.isPresent() && mode == Mode.RESTORE) { - final String ltq = greatestQueryId.get().toString(); - final int lastIndex = ltq.lastIndexOf("_"); - queryID = Long.parseLong(ltq.substring(lastIndex + 1)); - queryIdGenerator.setNextId(queryID + 1); - } + queryIdGenerator.setNextId(offset + 1); if (result.getQuery().isPresent()) { - queryIdGenerator.setNextId(offset + 1); if (mode == Mode.EXECUTE) { result.getQuery().get().start(); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index 4bce4fd74e19..ace30e67ffd7 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -24,17 +24,12 @@ import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Util for compacting the restore commands */ public final class RestoreCommandsCompactor { - static QueryId greatestQueryId; - private static final Logger LOG = LoggerFactory.getLogger(RestoreCommandsCompactor.class); - private RestoreCommandsCompactor() { } @@ -91,8 +86,6 @@ public static CompactedNode maybeAppend( if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) { final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity()); markShouldSkip(queryId, latestNodeWithId); - //keep track of the last terminate command - //lastTerminateQueryId = queryId; // terminate commands don't get added to the list of commands to execute // because we "execute" them in this class by removing query plans from @@ -135,12 +128,6 @@ private CompactedNode( private static Optional compact(final CompactedNode node) { final Command command = node.command; - if (command.getPlan().get().getQueryPlan().isPresent()) { - if (greatestQueryId == null) { - greatestQueryId = command.getPlan().get().getQueryPlan().get().getQueryId(); - } - } - if (!node.shouldSkip) { return Optional.of(node.queued); } @@ -165,9 +152,4 @@ private static Optional compact(final CompactedNode node) { node.queued.getOffset() )); } - - static QueryId getGreatestQueryId() { - return greatestQueryId; - } - } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index cf492f2ae0ba..ff3d36827550 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -960,7 +960,7 @@ private void handleStatement( private void terminateQueries() { final Command terminateCommand1 = new Command( - "TERMINATE CSAS_USER1PV_0;", + "TERMINATE CSAS_USER1PV_1;", emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated(), Optional.empty() diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 7a154a18c177..9c1bd8ba892a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -585,7 +585,7 @@ public void shouldRecoverRecreates() { server1.submitCommands( "CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;", - "TERMINATE CsAs_b_0;", + "TERMINATE CsAs_b_1;", "DROP STREAM B;", "CREATE STREAM B AS SELECT ROWKEY, C2 FROM A;" ); @@ -608,7 +608,7 @@ public void shouldRecoverReplacesWithTerminates() { "CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;", "CREATE OR REPLACE STREAM B AS SELECT ROWKEY, C1, C2 FROM A;", - "TERMINATE CSAS_B_0;", + "TERMINATE CSAS_B_1;", "DROP STREAM B;", "CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;" ); @@ -632,7 +632,7 @@ public void shouldRecoverInsertIntosRecreates() { "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B (COLUMN STRING) WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON', PARTITIONS=1);", "INSERT INTO B SELECT * FROM A;", - "TERMINATE InsertQuery_0;", + "TERMINATE InsertQuery_2;", "INSERT INTO B SELECT * FROM A;" ); shouldRecover(commands); @@ -644,7 +644,7 @@ public void shouldRecoverTerminates() { "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", "INSERT INTO B SELECT * FROM A;", - "TERMINATE CSAS_B_0;", + "TERMINATE CSAS_B_1;", "TERMINATE InsertQuery_2;" ); shouldRecover(commands); @@ -655,7 +655,7 @@ public void shouldRecoverDrop() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_0;", + "TERMINATE CSAS_B_1;", "DROP STREAM B;" ); shouldRecover(commands); @@ -666,7 +666,7 @@ public void shouldRecoverWithDuplicateTerminateAndDrop() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_0;" + "TERMINATE CSAS_B_1;" ); addDuplicateOfLastCommand(); // Add duplicate of "TERMINATE CSAS_B_0;" @@ -686,7 +686,7 @@ public void shouldNotDeleteTopicsOnRecovery() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_0;", + "TERMINATE CSAS_B_1;", "DROP STREAM B DELETE TOPIC;" ); @@ -709,7 +709,7 @@ public void shouldRecoverQueryIDs() { final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - assertThat(queryIdNames, contains(new QueryId("CSAS_C_0"))); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_1"))); } @Test @@ -717,7 +717,7 @@ public void shouldIncrementQueryIDsNoPlans() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_0;"); + "TERMINATE CSAS_B_1;"); final KsqlServer server = new KsqlServer(commands); server.recover(); @@ -725,7 +725,7 @@ public void shouldIncrementQueryIDsNoPlans() { final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - assertThat(queryIdNames, contains(new QueryId("CSAS_C_1"))); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"))); } @Test @@ -733,7 +733,7 @@ public void shouldIncrementQueryIDsWithPlan() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_0;"); + "TERMINATE CSAS_B_1;"); final KsqlServer server = new KsqlServer(commands); server.recover(); @@ -741,7 +741,7 @@ public void shouldIncrementQueryIDsWithPlan() { final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - assertThat(queryIdNames, contains(new QueryId("CSAS_C_1"))); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"))); } From db65dcd19ab6214a775dc891e7c251db984ea36f Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 15:54:40 -0700 Subject: [PATCH 4/8] fix compilation --- .../server/computation/CommandRunnerTest.java | 38 +++++++++---------- .../InteractiveStatementExecutorTest.java | 12 ++---- .../rest/server/computation/RecoveryTest.java | 2 - 3 files changed, 23 insertions(+), 29 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 349d5b43d71c..0dc7068f824a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -165,9 +165,9 @@ public void shouldRunThePriorCommandsCorrectly() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any()); + inOrder.verify(statementExecutor).handleRestore(queuedCommand1); + inOrder.verify(statementExecutor).handleRestore(queuedCommand2); + inOrder.verify(statementExecutor).handleRestore(queuedCommand3); } @Test @@ -185,7 +185,7 @@ public void shouldRunThePriorCommandsWithTerminateCorrectly() { inOrder.verify(commandStore).wakeup(); inOrder.verify(clusterTerminator).terminateCluster(anyList()); - verify(statementExecutor, never()).handleRestore(any(), any()); + verify(statementExecutor, never()).handleRestore(any()); } @Test @@ -198,7 +198,7 @@ public void shouldEarlyOutIfRestoreContainsTerminate() { commandRunner.processPriorCommands(); // Then: - verify(statementExecutor, never()).handleRestore(any(), any()); + verify(statementExecutor, never()).handleRestore(any()); } @Test @@ -223,10 +223,10 @@ public void shouldOnlyRestoreCompacted() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any()); + inOrder.verify(statementExecutor).handleRestore(queuedCommand1); + inOrder.verify(statementExecutor).handleRestore(queuedCommand3); - verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); + verify(statementExecutor, never()).handleRestore(queuedCommand2); } @Test @@ -243,14 +243,14 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInRestor // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); + inOrder.verify(statementExecutor).handleRestore(queuedCommand1); + inOrder.verify(statementExecutor).handleRestore(queuedCommand2); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND)); - verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); + verify(statementExecutor, never()).handleRestore(queuedCommand3); } @Test @@ -287,13 +287,13 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any()); - inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any()); + inOrder.verify(statementExecutor).handleRestore(queuedCommand1); + inOrder.verify(statementExecutor).handleRestore(queuedCommand2); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND)); - verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); + verify(statementExecutor, never()).handleRestore(queuedCommand3); } @Test @@ -414,9 +414,9 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { commandRunner.fetchAndRunCommands(); // Then: - verify(statementExecutor, never()).handleRestore(eq(queuedCommand1), any()); - verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); - verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any()); + verify(statementExecutor, never()).handleRestore(queuedCommand1); + verify(statementExecutor, never()).handleRestore(queuedCommand2); + verify(statementExecutor, never()).handleRestore(queuedCommand3); } @Test @@ -483,7 +483,7 @@ public void shouldEarlyOutOnShutdown() { commandRunner.fetchAndRunCommands(); // Then: - verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any()); + verify(statementExecutor, never()).handleRestore(queuedCommand2); } @Test @@ -525,7 +525,7 @@ public void shouldNotStartCommandRunnerThreadIfSerializationExceptionInRestore() inOrder.verify(executor).awaitTermination(anyLong(), any()); inOrder.verify(commandStore).close(); verify(commandStore, never()).getNewCommands(any()); - verify(statementExecutor, times(2)).handleRestore(any(), any()); + verify(statementExecutor, times(2)).handleRestore(any()); } @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index ff3d36827550..70dc6947d7fc 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -227,7 +227,7 @@ public void shouldThrowOnHandleRestoreIfNotConfigured() { ); // When: - statementExecutor.handleRestore(queuedCommand, null); + statementExecutor.handleRestore(queuedCommand); } @Test @@ -548,9 +548,7 @@ public void shouldHandlePriorStatements() { for (int i = 0; i < priorCommands.size(); i++) { final Pair pair = priorCommands.get(i); statementExecutor.handleRestore( - new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i), - null - + new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i) ); } @@ -634,8 +632,7 @@ public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() { command, Optional.empty(), 2L - ), - null + ) ); // Then: @@ -658,8 +655,7 @@ public void shouldSkipStartWhenReplayingLog() { command, Optional.empty(), 0L - ), - null + ) ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 9c1bd8ba892a..792ba800b60f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -44,7 +44,6 @@ import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlRequest; -import io.confluent.ksql.rest.server.CommandTopicBackup; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; @@ -66,7 +65,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.streams.StreamsConfig; From ae22c0466d1e62d2166bed55c2800402070076ca Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 16:01:38 -0700 Subject: [PATCH 5/8] remove my changes --- .../server/computation/CommandRunnerTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 0dc7068f824a..8f4c08b27107 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -165,9 +165,9 @@ public void shouldRunThePriorCommandsCorrectly() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(queuedCommand1); - inOrder.verify(statementExecutor).handleRestore(queuedCommand2); - inOrder.verify(statementExecutor).handleRestore(queuedCommand3); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3)); } @Test @@ -223,8 +223,8 @@ public void shouldOnlyRestoreCompacted() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(queuedCommand1); - inOrder.verify(statementExecutor).handleRestore(queuedCommand3); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3)); verify(statementExecutor, never()).handleRestore(queuedCommand2); } @@ -243,8 +243,8 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInRestor // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(queuedCommand1); - inOrder.verify(statementExecutor).handleRestore(queuedCommand2); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); @@ -287,8 +287,8 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() { // Then: final InOrder inOrder = inOrder(statementExecutor); - inOrder.verify(statementExecutor).handleRestore(queuedCommand1); - inOrder.verify(statementExecutor).handleRestore(queuedCommand2); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1)); + inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2)); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); From fcc8bec7281cd37bb8e3b12ec5b394102224289e Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 16:03:01 -0700 Subject: [PATCH 6/8] remove my changes --- .../ksql/rest/server/computation/CommandRunnerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 8f4c08b27107..5a372b5e3349 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -249,7 +249,6 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInRestor assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND)); - verify(statementExecutor, never()).handleRestore(queuedCommand3); } From e82785dc14342eb65bb60d16b7e7846376549bb7 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 16:11:47 -0700 Subject: [PATCH 7/8] remove my changes, fix test case --- .../ksql/rest/server/computation/CommandRunner.java | 1 - .../server/computation/InteractiveStatementExecutor.java | 8 +------- .../rest/server/computation/RestoreCommandsCompactor.java | 1 - .../ksql/rest/server/computation/CommandRunnerTest.java | 1 + .../ksql/rest/server/computation/RecoveryTest.java | 7 ++++--- 5 files changed, 6 insertions(+), 12 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 5eb5692cd222..2a4939493014 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest.server.computation; import com.google.common.annotations.VisibleForTesting; -import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 88b0ac7d504e..23001145eb07 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -195,13 +195,7 @@ private void handleStatementWithTerminatedQueries( ) { try { if (command.getPlan().isPresent()) { - executePlan( - command, - commandId, - commandStatusFuture, - command.getPlan().get(), - mode, - offset); + executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset); return; } final String statementString = command.getStatement(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index ace30e67ffd7..5630feca18d0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -127,7 +127,6 @@ private CompactedNode( private static Optional compact(final CompactedNode node) { final Command command = node.command; - if (!node.shouldSkip) { return Optional.of(node.queued); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 5a372b5e3349..7d3dc09f1851 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -54,6 +54,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; + import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 792ba800b60f..baed435073d7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -731,15 +731,16 @@ public void shouldIncrementQueryIDsWithPlan() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", + "CREATE STREAM C AS SELECT * FROM A;", "TERMINATE CSAS_B_1;"); final KsqlServer server = new KsqlServer(commands); server.recover(); - server.submitCommands("CREATE STREAM C AS SELECT * FROM A;"); + server.submitCommands("CREATE STREAM D AS SELECT * FROM A;"); final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - - assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"))); + System.out.println(queryIdNames); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"), new QueryId("CSAS_D_3"))); } From 44f9da0e755091c357e7a1e5e813ee70d3996118 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 23 Sep 2020 16:13:36 -0700 Subject: [PATCH 8/8] remove print --- .../confluent/ksql/rest/server/computation/CommandRunner.java | 2 ++ .../io/confluent/ksql/rest/server/computation/RecoveryTest.java | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 2a4939493014..0bb68edced0a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -42,6 +42,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; + import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; @@ -265,6 +266,7 @@ public void processPriorCommands() { } final List compacted = compactor.apply(compatibleCommands); + compacted.forEach( command -> { currentCommandRef.set(new Pair<>(command, clock.instant())); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index baed435073d7..36d03bc73b4d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -739,7 +739,6 @@ public void shouldIncrementQueryIDsWithPlan() { server.submitCommands("CREATE STREAM D AS SELECT * FROM A;"); final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - System.out.println(queryIdNames); assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"), new QueryId("CSAS_D_3"))); }