Skip to content

Commit

Permalink
bypass decoder in scala
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Dec 18, 2014
1 parent 048dbe6 commit 5697a01
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@
# limitations under the License.
#


from py4j.java_collections import MapConverter
from py4j.java_gateway import java_import, Py4JError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, UTF8Deserializer
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.streaming import DStream

__all__ = ['KafkaUtils']


def utf8_decoder(s):
return s.decode('utf-8')


class KafkaUtils(object):

@staticmethod
def createStream(ssc, zkQuorum, groupId, topics,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
keyDecoder=None, valueDecoder=None):
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.
Expand All @@ -47,26 +50,32 @@ def createStream(ssc, zkQuorum, groupId, topics,
"""
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")

param = {
"zookeeper.connect": zkQuorum,
"group.id": groupId,
"zookeeper.connection.timeout.ms": "10000",
}
if not isinstance(topics, dict):
raise TypeError("topics should be dict")
jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client)
jparam = MapConverter().convert(param, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

def getClassByName(name):
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)

try:
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, zkQuorum, groupId, jtopics,
jlevel)
array = getClassByName("[B")
decoder = getClassByName("kafka.serializer.DefaultDecoder")
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
if 'call a package' in e.message:
print "No kafka package, please build it and add it into classpath:"
print " $ sbt/sbt streaming-kafka/package"
print " $ bin/submit --driver-class-path external/kafka/target/scala-2.10/" \
"spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
raise Exception("No kafka package")
print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \
"external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
raise e
ser = PairDeserializer(UTF8Deserializer(), UTF8Deserializer())
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)

if keyDecoder is not None:
stream = stream.map(lambda (k, v): (keyDecoder(k), v))
if valueDecoder is not None:
stream = stream.mapValues(valueDecoder)
return stream
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))

0 comments on commit 5697a01

Please sign in to comment.