Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated to Kafka 0.10.1.1 and deprecated old API. #43

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin
## Versions
| kafka-unit | Kafka broker | Zookeeper |
|------------|-------------------------|-----------|
| 0.7 | kafka_2.11:0.10.1.1 | 3.4.6 |
| 0.6 | kafka_2.11:0.10.0.0 | 3.4.6 |
| 0.5 | kafka_2.11:0.9.0.1 | 3.4.6 |
| 0.4 | kafka_2.11:0.9.0.1 | 3.4.6 |
Expand All @@ -19,7 +20,7 @@ Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testin
<dependency>
<groupId>info.batey.kafka</groupId>
<artifactId>kafka-unit</artifactId>
<version>0.6</version>
<version>0.7</version>
</dependency>
```

Expand Down Expand Up @@ -51,8 +52,8 @@ You can then write your own code to interact with Kafka or use the following met

```java
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");
kafkaUnitServer.sendRecord(producerRecord);
```

And to read messages:
Expand Down Expand Up @@ -88,9 +89,9 @@ public class KafkaUnitIntegrationTest {
public void junitRuleShouldHaveStartedKafka() throws Exception {
String testTopic = "TestTopic";
kafkaUnitRule.getKafkaUnit().createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");

kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
kafkaUnitRule.getKafkaUnit().sendRecords(producerRecord);
List<String> messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1);

assertEquals(Arrays.asList("value"), messages);
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ repositories {

dependencies {
compile 'junit:junit:4.11'
compile('org.apache.kafka:kafka_2.11:0.10.0.0') {
compile('org.apache.kafka:kafka_2.11:0.10.1.1') {
exclude module: 'slf4j-simple'
exclude module: 'slf4j-log4j12'
exclude module: 'jmxtools'
Expand Down
75 changes: 62 additions & 13 deletions src/main/java/info/batey/kafka/unit/KafkaUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.security.JaasUtils;

import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ComparisonFailure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,8 +43,19 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class KafkaUnit {

Expand All @@ -57,7 +68,7 @@ public class KafkaUnit {
private final String brokerString;
private int zkPort;
private int brokerPort;
private Producer<String, String> producer = null;
private KafkaProducer<String, String> producer = null;
private Properties kafkaBrokerConfig = new Properties();

public KafkaUnit() throws IOException {
Expand Down Expand Up @@ -172,7 +183,11 @@ public void createTopic(String topicName, Integer numPartitions) {

// run
LOGGER.info("Executing: CreateTopic " + Arrays.toString(arguments));
TopicCommand.createTopic(zkUtils, opts);
try
{
TopicCommand.createTopic(zkUtils, opts);
} catch (TopicExistsException e) {
}
}


Expand All @@ -181,6 +196,8 @@ public void shutdown() {
if (zookeeper != null) zookeeper.shutdown();
}

@Deprecated
@SuppressWarnings("deprecation")
public List<KeyedMessage<String, String>> readKeyedMessages(final String topicName, final int expectedMessages) throws TimeoutException {
return readMessages(topicName, expectedMessages, new MessageExtractor<KeyedMessage<String, String>>() {

Expand All @@ -191,6 +208,16 @@ public KeyedMessage<String, String> extract(MessageAndMetadata<String, String> m
});
}

public List<ConsumerRecord<String, String>> readConsumerRecords(final String topicName, final int expectedMessages) throws TimeoutException {
return readMessages(topicName, expectedMessages, new MessageExtractor<ConsumerRecord<String, String>>() {

@Override
public ConsumerRecord<String, String> extract(MessageAndMetadata<String, String> messageAndMetadata) {
return new ConsumerRecord<String, String>(topicName, messageAndMetadata.partition(), messageAndMetadata.offset(), messageAndMetadata.key(), messageAndMetadata.message());
}
});
}

public List<String> readMessages(String topicName, final int expectedMessages) throws TimeoutException {
return readMessages(topicName, expectedMessages, new MessageExtractor<String>() {
@Override
Expand Down Expand Up @@ -251,17 +278,39 @@ public List<T> call() throws Exception {
}
}

/**
* @see #sendRecords(ProducerRecord, ProducerRecord[])
*/
@SafeVarargs
@Deprecated
@SuppressWarnings({"deprecation", "unchecked"})
public final void sendMessages(KeyedMessage<String, String> message, KeyedMessage<String, String>... messages) {
List<ProducerRecord<String, String>> records = new ArrayList<>(messages.length);
for (KeyedMessage<String, String> m: messages) {
records.add(crteateProducerRecord(m));
}
sendRecords(crteateProducerRecord(message), records.toArray((ProducerRecord<String, String>[])new ProducerRecord[0]));
}

@Deprecated
@SuppressWarnings("deprecation")
private static <K, V> ProducerRecord<K, V> crteateProducerRecord(KeyedMessage<K, V> message) {
return new ProducerRecord(message.topic(), message.key(), message.message());
}

@SafeVarargs
public final void sendRecords(ProducerRecord<String, String> message, ProducerRecord<String, String>... messages) {
if (producer == null) {
Properties props = new Properties();
props.put("serializer.class", StringEncoder.class.getName());
props.put("metadata.broker.list", brokerString);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<>(config);
props.put("bootstrap.servers", brokerString);
producer = new KafkaProducer<String, String>(props,
new StringSerializer(),
new StringSerializer());
}
producer.send(message);
producer.send(Arrays.asList(messages));
for (ProducerRecord<String, String> m: messages) {
producer.send(m);
}
}

/**
Expand Down
27 changes: 15 additions & 12 deletions src/test/java/info/batey/kafka/unit/KafkaIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package info.batey.kafka.unit;

import kafka.producer.KeyedMessage;
import kafka.server.KafkaServerStartable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -34,7 +34,9 @@
import java.util.Properties;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class KafkaIntegrationTest {

Expand Down Expand Up @@ -67,10 +69,10 @@ public void shouldThrowComparisonFailureIfMoreMessagesRequestedThanSent() throws
//given
String testTopic = "TestTopic";
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");

//when
kafkaUnitServer.sendMessages(keyedMessage);
kafkaUnitServer.sendRecords(producerRecord);

try {
kafkaUnitServer.readMessages(testTopic, 2);
Expand Down Expand Up @@ -109,26 +111,27 @@ public void canReadKeyedMessages() throws Exception {
//given
String testTopic = "TestTopic";
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");

//when
kafkaUnitServer.sendMessages(keyedMessage);
kafkaUnitServer.sendRecords(producerRecord);

KeyedMessage<String, String> receivedMessage = kafkaUnitServer.readKeyedMessages(testTopic, 1).get(0);
ConsumerRecord<String, String> receivedConsumerRecord = kafkaUnitServer
.readConsumerRecords(testTopic, 1).get(0);

assertEquals("Received message value is incorrect", "value", receivedMessage.message());
assertEquals("Received message key is incorrect", "key", receivedMessage.key());
assertEquals("Received message topic is incorrect", testTopic, receivedMessage.topic());
assertEquals("Received message value is incorrect", "value", receivedConsumerRecord.value());
assertEquals("Received message key is incorrect", "key", receivedConsumerRecord.key());
assertEquals("Received message topic is incorrect", testTopic, receivedConsumerRecord.topic());
}

private void assertKafkaServerIsAvailable(KafkaUnit server) throws TimeoutException {
//given
String testTopic = "TestTopic";
server.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");

//when
server.sendMessages(keyedMessage);
server.sendRecords(producerRecord);
List<String> messages = server.readMessages(testTopic, 1);

//then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package info.batey.kafka.unit;

import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -54,10 +54,10 @@ public void assertKafkaStartsAndSendsMessage(final KafkaUnit kafkaUnit) throws E
//given
String testTopic = "TestTopic";
kafkaUnit.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(testTopic, "key", "value");

//when
kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
kafkaUnitRule.getKafkaUnit().sendRecords(producerRecord);
List<String> messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1);

//then
Expand Down