From 7e91f92530ecf8875484357e700007cf70c0b147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Wed, 2 Nov 2022 12:41:39 +0100 Subject: [PATCH] fixup! fixup! Add backward compatibility with old versions of Apache Beam --- .../providers/apache/beam/hooks/test_beam.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py index 9daca1a6ccf893..bc9e8187f008b0 100644 --- a/tests/providers/apache/beam/hooks/test_beam.py +++ b/tests/providers/apache/beam/hooks/test_beam.py @@ -60,7 +60,8 @@ class TestBeamHook(unittest.TestCase): @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline(self, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline(self, mock_check_output, mock_runner): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() @@ -85,16 +86,13 @@ def test_start_python_pipeline(self, mock_runner): ) wait_for_done.assert_called_once_with() - @mock.patch("subprocess.check_output", return_value=b"2.35.0") + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0") def test_start_python_pipeline_unsupported_option(self, mock_check_output): hook = BeamHook(runner=DEFAULT_RUNNER) with pytest.raises( AirflowException, - match=re.escape( - "The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer. " - "Current version: 2.35.0" - ), + match=re.escape("The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."), ): hook.start_python_pipeline( variables={ @@ -117,7 +115,10 @@ def test_start_python_pipeline_unsupported_option(self, mock_check_output): ] ) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline_with_custom_interpreter( + self, _, py_interpreter, mock_check_output, mock_runner + ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() @@ -152,8 +153,14 @@ def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, ) @mock.patch(BEAM_STRING.format("prepare_virtualenv")) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages( - self, current_py_requirements, current_py_system_site_packages, mock_runner, mock_virtualenv + self, + current_py_requirements, + current_py_system_site_packages, + mock_check_output, + mock_runner, + mock_virtualenv, ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done @@ -189,7 +196,10 @@ def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system ) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(self, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages( + self, mock_check_output, mock_runner + ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock()