Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor 2: Removed the receiver's locks and essentially reverted to Saisai's original design #6

Merged
merged 5 commits into from
Nov 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)
blockGenerator.addDataWithCallback(data, metadata)
}

/** Update stored offset */
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
topicPartitionOffsetMap.put(topicAndPartition, offset)
}

/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
Expand Down Expand Up @@ -221,8 +227,9 @@ class ReliableKafkaReceiver[

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
} catch {
case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
}

logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
Expand Down Expand Up @@ -250,17 +257,25 @@ class ReliableKafkaReceiver[
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
def onAddData(data: Any, metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}

def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -43,23 +45,25 @@

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.afterFunction();
suiteBase.tearDownKafka();
}

@Test
Expand All @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

/**
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
val batchDuration = Milliseconds(500)
var ssc: StreamingContext = _

var zkAddress: String = _
var zkClient: ZkClient = _

Expand All @@ -64,7 +62,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
private var server: KafkaServer = _
private var producer: Producer[String, String] = _

def beforeFunction() {
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
Expand Down Expand Up @@ -100,12 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
logInfo("==================== 4 ====================")
}

def afterFunction() {
if (ssc != null) {
ssc.stop()
ssc = null
}

def tearDownKafka() {
if (producer != null) {
producer.close()
producer = null
Expand Down Expand Up @@ -146,21 +139,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
var ssc: StreamingContext = _

before {
setupKafka()
}

before { beforeFunction() }
after { afterFunction() }
after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
ssc = new StreamingContext(sparkConf, batchDuration)
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,75 @@

package org.apache.spark.streaming.kafka


import java.io.File

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.google.common.io.Files
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
val topic = "topic"

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)


var groupId: String = _
var kafkaParams: Map[String, String] = _
var ssc: StreamingContext = _
var tempDirectory: File = null

before {
beforeFunction() // call this first to start ZK and Kafka
setupKafka()
groupId = s"test-consumer-${Random.nextInt(10000)}"
kafkaParams = Map(
"zookeeper.connect" -> zkAddress,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)

ssc = new StreamingContext(sparkConf, Milliseconds(500))
tempDirectory = Files.createTempDir()
ssc.checkpoint(tempDirectory.getAbsolutePath)
}

after {
afterFunction()
if (ssc != null) {
ssc.stop()
}
if (tempDirectory != null && tempDirectory.exists()) {
FileUtils.deleteDirectory(tempDirectory)
tempDirectory = null
}
tearDownKafka()
}

test("Reliable Kafka input stream") {
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

test("Reliable Kafka input stream with single topic") {
var topic = "test-topic"
createTopic(topic)
produceAndSendMessage(topic, data)

// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === None)

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }.foreachRDD { r =>
val ret = r.collect()
Expand All @@ -77,84 +95,46 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
assert(data.size === result.size)
// Verify whether each message is the same as the data to be verified.
data.keys.foreach { k => assert(data(k) === result(k).toInt) }
}
ssc.stop()
}
test("Verify the offset commit") {
// Verify the correctness of offset commit mechanism.
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

createTopic(topic)
produceAndSendMessage(topic, data)

// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === 0L)

// Do this to consume all the message of this group/topic.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === 29L)
assert(getCommitOffset(groupId, topic, 0) === Some(29L))
}
ssc.stop()
}

test("Verify multiple topics offset commit") {
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

test("Reliable Kafka input stream with multiple topics") {
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
createTopic(t)
produceAndSendMessage(t, data)
}

// Before started, verify all the group/topic/partition offsets are 0.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }

// Consuming all the data sent to the broker which will potential commit the offsets internally.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
// Verify the offset for each group/topic to see whether they are equal to the expected one.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
}
ssc.stop()
}


/** Getting partition offset from Zookeeper. */
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
assert(zkClient != null, "Zookeeper client is not initialized")

val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"

ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L)
ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
}
}
Loading