Skip to content

Commit

Permalink
implementing transform function in Python
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent 38adf95 commit 4bcb318
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
1 change: 0 additions & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ def saveAsTextFiles(self, prefix, suffix=None):
"""
Save this DStream as a text file, using string representations of elements.
"""

def saveAsTextFile(rdd, time):
path = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(path)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.streaming.api.python

import org.apache.spark.Accumulator
import org.apache.spark.api.python.PythonRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.streaming.{Time, Duration}
import org.apache.spark.streaming.dstream.DStream

import scala.reflect.ClassTag

/**
* Created by ken on 7/15/14.
*/
class PythonTransformedDStream[T: ClassTag](
parents: Seq[DStream[T]],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
accumulator: Accumulator[JList[Array[Byte]]]
) extends DStream[Array[Byte]](parent.ssc) {

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

//pythonDStream compute
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
Some()
}
val asJavaDStream = JavaDStream.fromDStream(this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ abstract class DStream[T: ClassTag] (
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
// if transformfunc is fine, it is okay
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
Expand Down

0 comments on commit 4bcb318

Please sign in to comment.