From 5697a012def1b8508d21d96ccccf2afb7d6705cf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 18 Dec 2014 15:44:33 -0800 Subject: [PATCH] bypass decoder in scala --- python/pyspark/streaming/kafka.py | 39 +++++++++++++++++++------------ 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 24d62db1979b9..6d0cb5d4c1f63 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -15,23 +15,26 @@ # limitations under the License. # - from py4j.java_collections import MapConverter from py4j.java_gateway import java_import, Py4JError from pyspark.storagelevel import StorageLevel -from pyspark.serializers import PairDeserializer, UTF8Deserializer +from pyspark.serializers import PairDeserializer, NoOpSerializer from pyspark.streaming import DStream __all__ = ['KafkaUtils'] +def utf8_decoder(s): + return s.decode('utf-8') + + class KafkaUtils(object): @staticmethod def createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, - keyDecoder=None, valueDecoder=None): + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ Create an input stream that pulls messages from a Kafka Broker. @@ -47,26 +50,32 @@ def createStream(ssc, zkQuorum, groupId, topics, """ java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") + param = { + "zookeeper.connect": zkQuorum, + "group.id": groupId, + "zookeeper.connection.timeout.ms": "10000", + } if not isinstance(topics, dict): raise TypeError("topics should be dict") jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) + jparam = MapConverter().convert(param, ssc.sparkContext._gateway._gateway_client) jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + + def getClassByName(name): + return ssc._jvm.org.apache.spark.util.Utils.classForName(name) + try: - jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, zkQuorum, groupId, jtopics, - jlevel) + array = getClassByName("[B") + decoder = getClassByName("kafka.serializer.DefaultDecoder") + jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, + jparam, jtopics, jlevel) except Py4JError, e: if 'call a package' in e.message: print "No kafka package, please build it and add it into classpath:" print " $ sbt/sbt streaming-kafka/package" - print " $ bin/submit --driver-class-path external/kafka/target/scala-2.10/" \ - "spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar" - raise Exception("No kafka package") + print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \ + "external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar" raise e - ser = PairDeserializer(UTF8Deserializer(), UTF8Deserializer()) + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) - - if keyDecoder is not None: - stream = stream.map(lambda (k, v): (keyDecoder(k), v)) - if valueDecoder is not None: - stream = stream.mapValues(valueDecoder) - return stream + return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))