Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change query id generation to work with planned commands #4149

Merged
merged 2 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SpecificQueryIdGenerator implements QueryIdGenerator {

public SpecificQueryIdGenerator() {
this.nextId = 0L;
this.alreadyUsed = true;
this.alreadyUsed = false;
}

public void setNextId(final long nextId) {
Expand All @@ -50,6 +50,6 @@ public String getNext() {

@Override
public QueryIdGenerator createSandbox() {
return new SequentialQueryIdGenerator(nextId + 1);
return new SequentialQueryIdGenerator(nextId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void shouldGenerateIdBasedOnSetNextId() {
assertThat(generator.getNext(), is("5"));
}


@Test
public void shouldReturnZeroIdForFirstQuery() {
assertThat(generator.getNext(), is("0"));
}

@Test(expected = KsqlServerException.class)
public void shouldThrowWhenGetNextBeforeSet() {
Expand All @@ -54,6 +57,6 @@ public void shouldReturnSequentialGeneratorFromLastId() {
generator.setNextId(3L);
final QueryIdGenerator copy = generator.createSandbox();
assertThat(copy, instanceOf(SequentialQueryIdGenerator.class));
assertThat(copy.getNext(), is("4"));
assertThat(copy.getNext(), is("3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void handleStatementWithTerminatedQueries(
) {
try {
if (command.getPlan().isPresent()) {
executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode);
executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset);
return;
}
final String statementString = command.getStatement();
Expand Down Expand Up @@ -224,7 +224,8 @@ private void executePlan(
final CommandId commandId,
final Optional<CommandStatusFuture> commandStatusFuture,
final KsqlPlan plan,
final Mode mode
final Mode mode,
final long offset
) {
final KsqlConfig mergedConfig = buildMergedConfig(command);
final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of(
Expand All @@ -238,8 +239,11 @@ private void executePlan(
new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement")
);
final ExecuteResult result = ksqlEngine.execute(serviceContext, configured);
if (mode == Mode.EXECUTE) {
result.getQuery().ifPresent(QueryMetadata::start);
if (result.getQuery().isPresent()) {
queryIdGenerator.setNextId(offset + 1);
if (mode == Mode.EXECUTE) {
result.getQuery().get().start();
}
}
final String successMessage = getSuccessMessage(result);
final CommandStatus successStatus =
Expand Down Expand Up @@ -317,8 +321,6 @@ private PersistentQueryMetadata startQuery(
final ConfiguredStatement<?> configured = ConfiguredStatement.of(
statement, command.getOverwriteProperties(), mergedConfig);

queryIdGenerator.setNextId(offset);

final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured);
final QueryMetadata queryMetadata =
ksqlEngine
Expand All @@ -328,6 +330,8 @@ private PersistentQueryMetadata startQuery(
.getQuery()
.orElseThrow(() -> new IllegalStateException("Statement did not return a query"));

queryIdGenerator.setNextId(offset + 1);

if (!(queryMetadata instanceof PersistentQueryMetadata)) {
throw new KsqlException(String.format(
"Unexpected query metadata type: %s; was expecting %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,18 @@ public void shouldExecutePlannedCommand() {
verify(mockEngine).execute(serviceContext, ConfiguredKsqlPlan.of(plan, emptyMap(), ksqlConfig));
}

@Test
public void shouldSetNextQueryIdToNextOffsetWhenExecutingPlannedCommand() {
// Given:
givenMockPlannedQuery();

// When:
handleStatement(statementExecutorWithMocks, plannedCommand, COMMAND_ID, Optional.empty(), 2L);

// Then:
verify(mockQueryIdGenerator).setNextId(3L);
}

@Test
public void shouldUpdateStatusOnCompletedPlannedCommand() {
// Given:
Expand Down Expand Up @@ -570,6 +582,25 @@ public void shouldEnforceReferentialIntegrity() {
CoreMatchers.equalTo(CommandStatus.Status.SUCCESS));
}

@Test
public void shouldSetNextQueryIdToNextOffsetWhenExecutingStatementCommand() {
rodesai marked this conversation as resolved.
Show resolved Hide resolved
// Given:
mockReplayCSAS(new QueryId("csas-query-id"));

// When:
statementExecutorWithMocks.handleRestore(
new QueuedCommand(
new CommandId(Type.STREAM, "foo", Action.CREATE),
new Command("CSAS", emptyMap(), emptyMap(), Optional.empty()),
Optional.empty(),
2L
)
);

// Then:
verify(mockQueryIdGenerator).setNextId(3L);
}

@Test
public void shouldSkipStartWhenReplayingLog() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void tearDown() {
serviceContext.close();
}

private KsqlEngine createKsqlEngine() {
private KsqlEngine createKsqlEngine(final QueryIdGenerator queryIdGenerator) {
final KsqlEngineMetrics engineMetrics = mock(KsqlEngineMetrics.class);
return KsqlEngineTestUtil.createKsqlEngine(
serviceContext,
Expand Down Expand Up @@ -190,7 +191,8 @@ private class KsqlServer {
final ServerState serverState;

KsqlServer(final List<QueuedCommand> commandLog) {
this.ksqlEngine = createKsqlEngine();
final SpecificQueryIdGenerator queryIdGenerator = new SpecificQueryIdGenerator();
this.ksqlEngine = createKsqlEngine(queryIdGenerator);
this.fakeCommandQueue = new FakeCommandQueue(commandLog, transactionalProducer);
serverState = new ServerState();
serverState.setReady();
Expand Down Expand Up @@ -560,7 +562,7 @@ public void shouldRecoverRecreates() {
server1.submitCommands(
"CREATE STREAM A (C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT C1 FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT C2 FROM A;"
);
Expand All @@ -572,7 +574,7 @@ public void shouldRecoverTerminates() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;"
"TERMINATE CSAS_B_0;"
);
shouldRecover(commands);
}
Expand All @@ -582,7 +584,7 @@ public void shouldRecoverDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B;"
);
shouldRecover(commands);
Expand All @@ -594,7 +596,7 @@ public void shouldNotDeleteTopicsOnRecovery() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B DELETE TOPIC;"
);

Expand Down Expand Up @@ -656,7 +658,7 @@ public void shouldRecoverQueryIDs() {
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_7")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_0")));
}

}