Skip to content

Commit

Permalink
fix: Properly clean up state when executing a command fails (#6437)
Browse files Browse the repository at this point in the history
* fix: Properly clean up state when executing a command fails
  • Loading branch information
AlanConfluent authored Oct 19, 2020
1 parent 97e5ff3 commit 242a32e
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ void ensureConsumedPast(long seqNum, Duration timeout)
*/
Producer<CommandId, Command> createTransactionalProducer();

/**
* If a command created with enqueueCommand has had errors and the transaction has failed or been
* aborted, this should be called to ensure state is cleaned up.
*/
void abortCommand(CommandId commandId);

/**
* Blocks until the command topic consumer has processed all records up to
* the current offset when this method is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,19 @@ public Producer<CommandId, Command> createTransactionalProducer() {
);
}

@Override
public void abortCommand(final CommandId commandId) {
commandStatusMap.compute(
commandId,
(k, v) -> {
if (v != null) {
LOG.info("Aborting existing command {}", commandId);
}
return null;
}
);
}

@Override
public void waitForCommandConsumer() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ public Optional<KsqlEntity> execute(
"Could not write the statement '%s' into the command topic: " + e.getMessage(),
statement.getStatementText()), e);
}


CommandId commandId = null;
try {
transactionalProducer.beginTransaction();
commandQueue.waitForCommandConsumer();

final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement());
commandId = commandIdAssigner.getCommandId(statement.getStatement());
final Command command = validatedCommandFactory.create(
injected,
executionContext.createSandbox(executionContext.getServiceContext())
Expand All @@ -164,11 +165,17 @@ public Optional<KsqlEntity> execute(
) {
// We can't recover from these exceptions, so our only option is close producer and exit.
// This catch doesn't abortTransaction() since doing that would throw another exception.
if (commandId != null) {
commandQueue.abortCommand(commandId);
}
throw new KsqlServerException(String.format(
"Could not write the statement '%s' into the command topic.",
statement.getStatementText()), e);
} catch (final Exception e) {
transactionalProducer.abortTransaction();
if (commandId != null) {
commandQueue.abortCommand(commandId);
}
throw new KsqlServerException(String.format(
"Could not write the statement '%s' into the command topic.",
statement.getStatementText()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ public void shouldStartCommandTopicOnStart() {
verify(commandTopic).start();
}

@Test
public void shouldSuccessfullyAbortAndRetry() {
// Given:
commandStore.enqueueCommand(commandId, command, transactionalProducer);

// When/Then:
commandStore.abortCommand(commandId);
commandStore.enqueueCommand(commandId, command, transactionalProducer);
}

private static ConsumerRecords<byte[], byte[]> buildRecords(final Object... args) {
assertThat(args.length % 2, equalTo(0));
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -401,4 +403,36 @@ public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() {
"Cannot insert into read-only topic: "
+ "default_ksql_processing_log"));
}

@Test
public void shouldAbortOnError_ProducerFencedException() {
// When:
doThrow(new ProducerFencedException("Error!")).when(transactionalProducer).commitTransaction();
final Exception e = assertThrows(
KsqlServerException.class,
() -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext)
);

assertThat(e.getMessage(), containsString("Could not write the statement "
+ "'statement' into the command topic."));

// Then:
verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement()));
}

@Test
public void shouldAbortOnError_Exception() {
// When:
doThrow(new RuntimeException("Error!")).when(transactionalProducer).commitTransaction();
final Exception e = assertThrows(
KsqlServerException.class,
() -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext)
);

assertThat(e.getMessage(), containsString("Could not write the statement "
+ "'statement' into the command topic."));

// Then:
verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public Producer<CommandId, Command> createTransactionalProducer() {
return transactionalProducer;
}

@Override
public void abortCommand(CommandId commandId) {
}

@Override
public boolean corruptionDetected() {
return false;
Expand Down

0 comments on commit 242a32e

Please sign in to comment.