From 6d099cec191e889036cbbdc4c55b88abe1eec87c Mon Sep 17 00:00:00 2001 From: rodesai Date: Fri, 13 Dec 2019 16:28:27 -0800 Subject: [PATCH 1/2] fix: change query id generation to work with planned commands This patch changes up how we generate query IDs to play nice with planned commands. Before this change, statements would get the current offset as their query id. However planned commands get their query IDs before being enqueued, so they should really get the _next_ expected offset as their ID. This patch changes up the id generation to work this way. The next ID is set _after_ statemetns/plans are executed, and is set to the next expected offset. --- .../query/id/SpecificQueryIdGenerator.java | 4 +-- .../id/SpecificQueryIdGeneratorTest.java | 7 +++-- .../InteractiveStatementExecutor.java | 16 ++++++---- .../InteractiveStatementExecutorTest.java | 31 +++++++++++++++++++ .../rest/server/computation/RecoveryTest.java | 16 +++++----- 5 files changed, 57 insertions(+), 17 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java index 512dec937217..2b99fd97c6ad 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java @@ -30,7 +30,7 @@ public class SpecificQueryIdGenerator implements QueryIdGenerator { public SpecificQueryIdGenerator() { this.nextId = 0L; - this.alreadyUsed = true; + this.alreadyUsed = false; } public void setNextId(final long nextId) { @@ -50,6 +50,6 @@ public String getNext() { @Override public QueryIdGenerator createSandbox() { - return new SequentialQueryIdGenerator(nextId + 1); + return new SequentialQueryIdGenerator(nextId); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java index b5115b27e29f..92aa0a3e0c35 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java @@ -40,7 +40,10 @@ public void shouldGenerateIdBasedOnSetNextId() { assertThat(generator.getNext(), is("5")); } - + @Test + public void shouldReturnZeroIdForFirstQuery() { + assertThat(generator.getNext(), is("0")); + } @Test(expected = KsqlServerException.class) public void shouldThrowWhenGetNextBeforeSet() { @@ -54,6 +57,6 @@ public void shouldReturnSequentialGeneratorFromLastId() { generator.setNextId(3L); final QueryIdGenerator copy = generator.createSandbox(); assertThat(copy, instanceOf(SequentialQueryIdGenerator.class)); - assertThat(copy.getNext(), is("4")); + assertThat(copy.getNext(), is("3")); } } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 89ca6eb41a94..7e7761f0b0db 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -191,7 +191,7 @@ private void handleStatementWithTerminatedQueries( ) { try { if (command.getPlan().isPresent()) { - executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode); + executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset); return; } final String statementString = command.getStatement(); @@ -224,7 +224,8 @@ private void executePlan( final CommandId commandId, final Optional commandStatusFuture, final KsqlPlan plan, - final Mode mode + final Mode mode, + final long offset ) { final KsqlConfig mergedConfig = buildMergedConfig(command); final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of( @@ -238,8 +239,11 @@ private void executePlan( new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); final ExecuteResult result = ksqlEngine.execute(serviceContext, configured); - if (mode == Mode.EXECUTE) { - result.getQuery().ifPresent(QueryMetadata::start); + if (result.getQuery().isPresent()) { + queryIdGenerator.setNextId(offset + 1); + if (mode == Mode.EXECUTE) { + result.getQuery().get().start(); + } } final String successMessage = getSuccessMessage(result); final CommandStatus successStatus = @@ -317,8 +321,6 @@ private PersistentQueryMetadata startQuery( final ConfiguredStatement configured = ConfiguredStatement.of( statement, command.getOverwriteProperties(), mergedConfig); - queryIdGenerator.setNextId(offset); - final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); final QueryMetadata queryMetadata = ksqlEngine @@ -328,6 +330,8 @@ private PersistentQueryMetadata startQuery( .getQuery() .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); + queryIdGenerator.setNextId(offset + 1); + if (!(queryMetadata instanceof PersistentQueryMetadata)) { throw new KsqlException(String.format( "Unexpected query metadata type: %s; was expecting %s", diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index d3ebec489c5b..e508b1a61e89 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -345,6 +345,18 @@ public void shouldExecutePlannedCommand() { verify(mockEngine).execute(serviceContext, ConfiguredKsqlPlan.of(plan, emptyMap(), ksqlConfig)); } + @Test + public void shouldSetNextQueryIdToNextOffsetWhenExecutingPlannedCommand() { + // Given: + givenMockPlannedQuery(); + + // When: + handleStatement(statementExecutorWithMocks, plannedCommand, COMMAND_ID, Optional.empty(), 2L); + + // Then: + verify(mockQueryIdGenerator).setNextId(3L); + } + @Test public void shouldUpdateStatusOnCompletedPlannedCommand() { // Given: @@ -570,6 +582,25 @@ public void shouldEnforceReferentialIntegrity() { CoreMatchers.equalTo(CommandStatus.Status.SUCCESS)); } + @Test + public void shouldSetNextQueryIdToNextOffsetWhenExecutingStatementCommand() { + // Given: + mockReplayCSAS(new QueryId("csas-query-id")); + + // When: + statementExecutorWithMocks.handleRestore( + new QueuedCommand( + new CommandId(Type.STREAM, "foo", Action.CREATE), + new Command("CSAS", emptyMap(), emptyMap(), Optional.empty()), + Optional.empty(), + 2L + ) + ); + + // Then: + verify(mockQueryIdGenerator).setNextId(3L); + } + @Test public void shouldSkipStartWhenReplayingLog() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index a39474f64f47..7ba669196d14 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; @@ -105,7 +106,7 @@ public void tearDown() { serviceContext.close(); } - private KsqlEngine createKsqlEngine() { + private KsqlEngine createKsqlEngine(final QueryIdGenerator queryIdGenerator) { final KsqlEngineMetrics engineMetrics = mock(KsqlEngineMetrics.class); return KsqlEngineTestUtil.createKsqlEngine( serviceContext, @@ -190,7 +191,8 @@ private class KsqlServer { final ServerState serverState; KsqlServer(final List commandLog) { - this.ksqlEngine = createKsqlEngine(); + final SpecificQueryIdGenerator queryIdGenerator = new SpecificQueryIdGenerator(); + this.ksqlEngine = createKsqlEngine(queryIdGenerator); this.fakeCommandQueue = new FakeCommandQueue(commandLog, transactionalProducer); serverState = new ServerState(); serverState.setReady(); @@ -560,7 +562,7 @@ public void shouldRecoverRecreates() { server1.submitCommands( "CREATE STREAM A (C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT C1 FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B;", "CREATE STREAM B AS SELECT C2 FROM A;" ); @@ -572,7 +574,7 @@ public void shouldRecoverTerminates() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;" + "TERMINATE CSAS_B_0;" ); shouldRecover(commands); } @@ -582,7 +584,7 @@ public void shouldRecoverDrop() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B;" ); shouldRecover(commands); @@ -594,7 +596,7 @@ public void shouldNotDeleteTopicsOnRecovery() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B DELETE TOPIC;" ); @@ -656,7 +658,7 @@ public void shouldRecoverQueryIDs() { final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - assertThat(queryIdNames, contains(new QueryId("CSAS_C_7"))); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_0"))); } } From a485e5cff4ccbad3134c81b83b86f93213ef2bbd Mon Sep 17 00:00:00 2001 From: Rohan Date: Thu, 19 Dec 2019 09:36:07 -0800 Subject: [PATCH 2/2] Update ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java Co-Authored-By: Victoria Xia --- .../server/computation/InteractiveStatementExecutorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index e508b1a61e89..44bd43a9933e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -583,7 +583,7 @@ public void shouldEnforceReferentialIntegrity() { } @Test - public void shouldSetNextQueryIdToNextOffsetWhenExecutingStatementCommand() { + public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() { // Given: mockReplayCSAS(new QueryId("csas-query-id"));