Skip to content

Commit

Permalink
WIP added test case
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent bd3ba53 commit 9cde7c9
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ private[spark] object PythonRDD extends Logging {
} catch {
case eof: EOFException => {}
}
println("RDDDD ==================")
println(objs)
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}

Expand Down
25 changes: 16 additions & 9 deletions examples/src/main/python/streaming/test_oprations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@
conf = SparkConf()
conf.setAppName("PythonStreamingNetworkWordCount")
ssc = StreamingContext(conf=conf, duration=Seconds(1))
ssc.checkpoint("/tmp/spark_ckp")

test_input = ssc._testInputStream([[1],[1],[1]])
# ssc.checkpoint("/tmp/spark_ckp")
fm_test = test_input.flatMap(lambda x: x.split(" "))
mapped_test = fm_test.map(lambda x: (x, 1))
test_input = ssc._testInputStream([1,2,3])
class buff:
pass

fm_test = test_input.map(lambda x: (x, 1))
fm_test.test_output(buff)


mapped_test.print_()
ssc.start()
# ssc.awaitTermination()
# ssc.stop()
while True:
ssc.awaitTermination(50)
try:
buff.result
break
except AttributeError:
pass

ssc.stop()
print buff.result
2 changes: 1 addition & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def pyprint(self):
operator, so this DStream will be registered as an output stream and there materialized.
"""
def takeAndPrint(rdd, time):
print "take and print ==================="
taken = rdd.take(11)
print "-------------------------------------------"
print "Time: %s" % (str(time))
Expand Down Expand Up @@ -420,7 +421,6 @@ def saveAsTextFile(rdd, time):
# TODO: implemtnt rightOuterJoin



class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,17 @@ def tearDown(self):
def tearDownClass(cls):
PySparkStreamingTestCase.tearDownClass()

start_time = time.time()
while True:
current_time = time.time()
# check time out
if (current_time - start_time) > self.timeout:
self.ssc.stop()
break
self.ssc.awaitTermination(50)
if buff.result is not None:
break
return buff.result

if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main(infile, outfile):
SparkFiles._is_running_on_worker = True

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.print()
}

def print(label: String = null): Unit = {
dstream.print(label)
}

def outputToFile(): Unit = {
dstream.outputToFile()
}


/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class PythonDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
parent.getOrCompute(validTime) match{
case Some(rdd) =>
logInfo("RDD ID in python DStream ===========")
logInfo("RDD id " + rdd.id)
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
Some(pythonRDD.asJavaRDD.rdd)
case None => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,23 @@ abstract class DStream[T: ClassTag] (
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}


def print(label: String = null) {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
if(label != null){
println (label)
}
first11.take(10).foreach(println)
if (first11.size > 10) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

/**
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream. The new DStream generates RDDs with
Expand Down

0 comments on commit 9cde7c9

Please sign in to comment.