From 242a32ecef79545436de3ae0b6cc5486a0b3cd95 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Mon, 19 Oct 2020 11:09:27 -0700 Subject: [PATCH] fix: Properly clean up state when executing a command fails (#6437) * fix: Properly clean up state when executing a command fails --- .../rest/server/computation/CommandQueue.java | 6 ++++ .../rest/server/computation/CommandStore.java | 13 +++++++ .../computation/DistributingExecutor.java | 11 ++++-- .../server/computation/CommandStoreTest.java | 10 ++++++ .../computation/DistributingExecutorTest.java | 34 +++++++++++++++++++ .../rest/server/computation/RecoveryTest.java | 4 +++ 6 files changed, 76 insertions(+), 2 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java index cffaebb41dd2..763a9599261d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java @@ -87,6 +87,12 @@ void ensureConsumedPast(long seqNum, Duration timeout) */ Producer createTransactionalProducer(); + /** + * If a command created with enqueueCommand has had errors and the transaction has failed or been + * aborted, this should be called to ensure state is cleaned up. + */ + void abortCommand(CommandId commandId); + /** * Blocks until the command topic consumer has processed all records up to * the current offset when this method is called. diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index a74283d41426..3fdfed1a16a6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -303,6 +303,19 @@ public Producer createTransactionalProducer() { ); } + @Override + public void abortCommand(final CommandId commandId) { + commandStatusMap.compute( + commandId, + (k, v) -> { + if (v != null) { + LOG.info("Aborting existing command {}", commandId); + } + return null; + } + ); + } + @Override public void waitForCommandConsumer() { try { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 2e6f85d0d061..e3a7ab3ceceb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -135,12 +135,13 @@ public Optional execute( "Could not write the statement '%s' into the command topic: " + e.getMessage(), statement.getStatementText()), e); } - + + CommandId commandId = null; try { transactionalProducer.beginTransaction(); commandQueue.waitForCommandConsumer(); - final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); + commandId = commandIdAssigner.getCommandId(statement.getStatement()); final Command command = validatedCommandFactory.create( injected, executionContext.createSandbox(executionContext.getServiceContext()) @@ -164,11 +165,17 @@ public Optional execute( ) { // We can't recover from these exceptions, so our only option is close producer and exit. // This catch doesn't abortTransaction() since doing that would throw another exception. + if (commandId != null) { + commandQueue.abortCommand(commandId); + } throw new KsqlServerException(String.format( "Could not write the statement '%s' into the command topic.", statement.getStatementText()), e); } catch (final Exception e) { transactionalProducer.abortTransaction(); + if (commandId != null) { + commandQueue.abortCommand(commandId); + } throw new KsqlServerException(String.format( "Could not write the statement '%s' into the command topic.", statement.getStatementText()), e); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index 5e20387fa2b3..975cfb9ab58f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -368,6 +368,16 @@ public void shouldStartCommandTopicOnStart() { verify(commandTopic).start(); } + @Test + public void shouldSuccessfullyAbortAndRetry() { + // Given: + commandStore.enqueueCommand(commandId, command, transactionalProducer); + + // When/Then: + commandStore.abortCommand(commandId); + commandStore.enqueueCommand(commandId, command, transactionalProducer); + } + private static ConsumerRecords buildRecords(final Object... args) { assertThat(args.length % 2, equalTo(0)); final List> records = new ArrayList<>(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 05cfd3e0d00a..2cb234f51172 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -73,6 +73,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Before; import org.junit.Test; @@ -401,4 +403,36 @@ public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() { "Cannot insert into read-only topic: " + "default_ksql_processing_log")); } + + @Test + public void shouldAbortOnError_ProducerFencedException() { + // When: + doThrow(new ProducerFencedException("Error!")).when(transactionalProducer).commitTransaction(); + final Exception e = assertThrows( + KsqlServerException.class, + () -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext) + ); + + assertThat(e.getMessage(), containsString("Could not write the statement " + + "'statement' into the command topic.")); + + // Then: + verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement())); + } + + @Test + public void shouldAbortOnError_Exception() { + // When: + doThrow(new RuntimeException("Error!")).when(transactionalProducer).commitTransaction(); + final Exception e = assertThrows( + KsqlServerException.class, + () -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext) + ); + + assertThat(e.getMessage(), containsString("Could not write the statement " + + "'statement' into the command topic.")); + + // Then: + verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement())); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index f4f912df6550..68adb5cd1d69 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -173,6 +173,10 @@ public Producer createTransactionalProducer() { return transactionalProducer; } + @Override + public void abortCommand(CommandId commandId) { + } + @Override public boolean corruptionDetected() { return false;