Skip to content

Commit

Permalink
Reproducing bug 25: confluentinc#25
Browse files Browse the repository at this point in the history
  • Loading branch information
JorgenRingen committed Nov 10, 2020
1 parent e7146ad commit 01782c6
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 11 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ hs_err_pid*
*.iml
target/
.DS_Store
*.versionsBackup
*.versionsBackup

# JENV
.java-version
15 changes: 15 additions & 0 deletions parallel-consumer-examples/parallel-consumer-example-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pl.tlinkowski.unij.api.UniLists;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;

Expand All @@ -41,6 +42,9 @@ Producer<String, String> getKafkaProducer() {

ParallelStreamProcessor<String, String> parallelConsumer;

public AtomicInteger messagesProcessed = new AtomicInteger(0);
public AtomicInteger messagesProduced = new AtomicInteger(0);

@SuppressWarnings("UnqualifiedFieldAccess")
void run() {
this.parallelConsumer = setupConsumer();
Expand All @@ -57,16 +61,19 @@ ParallelStreamProcessor<String, String> setupConsumer() {
// tag::exampleSetup[]
var options = ParallelConsumerOptions.builder()
.ordering(KEY) // <1>
.maxConcurrency(1000) // <2>
.maxUncommittedMessagesToHandlePerPartition(10000) // <3>
// .maxConcurrency(1000) // <2>
// .numberOfThreads(1000) // <2>
// .maxUncommittedMessagesToHandlePerPartition(100) // <3>
.build();

Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
if (!(kafkaConsumer instanceof MockConsumer)) {
kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
}
// if (!(kafkaConsumer instanceof MockConsumer)) {
// kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
// }

return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
processor.subscribe(UniLists.of(inputTopic));
return processor;
// end::exampleSetup[]
}

Expand All @@ -82,11 +89,14 @@ void runPollAndProduce() {
var result = processBrokerRecord(record);
ProducerRecord<String, String> produceRecord =
new ProducerRecord<>(outputTopic, "a-key", result.payload);
messagesProcessed.incrementAndGet();
return UniLists.of(produceRecord);
}, consumeProduceResult ->
log.info("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset())
}, consumeProduceResult -> {
messagesProduced.incrementAndGet();
log.info("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset());
}
);
// end::exampleProduce[]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.confluent.parallelconsumer.examples.core;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/

import io.confluent.parallelconsumer.integrationTests.KafkaTest;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Properties;

@Slf4j
public class Bug25AppTest extends KafkaTest<String, String> {

@SneakyThrows
@Test
public void test() {
log.info("Test start");
ensureTopic(CoreApp.inputTopic, 1);
ensureTopic(CoreApp.outputTopic, 1);


AppUnderTest coreApp = new AppUnderTest();

log.info("Producing 1000 messages before starting application");
try (Producer<String, String> kafkaProducer = kcu.createNewProducer(false)) {
for (int i = 0; i < 1000; i++) {
kafkaProducer.send(new ProducerRecord<>(CoreApp.inputTopic, "key-" + i, "value-" + i));
}
}

log.info("Starting application...");
coreApp.runPollAndProduce();

Awaitility.waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> {
log.info("Processed-count: " + coreApp.messagesProcessed.get());
log.info("Produced-count: " + coreApp.messagesProduced.get());
Assertions.assertThat(coreApp.messagesProcessed.get()).isEqualTo(1000);
Assertions.assertThat(coreApp.messagesProduced.get()).isEqualTo(1000);
});

coreApp.close();
}

class AppUnderTest extends CoreApp {

@Override
Consumer<String, String> getKafkaConsumer() {
Properties props = kcu.props;
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // Sometimes causes test to fail
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); Invalid transition attempted from state READY to state COMMITTING_TRANSACTION
return new KafkaConsumer<>(props);
}

@Override
Producer<String, String> getKafkaProducer() {
return kcu.createNewProducer(true);
}
}
}

0 comments on commit 01782c6

Please sign in to comment.