diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index c35d352c66ca5..7f6960faed1a0 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -332,7 +332,7 @@ def test_groupByKey_batch(self): """Basic operation test for DStream.groupByKey with batch deserializer.""" test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]] def test_func(dstream): - return dstream.map(lambda x: (x,1)).groupByKey() + return dstream.map(lambda x: (x, 1)).groupByKey() expected_output = [[(1, [1]), (2, [1]), (3, [1]), (4, [1])], [(1, [1, 1, 1]), (2, [1, 1]), (3, [1])], [("a", [1, 1]), ("b", [1]), ("", [1, 1, 1])]] @@ -345,8 +345,9 @@ def test_func(dstream): def test_groupByKey_unbatch(self): """Basic operation test for DStream.groupByKey with unbatch deserializer.""" test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]] + def test_func(dstream): - return dstream.map(lambda x: (x,1)).groupByKey() + return dstream.map(lambda x: (x, 1)).groupByKey() expected_output = [[(1, [1]), (2, [1]), (3, [1])], [(1, [1, 1]), ("", [1])], [("a", [1, 1]), ("b", [1])]] @@ -356,6 +357,36 @@ def test_func(dstream): self._sort_result_based_on_key(result) self.assertEqual(expected_output, output) + def test_combineByKey_batch(self): + """Basic operation test for DStream.combineByKey with batch deserializer.""" + test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]] + + def test_func(dstream): + def add(a, b): return a + str(b) + return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add) + expected_output = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")], + [(1, "111"), (2, "11"), (3, "1")], + [("a", "11"), ("b", "1"), ("", "111")]] + output = self._run_stream(test_input, test_func, expected_output) + for result in (output, expected_output): + self._sort_result_based_on_key(result) + self.assertEqual(expected_output, output) + + def test_combineByKey_unbatch(self): + """Basic operation test for DStream.combineByKey with unbatch deserializer.""" + test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]] + + def test_func(dstream): + def add(a, b): return a + str(b) + return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add) + expected_output = [[(1, "1"), (2, "1"), (3, "1")], + [(1, "11"), ("", "1")], + [("a", "11"), ("b", "1")]] + output = self._run_stream(test_input, test_func, expected_output) + for result in (output, expected_output): + self._sort_result_based_on_key(result) + self.assertEqual(expected_output, output) + def _convert_iter_value_to_list(self, outputs): """Return key value pair list. Value is converted to iterator to list.""" result = list()