diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py index 8c08ff0c89850..40faff0ccc7db 100644 --- a/examples/src/main/python/streaming/hdfs_wordcount.py +++ b/examples/src/main/python/streaming/hdfs_wordcount.py @@ -1,3 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in new text files created in the given directory + Usage: hdfs_wordcount.py + is the directory that Spark Streaming will use to find and read new text files. + + To run this on your local machine on directory `localdir`, run this example + $ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localdir + + Then create a text file in `localdir` and the words in the file will get counted. +""" + import sys from pyspark import SparkContext @@ -5,10 +33,10 @@ if __name__ == "__main__": if len(sys.argv) != 2: - print >> sys.stderr, "Usage: wordcount " + print >> sys.stderr, "Usage: hdfs_wordcount.py " exit(-1) - sc = SparkContext(appName="PythonStreamingWordCount") + sc = SparkContext(appName="PythonStreamingHDFSWordCount") ssc = StreamingContext(sc, 1) lines = ssc.textFileStream(sys.argv[1]) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index e3b6248c82a12..cfa9c1ff5bfbc 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -1,3 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Usage: network_wordcount.py + and describe the TCP server that Spark Streaming would connect to receive data. + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + and then run the example + `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999` +""" + import sys from pyspark import SparkContext @@ -5,7 +33,7 @@ if __name__ == "__main__": if len(sys.argv) != 3: - print >> sys.stderr, "Usage: wordcount " + print >> sys.stderr, "Usage: network_wordcount.py " exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount") ssc = StreamingContext(sc, 1) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index da645a6201503..9808361eb664f 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -234,11 +234,11 @@ def transform(self, dstreams, transformFunc): jdstreams = ListConverter().convert([d._jdstream for d in dstreams], SparkContext._gateway._gateway_client) # change the final serializer to sc.serializer - jfunc = RDDFunction(self._sc, - lambda t, *rdds: transformFunc(rdds).map(lambda x: x), - *[d._jrdd_deserializer for d in dstreams]) - - jdstream = self._jvm.PythonDStream.callTransform(self._jssc, jdstreams, jfunc) + func = RDDFunction(self._sc, + lambda t, *rdds: transformFunc(rdds).map(lambda x: x), + *[d._jrdd_deserializer for d in dstreams]) + jfunc = self._jvm.RDDFunction(func) + jdstream = self._jssc.transform(jdstreams, jfunc) return DStream(jdstream, self, self._sc.serializer) def union(self, *dstreams): diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 4e3f07e26953b..87d5bb4906bd5 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -150,9 +150,6 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): """ return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc)) - # def foreach(self, func): - # return self.foreachRDD(lambda _, rdd: rdd.foreach(func)) - def foreachRDD(self, func): """ Apply a function to each RDD in this DStream. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index ddbbf107abb3e..4a19f27fe9c7d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -47,12 +47,12 @@ private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction) extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { - PythonDStream.some(pfunc.call(time.milliseconds, List(PythonDStream.wrapRDD(rdd)).asJava)) + Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd) } def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { - val rdds = List(PythonDStream.wrapRDD(rdd), PythonDStream.wrapRDD(rdd2)).asJava - PythonDStream.some(pfunc.call(time.milliseconds, rdds)) + val rdds = List(rdd.map(JavaRDD.fromRDD(_)).orNull, rdd2.map(JavaRDD.fromRDD(_)).orNull).asJava + Option(pfunc.call(time.milliseconds, rdds)).map(_.rdd) } // for function.Function2 @@ -115,39 +115,13 @@ private[python] object PythonDStream { serializer = new RDDFunctionSerializer(ser) } - // convert Option[RDD[_]] to JavaRDD, handle null gracefully - def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { - if (rdd.isDefined) { - JavaRDD.fromRDD(rdd.get) - } else { - null - } - } - - // convert JavaRDD to Option[RDD[Array[Byte]]] to , handle null gracefully - def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { - if (jrdd != null) { - Some(jrdd.rdd) - } else { - None - } - } - // helper function for DStream.foreachRDD(), // cannot be `foreachRDD`, it will confusing py4j - def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction){ + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction) { val func = new RDDFunction((pfunc)) jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time)) } - // helper function for ssc.transform() - def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], - pyfunc: PythonRDDFunction) - :JavaDStream[Array[Byte]] = { - val func = new RDDFunction(pyfunc) - ssc.transform(jdsteams, func) - } - // convert list of RDD into queue of RDDs, for ssc.queueStream() def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] @@ -232,7 +206,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_], func(parent.getOrCompute(validTime), parent2.getOrCompute(validTime), validTime) } - val asJavaDStream = JavaDStream.fromDStream(this) + val asJavaDStream = JavaDStream.fromDStream(this) } /**