Skip to content

Commit

Permalink
[SPARK-42944][FOLLOWUP][SS][CONNECT] Reenable ApplyInPandasWithState …
Browse files Browse the repository at this point in the history
…tests

### What changes were proposed in this pull request?

The tests for ApplyInPandasWithState was skipped in connect before. This was because the tests uses foreachBatch, which was not ready when the development is done. So they were skipped. This PR reenables them.

### Why are the changes needed?

Necessary tests

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test only addition.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46853 from WweiL/apply-in-pandas-with-state-test.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
WweiL authored and HyukjinKwon committed Jun 7, 2024
1 parent cf3051b commit 9de0a2e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,7 @@
class GroupedApplyInPandasWithStateTests(
GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase
):
@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic(self):
super().test_apply_in_pandas_with_state_basic()

@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic_no_state(self):
super().test_apply_in_pandas_with_state_basic()

@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic_no_state_no_data(self):
super().test_apply_in_pandas_with_state_basic()

@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic_more_data(self):
super().test_apply_in_pandas_with_state_basic()

@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic_fewer_data(self):
super().test_apply_in_pandas_with_state_basic()

@unittest.skip("foreachBatch will be supported in SPARK-42944.")
def test_apply_in_pandas_with_state_basic_with_null(self):
super().test_apply_in_pandas_with_state_basic()
pass


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def prepare_test_resource():
self.assertEqual(q.name, "this_query")
self.assertTrue(q.isActive)
q.processAllAvailable()
self.assertTrue(q.exception() is None)

def test_apply_in_pandas_with_state_basic(self):
def func(key, pdf_iter, state):
Expand All @@ -109,10 +110,10 @@ def func(key, pdf_iter, state):
yield pd.DataFrame({"key": [key[0]], "countAsString": [str(total_len)]})

def check_results(batch_df, _):
self.assertEqual(
set(batch_df.sort("key").collect()),
{Row(key="hello", countAsString="1"), Row(key="this", countAsString="1")},
)
assert set(batch_df.sort("key").collect()) == {
Row(key="hello", countAsString="1"),
Row(key="this", countAsString="1"),
}

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand All @@ -123,14 +124,11 @@ def func(key, pdf_iter, state):
yield pd.DataFrame({"key": [key[0], "foo"], "countAsString": ["100", "222"]})

def check_results(batch_df, _):
self.assertEqual(
set(batch_df.sort("key").collect()),
{
Row(key="hello", countAsString="100"),
Row(key="this", countAsString="100"),
Row(key="foo", countAsString="222"),
},
)
assert set(batch_df.sort("key").collect()) == {
Row(key="hello", countAsString="100"),
Row(key="this", countAsString="100"),
Row(key="foo", countAsString="222"),
}

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand All @@ -141,7 +139,7 @@ def func(key, pdf_iter, state):
yield pd.DataFrame({"key": [], "countAsString": []})

def check_results(batch_df, _):
self.assertTrue(len(set(batch_df.sort("key").collect())) == 0)
assert len(set(batch_df.sort("key").collect())) == 0

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand All @@ -156,16 +154,13 @@ def func(key, pdf_iter, state):
)

def check_results(batch_df, _):
self.assertEqual(
set(batch_df.sort("key").collect()),
{
Row(key="hello", countAsString="1"),
Row(key="foo", countAsString="666"),
Row(key="hello_2", countAsString="2"),
Row(key="this", countAsString="1"),
Row(key="this_2", countAsString="2"),
},
)
assert set(batch_df.sort("key").collect()) == {
Row(key="hello", countAsString="1"),
Row(key="foo", countAsString="666"),
Row(key="hello_2", countAsString="2"),
Row(key="this", countAsString="1"),
Row(key="this_2", countAsString="2"),
}

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand All @@ -177,7 +172,7 @@ def func(key, pdf_iter, state):
yield pd.DataFrame({"key": [], "countAsString": []})

def check_results(batch_df, _):
self.assertTrue(len(set(batch_df.sort("key").collect())) == 0)
assert len(set(batch_df.sort("key").collect())) == 0

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand All @@ -194,10 +189,7 @@ def func(key, pdf_iter, state):
yield pd.DataFrame({"key": [None], "countAsString": [str(total_len)]})

def check_results(batch_df, _):
self.assertEqual(
set(batch_df.sort("key").collect()),
{Row(key=None, countAsString="1")},
)
assert set(batch_df.sort("key").collect()) == {Row(key=None, countAsString="1")}

self._test_apply_in_pandas_with_state_basic(func, check_results)

Expand Down

0 comments on commit 9de0a2e

Please sign in to comment.