Skip to content

Commit

Permalink
fix: use transactions to restore the commands
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 15, 2020
1 parent cfbf1f5 commit a6dfc98
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.BackupInputFile;
import io.confluent.ksql.rest.server.computation.Command;
Expand All @@ -35,16 +37,21 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;

Expand Down Expand Up @@ -201,32 +208,52 @@ public static void main(final String[] args) throws Exception {
private final KsqlConfig serverConfig;
private final String commandTopicName;
private final KafkaTopicClient topicClient;
private final Producer<byte[], byte[]> kafkaProducer;
private final Supplier<Producer<byte[], byte[]>> kafkaProducerSupplier;

private static KafkaProducer<byte[], byte[]> transactionalProducer(
final KsqlConfig serverConfig
) {
final Map<String, Object> transactionalProperties =
new HashMap<>(serverConfig.getProducerClientConfigProps());

transactionalProperties.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
serverConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
);

transactionalProperties.put(
ProducerConfig.ACKS_CONFIG,
"all"
);

return new KafkaProducer<>(
transactionalProperties,
BYTES_SERIALIZER,
BYTES_SERIALIZER
);
}

KsqlRestoreCommandTopic(final KsqlConfig serverConfig) {
this(
serverConfig,
ReservedInternalTopics.commandTopic(serverConfig),
ServiceContextFactory.create(serverConfig,
() -> /* no ksql client */ null).getTopicClient(),
new KafkaProducer<>(
serverConfig.getProducerClientConfigProps(),
BYTES_SERIALIZER,
BYTES_SERIALIZER
)
() -> transactionalProducer(serverConfig)
);
}

@VisibleForTesting
KsqlRestoreCommandTopic(
final KsqlConfig serverConfig,
final String commandTopicName,
final KafkaTopicClient topicClient,
final Producer<byte[], byte[]> kafkaProducer
final Supplier<Producer<byte[], byte[]>> kafkaProducerSupplier
) {
this.serverConfig = requireNonNull(serverConfig, "serverConfig");
this.commandTopicName = requireNonNull(commandTopicName, "commandTopicName");
this.topicClient = requireNonNull(topicClient, "topicClient");
this.kafkaProducer = requireNonNull(kafkaProducer, "kafkaProducer");
this.kafkaProducerSupplier = requireNonNull(kafkaProducerSupplier, "kafkaProducerSupplier");
}

public void restore(final List<Pair<byte[], byte[]>> backupCommands) {
Expand Down Expand Up @@ -254,35 +281,60 @@ private void deleteCommandTopicIfExists() {
}

private void restoreCommandTopic(final List<Pair<byte[], byte[]>> commands) {
final List<Future<RecordMetadata>> futures = new ArrayList<>(commands.size());

for (final Pair<byte[], byte[]> command : commands) {
futures.add(enqueueCommand(command.getLeft(), command.getRight()));
}

int i = 0;
for (final Future<RecordMetadata> future : futures) {
try {
future.get();
} catch (final InterruptedException e) {
throw new KsqlException("Restore process was interrupted.", e);
} catch (final Exception e) {
throw new KsqlException(
String.format("Failed restoring command (line %d): %s",
i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e);
try (Producer<byte[], byte[]> kafkaProducer = createTransactionalProducer()) {
for (int i = 0; i < commands.size(); i++) {
final Pair<byte[], byte[]> command = commands.get(i);

try {
kafkaProducer.beginTransaction();
enqueueCommand(kafkaProducer, command.getLeft(), command.getRight());
kafkaProducer.commitTransaction();
} catch (final ProducerFencedException
| OutOfOrderSequenceException
| AuthorizationException e
) {
// We can't recover from these exceptions, so our only option is close producer and exit.
// This catch doesn't abortTransaction() since doing that would throw another exception.
throw new KsqlException(
String.format("Failed restoring command (line %d): %s",
i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e);
} catch (final InterruptedException e) {
kafkaProducer.abortTransaction();
throw new KsqlException("Restore process was interrupted.", e);
} catch (final Exception e) {
kafkaProducer.abortTransaction();
throw new KsqlException(
String.format("Failed restoring command (line %d): %s",
i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e);
}
}

i++;
}
}

public Future<RecordMetadata> enqueueCommand(final byte[] commandId, final byte[] command) {
private void enqueueCommand(
final Producer<byte[], byte[]> kafkaProducer,
final byte[] commandId,
final byte[] command
) throws ExecutionException, InterruptedException {
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(
commandTopicName,
COMMAND_TOPIC_PARTITION,
commandId,
command);

return kafkaProducer.send(producerRecord);
kafkaProducer.send(producerRecord).get();
}

private Producer<byte[], byte[]> createTransactionalProducer() {
try {
final Producer<byte[], byte[]> kafkaProducer = kafkaProducerSupplier.get();
kafkaProducer.initTransactions();
return kafkaProducer;
} catch (final TimeoutException e) {
final DefaultErrorMessages errorMessages = new DefaultErrorMessages();
throw new KsqlException(errorMessages.transactionInitTimeoutErrorMessage(e), e);
} catch (final Exception e) {
throw new KsqlException("Failed to initialize topic transactions.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -115,7 +114,7 @@ public void setup() {
serverConfig,
COMMAND_TOPIC_NAME,
topicClient,
kafkaProducer
() -> kafkaProducer
);

when(kafkaProducer.send(RECORD_1)).thenReturn(future1);
Expand All @@ -133,12 +132,16 @@ public void shouldCreateAndRestoreCommandTopic() throws ExecutionException, Inte

// Then:
verifyCreateTopic(COMMAND_TOPIC_NAME);
verify(kafkaProducer).initTransactions();
verify(kafkaProducer, times(3)).beginTransaction();
verify(kafkaProducer).send(RECORD_1);
verify(kafkaProducer).send(RECORD_2);
verify(kafkaProducer).send(RECORD_3);
verify(future1).get();
verify(future2).get();
verify(future3).get();
verify(kafkaProducer, times(3)).commitTransaction();
verify(kafkaProducer).close();
verifyNoMoreInteractions(kafkaProducer, future1, future2, future3);
}

Expand All @@ -156,11 +159,15 @@ public void shouldThrowWhenRestoreIsInterrupted() throws Exception {
// Then:
assertThat(e.getMessage(), containsString("Restore process was interrupted."));
verifyCreateTopic(COMMAND_TOPIC_NAME);
verify(kafkaProducer).initTransactions();
verify(kafkaProducer, times(2)).beginTransaction();
verify(kafkaProducer).send(RECORD_1);
verify(kafkaProducer).send(RECORD_2);
verify(kafkaProducer).send(RECORD_3);
verify(future1).get();
verify(future2).get();
verify(kafkaProducer).commitTransaction();
verify(kafkaProducer).abortTransaction();
verify(kafkaProducer).close();
verifyNoMoreInteractions(kafkaProducer, future1, future2);
verifyZeroInteractions(future3);
}
Expand All @@ -182,11 +189,15 @@ public void shouldThrowWhenRestoreExecutionFails() throws Exception {
new String(RECORD_2.key(), StandardCharsets.UTF_8))));

verifyCreateTopic(COMMAND_TOPIC_NAME);
verify(kafkaProducer).initTransactions();
verify(kafkaProducer, times(2)).beginTransaction();
verify(kafkaProducer).send(RECORD_1);
verify(kafkaProducer).send(RECORD_2);
verify(kafkaProducer).send(RECORD_3);
verify(future1).get();
verify(future2).get();
verify(kafkaProducer).commitTransaction();
verify(kafkaProducer).abortTransaction();
verify(kafkaProducer).close();
verifyNoMoreInteractions(kafkaProducer, future1, future2);
verifyZeroInteractions(future3);
}
Expand All @@ -201,6 +212,8 @@ public void shouldRestoreCommandTopicWithEmptyCommands() {

// Then:
verifyCreateTopic(COMMAND_TOPIC_NAME);
verify(kafkaProducer).initTransactions();
verify(kafkaProducer).close();
verifyZeroInteractions(kafkaProducer, future1, future2, future3);
}

Expand All @@ -215,8 +228,12 @@ public void shouldDeleteAndCreateCommandTopicOnRestore() throws Exception {
// Then:
verifyDeleteTopic(COMMAND_TOPIC_NAME);
verifyCreateTopic(COMMAND_TOPIC_NAME);
verify(kafkaProducer).initTransactions();
verify(kafkaProducer).beginTransaction();
verify(kafkaProducer).send(RECORD_1);
verify(future1).get();
verify(kafkaProducer).commitTransaction();
verify(kafkaProducer).close();
verifyNoMoreInteractions(kafkaProducer, future1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ public void shouldBuildAggregatorParamsCorrectlyForUnwindowedAggregate() {
}

@Test
@SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
public void shouldBuildTumblingWindowedAggregateCorrectly() {
// Given:
givenTumblingWindowedAggregate();
Expand Down

0 comments on commit a6dfc98

Please sign in to comment.