From 6d8190a5167762bea58969fc2e675bb46496dacc Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 18 Aug 2014 00:30:17 -0700 Subject: [PATCH] add comments --- python/pyspark/java_gateway.py | 5 ++--- python/pyspark/streaming/context.py | 13 ++++++----- python/pyspark/streaming/dstream.py | 24 ++++++++++++++++++++ python/pyspark/streaming_tests.py | 34 ++++++++++++++++++++++------- 4 files changed, 59 insertions(+), 17 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9fd59be1456ef..8a4adada9011d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -84,15 +84,14 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") - java_import(gateway.jvm, "org.apache.spark.streaming.*") + java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this? java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") - java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") + java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this? java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") java_import(gateway.jvm, "scala.Tuple2") - return gateway diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 470ed270cdbfb..e380626aa080b 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -64,7 +64,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, pyFiles=pyFiles, environment=environment, batchSize=batchSize, serializer=serializer, conf=conf, gateway=gateway) - # Start py4j callback server + # Start py4j callback server. + # Callback sever is need only by SparkStreming; therefore the callback sever + # is started in StreamingContext. SparkContext._gateway.restart_callback_server() self._clean_up_trigger() self._jvm = self._sc._jvm @@ -78,6 +80,8 @@ def _clean_up_trigger(self): """Kill py4j callback server properly using signal lib""" def clean_up_handler(*args): + # Make sure stop callback server. + # This need improvement how to terminate callback sever properly. SparkContext._gateway._shutdown_callback_server() SparkContext._gateway.shutdown() sys.exit(0) @@ -100,7 +104,7 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination(timeout) - # start from simple one. storageLevel is not passed for now. + #TODO: add storageLevel def socketTextStream(self, hostname, port): """ Create an input from TCP source hostname:port. Data is received using @@ -134,7 +138,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ This function is only for test. - This implementation is inpired by QueStream implementation. + This implementation is inspired by QueStream implementation. Give list of RDD to generate DStream which contains the RDD. """ test_rdds = list() @@ -144,9 +148,6 @@ def _testInputStream(self, test_inputs, numSlices=None): test_rdds.append(test_rdd._jrdd) test_rdd_deserializers.append(test_rdd._jrdd_deserializer) -# if len(set(test_rdd_deserializers)) > 1: -# raise IOError("Deserializer should be one type to run test case. " -# "See the SparkContext.parallelize to understand how to decide deserializer") jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ef0e2258e9922..8ed50d3dd2531 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -331,6 +331,17 @@ def checkpoint(self, interval): return self def groupByKey(self, numPartitions=None): + """ + Return a new DStream which contains group the values for each key in the + DStream into a single sequence. + Hash-partitions the resulting RDD with into numPartitions partitions in + the DStream. + + Note: If you are grouping in order to perform an aggregation (such as a + sum or average) over each key, using reduceByKey will provide much + better performance. + + """ def createCombiner(x): return [x] @@ -346,6 +357,10 @@ def mergeCombiners(a, b): numPartitions).mapValues(lambda x: ResultIterable(x)) def countByValue(self): + """ + Return new DStream which contains the count of each unique value in this + DStreeam as a (value, count) pairs. + """ def countPartition(iterator): counts = defaultdict(int) for obj in iterator: @@ -360,6 +375,9 @@ def mergeMaps(m1, m2): return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items()) def saveAsTextFiles(self, prefix, suffix=None): + """ + Save this DStream as a text file, using string representations of elements. + """ def saveAsTextFile(rdd, time): path = rddToFileName(prefix, suffix, time) @@ -368,6 +386,11 @@ def saveAsTextFile(rdd, time): return self.foreachRDD(saveAsTextFile) def saveAsPickledFiles(self, prefix, suffix=None): + """ + Save this DStream as a SequenceFile of serialized objects. The serializer + used is L{pyspark.serializers.PickleSerializer}, default batch size + is 10. + """ def saveAsTextFile(rdd, time): path = rddToFileName(prefix, suffix, time) @@ -397,6 +420,7 @@ def saveAsTextFile(rdd, time): # TODO: implement leftOuterJoin # TODO: implemtnt rightOuterJoin + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 2bb01ed3a0642..ef308fdd6aa59 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -18,12 +18,11 @@ """ Unit tests for PySpark; additional tests are implemented as doctests in individual modules. -Other option is separate this test case with other tests. -This makes sense becuase streaming tests takes long time due to waiting time -for stoping callback server. -This file will merged to tests.py. But for now, this file is separated due -to focusing to streaming test case +This file would be merged to tests.py after all functions are ready. +But for now, this file is separated due to focusing to streaming test case. + +Callback server seems like unstable sometimes, which cause error in test case. """ from itertools import chain @@ -43,10 +42,10 @@ def setUp(self): def tearDown(self): # Do not call pyspark.streaming.context.StreamingContext.stop directly because - # we do not wait to shutdowncall back server and py4j client + # we do not wait to shutdown call back server and py4j client self.ssc._jssc.stop() self.ssc._sc.stop() - # Why does it long time to terminaete StremaingContext and SparkContext? + # Why does it long time to terminate StremaingContext and SparkContext? # Should we change the sleep time if this depends on machine spec? time.sleep(10) @@ -68,7 +67,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase): I am wondering if these test are enough or not. All tests input should have list of lists. This represents stream. Every batch interval, the first object of list are chosen to make DStream. - Please see the BasicTestSuits in Scala or QueStream which is close to this implementation. + Please see the BasicTestSuits in Scala which is close to this implementation. """ def setUp(self): PySparkStreamingTestCase.setUp(self) @@ -358,5 +357,24 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): return self.result +class TestSaveAsFilesSuite(PySparkStreamingTestCase): + def setUp(self): + PySparkStreamingTestCase.setUp(self) + self.timeout = 10 # seconds + self.numInputPartitions = 2 + self.result = list() + + def tearDown(self): + PySparkStreamingTestCase.tearDown(self) + + @classmethod + def tearDownClass(cls): + PySparkStreamingTestCase.tearDownClass() + + + + + + if __name__ == "__main__": unittest.main()