Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python #3715

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 22 additions & 47 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ private object SpecialLengths {
val PYTHON_EXCEPTION_THROWN = -2
val TIMING_DATA = -3
val END_OF_STREAM = -4
val NULL = -5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this patch tries to fix a bug in Python regarding null values? If so, that probably should be a different patch from this Kafka patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas I think that this same null-handling change has been proposed before but until now I don't think we had a great reason to pull it in, since none of our internal APIs relied on it and we were worried that it might mask the presence of bugs. Now that we have a need for it, though, it might be okay to pull in here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I am more or less okay with this, I just fear any regressions in core pyspark that this would cause. I dont understand the implications well enough. @JoshRosen Do you have any thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if this is a core python change, there should be unit tests that capture this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline conversation: This change is related to the SPARK-1630 which was attempted to be solved in #1551 Since that PR was rejected by @mateiz its better that he takes a look and comment on the validity of this change in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what was the discussion here? Before, the motivation was that Nones from Python would be encoded as a pickled version of None. Is it that we now want to send nulls back, and we are not using the pickle library for Java that would map them? Then it would be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Kafka, each record is (K,V), but K could be null if not provided by producer. We pass the bytes from JVM to Python, do the deserialization in Python, no pickling needed. So we need some way to tell the key is null or empty string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz Could take a call on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, it sounds good to allow nulls then.

}

private[spark] object PythonRDD extends Logging {
Expand Down Expand Up @@ -371,54 +372,28 @@ private[spark] object PythonRDD extends Logging {
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
// The right way to implement this would be to use TypeTags to get the full
// type of T. Since I don't want to introduce breaking changes throughout the
// entire Spark API, I have to use this hacky approach:
if (iter.hasNext) {
val first = iter.next()
val newIter = Seq(first).iterator ++ iter
first match {
case arr: Array[Byte] =>
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case string: String =>
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
}
case other =>
throw new SparkException("Unexpected element type " + first.getClass)
}

def write(obj: Any): Unit = obj match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method can be made a final method of scala and use tail-recursion optimization.
http://tech.pro/blog/2112/scala-tail-recursion-optimisation-and-comparison-to-java
might help in performance of large nested objects. this is can be a different PR completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! In this particular case, it only have max 2 depths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it can be as deep as the number of nesting in tuples. Like (1, (2, (3, 3))) is 3 levels deep. Yeah, thats probably not a very general usecase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JavaRDD could only be String, Array[Byte], Stream, or a pair of them.

case null =>
dataOut.writeInt(SpecialLengths.NULL)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group them into different categories.

case arr: Array[Byte] =>
dataOut.writeInt(arr.length)
dataOut.write(arr)
case str: String =>
writeUTF(str, dataOut)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line?

case stream: PortableDataStream =>
write(stream.toArray())
case (key, value) =>
write(key)
write(value)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line?

case other =>
throw new SparkException("Unexpected element type " + other.getClass)
}

iter.foreach(write)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{File, InputStream, IOException, OutputStream}
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}

private[spark] object PythonUtils {
/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
Expand All @@ -39,4 +40,8 @@ private[spark] object PythonUtils {
def mergePythonPaths(paths: String*): String = {
paths.filter(_ != "").mkString(File.pathSeparator)
}

def generateRDDWithNull(sc: JavaSparkContext): JavaRDD[String] = {
sc.parallelize(List("a", null, "b"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ import org.scalatest.FunSuite

class PythonRDDSuite extends FunSuite {

test("Writing large strings to the worker") {
val input: List[String] = List("a"*100000)
val buffer = new DataOutputStream(new ByteArrayOutputStream)
PythonRDD.writeIteratorToStream(input.iterator, buffer)
}
test("Writing large strings to the worker") {
val input: List[String] = List("a"*100000)
val buffer = new DataOutputStream(new ByteArrayOutputStream)
PythonRDD.writeIteratorToStream(input.iterator, buffer)
}

}
test("Handle nulls gracefully") {
val buffer = new DataOutputStream(new ByteArrayOutputStream)
PythonRDD.writeIteratorToStream(List("a", null).iterator, buffer)
PythonRDD.writeIteratorToStream(List(null, "a").iterator, buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: List(...).iterator ---> Iterator(....)

PythonRDD.writeIteratorToStream(List("a".getBytes, null).iterator, buffer)
PythonRDD.writeIteratorToStream(List(null, "a".getBytes).iterator, buffer)

PythonRDD.writeIteratorToStream(List((null, null), ("a", null), (null, "b")).iterator, buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no assert/checks whether the data was written correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue still has not been addressed. There are not asserts to check whether the nulls can be read back properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test in Python to verify that (but not cover all the cases). Does it work for you?

PythonRDD.writeIteratorToStream(
List((null, null), ("a".getBytes, null), (null, "b".getBytes)).iterator, buffer)
}
}
54 changes: 54 additions & 0 deletions examples/src/main/python/streaming/kafka_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <zk> <topic>

To run this on your local machine, you need to setup Kafka and create a producer first, see
http://kafka.apache.org/documentation.html#quickstart

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will remove these, and put a link here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the above commands want to run from Kafka bin/ ?
But it still create some confusion which directory we want use. In all the Spark examples bin/ refer to the bin/ of Spark.

and then run the example
`$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
106 changes: 106 additions & 0 deletions external/kafka-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Kafka Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-kafka-assembly</sbt.project.name>
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
<spark.jar.basename>spark-streaming-kafka-assembly-${project.version}.jar</spark.jar.basename>
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${spark.jar}</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell Could you do one more pass on this pom.xml.
Most importantly, will this be published to maven? Or is there something I need to add to make sure of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - this will be published by default (unless we explicitly change something).

<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ echo "Build flags: $@" >> "$DISTDIR/RELEASE"
# Copy jars
cp "$SPARK_HOME"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
cp "$SPARK_HOME"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
cp "$SPARK_HOME"/external/kafka/scala*/*kafka*assembly*.jar "$DISTDIR/lib/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than packaging this with the release, can we just ask users to add the maven coordinates when launching it. This will add a fairly large amount to the binary size of Spark (especially if we add other ones in the future).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that #4215 gets merged in all this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I not inclined to block this PR on #4215 . What we can do is rather than putting it in the distribution, give instructions to download it from maven central. Later when #4215 goes in, we can update the docs. Does that sound good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for the assembly jars is to simplify the process for Python programmers, can #4215 help in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will publish the jar to maven central. It's fine IMO in this patch to recommend downloading the published jar. And later on we can just suggest linking against the maven coordinates. But I'd like to avoid actually including this JAR in the binary distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas @davies - so can we remove this then? We can assume that we are publishing the assembly jar to maven (so that they can just add it with a flag to spark submit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can remove this. @davies can you please update the PR. Small change.

# This will fail if the -Pyarn profile is not provided
# In this case, silence the error and ignore the return code of this command
cp "$SPARK_HOME"/network/yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || :
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,7 @@
</properties>
<modules>
<module>external/kafka</module>
<module>external/kafka-assembly</module>
</modules>
</profile>

Expand Down
14 changes: 11 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))

val assemblyProjects@Seq(assembly, examples, networkYarn) =
Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
.map(ProjectRef(buildLocation, _))

val tools = ProjectRef(buildLocation, "tools")
// Root project.
Expand Down Expand Up @@ -300,7 +301,14 @@ object Assembly {
sys.props.get("hadoop.version")
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly := s"${moduleName.value}-${version.value}-hadoop${hadoopVersion.value}.jar",
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
if (mName.contains("streaming-kafka-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
s"${mName}-${v}-hadoop${hv}.jar"
}
},
mergeStrategy in assembly := {
case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SpecialLengths(object):
PYTHON_EXCEPTION_THROWN = -2
TIMING_DATA = -3
END_OF_STREAM = -4
NULL = -5


class Serializer(object):
Expand Down Expand Up @@ -133,6 +134,8 @@ def load_stream(self, stream):

def _write_with_length(self, obj, stream):
serialized = self.dumps(obj)
if serialized is None:
raise ValueError("serialized value should not be None")
if len(serialized) > (1 << 31):
raise ValueError("can not serialize object larger than 2G")
write_int(len(serialized), stream)
Expand All @@ -145,8 +148,10 @@ def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if obj == "":
if len(obj) < length:
raise EOFError
return self.loads(obj)

Expand Down Expand Up @@ -484,6 +489,8 @@ def loads(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
elif length == SpecialLengths.NULL:
return None
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s

Expand Down
Loading