From 3d3782236850b0b9ca2558c52bfe0535881757b0 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 11 Aug 2014 03:41:24 -0700 Subject: [PATCH] fixed PEP-008 violation --- python/pyspark/streaming/context.py | 5 ---- python/pyspark/streaming/dstream.py | 19 +++++++++------ python/pyspark/streaming_tests.py | 37 +++++++++++++++-------------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index be142fd4f327b..088a4965b6b13 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,12 +19,7 @@ from signal import signal, SIGTERM, SIGINT from tempfile import NamedTemporaryFile -from pyspark.conf import SparkConf -from pyspark.files import SparkFiles -from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer -from pyspark.storagelevel import * -from pyspark.rdd import RDD from pyspark.context import SparkContext from pyspark.streaming.dstream import DStream diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ec66af3177826..07429f477d310 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -49,7 +49,7 @@ def _sum(self): def print_(self, label=None): """ - Since print is reserved name for python, we cannot define a print method function. + Since print is reserved name for python, we cannot define a "print" method function. This function prints serialized data in RDD in DStream because Scala and Java cannot deserialized pickled python object. Please use DStream.pyprint() instead to print results. @@ -159,8 +159,8 @@ def partitionBy(self, numPartitions, partitionFunc=None): # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. outputSerializer = self.ctx._unbatched_serializer - def add_shuffle_key(split, iterator): + def add_shuffle_key(split, iterator): buckets = defaultdict(list) for (k, v) in iterator: @@ -218,6 +218,11 @@ def getNumPartitions(self): def foreachRDD(self, func): """ + Apply userdefined function to all RDD in a DStream. + This python implementation could be expensive because it uses callback server + in order to apply function to RDD in DStream. + This is an output operator, so this DStream will be registered as an output + stream and there materialized. """ from utils import RDDFunction wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) @@ -227,7 +232,6 @@ def pyprint(self): """ Print the first ten elements of each RDD generated in this DStream. This is an output operator, so this DStream will be registered as an output stream and there materialized. - """ def takeAndPrint(rdd, time): taken = rdd.take(11) @@ -248,14 +252,15 @@ def takeAndPrint(rdd, time): # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW - def _test_output(self, buff): + def _test_output(self, result): """ - This function is only for testcase. - Store data in dstream to buffer to valify the result in tesecase + This function is only for test case. + Store data in a DStream to result to verify the result in tese case """ def get_output(rdd, time): taken = rdd.collect() - buff.append(taken) + result.append(taken) + self.foreachRDD(get_output) diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index d2e638a7d2acc..ef9b87756fcef 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -23,18 +23,10 @@ to focusing to streaming test case """ -from fileinput import input -from glob import glob from itertools import chain import os -import re -import shutil -import subprocess -import sys -import tempfile import time import unittest -import zipfile import operator from pyspark.context import SparkContext @@ -44,12 +36,14 @@ SPARK_HOME = os.environ["SPARK_HOME"] + class StreamOutput: """ a class to store the output from stream """ result = list() + class PySparkStreamingTestCase(unittest.TestCase): def setUp(self): class_name = self.__class__.__name__ @@ -69,6 +63,7 @@ def tearDownClass(cls): time.sleep(5) SparkContext._gateway._shutdown_callback_server() + class TestBasicOperationsSuite(PySparkStreamingTestCase): """ Input and output of this TestBasicOperationsSuite is the equivalent to @@ -77,7 +72,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase): def setUp(self): PySparkStreamingTestCase.setUp(self) StreamOutput.result = list() - self.timeout = 10 # seconds + self.timeout = 10 # seconds def tearDown(self): PySparkStreamingTestCase.tearDown(self) @@ -88,7 +83,8 @@ def tearDownClass(cls): def test_map(self): """Basic operation test for DStream.map""" - test_input = [range(1,5), range(5,9), range(9, 13)] + test_input = [range(1, 5), range(5, 9), range(9, 13)] + def test_func(dstream): return dstream.map(lambda x: str(x)) expected_output = map(lambda x: map(lambda y: str(y), x), test_input) @@ -97,17 +93,19 @@ def test_func(dstream): def test_flatMap(self): """Basic operation test for DStream.faltMap""" - test_input = [range(1,5), range(5,9), range(9, 13)] + test_input = [range(1, 5), range(5, 9), range(9, 13)] + def test_func(dstream): return dstream.flatMap(lambda x: (x, x * 2)) expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))), - test_input) + test_input) output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) def test_filter(self): """Basic operation test for DStream.filter""" - test_input = [range(1,5), range(5,9), range(9, 13)] + test_input = [range(1, 5), range(5, 9), range(9, 13)] + def test_func(dstream): return dstream.filter(lambda x: x % 2 == 0) expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input) @@ -116,7 +114,8 @@ def test_func(dstream): def test_count(self): """Basic operation test for DStream.count""" - test_input = [[], [1], range(1, 3), range(1,4), range(1,5)] + test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)] + def test_func(dstream): return dstream.count() expected_output = map(lambda x: [len(x)], test_input) @@ -125,7 +124,8 @@ def test_func(dstream): def test_reduce(self): """Basic operation test for DStream.reduce""" - test_input = [range(1,5), range(5,9), range(9, 13)] + test_input = [range(1, 5), range(5, 9), range(9, 13)] + def test_func(dstream): return dstream.reduce(operator.add) expected_output = map(lambda x: [reduce(operator.add, x)], test_input) @@ -135,9 +135,10 @@ def test_func(dstream): def test_reduceByKey(self): """Basic operation test for DStream.reduceByKey""" test_input = [["a", "a", "b"], ["", ""], []] + def test_func(dstream): return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) - expected_output = [[("a", 2), ("b", 1)],[("", 2)], []] + expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) @@ -145,9 +146,9 @@ def _run_stream(self, test_input, test_func, expected_output): """Start stream and return the output""" # Generate input stream with user-defined input test_input_stream = self.ssc._testInputStream(test_input) - # Applyed test function to stream + # Applied test function to stream test_stream = test_func(test_input_stream) - # Add job to get outpuf from stream + # Add job to get output from stream test_stream._test_output(StreamOutput.result) self.ssc.start()