Skip to content

Commit

Permalink
fixup! fixup! Add backward compatibility with old versions of Apache …
Browse files Browse the repository at this point in the history
…Beam
  • Loading branch information
mik-laj authored and potiuk committed Nov 7, 2022
1 parent 687ed1d commit 7e91f92
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions tests/providers/apache/beam/hooks/test_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@

class TestBeamHook(unittest.TestCase):
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline(self, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline(self, mock_check_output, mock_runner):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand All @@ -85,16 +86,13 @@ def test_start_python_pipeline(self, mock_runner):
)
wait_for_done.assert_called_once_with()

@mock.patch("subprocess.check_output", return_value=b"2.35.0")
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0")
def test_start_python_pipeline_unsupported_option(self, mock_check_output):
hook = BeamHook(runner=DEFAULT_RUNNER)

with pytest.raises(
AirflowException,
match=re.escape(
"The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer. "
"Current version: 2.35.0"
),
match=re.escape("The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."),
):
hook.start_python_pipeline(
variables={
Expand All @@ -117,7 +115,10 @@ def test_start_python_pipeline_unsupported_option(self, mock_check_output):
]
)
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_custom_interpreter(
self, _, py_interpreter, mock_check_output, mock_runner
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand Down Expand Up @@ -152,8 +153,14 @@ def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter,
)
@mock.patch(BEAM_STRING.format("prepare_virtualenv"))
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages(
self, current_py_requirements, current_py_system_site_packages, mock_runner, mock_virtualenv
self,
current_py_requirements,
current_py_system_site_packages,
mock_check_output,
mock_runner,
mock_virtualenv,
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
Expand Down Expand Up @@ -189,7 +196,10 @@ def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system
)

@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(self, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(
self, mock_check_output, mock_runner
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand Down

0 comments on commit 7e91f92

Please sign in to comment.