Skip to content

Commit

Permalink
improve testcases
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 19, 2014
1 parent 58150f5 commit 09a28bf
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def setUp(self):

def tearDown(self):
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
# we do not wait to shutdown call back server and py4j client
# we do not wait to shutdown py4j client.
self.ssc._jssc.stop()
self.ssc._sc.stop()
# Why does it long time to terminate StremaingContext and SparkContext?
Expand Down Expand Up @@ -74,7 +74,6 @@ def setUp(self):
PySparkStreamingTestCase.setUp(self)
self.timeout = 10 # seconds
self.numInputPartitions = 2
self.result = list()

def tearDown(self):
PySparkStreamingTestCase.tearDown(self)
Expand Down Expand Up @@ -426,7 +425,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
# Apply test function to stream.
test_stream = test_func(test_input_stream)
# Add job to get output from stream.
test_stream._test_output(self.result)
result = list()
test_stream._test_output(result)
self.ssc.start()

start_time = time.time()
Expand All @@ -438,10 +438,10 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
break
self.ssc.awaitTermination(50)
# Check if the output is the same length of expexted output.
if len(expected_output) == len(self.result):
if len(expected_output) == len(result):
break

return self.result
return result

class TestSaveAsFilesSuite(PySparkStreamingTestCase):
def setUp(self):
Expand Down

0 comments on commit 09a28bf

Please sign in to comment.