diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index f44cd696894ba..3996991109d60 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -1,6 +1,7 @@ import sys from operator import add +from pyspark.conf import SparkConf from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -8,15 +9,22 @@ if len(sys.argv) != 2: print >> sys.stderr, "Usage: wordcount " exit(-1) - ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + conf = SparkConf() + conf.setAppName("PythonStreamingWordCount") + conf.set("spark.default.parallelism", 1) + +# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.textFileStream(sys.argv[1]) fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) + reduced_lines = mapped_lines.reduce(add) fm_lines.pyprint() filtered_lines.pyprint() mapped_lines.pyprint() + reduced_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index cd28184274c9a..f0a3342876e4c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -9,6 +9,7 @@ __all__ = ["DStream"] + class DStream(object): def __init__(self, jdstream, ssc, jrdd_deserializer): self._jdstream = jdstream @@ -69,7 +70,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, """ """ if numPartitions is None: - numPartitions = self.ctx._defaultParallelism() + numPartitions = self._defaultReducePartitions() def combineLocally(iterator): combiners = {} for x in iterator: @@ -130,8 +131,31 @@ def add_shuffle_key(split, iterator): return dstream def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + + """ return PipelinedDStream(self, f, preservesPartitioning) + def _defaultReducePartitions(self): + """ + + """ + # hard code to avoid the error + return 2 + if self.ctx._conf.contains("spark.default.parallelism"): + return self.ctx.defaultParallelism + else: + return self.getNumPartitions() + + def getNumPartitions(self): + """ + Returns the number of partitions in RDD + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) + >>> rdd.getNumPartitions() + 2 + """ + return self._jdstream.partitions().size() + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 76b88385e095a..83e4eaa8b5e4e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -55,6 +55,91 @@ class PythonDStream[T: ClassTag]( case None => None } } +<<<<<<< HEAD val asJavaDStream = JavaDStream.fromDStream(this) } +======= + val asJavaDStream = JavaDStream.fromDStream(this) + + /** + * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output + * operator, so this PythonDStream will be registered as an output stream and there materialized. + * Since serialized Python object is readable by Python, pyprint writes out binary data to + * temporary file and run python script to deserialized and print the first ten elements + */ + private[streaming] def ppyprint() { + def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => { + val iter = rdd.take(11).iterator + + // make a temporary file + val prefix = "spark" + val suffix = ".tmp" + val tempFile = File.createTempFile(prefix, suffix) + val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) + //write out serialized python object + PythonRDD.writeIteratorToStream(iter, tempFileStream) + tempFileStream.close() + + // This value has to be passed from python + //val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") + //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? + //absolute path to the python script is needed to change because we do not use pysparkstreaming + val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath) + val workerEnv = pb.environment() + + //envVars also need to be pass + //workerEnv.putAll(envVars) + val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") + workerEnv.put("PYTHONPATH", pythonPath) + val worker = pb.start() + val is = worker.getInputStream() + val isr = new InputStreamReader(is) + val br = new BufferedReader(isr) + + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + + //print value from python std out + var line = "" + breakable { + while (true) { + line = br.readLine() + if (line == null) break() + println(line) + } + } + //delete temporary file + tempFile.delete() + println() + + } + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + } +} + + +private class PairwiseDStream(prev:DStream[Array[Byte]]) extends +DStream[(Long, Array[Byte])](prev.ssc){ + override def dependencies = List(prev) + + override def slideDuration: Duration = prev.slideDuration + + override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={ + prev.getOrCompute(validTime) match{ + case Some(rdd)=>Some(rdd) + val pairwiseRDD = new PairwiseRDD(rdd) + Some(pairwiseRDD.asJavaPairRDD.rdd) + case None => None + } + } + val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) +} + + + + + +>>>>>>> added reducedByKey not working yet