Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor 1: Achieve deadlock-free lock ordering by subclassing BlockGenerator and taking receiver lock first in BlockGenerator.updateCurrentBuffer #7

Open
wants to merge 7 commits into
base: kafka-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka

import java.util.Properties
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}

import scala.collection.{Map, mutable}
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -75,7 +75,8 @@ class ReliableKafkaReceiver[
private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null

/** A concurrent HashMap to store the stream block id and related offset snapshot. */
private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
private var blockOffsetMap:
ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null

/**
* Manage the BlockGenerator in receiver itself for better managing block store and offset
Expand All @@ -86,6 +87,8 @@ class ReliableKafkaReceiver[
/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null

private var currentBlockId: StreamBlockId = null

override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")

Expand All @@ -96,7 +99,7 @@ class ReliableKafkaReceiver[
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
blockGenerator = new CustomBlockGenerator(new GeneratedBlockHandler)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
Expand Down Expand Up @@ -145,11 +148,6 @@ class ReliableKafkaReceiver[
}

override def onStop(): Unit = {
if (messageHandlerThreadPool != null) {
messageHandlerThreadPool.shutdown()
messageHandlerThreadPool = null
}

if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
Expand All @@ -160,6 +158,11 @@ class ReliableKafkaReceiver[
zkClient = null
}

if (messageHandlerThreadPool != null) {
messageHandlerThreadPool.shutdown()
messageHandlerThreadPool = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
Expand Down Expand Up @@ -199,6 +202,7 @@ class ReliableKafkaReceiver[
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])

Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
}
Expand All @@ -221,10 +225,10 @@ class ReliableKafkaReceiver[

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
} catch {
case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
}

logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
s"partition ${topicAndPart.partition}")
}
Expand All @@ -241,12 +245,23 @@ class ReliableKafkaReceiver[
}
} catch {
case e: Exception =>
logError("Error handling message", e)
restart("Error handling message", e)
}
}
}
}

/**
* Custom BlockGenerator class that synchronizes on the Receiver object before
* switching the receiving buffer.
*/
private final class CustomBlockGenerator(listener: BlockGeneratorListener)
extends BlockGenerator(listener, streamId, conf) {
override def updateCurrentBuffer(time: Long): Unit = ReliableKafkaReceiver.this.synchronized {
super.updateCurrentBuffer(time)
}
}

/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

Expand All @@ -261,7 +276,7 @@ class ReliableKafkaReceiver[
}

override def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
restart(message, throwable)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -43,23 +45,25 @@

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.afterFunction();
suiteBase.tearDownKafka();
}

@Test
Expand All @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ import org.apache.spark.util.Utils
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
val batchDuration = Milliseconds(500)
var ssc: StreamingContext = _

var zkAddress: String = _
var zkClient: ZkClient = _

Expand All @@ -64,7 +58,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
private var server: KafkaServer = _
private var producer: Producer[String, String] = _

def beforeFunction() {
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
Expand Down Expand Up @@ -100,12 +94,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
logInfo("==================== 4 ====================")
}

def afterFunction() {
if (ssc != null) {
ssc.stop()
ssc = null
}

def tearDownKafka() {
if (producer != null) {
producer.close()
producer = null
Expand Down Expand Up @@ -146,21 +135,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
var ssc: StreamingContext = _

before { beforeFunction() }
after { afterFunction() }
before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
ssc = new StreamingContext(sparkConf, batchDuration)
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
Expand Down
Loading