Skip to content

Commit

Permalink
reduceByKey is working
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Aug 18, 2014
1 parent d25d5cf commit 0b8b7d0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
Binary file added examples/src/main/python/streaming/wordcount.pyc
Binary file not shown.
6 changes: 2 additions & 4 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
keyed = PipelinedDStream(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
#JavaDStream
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jdstream = pairDStream.partitionBy(partitioner).values()
id(partitionFunc))
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,30 @@ class PythonDStream[T: ClassTag](
val asJavaDStream = JavaDStream.fromDStream(this)
}


private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
DStream[Array[Byte]](prev.ssc){
override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime:Time):Option[RDD[Array[Byte]]]={
prev.getOrCompute(validTime) match{
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* This is equivalent to following python code
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
* id(partitionFunc))
* jrdd = pairRDD.partitionBy(partitioner).values()
* rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
*/
Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
case None => None
}
}
val asJavaDStream = JavaDStream.fromDStream(this)
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/*
package org.apache.spark.streaming.api.python
import org.apache.spark.Accumulator
Expand All @@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
import scala.reflect.ClassTag
/**
* Created by ken on 7/15/14.
*/
class PythonTransformedDStream[T: ClassTag](
parents: Seq[DStream[T]],
parent: DStream[T],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
//pythonDStream compute
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
Some()
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
// parents.map(_.getOrCompute(validTime).orNull).to
// parent = parents.head.asInstanceOf[RDD]
// Some()
}
val asJavaDStream = JavaDStream.fromDStream(this)
val asJavaDStream = JavaDStream.fromDStream(this)
}
*/

0 comments on commit 0b8b7d0

Please sign in to comment.