diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 022e2891559d7..41ed95b607161 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -350,6 +350,8 @@ private[spark] object PythonRDD extends Logging { } catch { case eof: EOFException => {} } + println("RDDDD ==================") + println(objs) JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py index 3338a766b9cc3..5ee0bd4b31253 100644 --- a/examples/src/main/python/streaming/test_oprations.py +++ b/examples/src/main/python/streaming/test_oprations.py @@ -9,11 +9,15 @@ conf = SparkConf() conf.setAppName("PythonStreamingNetworkWordCount") ssc = StreamingContext(conf=conf, duration=Seconds(1)) + ssc.checkpoint("/tmp/spark_ckp") - test_input = ssc._testInputStream([1,1,1,1]) - mapped = test_input.map(lambda x: (x, 1)) - mapped.pyprint() + test_input = ssc._testInputStream([[1],[1],[1]]) +# ssc.checkpoint("/tmp/spark_ckp") + fm_test = test_input.flatMap(lambda x: x.split(" ")) + mapped_test = fm_test.map(lambda x: (x, 1)) + + mapped_test.print_() ssc.start() # ssc.awaitTermination() # ssc.stop() diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d544eab9b8fc7..882db547faa39 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -146,7 +146,10 @@ def _testInputStream(self, test_input, numSlices=None): # Calling the Java parallelize() method with an ArrayList is too slow, # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). - tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + + #tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + tempFile = open("/tmp/spark_rdd", "wb") + # Make sure we distribute data evenly if it's smaller than self.batchSize if "__len__" not in dir(test_input): c = list(test_input) # Make it a list so we can compute its length @@ -157,6 +160,7 @@ def _testInputStream(self, test_input, numSlices=None): else: serializer = self._sc._unbatched_serializer serializer.dump_stream(test_input, tempFile) + tempFile.flush() tempFile.close() print tempFile.name jinput_stream = self._jvm.PythonTestInputStream(self._jssc,