diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 3075f6d3f1c6..9d3d5b292d42 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -127,17 +127,7 @@ RestoreCommands getRestoreCommands() { while (!records.isEmpty()) { log.debug("Received {} records from poll", records.count()); for (ConsumerRecord record : records) { - final CommandId key = record.key(); - if (key.getAction() != CommandId.Action.DROP) { - restoreCommands.addCommand(record.key(), record.value()); - } else if (key.getAction() == CommandId.Action.DROP){ - if(!restoreCommands.remove(new CommandId(key.getType(), - key.getEntity(), - CommandId.Action.CREATE))) { - log.warn("drop command {} found without a corresponding create command for" - + " {} {}", key, key.getType(), key.getAction()); - } - } + restoreCommands.addCommand(record.key(), record.value()); } records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommands.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommands.java index a25b100dbd06..67e20393bce1 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommands.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommands.java @@ -21,12 +21,17 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.Pair; class RestoreCommands { - private final Map toRestore = new LinkedHashMap<>(); + + private final Map, Command> toRestore = new LinkedHashMap<>(); private final Map allTerminatedQueries = new HashMap<>(); + private final Map dropped = new HashMap<>(); private final List allCommandIds = new ArrayList<>(); void addCommand(final CommandId key, final Command value) { @@ -35,35 +40,37 @@ void addCommand(final CommandId key, final Command value) { if (allCommandIds.contains(key)) { allCommandIds.remove(key); } - } else if (!toRestore.containsKey(key)){ - toRestore.put(key, value); + } else { + toRestore.put(new Pair<>(allCommandIds.size(), key), value); + if (key.getAction() == CommandId.Action.DROP) { + dropped.put(key.getEntity(), key); + } } allCommandIds.add(key); } - boolean remove(final CommandId commandId) { - if (toRestore.remove(commandId) != null) { - allCommandIds.remove(commandId); - return true; - } - return false; - } interface ForEach { void apply(final CommandId commandId, final Command command, - final Map terminatedQueries); + final Map terminatedQueries, + final boolean dropped); } void forEach(final ForEach action) { - toRestore.forEach((commandId, command) -> { - final int commandIdIdx = allCommandIds.indexOf(commandId); + toRestore.forEach((commandIdIndexPair, command) -> { final Map terminatedAfter = new HashMap<>(); allTerminatedQueries.entrySet().stream() - .filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIdx) + .filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIndexPair.left) .forEach(queryIdCommandIdEntry -> terminatedAfter.put(queryIdCommandIdEntry.getKey(), queryIdCommandIdEntry.getValue())); - action.apply(commandId, command, terminatedAfter); + final Set droppedEntities = this.dropped.entrySet().stream() + .filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIndexPair.left) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + action.apply(commandIdIndexPair.right, command, + terminatedAfter, + droppedEntities.contains(commandIdIndexPair.right.getEntity())); }); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index 03eb4fc9a4f0..ae80aada4771 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -73,13 +73,14 @@ public StatementExecutor( } void handleRestoration(final RestoreCommands restoreCommands) throws Exception { - restoreCommands.forEach(((commandId, command, terminatedQueries) -> { + restoreCommands.forEach(((commandId, command, terminatedQueries, wasDropped) -> { log.info("Executing prior statement: '{}'", command); try { handleStatementWithTerminatedQueries( command, commandId, - terminatedQueries); + terminatedQueries, + wasDropped); } catch (Exception exception) { log.warn("Failed to execute statement due to exception", exception); } @@ -96,7 +97,7 @@ void handleStatement( Command command, CommandId commandId ) throws Exception { - handleStatementWithTerminatedQueries(command, commandId, null); + handleStatementWithTerminatedQueries(command, commandId, null, false); } /** @@ -162,13 +163,14 @@ private void completeStatusFuture(CommandId commandId, CommandStatus commandStat * @param commandId The ID to be used to track the status of the command * @param terminatedQueries An optional map from terminated query IDs to the commands that * requested their termination + * @param wasDropped was this table/stream subsequently dropped * @throws Exception TODO: Refine this. */ private void handleStatementWithTerminatedQueries( Command command, CommandId commandId, - Map terminatedQueries - ) throws Exception { + Map terminatedQueries, + boolean wasDropped) throws Exception { try { String statementString = command.getStatement(); statusStore.put( @@ -180,7 +182,7 @@ private void handleStatementWithTerminatedQueries( commandId, new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); - executeStatement(statement, command, commandId, terminatedQueries); + executeStatement(statement, command, commandId, terminatedQueries, wasDropped); } catch (WakeupException exception) { throw exception; } catch (Exception exception) { @@ -195,8 +197,8 @@ private void executeStatement( Statement statement, Command command, CommandId commandId, - Map terminatedQueries - ) throws Exception { + Map terminatedQueries, + boolean wasDropped) throws Exception { String statementStr = command.getStatement(); DDLCommandResult result = null; @@ -210,7 +212,8 @@ private void executeStatement( command, commandId, terminatedQueries, - statementStr); + statementStr, + wasDropped); if (successMessage == null) return; } else if (statement instanceof TerminateQuery) { terminateQuery((TerminateQuery) statement); @@ -251,7 +254,8 @@ private String handleCreateAsSelect(final CreateAsSelect statement, final Command command, final CommandId commandId, final Map terminatedQueries, - final String statementStr) throws Exception { + final String statementStr, + final boolean wasDropped) throws Exception { QuerySpecification querySpecification = (QuerySpecification) statement.getQuery().getQueryBody(); Query query = ksqlEngine.addInto( @@ -261,7 +265,7 @@ private String handleCreateAsSelect(final CreateAsSelect statement, statement.getProperties(), statement.getPartitionByColumn() ); - if (startQuery(statementStr, query, commandId, terminatedQueries, command)) { + if (startQuery(statementStr, query, commandId, terminatedQueries, command, wasDropped)) { return statement instanceof CreateTableAsSelect ? "Table created and running" : "Stream created and running"; @@ -275,8 +279,8 @@ private boolean startQuery( Query query, CommandId commandId, Map terminatedQueries, - Command command - ) throws Exception { + Command command, + boolean wasDropped) throws Exception { if (query.getQueryBody() instanceof QuerySpecification) { QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); Relation into = querySpecification.getInto(); @@ -312,6 +316,9 @@ private boolean startQuery( ); ksqlEngine.terminateQuery(queryId, false); return false; + } else if (wasDropped){ + ksqlEngine.terminateQuery(queryId, false); + return false; } else { persistentQueryMetadata.getKafkaStreams().start(); return true; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index 8b4a245e72a7..6ed80d65d4cc 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -28,11 +28,10 @@ import org.junit.Test; import org.junit.runner.RunWith; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.query.QueryId; @@ -54,30 +53,7 @@ public class CommandStoreTest { @Test - public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() { - final CommandId commandId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); - final Command originalCommand = new Command("some statement", Collections.emptyMap()); - final Command latestCommand = new Command("a new statement", Collections.emptyMap()); - final ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList( - new ConsumerRecord<>("topic", 0, 0, commandId, - originalCommand), - new ConsumerRecord<>("topic", 0, 0, commandId, - latestCommand)) - )); - - EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); - - EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records) - .andReturn(new ConsumerRecords<>(Collections.emptyMap())); - EasyMock.replay(commandConsumer); - - final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); - final Map commands = getPriorCommands(command); - assertThat(commands, equalTo(Collections.singletonMap(commandId, originalCommand))); - } - - @Test - public void shouldReplaceCommandWithNewCommandAfterDrop() { + public void shouldHaveAllCreateCommandsInOrder() { final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP); final Command originalCommand = new Command("some statement", Collections.emptyMap()); @@ -97,31 +73,12 @@ public void shouldReplaceCommandWithNewCommandAfterDrop() { EasyMock.replay(commandConsumer); final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); - final Map commands = getPriorCommands(command); - assertThat(commands, equalTo(Collections.singletonMap(createId, latestCommand))); + final List> commands = getPriorCommands(command); + assertThat(commands, equalTo(Arrays.asList(new Pair<>(createId, originalCommand), + new Pair<>(dropId, dropCommand), + new Pair<>(createId, latestCommand)))); } - @Test - public void shouldRemoveCreateCommandIfItHasBeenDropped() { - final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); - final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP); - final Command originalCommand = new Command("some statement", Collections.emptyMap()); - final Command dropCommand = new Command("drop", Collections.emptyMap()); - - final ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList( - new ConsumerRecord<>("topic", 0, 0, createId, originalCommand), - new ConsumerRecord<>("topic", 0, 0, dropId, dropCommand) - ))); - - EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); - - EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records) - .andReturn(new ConsumerRecords<>(Collections.emptyMap())); - EasyMock.replay(commandConsumer); - - final CommandStore commandStore = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); - assertThat(getPriorCommands(commandStore), equalTo(Collections.emptyMap())); - } @Test public void shouldCollectTerminatedQueries() { @@ -141,10 +98,10 @@ public void shouldCollectTerminatedQueries() { assertThat(restoreCommands.terminatedQueries(), equalTo(Collections.singletonMap(new QueryId("queryId"), terminated))); } - private Map getPriorCommands(CommandStore command) { + private List> getPriorCommands(CommandStore command) { final RestoreCommands priorCommands = command.getRestoreCommands(); - final Map commands = new HashMap<>(); - priorCommands.forEach(((id, cmd, terminatedQueries) -> commands.put(id, cmd))); + final List> commands = new ArrayList<>(); + priorCommands.forEach(((id, cmd, terminatedQueries, droppedEntities) -> commands.add(new Pair<>(id, cmd)))); return commands; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RestoreCommandsTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RestoreCommandsTest.java index 995fba43a67e..b89e594691fe 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RestoreCommandsTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RestoreCommandsTest.java @@ -27,19 +27,24 @@ import java.util.Map; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.Pair; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class RestoreCommandsTest { private final RestoreCommands restoreCommands = new RestoreCommands(); private final CommandId createId = new CommandId(CommandId.Type.TABLE, "foo", CommandId.Action.CREATE); + private final CommandId dropId = new CommandId(CommandId.Type.TABLE, "foo", CommandId.Action.DROP); private final CommandId terminateId = new CommandId(CommandId.Type.TERMINATE, "queryId", CommandId.Action.EXECUTE); private final Command createCommand = new Command("create table foo", Collections.emptyMap()); + private final Command dropCommand = new Command("drop table foo", Collections.emptyMap()); private final Command terminateCommand = new Command("terminate query 'queryId'", Collections.emptyMap()); @Test @@ -49,7 +54,7 @@ public void shouldHaveMapContainingTerminatedQueriesThatWereIssuedAfterCreate() restoreCommands.addCommand(terminateId, terminateCommand); - restoreCommands.forEach((commandId, command, terminatedQueries) -> { + restoreCommands.forEach((commandId, command, terminatedQueries, droppedEntities) -> { assertThat(commandId, equalTo(createId)); assertThat(command, equalTo(createCommand)); assertThat(terminatedQueries, equalTo(Collections.singletonMap(new QueryId("queryId"), @@ -64,14 +69,18 @@ public void shouldNotHaveTerminatedQueryWhenDroppedAndRecreatedAfterTerminate() restoreCommands.addCommand(terminateId, terminateCommand); // drop - restoreCommands.remove(createId); + restoreCommands.addCommand(dropId, dropCommand); // recreate restoreCommands.addCommand(createId, createCommand); - restoreCommands.forEach((commandId, command, terminatedQueries) -> { - assertThat(commandId, equalTo(createId)); - assertThat(command, equalTo(createCommand)); - assertThat(terminatedQueries, equalTo(Collections.emptyMap())); + final List>> results = new ArrayList<>(); + restoreCommands.forEach((commandId, command, terminatedQueries, droppedEntities) -> { + results.add(new Pair<>(commandId, terminatedQueries)); }); + + assertThat(results, equalTo(Arrays.asList( + new Pair<>(createId, Collections.singletonMap(new QueryId("queryId"), terminateId)), + new Pair<>(dropId, Collections.emptyMap()), + new Pair<>(createId, Collections.emptyMap())))); } @Test @@ -81,7 +90,7 @@ public void shouldNotHaveTerminatedQueriesIssuedBeforeCreate() { restoreCommands.addCommand(createId, createCommand); - restoreCommands.forEach((commandId, command, terminatedQueries) -> { + restoreCommands.forEach((commandId, command, terminatedQueries, dropped) -> { assertThat(commandId, equalTo(createId)); assertThat(command, equalTo(createCommand)); assertThat(terminatedQueries, equalTo(Collections.emptyMap())); @@ -100,7 +109,7 @@ public void shouldIterateCommandsInOrderTheyWereIssued() { new Command("create stream one", Collections.emptyMap())); final List results = new ArrayList<>(); - restoreCommands.forEach((commandId, command, terminatedQueries) -> { + restoreCommands.forEach((commandId, command, terminatedQueries, dropped) -> { results.add(commandId); }); assertThat(results, equalTo(Arrays.asList(createId, createStreamOneId))); @@ -113,7 +122,7 @@ public void shouldHaveTerminatedQueriesWhenMultipleCreateDropTerminateForCommand // terminate restoreCommands.addCommand(terminateId, terminateCommand); // drop - restoreCommands.remove(createId); + restoreCommands.addCommand(dropId, dropCommand); // recreate restoreCommands.addCommand(createId, createCommand); // another one for good measure @@ -122,16 +131,35 @@ public void shouldHaveTerminatedQueriesWhenMultipleCreateDropTerminateForCommand restoreCommands.addCommand(terminateId, terminateCommand); final Map> commandIdToTerminate = new HashMap<>(); - restoreCommands.forEach((commandId, command, terminatedQueries) -> { - if (commandIdToTerminate.containsKey(commandId)) { - fail("Should not have same commandId twice. CommandId=" + commandId); - } - commandIdToTerminate.put(commandId, terminatedQueries); - }); + restoreCommands.forEach((commandId, command, terminatedQueries, dropped) + -> commandIdToTerminate.put(commandId, terminatedQueries)); assertThat(commandIdToTerminate.get(createId), equalTo(Collections.singletonMap(new QueryId("queryId"), terminateId))); } + @Test + public void shouldBeDroppedWhenDropCommandAfterCreate() { + restoreCommands.addCommand(createId, createCommand); + restoreCommands.addCommand(dropId, dropCommand); + + final Map results = new HashMap<>(); + restoreCommands.forEach((commandId, command, terminatedQueries, dropped) + -> results.put(commandId, dropped)); + + assertTrue(results.get(createId)); + } + + @Test + public void shouldNotBeDroppedWhenDropCommandBeforeCreate() { + restoreCommands.addCommand(dropId, dropCommand); + restoreCommands.addCommand(createId, createCommand); + + final Map results = new HashMap<>(); + restoreCommands.forEach((commandId, command, terminatedQueries, dropped) + -> results.put(commandId, dropped)); + + assertFalse(results.get(createId)); + } } \ No newline at end of file