diff --git a/examples/src/main/python/streaming/wordcount.pyc b/examples/src/main/python/streaming/wordcount.pyc new file mode 100644 index 0000000000000..db93702361f47 Binary files /dev/null and b/examples/src/main/python/streaming/wordcount.pyc differ diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 5766cca39bdee..4e18cbacf3eba 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator): keyed = PipelinedDStream(self, add_shuffle_key) keyed._bypass_serializer = True with _JavaStackTrace(self.ctx) as st: - #JavaDStream - pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream() partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, - id(partitionFunc)) - jdstream = pairDStream.partitionBy(partitioner).values() + id(partitionFunc)) + jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream() dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) # This is required so that id(partitionFunc) remains unique, even if # partitionFunc is a lambda: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index c95c81d994a95..d305797bb4a0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -59,3 +59,30 @@ class PythonDStream[T: ClassTag]( val asJavaDStream = JavaDStream.fromDStream(this) } + +private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends +DStream[Array[Byte]](prev.ssc){ + override def dependencies = List(prev) + + override def slideDuration: Duration = prev.slideDuration + + override def compute(validTime:Time):Option[RDD[Array[Byte]]]={ + prev.getOrCompute(validTime) match{ + case Some(rdd)=>Some(rdd) + val pairwiseRDD = new PairwiseRDD(rdd) + /* + * This is equivalent to following python code + * with _JavaStackTrace(self.context) as st: + * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() + * partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, + * id(partitionFunc)) + * jrdd = pairRDD.partitionBy(partitioner).values() + * rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) + */ + Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd) + case None => None + } + } + val asJavaDStream = JavaDStream.fromDStream(this) + //val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala index ff70483b771a4..bc07e09ec6d03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala @@ -1,3 +1,5 @@ +/* + package org.apache.spark.streaming.api.python import org.apache.spark.Accumulator @@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream import scala.reflect.ClassTag -/** - * Created by ken on 7/15/14. - */ class PythonTransformedDStream[T: ClassTag]( - parents: Seq[DStream[T]], + parent: DStream[T], command: Array[Byte], envVars: JMap[String, String], pythonIncludes: JList[String], @@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag]( //pythonDStream compute override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Some() + +// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq +// parents.map(_.getOrCompute(validTime).orNull).to +// parent = parents.head.asInstanceOf[RDD] +// Some() } - val asJavaDStream = JavaDStream.fromDStream(this) + + val asJavaDStream = JavaDStream.fromDStream(this) } + +*/