Skip to content

Commit

Permalink
feat: don't start queries when corruption is detected during startup (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Jul 22, 2021
1 parent 1d9402f commit 4c0c181
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public interface QueryMetadata {

List<QueryError> getQueryErrors();

void setCorruptionQueryError();

KafkaStreams getKafkaStreams();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class QueryMetadataImpl implements QueryMetadata {
// the object is made available to other threads.
private KafkaStreams kafkaStreams;
private boolean initialized = false;
private boolean corruptionCommandTopic = false;

private static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() {
@Override
Expand Down Expand Up @@ -225,6 +226,9 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler ha
}

public State getState() {
if (corruptionCommandTopic) {
return State.ERROR;
}
return kafkaStreams.state();
}

Expand Down Expand Up @@ -290,6 +294,17 @@ public List<QueryError> getQueryErrors() {
return queryErrors.toImmutableList();
}

public void setCorruptionQueryError() {
final QueryError corruptionQueryError = new QueryError(
System.currentTimeMillis(),
"Query not started due to corruption in the command topic.",
Type.USER
);
listener.onError(this, corruptionQueryError);
queryErrors.add(corruptionQueryError);
corruptionCommandTopic = true;
}

protected boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.time.Clock;
Expand Down Expand Up @@ -287,9 +288,14 @@ public void processPriorCommands(final PersistentQueryCleanupImpl queryCleanup)
.getPersistentQueries();

queryCleanup.cleanupLeakedQueries(queries);
LOG.info("Restarting {} queries.", queries.size());

queries.forEach(PersistentQueryMetadata::start);
if (commandStore.corruptionDetected()) {
LOG.info("Corruption detected, queries will not be started.");
queries.forEach(QueryMetadata::setCorruptionQueryError);
} else {
LOG.info("Restarting {} queries.", queries.size());
queries.forEach(PersistentQueryMetadata::start);
}

LOG.info("Restore complete");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;

import io.confluent.ksql.util.PersistentQueryMetadata;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -115,7 +115,13 @@ public class CommandRunnerTest {
@Mock
private Errors errorHandler;
@Mock
PersistentQueryCleanupImpl persistentQueryCleanupImpl;
private PersistentQueryMetadata queryMetadata1;
@Mock
private PersistentQueryMetadata queryMetadata2;
@Mock
private PersistentQueryMetadata queryMetadata3;
@Mock
private PersistentQueryCleanupImpl persistentQueryCleanupImpl;
@Captor
private ArgumentCaptor<Runnable> threadTaskCaptor;
private CommandRunner commandRunner;
Expand Down Expand Up @@ -167,6 +173,30 @@ public void setup() {
public void shouldRunThePriorCommandsCorrectly() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(queryMetadata1, queryMetadata2, queryMetadata3));

// When:
commandRunner.processPriorCommands(persistentQueryCleanupImpl);

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3));
verify(queryMetadata1).start();
verify(queryMetadata2).start();
verify(queryMetadata3).start();
verify(queryMetadata1, never()).setCorruptionQueryError();
verify(queryMetadata2, never()).setCorruptionQueryError();
verify(queryMetadata3, never()).setCorruptionQueryError();
}

@Test
public void shouldNotStartQueriesDuringRestoreWhenCorrupted() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(queryMetadata1, queryMetadata2, queryMetadata3));
when(commandStore.corruptionDetected()).thenReturn(true);

// When:
commandRunner.processPriorCommands(persistentQueryCleanupImpl);
Expand All @@ -176,6 +206,12 @@ public void shouldRunThePriorCommandsCorrectly() {
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3));
verify(queryMetadata1, never()).start();
verify(queryMetadata2, never()).start();
verify(queryMetadata3, never()).start();
verify(queryMetadata1).setCorruptionQueryError();
verify(queryMetadata2).setCorruptionQueryError();
verify(queryMetadata3).setCorruptionQueryError();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class DefaultErrorMessages implements ErrorMessages {
+ System.lineSeparator()
+ "DDL statements will not be processed any further."
+ System.lineSeparator()
+ "Queries may not run properly while the server is in a corrupted state."
+ System.lineSeparator()
+ "If a backup of the command topic is available, "
+ "restore the command topic using the backup file."
+ System.lineSeparator()
Expand Down

0 comments on commit 4c0c181

Please sign in to comment.