From 77c3e5064f72c259cc497aa06084da358145cf16 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 27 Oct 2014 16:55:13 +0800 Subject: [PATCH] Code refactor and add some unit tests --- .../streaming/kafka/KafkaInputDStream.scala | 4 +- .../kafka/ReliableKafkaReceiver.scala | 10 +- .../streaming/kafka/KafkaStreamSuite.scala | 27 ++- .../kafka/ReliableKafkaStreamSuite.scala | 184 ++++++++++++++++++ 4 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 92dd3f878a71f..5a8f4a07d0a3d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -47,12 +47,12 @@ class KafkaInputDStream[ @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], - reliableStoreEnabled: Boolean, + reliableReceiveEnabled: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)] = { - if (!reliableStoreEnabled) { + if (!reliableReceiveEnabled) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) .asInstanceOf[Receiver[(K, V)]] } else { diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 6773db67db24c..57a66f4e64fd5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -71,7 +71,6 @@ class ReliableKafkaReceiver[ val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)] topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2) } - println(s"offset map: ${topicPartitionOffsetMap.mkString(":")}") } override def onGenerateBlock(blockId: StreamBlockId): Unit = { @@ -80,7 +79,6 @@ class ReliableKafkaReceiver[ val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() - println(s"block generated: $blockId, offset snapshot: ${offsetSnapshot.mkString(":")}") } override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { @@ -101,8 +99,7 @@ class ReliableKafkaReceiver[ /** Manage the BlockGenerator in receiver itself for better managing block store and offset * commit */ - @volatile private lazy val blockGenerator = - new BlockGenerator(blockGeneratorListener, streamId, env.conf) + private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) override def onStop(): Unit = { if (consumerConnector != null) { @@ -134,6 +131,9 @@ class ReliableKafkaReceiver[ props.setProperty(AUTO_OFFSET_COMMIT, "false") val consumerConfig = new ConsumerConfig(props) + + assert(consumerConfig.autoCommitEnable == false) + logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") consumerConnector = Consumer.create(consumerConfig) logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") @@ -204,7 +204,7 @@ class ReliableKafkaReceiver[ s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t) } - println(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " + + logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " + s"partition ${topicAndPart.partition}") } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6943326eb750e..cf58b5cb70cd6 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -93,12 +93,27 @@ class KafkaStreamSuite extends TestSuiteBase { } override def afterFunction() { - producer.close() - server.shutdown() + if (producer != null) { + producer.close() + producer = null + } + + if (server != null) { + server.shutdown() + server = null + } + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } - zkClient.close() - zookeeper.shutdown() + if (zkClient != null) { + zkClient.close() + zkClient = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } super.afterFunction() } @@ -155,7 +170,9 @@ class KafkaStreamSuite extends TestSuiteBase { def produceAndSendMessage(topic: String, sent: Map[String, Int]) { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + if (producer == null) { + producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + } producer.send(createTestMessage(topic, sent): _*) logInfo("==================== 6 ====================") } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala new file mode 100644 index 0000000000000..0cf2752ebdb4d --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.mutable + +import kafka.serializer.StringDecoder +import kafka.utils.{ZkUtils, ZKGroupTopicDirs} + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext + +class ReliableKafkaStreamSuite extends KafkaStreamSuite { + import KafkaTestUtils._ + + test("Reliable Kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topic = "test" + val sent = Map("a" -> 1, "b" -> 1, "c" -> 1) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", + "group.id" -> s"test-consumer-${random.nextInt(10000)}", + "auto.offset.reset" -> "smallest") + + val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + Map(topic -> 1), + StorageLevel.MEMORY_ONLY) + val result = new mutable.HashMap[String, Long]() + stream.map { case (k, v) => v } + .foreachRDD { r => + val ret = r.collect() + ret.foreach { v => + val count = result.getOrElseUpdate(v, 0) + 1 + result.put(v, count) + } + } + ssc.start() + ssc.awaitTermination(3000) + + assert(sent.size === result.size) + sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } + + ssc.stop() + } + + test("Verify the offset commit") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topic = "test" + val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val groupId = s"test-consumer-${random.nextInt(10000)}" + + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", + "group.id" -> groupId, + "auto.offset.reset" -> "smallest") + + assert(getCommitOffset(groupId, topic, 0) === 0L) + + val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + Map(topic -> 1), + StorageLevel.MEMORY_ONLY) + stream.foreachRDD(_ => Unit) + ssc.start() + ssc.awaitTermination(3000) + ssc.stop() + + assert(getCommitOffset(groupId, topic, 0) === 29L) + } + + test("Verify multiple topics offset commit") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) + val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) + topics.foreach { case (t, _) => + createTopic(t) + produceAndSendMessage(t, sent) + } + + val groupId = s"test-consumer-${random.nextInt(10000)}" + + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", + "group.id" -> groupId, + "auto.offset.reset" -> "smallest") + + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } + + val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY) + stream.foreachRDD(_ => Unit) + ssc.start() + ssc.awaitTermination(3000) + ssc.stop() + + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } + } + + test("Verify offset commit when exception is met") { + val sparkConf = new SparkConf() + .setMaster(master) + .setAppName(framework) + var ssc = new StreamingContext( + sparkConf.clone.set("spark.streaming.blockInterval", "4000"), + batchDuration) + val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) + val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) + topics.foreach { case (t, _) => + createTopic(t) + produceAndSendMessage(t, sent) + } + + val groupId = s"test-consumer-${random.nextInt(10000)}" + + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", + "group.id" -> groupId, + "auto.offset.reset" -> "smallest") + + KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception) + try { + ssc.start() + ssc.awaitTermination(1000) + } catch { + case e: Exception => + if (ssc != null) { + ssc.stop() + ssc = null + } + } + // Failed before putting to BM, so offset is not updated. + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } + + // Restart to see if data is consumed from last checkpoint. + ssc = new StreamingContext(sparkConf, batchDuration) + KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit) + ssc.start() + ssc.awaitTermination(3000) + ssc.stop() + + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } + } + + private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { + assert(zkClient != null, "Zookeeper client is not initialized") + + val topicDirs = new ZKGroupTopicDirs(groupId, topic) + val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" + + ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L) + } +}