diff --git a/README.md b/README.md index aabd40e..3f71d23 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin ## Versions | kafka-unit | Kafka broker | Zookeeper | |------------|-------------------------|-----------| +| 0.7 | kafka_2.11:0.10.1.1 | 3.4.6 | | 0.6 | kafka_2.11:0.10.0.0 | 3.4.6 | | 0.5 | kafka_2.11:0.9.0.1 | 3.4.6 | | 0.4 | kafka_2.11:0.9.0.1 | 3.4.6 | @@ -19,7 +20,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin info.batey.kafka kafka-unit - 0.6 + 0.7 ``` @@ -51,8 +52,8 @@ You can then write your own code to interact with Kafka or use the following met ```java kafkaUnitServer.createTopic(testTopic); -KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); -kafkaUnitServer.sendMessages(keyedMessage); +ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); +kafkaUnitServer.sendRecord(producerRecord); ``` And to read messages: @@ -88,9 +89,9 @@ public class KafkaUnitIntegrationTest { public void junitRuleShouldHaveStartedKafka() throws Exception { String testTopic = "TestTopic"; kafkaUnitRule.getKafkaUnit().createTopic(testTopic); - KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); + ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); - kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); + kafkaUnitRule.getKafkaUnit().sendRecords(producerRecord); List messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1); assertEquals(Arrays.asList("value"), messages); diff --git a/build.gradle b/build.gradle index 03cc7d4..ba84a48 100644 --- a/build.gradle +++ b/build.gradle @@ -98,7 +98,7 @@ repositories { dependencies { compile 'junit:junit:4.11' - compile('org.apache.kafka:kafka_2.11:0.10.0.0') { + compile('org.apache.kafka:kafka_2.11:0.10.1.1') { exclude module: 'slf4j-simple' exclude module: 'slf4j-log4j12' exclude module: 'jmxtools' diff --git a/src/main/java/info/batey/kafka/unit/KafkaUnit.java b/src/main/java/info/batey/kafka/unit/KafkaUnit.java index 0fda327..a12cec4 100644 --- a/src/main/java/info/batey/kafka/unit/KafkaUnit.java +++ b/src/main/java/info/batey/kafka/unit/KafkaUnit.java @@ -21,20 +21,20 @@ import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; -import kafka.javaapi.producer.Producer; import kafka.message.MessageAndMetadata; import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import kafka.serializer.StringDecoder; -import kafka.serializer.StringEncoder; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import kafka.utils.VerifiableProperties; import kafka.utils.ZkUtils; import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.security.JaasUtils; - +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.ComparisonFailure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +43,19 @@ import java.io.IOException; import java.net.ServerSocket; import java.nio.file.Files; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class KafkaUnit { @@ -57,7 +68,7 @@ public class KafkaUnit { private final String brokerString; private int zkPort; private int brokerPort; - private Producer producer = null; + private KafkaProducer producer = null; private Properties kafkaBrokerConfig = new Properties(); public KafkaUnit() throws IOException { @@ -172,7 +183,11 @@ public void createTopic(String topicName, Integer numPartitions) { // run LOGGER.info("Executing: CreateTopic " + Arrays.toString(arguments)); - TopicCommand.createTopic(zkUtils, opts); + try + { + TopicCommand.createTopic(zkUtils, opts); + } catch (TopicExistsException e) { + } } @@ -181,6 +196,8 @@ public void shutdown() { if (zookeeper != null) zookeeper.shutdown(); } + @Deprecated + @SuppressWarnings("deprecation") public List> readKeyedMessages(final String topicName, final int expectedMessages) throws TimeoutException { return readMessages(topicName, expectedMessages, new MessageExtractor>() { @@ -191,6 +208,16 @@ public KeyedMessage extract(MessageAndMetadata m }); } + public List> readConsumerRecords(final String topicName, final int expectedMessages) throws TimeoutException { + return readMessages(topicName, expectedMessages, new MessageExtractor>() { + + @Override + public ConsumerRecord extract(MessageAndMetadata messageAndMetadata) { + return new ConsumerRecord(topicName, messageAndMetadata.partition(), messageAndMetadata.offset(), messageAndMetadata.key(), messageAndMetadata.message()); + } + }); + } + public List readMessages(String topicName, final int expectedMessages) throws TimeoutException { return readMessages(topicName, expectedMessages, new MessageExtractor() { @Override @@ -251,17 +278,39 @@ public List call() throws Exception { } } + /** + * @see #sendRecords(ProducerRecord, ProducerRecord[]) + */ @SafeVarargs + @Deprecated + @SuppressWarnings({"deprecation", "unchecked"}) public final void sendMessages(KeyedMessage message, KeyedMessage... messages) { + List> records = new ArrayList<>(messages.length); + for (KeyedMessage m: messages) { + records.add(crteateProducerRecord(m)); + } + sendRecords(crteateProducerRecord(message), records.toArray((ProducerRecord[])new ProducerRecord[0])); + } + + @Deprecated + @SuppressWarnings("deprecation") + private static ProducerRecord crteateProducerRecord(KeyedMessage message) { + return new ProducerRecord(message.topic(), message.key(), message.message()); + } + + @SafeVarargs + public final void sendRecords(ProducerRecord message, ProducerRecord... messages) { if (producer == null) { Properties props = new Properties(); - props.put("serializer.class", StringEncoder.class.getName()); - props.put("metadata.broker.list", brokerString); - ProducerConfig config = new ProducerConfig(props); - producer = new Producer<>(config); + props.put("bootstrap.servers", brokerString); + producer = new KafkaProducer(props, + new StringSerializer(), + new StringSerializer()); } producer.send(message); - producer.send(Arrays.asList(messages)); + for (ProducerRecord m: messages) { + producer.send(m); + } } /** diff --git a/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java index 6a32695..e5fbeaa 100644 --- a/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java +++ b/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java @@ -15,8 +15,8 @@ */ package info.batey.kafka.unit; -import kafka.producer.KeyedMessage; import kafka.server.KafkaServerStartable; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -34,7 +34,9 @@ import java.util.Properties; import java.util.concurrent.TimeoutException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class KafkaIntegrationTest { @@ -67,10 +69,10 @@ public void shouldThrowComparisonFailureIfMoreMessagesRequestedThanSent() throws //given String testTopic = "TestTopic"; kafkaUnitServer.createTopic(testTopic); - KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); + ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); //when - kafkaUnitServer.sendMessages(keyedMessage); + kafkaUnitServer.sendRecords(producerRecord); try { kafkaUnitServer.readMessages(testTopic, 2); @@ -109,26 +111,27 @@ public void canReadKeyedMessages() throws Exception { //given String testTopic = "TestTopic"; kafkaUnitServer.createTopic(testTopic); - KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); + ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); //when - kafkaUnitServer.sendMessages(keyedMessage); + kafkaUnitServer.sendRecords(producerRecord); - KeyedMessage receivedMessage = kafkaUnitServer.readKeyedMessages(testTopic, 1).get(0); + ConsumerRecord receivedConsumerRecord = kafkaUnitServer + .readConsumerRecords(testTopic, 1).get(0); - assertEquals("Received message value is incorrect", "value", receivedMessage.message()); - assertEquals("Received message key is incorrect", "key", receivedMessage.key()); - assertEquals("Received message topic is incorrect", testTopic, receivedMessage.topic()); + assertEquals("Received message value is incorrect", "value", receivedConsumerRecord.value()); + assertEquals("Received message key is incorrect", "key", receivedConsumerRecord.key()); + assertEquals("Received message topic is incorrect", testTopic, receivedConsumerRecord.topic()); } private void assertKafkaServerIsAvailable(KafkaUnit server) throws TimeoutException { //given String testTopic = "TestTopic"; server.createTopic(testTopic); - KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); + ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); //when - server.sendMessages(keyedMessage); + server.sendRecords(producerRecord); List messages = server.readMessages(testTopic, 1); //then diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java index ab3ab54..7e43972 100644 --- a/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java +++ b/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java @@ -15,7 +15,7 @@ */ package info.batey.kafka.unit; -import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Rule; import org.junit.Test; @@ -54,10 +54,10 @@ public void assertKafkaStartsAndSendsMessage(final KafkaUnit kafkaUnit) throws E //given String testTopic = "TestTopic"; kafkaUnit.createTopic(testTopic); - KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); + ProducerRecord producerRecord = new ProducerRecord<>(testTopic, "key", "value"); //when - kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); + kafkaUnitRule.getKafkaUnit().sendRecords(producerRecord); List messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1); //then