From b3b0362fb162b9029ff14a5896f2b82ace191de5 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 11 Aug 2014 03:21:22 -0700 Subject: [PATCH] added basic operation test cases --- .../main/python/streaming/test_oprations.py | 19 +++++----- python/pyspark/streaming/context.py | 36 +++++++++++++++++++ python/pyspark/streaming/dstream.py | 1 - python/pyspark/streaming_tests.py | 5 ++- .../streaming/api/python/PythonDStream.scala | 2 -- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py index 24ebe23d63166..70a62058286e9 100644 --- a/examples/src/main/python/streaming/test_oprations.py +++ b/examples/src/main/python/streaming/test_oprations.py @@ -9,22 +9,23 @@ conf = SparkConf() conf.setAppName("PythonStreamingNetworkWordCount") ssc = StreamingContext(conf=conf, duration=Seconds(1)) - - test_input = ssc._testInputStream([1,2,3]) - class buff: + class Buff: + result = list() pass + Buff.result = list() + + test_input = ssc._testInputStream([range(1,4), range(4,7), range(7,10)]) fm_test = test_input.map(lambda x: (x, 1)) - fm_test.test_output(buff) + fm_test.pyprint() + fm_test._test_output(Buff.result) ssc.start() while True: ssc.awaitTermination(50) - try: - buff.result + if len(Buff.result) == 3: break - except AttributeError: - pass ssc.stop() - print buff.result + print Buff.result + diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 765c4d5b96c74..e1ea6bda7adac 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -150,6 +150,7 @@ def _testInputStream(self, test_inputs, numSlices=None): This implementation is inspired by QueStream implementation. Give list of RDD to generate DStream which contains the RDD. """ +<<<<<<< HEAD test_rdds = list() test_rdd_deserializers = list() for test_input in test_inputs: @@ -161,3 +162,38 @@ def _testInputStream(self, test_inputs, numSlices=None): jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() return DStream(jinput_stream, self, test_rdd_deserializers[0]) +======= + self._jssc.checkpoint(directory) + + def _testInputStream(self, test_inputs, numSlices=None): + """ + Generate multiple files to make "stream" in Scala side for test. + Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile. + """ + numSlices = numSlices or self._sc.defaultParallelism + # Calling the Java parallelize() method with an ArrayList is too slow, + # because it sends O(n) Py4J commands. As an alternative, serialized + # objects are written to a file and loaded through textFile(). + + tempFiles = list() + for test_input in test_inputs: + tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + + # Make sure we distribute data evenly if it's smaller than self.batchSize + if "__len__" not in dir(test_input): + c = list(test_input) # Make it a list so we can compute its length + batchSize = min(len(test_input) // numSlices, self._sc._batchSize) + if batchSize > 1: + serializer = BatchedSerializer(self._sc._unbatched_serializer, + batchSize) + else: + serializer = self._sc._unbatched_serializer + serializer.dump_stream(test_input, tempFile) + tempFiles.append(tempFile.name) + + jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client) + jinput_stream = self._jvm.PythonTestInputStream(self._jssc, + jtempFiles, + numSlices).asJavaDStream() + return DStream(jinput_stream, self, PickleSerializer()) +>>>>>>> added basic operation test cases diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 4c51a862a58fa..c1ace350b1bf5 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -236,7 +236,6 @@ def pyprint(self): operator, so this DStream will be registered as an output stream and there materialized. """ def takeAndPrint(rdd, time): - print "take and print ===================" taken = rdd.take(11) print "-------------------------------------------" print "Time: %s" % (str(time)) diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index d81f9b023ada2..0682a68a419ac 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -449,12 +449,11 @@ def tearDownClass(cls): current_time = time.time() # check time out if (current_time - start_time) > self.timeout: - self.ssc.stop() break self.ssc.awaitTermination(50) - if buff.result is not None: + if len(expected_output) == len(StreamOutput.result): break - return buff.result + return StreamOutput.result if __name__ == "__main__": unittest.main() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 289363206ca5a..6db7e0cfa6846 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -55,8 +55,6 @@ class PythonDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { parent.getOrCompute(validTime) match{ case Some(rdd) => - logInfo("RDD ID in python DStream ===========") - logInfo("RDD id " + rdd.id) val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) Some(pythonRDD.asJavaRDD.rdd) case None => None