From af336b776c0586e1d8e2dbe4df2073ee8f7ca566 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 18 Aug 2014 00:30:17 -0700 Subject: [PATCH] add comments --- python/pyspark/java_gateway.py | 2 +- python/pyspark/streaming/context.py | 3 --- python/pyspark/streaming/dstream.py | 1 + 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2af917efc40a3..f3c6d231ab777 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -111,7 +111,7 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this? java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") - java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") + java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this? java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 2cc9d03a87e9e..3f455a3e06072 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -152,9 +152,6 @@ def _testInputStream(self, test_inputs, numSlices=None): test_rdds.append(test_rdd._jrdd) test_rdd_deserializers.append(test_rdd._jrdd_deserializer) -# if len(set(test_rdd_deserializers)) > 1: -# raise IOError("Deserializer should be one type to run test case. " -# "See the SparkContext.parallelize to understand how to decide deserializer") jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index bfe639e3ef791..66024d539ce5c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -425,6 +425,7 @@ def saveAsTextFile(rdd, time): # TODO: implement leftOuterJoin # TODO: implemtnt rightOuterJoin + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():