Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

START: DLQ draft #8

Draft
wants to merge 8 commits into
base: features/retry-exception
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -19,6 +20,7 @@

import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.DLQ;

/**
* The options for the {@link AbstractParallelEoSStreamProcessor} system.
Expand All @@ -43,6 +45,11 @@ public class ParallelConsumerOptions<K, V> {
*/
private final Producer<K, V> producer;

/**
* Supplying a producer is only needed for some functions, which will cause an error if used when it's missing.
*/
private final AdminClient adminClient;

/**
* Path to Managed executor service for Java EE
*/
Expand Down Expand Up @@ -228,7 +235,7 @@ public boolean isUsingBatching() {
public enum TerminalFailureReaction {
SHUTDOWN,
SKIP,
// DLQ, TODO
DLQ
}

/**
Expand All @@ -248,8 +255,17 @@ public void validate() {

//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());

//
// todo should admin be required?
if (!isProducerSupplied() && getAdminClient() == null && getTerminalFailureReaction() == DLQ) {
throw new IllegalArgumentException(msg("Wanting to use DQL failure mode ({}) without supplying a either a Producer or Admin client (both are needed) instance",
getTerminalFailureReaction()));
}

}


public boolean isUsingTransactionalProducer() {
return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.ProducerManager;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import pl.tlinkowski.unij.api.UniLists;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -68,22 +63,9 @@ public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K
}
log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce);

List<ConsumeProduceResult<K, V, K, V>> results = new ArrayList<>();
log.trace("Producing {} messages in result...", recordListToProduce.size());

var futures = super.getProducerManager().get().produceMessages(recordListToProduce);
try {
for (Tuple<ProducerRecord<K, V>, Future<RecordMetadata>> future : futures) {
var recordMetadata = TimeUtils.time(() ->
future.getRight().get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS));
var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata);
results.add(result);
}
} catch (Exception e) {
throw new InternalRuntimeError("Error while waiting for produce results", e);
}

return results;
ProducerManager<K, V> producer = super.getProducerManager().get();
return producer.produceMessagesSyncWithContext(context.getPollContext(), recordListToProduce);
};

supervisorLoop(wrappedUserFunc, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ public abstract class AbstractParallelEoSStreamProcessor<K, V> implements Parall
@Getter(PROTECTED)
private final BlockingQueue<ControllerEventMessage<K, V>> workMailBox = new LinkedBlockingQueue<>(); // Thread safe, highly performant, non blocking

private final UserFunctionRunner<K, V> runner;

/**
* An inbound message to the controller.
* <p>
* Currently, an Either type class, representing either newly polled records to ingest, or a work result.
*/
@Value
@RequiredArgsConstructor(access = PRIVATE)
// todo refactor - this has been extracted in PR#270
private static class ControllerEventMessage<K, V> {
WorkContainer<K, V> workContainer;
EpochAndRecordsMap<K, V> consumerRecords;
Expand Down Expand Up @@ -256,6 +259,8 @@ public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptio
this.producerManager = Optional.empty();
this.committer = this.brokerPollSubsystem;
}

this.runner = new UserFunctionRunner<>(this, clock, getProducerManager(), options.getAdminClient());
}

private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
Expand Down Expand Up @@ -771,8 +776,7 @@ private <R> void submitWorkToPoolInner(final Function<PollContextInternal<K, V>,
log.trace("Sending work ({}) to pool", batch);
Future outputRecordFuture = workerThreadPool.submit(() -> {
addInstanceMDC();
UserFunctionRunner<K, V> runner = new UserFunctionRunner<>(this);
return runner.runUserFunction(usersFunction, callback, batch);
return this.runner.runUserFunction(usersFunction, callback, batch);
});
// for a batch, each message in the batch shares the same result
for (final WorkContainer<K, V> workContainer : batch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/

import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -20,15 +22,24 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.confluent.csid.utils.StringUtils.msg;

/**
* Central control point for {@link KafkaProducer} where it's needed in some scenarios, like transactions.
* <p>
* Also has useful wrapping methods.
* <p>
* todo docs
*/
@Slf4j
public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> implements OffsetCommitter {

Expand All @@ -38,9 +49,8 @@ public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> impleme

private final boolean producerIsConfiguredForTransactions;


/**
* The {@link KafkaProducer) isn't actually completely thread safe, at least when using it transactionally. We must
* The {@link KafkaProducer} isn't actually completely thread safe, at least when using it transactionally. We must
* be careful not to send messages to the producer, while we are committing a transaction - "Cannot call send in
* state COMMITTING_TRANSACTION".
*/
Expand Down Expand Up @@ -123,7 +133,7 @@ private TransactionManager getTransactionManager() {
* @see ParallelConsumer#poll
* @see ParallelStreamProcessor#pollAndProduceMany
*/
public List<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> produceMessages(List<ProducerRecord<K, V>> outMsgs) {
public List<Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> produceMessages(List<ProducerRecord<K, V>> outMsgs) {
// only needed if not using tx
Callback callback = (RecordMetadata metadata, Exception exception) -> {
if (exception != null) {
Expand All @@ -134,18 +144,42 @@ public List<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>

ReentrantReadWriteLock.ReadLock readLock = producerTransactionLock.readLock();
readLock.lock();
List<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> futures = new ArrayList<>(outMsgs.size());
try {
for (ProducerRecord<K, V> rec : outMsgs) {
return outMsgs.stream().map(rec -> {
log.trace("Producing {}", rec);
var future = producer.send(rec, callback);
futures.add(ParallelConsumer.Tuple.pairOf(rec, future));
}
return Tuple.pairOf(rec, future);
}).collect(Collectors.toList());
} finally {
readLock.unlock();
}
}

public List<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> produceMessagesSyncWithContext(PollContext<K, V> context, List<ProducerRecord<K, V>> outMsgs) {
return produceMessagesSync(outMsgs).map(tuple -> {
RecordMetadata recordMetadata = tuple.getRight();
ProducerRecord<K, V> producerRecord = tuple.getLeft();
return new ParallelStreamProcessor.ConsumeProduceResult<>(context, producerRecord, recordMetadata);
}).collect(Collectors.toList());
}

public Stream<Tuple<ProducerRecord<K, V>, RecordMetadata>> produceMessagesSync(List<ProducerRecord<K, V>> outMsgs) {
var futures = produceMessages(outMsgs);

return futures;
try {
return futures.stream().map(tuple -> {
Future<RecordMetadata> futureSendResult = tuple.getRight();
ProducerRecord<K, V> producerRecord = tuple.getLeft();
try {
RecordMetadata recordMetadata = futureSendResult.get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS);
return new Tuple<>(producerRecord, recordMetadata);
} catch (Exception e) {
throw new RuntimeException(msg("Error while waiting on individual produce result {}", producerRecord), e);
}
});
} catch (Exception e) {
throw new InternalRuntimeError(msg("Error while waiting for produce results {}", outMsgs), e);
}
}

@Override
Expand Down
Loading