diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 2d8b1e468dc4c..fe67250604d8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag]( } } - +/* private class PairwiseDStream(prev:DStream[Array[Byte]]) extends DStream[(Long, Array[Byte])](prev.ssc){ override def dependencies = List(prev) @@ -146,6 +146,7 @@ DStream[(Long, Array[Byte])](prev.ssc){ } val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) } +*/ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index b24109074e816..d9d5446b62e9f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -620,10 +620,7 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } - - - - +//TODO move pyprint to PythonDStream /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. @@ -644,6 +641,7 @@ abstract class DStream[T: ClassTag] ( tempFileStream.close() // This value has to be passed from python + // Python currently does not do cluster deployment. But what happened val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???