From b3806aba729842bd7c5fff806c59d4820c2b430a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 14 May 2015 12:29:33 -0700 Subject: [PATCH] Fix test --- .../scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index bda7831d2bc38..185cbd3ba1b16 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -233,7 +233,8 @@ object KafkaUtils { case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port)) }.toMap } - new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + val cleanedHandler = sc.clean(messageHandler) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler) } /**