Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python #3715

Closed
wants to merge 22 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Dec 16, 2014

This PR brings the Python API for Spark Streaming Kafka data source.

    class KafkaUtils(__builtin__.object)
     |  Static methods defined here:
     |
     |  createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
     |      Create an input stream that pulls messages from a Kafka Broker.
     |
     |      :param ssc:  StreamingContext object
     |      :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
     |      :param groupId:  The group id for this consumer.
     |      :param topics:  Dict of (topic_name -> numPartitions) to consume.
     |                      Each partition is consumed in its own thread.
     |      :param storageLevel:  RDD storage level.
     |      :param keyDecoder:  A function used to decode key
     |      :param valueDecoder:  A function used to decode value
     |      :return: A DStream object

run the example:

bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24510 has started for PR 3715 at commit 75d485e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24510 has finished for PR 3715 at commit 75d485e.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • class MQTTUtils(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24510/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24511 has started for PR 3715 at commit 048dbe6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24511 has finished for PR 3715 at commit 048dbe6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • class MQTTUtils(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24511/
Test PASSed.

@prabeesh
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24609 has started for PR 3715 at commit 5697a01.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24609 has finished for PR 3715 at commit 5697a01.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • class MQTTUtils(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24609/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 19, 2014

Test build #24610 has started for PR 3715 at commit 98c8d17.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 19, 2014

Test build #24610 has finished for PR 3715 at commit 98c8d17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • class MQTTUtils(object):
    • class Analyzer(catalog: Catalog,

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24610/
Test FAILed.

@davies davies changed the title [WIP] Kafka and MQTT support in Python [SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python Jan 8, 2015
@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25267 has started for PR 3715 at commit eea16a7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25267 has finished for PR 3715 at commit eea16a7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25267/
Test FAILed.

dataOut.write(bytes)
}
}
def writeS(str: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line missing.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25951 has started for PR 3715 at commit adeeb38.

  • This patch does not merge cleanly.

Conflicts:
	make-distribution.sh
	project/SparkBuild.scala
@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26331 has started for PR 3715 at commit f257071.

  • This patch does not merge cleanly.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26331 has finished for PR 3715 at commit f257071.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26331/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26339 has started for PR 3715 at commit e6d0427.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26339 has finished for PR 3715 at commit e6d0427.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26339/
Test PASSed.

def write(obj: Any): Unit = obj match {
case null =>
dataOut.writeInt(SpecialLengths.NULL)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group them into different categories.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26361 has started for PR 3715 at commit 4280d04.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26361 has finished for PR 3715 at commit 4280d04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26361/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26578 has started for PR 3715 at commit d93bfe0.

  • This patch merges cleanly.

@davies
Copy link
Contributor Author

davies commented Feb 3, 2015

@tdas @pwendell done

print "No kafka package, please put the assembly jar into classpath:"
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
"scala-*/spark-streaming-kafka-assembly-*.jar"
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message that gets printed here is quite scary.


2015-02-02 18:31:31.950 java[76691:5f03] Unable to load realm info from SCDynamicStore
No kafka package, please put the assembly jar into classpath:
 $ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
Traceback (most recent call last):
  File "/Users/tdas/Projects/Spark/spark/examples/src/main/python/streaming/kafka_wordcount.py", line 46, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/streaming/kafka.py", line 80, in createStream
    raise e
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.util.Utils.classForName.
: java.lang.ClassNotFoundException: kafka.serializer.DefaultDecoder
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:153)
    at org.apache.spark.util.Utils.classForName(Utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:744)

Its easy to miss the real message. Is it possible to quit in a such that the whole stack trace does not get printed. Rather it gracefully quits after printing this message? Perhaps an exit? @JoshRosen Is that good idea.

@tdas
Copy link
Contributor

tdas commented Feb 3, 2015

I am merging despite a small comment from me. Thanks @davies and others for helping!

@asfgit asfgit closed this in 0561c45 Feb 3, 2015
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26578 has finished for PR 3715 at commit d93bfe0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26578/
Test PASSed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants