diff --git a/README.md b/README.md
index 030ee43..12c7284 100644
--- a/README.md
+++ b/README.md
@@ -7,6 +7,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin
## Versions
| kafka-unit | Kafka broker | Zookeeper |
|------------|-------------------------|-----------|
+| 1.0 | kafka_2.11:0.11.0.0 | 3.4.10 |
| 0.7 | kafka_2.11:0.10.0.2 | 3.4.10 |
| 0.6 | kafka_2.11:0.10.0.0 | 3.4.6 |
| 0.5 | kafka_2.11:0.9.0.1 | 3.4.6 |
@@ -20,7 +21,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin
info.batey.kafka
kafka-unit
- 0.7
+ 1.0
```
@@ -52,7 +53,7 @@ You can then write your own code to interact with Kafka or use the following met
```java
kafkaUnitServer.createTopic(testTopic);
-KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
+ProducerRecord keyedMessage = new ProducerRecord<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);
```
@@ -60,6 +61,7 @@ And to read messages:
```java
List messages = kafkaUnitServer.readMessages(testTopic, 1);
+List allMessages = kafkaUnitServer.readAllMessages(testTopic);
```
Only `String` messages are supported at the moment.
@@ -89,9 +91,9 @@ public class KafkaUnitIntegrationTest {
public void junitRuleShouldHaveStartedKafka() throws Exception {
String testTopic = "TestTopic";
kafkaUnitRule.getKafkaUnit().createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
+ ProducerRecord keyedMessage = new ProducerRecord<>(testTopic, "key", "value");
- kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
+ kafkaUnitServer.sendMessages(keyedMessage);
List messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1);
assertEquals(Arrays.asList("value"), messages);
diff --git a/build.gradle b/build.gradle
index b260c24..008e05d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -98,7 +98,7 @@ repositories {
dependencies {
compile 'junit:junit:4.11'
- compile('org.apache.kafka:kafka_2.11:0.10.2.0') {
+ compile('org.apache.kafka:kafka_2.11:0.11.0.0') {
exclude module: 'slf4j-simple'
exclude module: 'slf4j-log4j12'
exclude module: 'jmxtools'
diff --git a/src/main/java/info/batey/kafka/unit/KafkaUnit.java b/src/main/java/info/batey/kafka/unit/KafkaUnit.java
index 63c5832..62608df 100644
--- a/src/main/java/info/batey/kafka/unit/KafkaUnit.java
+++ b/src/main/java/info/batey/kafka/unit/KafkaUnit.java
@@ -15,54 +15,51 @@
*/
package info.batey.kafka.unit;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import kafka.admin.TopicCommand;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.message.MessageAndMetadata;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.StringDecoder;
-import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
-import kafka.utils.VerifiableProperties;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;
-
-import org.junit.ComparisonFailure;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
public class KafkaUnit {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUnit.class);
- private KafkaServerStartable broker;
-
- private Zookeeper zookeeper;
private final String zookeeperString;
private final String brokerString;
- private int zkPort;
- private int brokerPort;
- private Producer producer = null;
- private Properties kafkaBrokerConfig = new Properties();
- private int zkMaxConnections;
+ private final int zkPort;
+ private final int brokerPort;
+ private final Properties kafkaBrokerConfig = new Properties();
+ private final int zkMaxConnections;
+
+ private KafkaServerStartable broker;
+ private Zookeeper zookeeper;
+ private KafkaProducer producer;
+ private File logDir;
public KafkaUnit() throws IOException {
this(getEphemeralPort(), getEphemeralPort());
@@ -72,20 +69,29 @@ public KafkaUnit(int zkPort, int brokerPort) {
this(zkPort, brokerPort, 16);
}
+ public KafkaUnit(String zkConnectionString, String kafkaConnectionString) {
+ this(parseConnectionString(zkConnectionString), parseConnectionString(kafkaConnectionString));
+ }
+
+ public KafkaUnit(String zkConnectionString, String kafkaConnectionString, int zkMaxConnections) {
+ this(parseConnectionString(zkConnectionString), parseConnectionString(kafkaConnectionString), zkMaxConnections);
+ }
+
public KafkaUnit(int zkPort, int brokerPort, int zkMaxConnections) {
this.zkPort = zkPort;
this.brokerPort = brokerPort;
this.zookeeperString = "localhost:" + zkPort;
this.brokerString = "localhost:" + brokerPort;
this.zkMaxConnections = zkMaxConnections;
+ this.producer = createProducer();
}
- public KafkaUnit(String zkConnectionString, String kafkaConnectionString) {
- this(parseConnectionString(zkConnectionString), parseConnectionString(kafkaConnectionString));
- }
-
- public KafkaUnit(String zkConnectionString, String kafkaConnectionString, int zkMaxConnections) {
- this(parseConnectionString(zkConnectionString), parseConnectionString(kafkaConnectionString), zkMaxConnections);
+ private KafkaProducer createProducer() {
+ final Properties props = new Properties();
+ props.put("bootstrap.servers", brokerString);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ return new KafkaProducer<>(props);
}
private static int parseConnectionString(String connectionString) {
@@ -122,23 +128,13 @@ public void startup() {
zookeeper = new Zookeeper(zkPort, zkMaxConnections);
zookeeper.startup();
- final File logDir;
try {
logDir = Files.createTempDirectory("kafka").toFile();
} catch (IOException e) {
throw new RuntimeException("Unable to start Kafka", e);
}
logDir.deleteOnExit();
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- FileUtils.deleteDirectory(logDir);
- } catch (IOException e) {
- LOGGER.warn("Problems deleting temporary directory " + logDir.getAbsolutePath(), e);
- }
- }
- }));
+ Runtime.getRuntime().addShutdownHook(new Thread(getDeleteLogDirectoryAction()));
kafkaBrokerConfig.setProperty("zookeeper.connect", zookeeperString);
kafkaBrokerConfig.setProperty("broker.id", "1");
kafkaBrokerConfig.setProperty("host.name", "localhost");
@@ -147,11 +143,27 @@ public void run() {
kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaBrokerConfig.setProperty("delete.topic.enable", String.valueOf(true));
kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", String.valueOf(1));
+ kafkaBrokerConfig.setProperty("auto.create.topics.enable", String.valueOf(false));
broker = new KafkaServerStartable(new KafkaConfig(kafkaBrokerConfig));
broker.startup();
}
+ private Runnable getDeleteLogDirectoryAction() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (logDir != null) {
+ try {
+ FileUtils.deleteDirectory(logDir);
+ } catch (IOException e) {
+ LOGGER.warn("Problems deleting temporary directory " + logDir.getAbsolutePath(), e);
+ }
+ }
+ }
+ };
+ }
+
public String getKafkaConnect() {
return brokerString;
}
@@ -168,8 +180,9 @@ public void createTopic(String topicName) {
createTopic(topicName, 1);
}
- public void createTopic(String topicName, Integer numPartitions) {
+ public void createTopic(String topicName, int numPartitions) {
// setup
+
String[] arguments = new String[9];
arguments[0] = "--create";
arguments[1] = "--zookeeper";
@@ -177,7 +190,7 @@ public void createTopic(String topicName, Integer numPartitions) {
arguments[3] = "--replication-factor";
arguments[4] = "1";
arguments[5] = "--partitions";
- arguments[6] = "" + Integer.valueOf(numPartitions);
+ arguments[6] = String.valueOf(numPartitions);
arguments[7] = "--topic";
arguments[8] = topicName;
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(arguments);
@@ -197,7 +210,7 @@ public void createTopic(String topicName, Integer numPartitions) {
/**
* @return All topic names
*/
- public List listTopics(){
+ public List listTopics() {
String[] arguments = new String[3];
arguments[0] = "--zookeeper";
arguments[1] = zookeeperString;
@@ -207,7 +220,7 @@ public List listTopics(){
ZkUtils zkUtils = ZkUtils.apply(opts.options().valueOf(opts.zkConnectOpt()),
30000, 30000, JaasUtils.isZkSecurityEnabled());
final List topics = new ArrayList<>();
- try{
+ try {
// run
LOGGER.info("Executing: ListTopics " + Arrays.toString(arguments));
@@ -236,11 +249,12 @@ public void print(String s) {
/**
* Delete all topics
*/
- public void deleteAllTopics(){
- for (String topic: listTopics()){
- try{
+ public void deleteAllTopics() {
+ for (String topic: listTopics()) {
+ try {
deleteTopic(topic);
- } catch (Throwable ignored){}
+ } catch (Throwable ignored) {
+ }
}
}
@@ -248,7 +262,7 @@ public void deleteAllTopics(){
* Delete a topic.
* @param topicName The name of the topic to delete
*/
- public void deleteTopic(String topicName){
+ public void deleteTopic(String topicName) {
String[] arguments = new String[5];
arguments[0] = "--zookeeper";
arguments[1] = zookeeperString;
@@ -259,7 +273,7 @@ public void deleteTopic(String topicName){
ZkUtils zkUtils = ZkUtils.apply(opts.options().valueOf(opts.zkConnectOpt()),
30000, 30000, JaasUtils.isZkSecurityEnabled());
- try{
+ try {
// run
LOGGER.info("Executing: DeleteTopic " + Arrays.toString(arguments));
TopicCommand.deleteTopic(zkUtils, opts);
@@ -269,112 +283,87 @@ public void deleteTopic(String topicName){
}
public void shutdown() {
- if (broker != null) broker.shutdown();
+ if (broker != null) {
+ broker.shutdown();
+ broker.awaitShutdown();
+ }
if (zookeeper != null) zookeeper.shutdown();
}
- public List> readKeyedMessages(final String topicName, final int expectedMessages) throws TimeoutException {
- return readMessages(topicName, expectedMessages, new MessageExtractor>() {
-
- @Override
- public KeyedMessage extract(MessageAndMetadata messageAndMetadata) {
- return new KeyedMessage(topicName, messageAndMetadata.key(), messageAndMetadata.message());
- }
- });
+ public List> readRecords(final String topicName, final int maxPoll) {
+ return readMessages(topicName, maxPoll, new PasstroughMessageExtractor());
}
- public List readMessages(String topicName, final int expectedMessages) throws TimeoutException {
- return readMessages(topicName, expectedMessages, new MessageExtractor() {
- @Override
- public String extract(MessageAndMetadata messageAndMetadata) {
- return messageAndMetadata.message();
- }
- });
+ public List readMessages(final String topicName, final int maxPoll) {
+ return readMessages(topicName, maxPoll, new ValueMessageExtractor());
}
- public List pollMessages(String topicName) throws TimeoutException {
- return readMessages(topicName, -1, new MessageExtractor() {
- @Override
- public String extract(MessageAndMetadata messageAndMetadata) {
- return messageAndMetadata.message();
- }
- });
+ public List readAllMessages(final String topicName) {
+ return readMessages(topicName, Integer.MAX_VALUE, new ValueMessageExtractor());
}
- private List readMessages(String topicName, final int expectedMessages, final MessageExtractor messageExtractor) throws TimeoutException {
- ExecutorService singleThread = Executors.newSingleThreadExecutor();
- Properties consumerProperties = new Properties();
- consumerProperties.put("zookeeper.connect", zookeeperString);
- consumerProperties.put("group.id", "10");
- consumerProperties.put("socket.timeout.ms", "500");
- consumerProperties.put("consumer.id", "test");
- consumerProperties.put("auto.offset.reset", "smallest");
- consumerProperties.put("consumer.timeout.ms", "500");
- ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
- StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
- Map topicMap = new HashMap<>();
- topicMap.put(topicName, 1);
- Map>> events = javaConsumerConnector.createMessageStreams(topicMap, stringDecoder, stringDecoder);
- List> events1 = events.get(topicName);
- final KafkaStream kafkaStreams = events1.get(0);
-
-
- Future> submit = singleThread.submit(new Callable>() {
- public List call() throws Exception {
- List messages = new ArrayList<>();
- try {
- for (MessageAndMetadata kafkaStream : kafkaStreams) {
- T message = messageExtractor.extract(kafkaStream);
- LOGGER.info("Received message: {}", kafkaStream.message());
- messages.add(message);
- }
- } catch (ConsumerTimeoutException e) {
- // always gets throws reaching the end of the stream
- }
- if (expectedMessages >= 0 && messages.size() != expectedMessages) {
- throw new ComparisonFailure("Incorrect number of messages returned", Integer.toString(expectedMessages),
- Integer.toString(messages.size()));
- }
- return messages;
+ private List readMessages(final String topicName, final int maxPoll, final MessageExtractor messageExtractor) {
+ final Properties props = new Properties();
+ props.put("bootstrap.servers", brokerString);
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("max.poll.records", String.valueOf(maxPoll));
+ try (final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) {
+ kafkaConsumer.subscribe(Collections.singletonList(topicName));
+ kafkaConsumer.poll(0); // dummy poll
+ kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(topicName, 0)));
+ final ConsumerRecords records = kafkaConsumer.poll(1000);
+ final List messages = new ArrayList<>();
+ for (ConsumerRecord record : records) {
+ messages.add(messageExtractor.extract(record));
}
- });
-
- try {
- return submit.get(3, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- if (e.getCause() instanceof ComparisonFailure) {
- throw (ComparisonFailure) e.getCause();
- }
- throw new TimeoutException("Timed out waiting for messages");
- } finally {
- singleThread.shutdown();
- javaConsumerConnector.shutdown();
+ return messages;
}
}
+
@SafeVarargs
- public final void sendMessages(KeyedMessage message, KeyedMessage... messages) {
- if (producer == null) {
- Properties props = new Properties();
- props.put("serializer.class", StringEncoder.class.getName());
- props.put("metadata.broker.list", brokerString);
- ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<>(config);
+ public final void sendMessages(final ProducerRecord... records) {
+ for (final ProducerRecord record: records) {
+ try {
+ producer.send(record).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ } finally {
+ producer.flush();
+ }
}
- producer.send(message);
- producer.send(Arrays.asList(messages));
}
/**
* Set custom broker configuration.
- * See avaliable config keys in the kafka documentation: http://kafka.apache.org/documentation.html#brokerconfigs
+ * See available config keys in the kafka documentation: http://kafka.apache.org/documentation.html#brokerconfigs
*/
public final void setKafkaBrokerConfig(String configKey, String configValue) {
kafkaBrokerConfig.setProperty(configKey, configValue);
}
private interface MessageExtractor {
- T extract(MessageAndMetadata messageAndMetadata);
+ T extract(ConsumerRecord record);
+ }
+
+ public class ValueMessageExtractor implements MessageExtractor {
+ @Override
+ public String extract(final ConsumerRecord record) {
+ return record.value();
+ }
+ }
+
+ public class PasstroughMessageExtractor implements MessageExtractor> {
+ @Override
+ public ConsumerRecord extract(final ConsumerRecord record)
+ {
+ return record;
+ }
}
}
diff --git a/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java
index 029a684..c536f42 100644
--- a/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java
+++ b/src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java
@@ -15,28 +15,21 @@
*/
package info.batey.kafka.unit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-import kafka.producer.KeyedMessage;
import kafka.server.KafkaServerStartable;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
-import org.junit.ComparisonFailure;
import org.junit.Test;
-public class KafkaIntegrationTest {
+public class KafkaIntegrationTest extends KafkaUnitRuleIntegrationTestBase {
private KafkaUnit kafkaUnitServer;
@@ -52,53 +45,34 @@ public void shutdown() throws Exception {
Field f = kafkaUnitServer.getClass().getDeclaredField("broker");
f.setAccessible(true);
KafkaServerStartable broker = (KafkaServerStartable) f.get(kafkaUnitServer);
- assertEquals(1024, (int)broker.serverConfig().logSegmentBytes());
-
+ assertEquals(1024, (int) broker.serverConfig().logSegmentBytes());
+ kafkaUnitServer.deleteAllTopics();
kafkaUnitServer.shutdown();
}
@Test
public void kafkaServerIsAvailable() throws Exception {
- assertKafkaServerIsAvailable(kafkaUnitServer);
- }
-
- @Test(expected = ComparisonFailure.class)
- public void shouldThrowComparisonFailureIfMoreMessagesRequestedThanSent() throws Exception {
- //given
- String testTopic = "TestTopic";
- kafkaUnitServer.createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
-
- //when
- kafkaUnitServer.sendMessages(keyedMessage);
-
- try {
- kafkaUnitServer.readMessages(testTopic, 2);
- fail("Expected ComparisonFailure to be thrown");
- } catch (ComparisonFailure e) {
- assertEquals("Wrong value for 'expected'", "2", e.getExpected());
- assertEquals("Wrong value for 'actual'", "1", e.getActual());
- assertEquals("Wrong error message", "Incorrect number of messages returned", e.getMessage());
- }
+ assertKafkaStartsAndSendsMessage(kafkaUnitServer);
}
@Test
- public void canDeleteTopic() throws Exception{
+ public void canDeleteTopic() throws Exception {
//given
String testTopic = "TestTopic";
kafkaUnitServer.createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
+ ProducerRecord keyedMessage = new ProducerRecord<>(testTopic,
+ "key",
+ "value");
//when
kafkaUnitServer.sendMessages(keyedMessage);
- kafkaUnitServer.readMessages(testTopic, 1);
-
+ assertEquals(kafkaUnitServer.readAllMessages(testTopic), Collections.singletonList("value"));
kafkaUnitServer.deleteTopic(testTopic);
- kafkaUnitServer.readMessages(testTopic, 0);
+ assertFalse(kafkaUnitServer.listTopics().contains(testTopic));
}
@Test
- public void canListTopics() throws Exception{
+ public void canListTopics() throws Exception {
String testTopic1 = "TestTopic1";
kafkaUnitServer.createTopic(testTopic1);
String testTopic2 = "TestTopic2";
@@ -111,7 +85,7 @@ public void canListTopics() throws Exception{
}
@Test
- public void canDeleteAllTopics(){
+ public void canDeleteAllTopics() {
String testTopic1 = "TestTopic1";
kafkaUnitServer.createTopic(testTopic1);
String testTopic2 = "TestTopic2";
@@ -131,37 +105,26 @@ public void canDeleteAllTopics(){
public void startKafkaServerWithoutParamsAndSendMessage() throws Exception {
KafkaUnit noParamServer = new KafkaUnit();
noParamServer.startup();
- assertKafkaServerIsAvailable(noParamServer);
+ assertKafkaStartsAndSendsMessage(noParamServer);
assertTrue("Kafka port needs to be non-negative", noParamServer.getBrokerPort() > 0);
assertTrue("Zookeeper port needs to be non-negative", noParamServer.getZkPort() > 0);
}
@Test
- public void canUseKafkaConnectToProduce() throws Exception {
- final String topic = "KafkakConnectTestTopic";
- Properties props = new Properties();
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getCanonicalName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUnitServer.getKafkaConnect());
- Producer producer = new KafkaProducer<>(props);
- ProducerRecord record = new ProducerRecord<>(topic, 1L, "test");
- producer.send(record); // would be good to have KafkaUnit.sendMessages() support the new producer
- assertEquals("test", kafkaUnitServer.readMessages(topic, 1).get(0));
- }
-
- @Test
- public void canReadKeyedMessages() throws Exception {
+ public void canReadRecords() throws Exception {
//given
String testTopic = "TestTopic";
kafkaUnitServer.createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
+ ProducerRecord keyedMessage = new ProducerRecord<>(testTopic,
+ "key",
+ "value");
//when
kafkaUnitServer.sendMessages(keyedMessage);
- KeyedMessage receivedMessage = kafkaUnitServer.readKeyedMessages(testTopic, 1).get(0);
+ ConsumerRecord receivedMessage = kafkaUnitServer.readRecords(testTopic, 1).get(0);
- assertEquals("Received message value is incorrect", "value", receivedMessage.message());
+ assertEquals("Received message value is incorrect", "value", receivedMessage.value());
assertEquals("Received message key is incorrect", "key", receivedMessage.key());
assertEquals("Received message topic is incorrect", testTopic, receivedMessage.topic());
}
@@ -173,18 +136,4 @@ public void closeConnectionBetweenTopicCreations() throws Exception{
kafkaUnitServer.createTopic(topicPrefix + i);
}
}
-
- private void assertKafkaServerIsAvailable(KafkaUnit server) throws TimeoutException {
- //given
- String testTopic = "TestTopic";
- server.createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
-
- //when
- server.sendMessages(keyedMessage);
- List messages = server.readMessages(testTopic, 1);
-
- //then
- assertEquals(Arrays.asList("value"), messages);
- }
}
\ No newline at end of file
diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java
deleted file mode 100644
index ab3ab54..0000000
--- a/src/test/java/info/batey/kafka/unit/KafkaUnitIntegrationTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2014 Christopher Batey
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package info.batey.kafka.unit;
-
-import kafka.producer.KeyedMessage;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaUnitIntegrationTest {
-
- @Rule
- public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(6000, 6001);
-
- @Rule
- public KafkaUnitRule kafkaUnitRuleWithConnectionStrings = new KafkaUnitRule("localhost:5000", "localhost:5001");
-
- @Rule
- public KafkaUnitRule kafkaUnitRuleWithEphemeralPorts = new KafkaUnitRule();
-
- @Test
- public void junitRuleShouldHaveStartedKafka() throws Exception {
- assertKafkaStartsAndSendsMessage(kafkaUnitRule.getKafkaUnit());
- }
-
- @Test
- public void junitRuleShouldHaveStartedKafkaWithConnectionStrings() throws Exception {
- assertKafkaStartsAndSendsMessage(kafkaUnitRuleWithConnectionStrings.getKafkaUnit());
- }
-
- @Test
- public void junitRuleShouldHaveStartedKafkaWithEphemeralPorts() throws Exception {
- assertKafkaStartsAndSendsMessage(kafkaUnitRuleWithEphemeralPorts.getKafkaUnit());
- }
-
- public void assertKafkaStartsAndSendsMessage(final KafkaUnit kafkaUnit) throws Exception {
- //given
- String testTopic = "TestTopic";
- kafkaUnit.createTopic(testTopic);
- KeyedMessage keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
-
- //when
- kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
- List messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1);
-
- //then
- assertEquals(Arrays.asList("value"), messages);
-
- }
-}
diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitRuleConnectionStringIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleConnectionStringIntegrationTest.java
new file mode 100644
index 0000000..cce1b23
--- /dev/null
+++ b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleConnectionStringIntegrationTest.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2014 Christopher Batey
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package info.batey.kafka.unit;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KafkaUnitRuleConnectionStringIntegrationTest extends KafkaUnitRuleIntegrationTestBase {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule("localhost:5000", "localhost:5001");
+
+ @Test
+ public void junitRuleShouldHaveStartedKafka() throws Exception {
+ assertKafkaStartsAndSendsMessage(kafkaUnitRule.getKafkaUnit());
+ }
+}
diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitRuleEphemeralPortsIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleEphemeralPortsIntegrationTest.java
new file mode 100644
index 0000000..ced06da
--- /dev/null
+++ b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleEphemeralPortsIntegrationTest.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2014 Christopher Batey
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package info.batey.kafka.unit;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KafkaUnitRuleEphemeralPortsIntegrationTest extends KafkaUnitRuleIntegrationTestBase {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ @Test
+ public void junitRuleShouldHaveStartedKafka() throws Exception {
+ assertKafkaStartsAndSendsMessage(kafkaUnitRule.getKafkaUnit());
+ }
+
+}
diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTest.java b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTest.java
new file mode 100644
index 0000000..bc1fc02
--- /dev/null
+++ b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTest.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2014 Christopher Batey
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package info.batey.kafka.unit;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KafkaUnitRuleIntegrationTest extends KafkaUnitRuleIntegrationTestBase {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(6000, 6001);
+
+ @Test
+ public void junitRuleShouldHaveStartedKafka() throws Exception {
+ assertKafkaStartsAndSendsMessage(kafkaUnitRule.getKafkaUnit());
+ }
+
+}
diff --git a/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTestBase.java b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTestBase.java
new file mode 100644
index 0000000..fd8cf09
--- /dev/null
+++ b/src/test/java/info/batey/kafka/unit/KafkaUnitRuleIntegrationTestBase.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2014 Christopher Batey
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package info.batey.kafka.unit;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public abstract class KafkaUnitRuleIntegrationTestBase
+{
+ protected void assertKafkaStartsAndSendsMessage(final KafkaUnit kafkaUnit) throws Exception {
+ //given
+ String testTopic = "TestTopic";
+ kafkaUnit.createTopic(testTopic);
+ ProducerRecord keyedMessage = new ProducerRecord<>(testTopic,
+ "key",
+ "value");
+ //when
+ kafkaUnit.sendMessages(keyedMessage);
+ List messages = kafkaUnit.readMessages(testTopic, 1);
+ //then
+ assertEquals(Collections.singletonList("value"), messages);
+ }
+}