Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't use queryId of last terminate command after restore #6278

Merged
merged 8 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ public void shouldListQueries() {

// Then
assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_0"));
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5"));
assertThat(queries.get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".STR STR,\n"
Expand Down Expand Up @@ -894,7 +894,7 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.valueFormat(), is("JSON"));
assertThat(description.readQueries(), hasSize(1));
assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_0"));
assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_5"));
assertThat(description.readQueries().get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".STR STR,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ private void executePlan(
new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement")
);
final ExecuteResult result = ksqlEngine.execute(serviceContext, configured);
queryIdGenerator.setNextId(offset + 1);
if (result.getQuery().isPresent()) {
queryIdGenerator.setNextId(offset + 1);
if (mode == Mode.EXECUTE) {
result.getQuery().get().start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ private void handleStatement(

private void terminateQueries() {
final Command terminateCommand1 = new Command(
"TERMINATE CSAS_USER1PV_0;",
"TERMINATE CSAS_USER1PV_1;",
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated(),
Optional.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -45,7 +44,6 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -67,7 +65,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -586,7 +583,7 @@ public void shouldRecoverRecreates() {
server1.submitCommands(
"CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;",
"TERMINATE CsAs_b_0;",
"TERMINATE CsAs_b_1;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT ROWKEY, C2 FROM A;"
);
Expand All @@ -609,7 +606,7 @@ public void shouldRecoverReplacesWithTerminates() {
"CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;",
"CREATE OR REPLACE STREAM B AS SELECT ROWKEY, C1, C2 FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;"
);
Expand All @@ -633,7 +630,7 @@ public void shouldRecoverInsertIntosRecreates() {
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B (COLUMN STRING) WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON', PARTITIONS=1);",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE InsertQuery_0;",
"TERMINATE InsertQuery_2;",
"INSERT INTO B SELECT * FROM A;"
);
shouldRecover(commands);
Expand All @@ -645,7 +642,7 @@ public void shouldRecoverTerminates() {
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"TERMINATE InsertQuery_2;"
);
shouldRecover(commands);
Expand All @@ -656,7 +653,7 @@ public void shouldRecoverDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B;"
);
shouldRecover(commands);
Expand All @@ -667,7 +664,7 @@ public void shouldRecoverWithDuplicateTerminateAndDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;"
"TERMINATE CSAS_B_1;"
);

addDuplicateOfLastCommand(); // Add duplicate of "TERMINATE CSAS_B_0;"
Expand All @@ -687,7 +684,7 @@ public void shouldNotDeleteTopicsOnRecovery() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B DELETE TOPIC;"
);

Expand All @@ -710,9 +707,42 @@ public void shouldRecoverQueryIDs() {
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_0")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_1")));
}

@Test
public void shouldIncrementQueryIDsNoPlans() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;");

final KsqlServer server = new KsqlServer(commands);
server.recover();
server.submitCommands("CREATE STREAM C AS SELECT * FROM A;");
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_2")));
}

@Test
public void shouldIncrementQueryIDsWithPlan() {
vpapavas marked this conversation as resolved.
Show resolved Hide resolved
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"CREATE STREAM C AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;");

final KsqlServer server = new KsqlServer(commands);
server.recover();
server.submitCommands("CREATE STREAM D AS SELECT * FROM A;");
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();
assertThat(queryIdNames, contains(new QueryId("CSAS_C_2"), new QueryId("CSAS_D_3")));
}


// Simulate bad commands that have been introduced due to race condition in logic producing to cmd topic
private void addDuplicateOfLastCommand() {
final QueuedCommand original = commands.get(commands.size() - 1);
Expand Down