Skip to content

Commit

Permalink
feature: Choose between Consumer commit or Producer transactional com…
Browse files Browse the repository at this point in the history
…mits

- Choose either Consumer sync or async commits
- Fixes confluentinc#25 confluentinc#25:
-- Sometimes a a transaction error occurs - Cannot call send in state COMMITTING_TRANSACTION confluentinc#25
- ReentrantReadWrite lock protects non-thread safe transactional producer from incorrect multithreaded use
- Wider lock to prevent transaction's containing produced messages that they shouldn't
- Implement non transactional synchronous commit sync properly
- Select tests adapted to non transactional as well
- Must start tx in MockProducer as well
- Adds supervision to poller
- Fixes a performance issue with the async committer not being woken up
- Enhances tests to run under multiple commit modes
- Fixes example app tests - incorrectly testing wrong thing and MockProducer not configured to auto complete
- Make committer thread revoke partitions and commit
- Have onPartitionsRevoked be responsible for committing on close, instead of an explicit call to commit by controller
- Make sure Broker Poller now drains properly, committing any waiting work
- Add missing revoke flow to MockConsumer wrapper
- Add missing latch timeout check
  • Loading branch information
JorgenRingen authored and astubbs committed Nov 20, 2020
1 parent 0b96d5e commit 893ae25
Show file tree
Hide file tree
Showing 42 changed files with 2,079 additions and 518 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ hs_err_pid*
*.iml
target/
.DS_Store
*.versionsBackup
*.versionsBackup

# JENV
.java-version
2 changes: 1 addition & 1 deletion .idea/runConfigurations/All_examples.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 29 additions & 14 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,17 @@ Where `${project.version}` is the version to be used:
.Setup the client
[source,java,indent=0]
----
var options = ParallelConsumerOptions.builder()
.ordering(KEY) // <1>
.maxConcurrency(1000) // <2>
.maxUncommittedMessagesToHandlePerPartition(10000) // <3>
.build();
ParallelConsumerOptions options = getOptions();
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
Producer<String, String> kafkaProducer = getKafkaProducer();
ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, kafkaProducer, options);
if (!(kafkaConsumer instanceof MockConsumer)) {
kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5>
}
return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
return eosStreamProcessor;
----
<1> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
Expand Down Expand Up @@ -385,12 +384,17 @@ You can also optionally provide a callback function to be run after the message(
this.parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
ProducerRecord<String, String> produceRecord =
new ProducerRecord<>(outputTopic, "a-key", result.payload);
new ProducerRecord<>(outputTopic, record.key(), result.payload);

processedCount.incrementAndGet();
return UniLists.of(produceRecord);
}, consumeProduceResult ->
log.info("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset())
}, consumeProduceResult -> {
producedCount.incrementAndGet();
processedAndProducedKeys.add(consumeProduceResult.getIn().key());
log.debug("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset());
}
);


Expand All @@ -412,7 +416,7 @@ In future versions, we plan to look at supporting other streaming systems like h
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
return new RequestInfo("localhost", "/api", params); // <1>
return new RequestInfo("localhost", port, "/api", params); // <1>
});
----
<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine
Expand Down Expand Up @@ -453,7 +457,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
}
void concurrentProcess() {
setupConsumer();
setupParallelConsumer();
parallelConsumer.poll(record -> {
log.info("Concurrently processing a record: {}", record);
Expand Down Expand Up @@ -562,6 +566,13 @@ your processing, you create side effects in other systems, this pertains to the

CAUTION: This cannot be true for any externally integrated third party system, unless that system is __idempotent__.

You can choose to use the Transactional system, or not.

Pros:
Cons:

For implementations details, see the <<Transactional System Architecture>> section.

[[streams-usage]]
== Using with Kafka Streams

Expand Down Expand Up @@ -748,6 +759,10 @@ Instead of the work thread pool count being the degree of concurrency, it is con
.Vert.x Architecture
image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]

=== Transactional System Architecture

image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]

=== Offset Map

==== Storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
public class StringUtils {

public static String msg(String s, Object... args) {
String message = MessageFormatter.basicArrayFormat(s, args);
return message;
return MessageFormatter.basicArrayFormat(s, args);
}

public static boolean isBlank(final String property) {
if (property == null) return true;
else return property.trim().isEmpty(); // isBlank @since 11
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.confluent.parallelconsumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

@Slf4j
@RequiredArgsConstructor
public abstract class AbstractOffsetCommitter<K, V> implements OffsetCommitter {

protected final ConsumerManager<K, V> consumerMgr;
protected final WorkManager<K, V> wm;

/**
* Get offsets from {@link WorkManager} that are ready to commit
*/
@Override
public void retrieveOffsetsAndCommit() {
log.debug("Commit starting - find completed work to commit offsets");
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
preAcquireWork();
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = wm.findCompletedEligibleOffsetsAndRemove();
if (offsetsToSend.isEmpty()) {
log.trace("No offsets ready");
} else {
log.debug("Will commit offsets for {} partition(s): {}", offsetsToSend.size(), offsetsToSend);
ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();

log.debug("Begin commit");
commitOffsets(offsetsToSend, groupMetadata);

log.debug("On commit success");
onOffsetCommitSuccess(offsetsToSend);
}
} finally {
postCommit();
}
}

protected void postCommit() {
// default noop
}

protected void preAcquireWork() {
// default noop
}

private void onOffsetCommitSuccess(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
wm.onOffsetCommitSuccess(offsetsToSend);
}

protected abstract void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend, final ConsumerGroupMetadata groupMetadata);

}
Loading

0 comments on commit 893ae25

Please sign in to comment.