Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't execute dropped streams/tables, but update metastore #532

Merged
merged 1 commit into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,7 @@ RestoreCommands getRestoreCommands() {
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
for (ConsumerRecord<CommandId, Command> record : records) {
final CommandId key = record.key();
if (key.getAction() != CommandId.Action.DROP) {
restoreCommands.addCommand(record.key(), record.value());
} else if (key.getAction() == CommandId.Action.DROP){
if(!restoreCommands.remove(new CommandId(key.getType(),
key.getEntity(),
CommandId.Action.CREATE))) {
log.warn("drop command {} found without a corresponding create command for"
+ " {} {}", key, key.getType(), key.getAction());
}
}
restoreCommands.addCommand(record.key(), record.value());
}
records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.Pair;

class RestoreCommands {
private final Map<CommandId, Command> toRestore = new LinkedHashMap<>();

private final Map<Pair<Integer, CommandId>, Command> toRestore = new LinkedHashMap<>();
private final Map<QueryId, CommandId> allTerminatedQueries = new HashMap<>();
private final Map<String, CommandId> dropped = new HashMap<>();
private final List<CommandId> allCommandIds = new ArrayList<>();

void addCommand(final CommandId key, final Command value) {
Expand All @@ -35,35 +40,37 @@ void addCommand(final CommandId key, final Command value) {
if (allCommandIds.contains(key)) {
allCommandIds.remove(key);
}
} else if (!toRestore.containsKey(key)){
toRestore.put(key, value);
} else {
toRestore.put(new Pair<>(allCommandIds.size(), key), value);
if (key.getAction() == CommandId.Action.DROP) {
dropped.put(key.getEntity(), key);
}
}
allCommandIds.add(key);
}

boolean remove(final CommandId commandId) {
if (toRestore.remove(commandId) != null) {
allCommandIds.remove(commandId);
return true;
}
return false;
}

interface ForEach {
void apply(final CommandId commandId,
final Command command,
final Map<QueryId, CommandId> terminatedQueries);
final Map<QueryId, CommandId> terminatedQueries,
final boolean dropped);
}

void forEach(final ForEach action) {
toRestore.forEach((commandId, command) -> {
final int commandIdIdx = allCommandIds.indexOf(commandId);
toRestore.forEach((commandIdIndexPair, command) -> {
final Map<QueryId, CommandId> terminatedAfter = new HashMap<>();
allTerminatedQueries.entrySet().stream()
.filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIdx)
.filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIndexPair.left)
.forEach(queryIdCommandIdEntry ->
terminatedAfter.put(queryIdCommandIdEntry.getKey(), queryIdCommandIdEntry.getValue()));
action.apply(commandId, command, terminatedAfter);
final Set<String> droppedEntities = this.dropped.entrySet().stream()
.filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIndexPair.left)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
action.apply(commandIdIndexPair.right, command,
terminatedAfter,
droppedEntities.contains(commandIdIndexPair.right.getEntity()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public StatementExecutor(
}

void handleRestoration(final RestoreCommands restoreCommands) throws Exception {
restoreCommands.forEach(((commandId, command, terminatedQueries) -> {
restoreCommands.forEach(((commandId, command, terminatedQueries, wasDropped) -> {
log.info("Executing prior statement: '{}'", command);
try {
handleStatementWithTerminatedQueries(
command,
commandId,
terminatedQueries);
terminatedQueries,
wasDropped);
} catch (Exception exception) {
log.warn("Failed to execute statement due to exception", exception);
}
Expand All @@ -96,7 +97,7 @@ void handleStatement(
Command command,
CommandId commandId
) throws Exception {
handleStatementWithTerminatedQueries(command, commandId, null);
handleStatementWithTerminatedQueries(command, commandId, null, false);
}

/**
Expand Down Expand Up @@ -162,13 +163,14 @@ private void completeStatusFuture(CommandId commandId, CommandStatus commandStat
* @param commandId The ID to be used to track the status of the command
* @param terminatedQueries An optional map from terminated query IDs to the commands that
* requested their termination
* @param wasDropped was this table/stream subsequently dropped
* @throws Exception TODO: Refine this.
*/
private void handleStatementWithTerminatedQueries(
Command command,
CommandId commandId,
Map<QueryId, CommandId> terminatedQueries
) throws Exception {
Map<QueryId, CommandId> terminatedQueries,
boolean wasDropped) throws Exception {
try {
String statementString = command.getStatement();
statusStore.put(
Expand All @@ -180,7 +182,7 @@ private void handleStatementWithTerminatedQueries(
commandId,
new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement")
);
executeStatement(statement, command, commandId, terminatedQueries);
executeStatement(statement, command, commandId, terminatedQueries, wasDropped);
} catch (WakeupException exception) {
throw exception;
} catch (Exception exception) {
Expand All @@ -195,8 +197,8 @@ private void executeStatement(
Statement statement,
Command command,
CommandId commandId,
Map<QueryId, CommandId> terminatedQueries
) throws Exception {
Map<QueryId, CommandId> terminatedQueries,
boolean wasDropped) throws Exception {
String statementStr = command.getStatement();

DDLCommandResult result = null;
Expand All @@ -210,7 +212,8 @@ private void executeStatement(
command,
commandId,
terminatedQueries,
statementStr);
statementStr,
wasDropped);
if (successMessage == null) return;
} else if (statement instanceof TerminateQuery) {
terminateQuery((TerminateQuery) statement);
Expand Down Expand Up @@ -251,7 +254,8 @@ private String handleCreateAsSelect(final CreateAsSelect statement,
final Command command,
final CommandId commandId,
final Map<QueryId, CommandId> terminatedQueries,
final String statementStr) throws Exception {
final String statementStr,
final boolean wasDropped) throws Exception {
QuerySpecification querySpecification =
(QuerySpecification) statement.getQuery().getQueryBody();
Query query = ksqlEngine.addInto(
Expand All @@ -261,7 +265,7 @@ private String handleCreateAsSelect(final CreateAsSelect statement,
statement.getProperties(),
statement.getPartitionByColumn()
);
if (startQuery(statementStr, query, commandId, terminatedQueries, command)) {
if (startQuery(statementStr, query, commandId, terminatedQueries, command, wasDropped)) {
return statement instanceof CreateTableAsSelect
? "Table created and running"
: "Stream created and running";
Expand All @@ -275,8 +279,8 @@ private boolean startQuery(
Query query,
CommandId commandId,
Map<QueryId, CommandId> terminatedQueries,
Command command
) throws Exception {
Command command,
boolean wasDropped) throws Exception {
if (query.getQueryBody() instanceof QuerySpecification) {
QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody();
Relation into = querySpecification.getInto();
Expand Down Expand Up @@ -312,6 +316,9 @@ private boolean startQuery(
);
ksqlEngine.terminateQuery(queryId, false);
return false;
} else if (wasDropped){
ksqlEngine.terminateQuery(queryId, false);
return false;
} else {
persistentQueryMetadata.getKafkaStreams().start();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.query.QueryId;
Expand All @@ -54,30 +53,7 @@ public class CommandStoreTest {


@Test
public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() {
final CommandId commandId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE);
final Command originalCommand = new Command("some statement", Collections.emptyMap());
final Command latestCommand = new Command("a new statement", Collections.emptyMap());
final ConsumerRecords<CommandId, Command> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList(
new ConsumerRecord<>("topic", 0, 0, commandId,
originalCommand),
new ConsumerRecord<>("topic", 0, 0, commandId,
latestCommand))
));

EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList());

EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records)
.andReturn(new ConsumerRecords<>(Collections.emptyMap()));
EasyMock.replay(commandConsumer);

final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final Map<CommandId, Command> commands = getPriorCommands(command);
assertThat(commands, equalTo(Collections.singletonMap(commandId, originalCommand)));
}

@Test
public void shouldReplaceCommandWithNewCommandAfterDrop() {
public void shouldHaveAllCreateCommandsInOrder() {
final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE);
final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP);
final Command originalCommand = new Command("some statement", Collections.emptyMap());
Expand All @@ -97,31 +73,12 @@ public void shouldReplaceCommandWithNewCommandAfterDrop() {
EasyMock.replay(commandConsumer);

final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final Map<CommandId, Command> commands = getPriorCommands(command);
assertThat(commands, equalTo(Collections.singletonMap(createId, latestCommand)));
final List<Pair<CommandId, Command>> commands = getPriorCommands(command);
assertThat(commands, equalTo(Arrays.asList(new Pair<>(createId, originalCommand),
new Pair<>(dropId, dropCommand),
new Pair<>(createId, latestCommand))));
}

@Test
public void shouldRemoveCreateCommandIfItHasBeenDropped() {
final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE);
final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP);
final Command originalCommand = new Command("some statement", Collections.emptyMap());
final Command dropCommand = new Command("drop", Collections.emptyMap());

final ConsumerRecords<CommandId, Command> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList(
new ConsumerRecord<>("topic", 0, 0, createId, originalCommand),
new ConsumerRecord<>("topic", 0, 0, dropId, dropCommand)
)));

EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList());

EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records)
.andReturn(new ConsumerRecords<>(Collections.emptyMap()));
EasyMock.replay(commandConsumer);

final CommandStore commandStore = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
assertThat(getPriorCommands(commandStore), equalTo(Collections.emptyMap()));
}

@Test
public void shouldCollectTerminatedQueries() {
Expand All @@ -141,10 +98,10 @@ public void shouldCollectTerminatedQueries() {
assertThat(restoreCommands.terminatedQueries(), equalTo(Collections.singletonMap(new QueryId("queryId"), terminated)));
}

private Map<CommandId, Command> getPriorCommands(CommandStore command) {
private List<Pair<CommandId, Command>> getPriorCommands(CommandStore command) {
final RestoreCommands priorCommands = command.getRestoreCommands();
final Map<CommandId, Command> commands = new HashMap<>();
priorCommands.forEach(((id, cmd, terminatedQueries) -> commands.put(id, cmd)));
final List<Pair<CommandId, Command>> commands = new ArrayList<>();
priorCommands.forEach(((id, cmd, terminatedQueries, droppedEntities) -> commands.add(new Pair<>(id, cmd))));
return commands;
}

Expand Down
Loading