From d3de115b92e1d71850901e6069be462731849875 Mon Sep 17 00:00:00 2001 From: Daniel Standish Date: Thu, 24 Dec 2020 10:22:09 -0800 Subject: [PATCH 1/3] Simplify CeleryKubernetesExecutor tests --- .../test_celery_kubernetes_executor.py | 262 +++++++----------- 1 file changed, 95 insertions(+), 167 deletions(-) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index cc8a958b2ed5a3..588785626f2952 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -17,7 +17,13 @@ # under the License. from unittest import mock +from parameterized import parameterized + +from airflow.executors.celery_executor import CeleryExecutor from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor +from airflow.executors.kubernetes_executor import KubernetesExecutor + +KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE class TestCeleryKubernetesExecutor: @@ -58,182 +64,104 @@ def test_start(self): celery_executor_mock.start.assert_called() k8s_executor_mock.start.assert_called() - def test_queue_command(self): - command = ['airflow', 'run', 'dag'] - priority = 1 - queue = 'default' - - def when_using_k8s_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - simple_task_instance = mock.MagicMock() - simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE - - cke.queue_command(simple_task_instance, command, priority, queue) - - k8s_executor_mock.queue_command.assert_called_once_with( - simple_task_instance, command, priority, queue - ) - celery_executor_mock.queue_command.assert_not_called() - - def when_using_celery_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - simple_task_instance = mock.MagicMock() - simple_task_instance.queue = 'non-kubernetes-queue' + @parameterized.expand( + [ + ('other-queue',), + (KUBERNETES_QUEUE,), + ] + ) + @mock.patch.object(CeleryExecutor, 'queue_command') + @mock.patch.object(KubernetesExecutor, 'queue_command') + def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd): + kwargs = dict( + command=['airflow', 'run', 'dag'], + priority=1, + queue='default', + ) + kwarg_values = kwargs.values() + cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor()) + + simple_task_instance = mock.MagicMock() + simple_task_instance.queue = test_queue + + cke.queue_command(simple_task_instance, **kwargs) + + if test_queue == KUBERNETES_QUEUE: + k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values) + celery_queue_cmd.assert_not_called() + else: + celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values) + k8s_queue_cmd.assert_not_called() + + @parameterized.expand( + [ + ('non-kubernetes-queue',), + (KUBERNETES_QUEUE,), + ] + ) + def test_queue_task_instance(self, test_queue): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - cke.queue_command(simple_task_instance, command, priority, queue) + ti = mock.MagicMock() + ti.queue = test_queue + + kwargs = dict( + task_instance=ti, + mark_success=False, + pickle_id=None, + ignore_all_deps=False, + ignore_depends_on_past=False, + ignore_task_deps=False, + ignore_ti_state=False, + pool=None, + cfg_path=None, + ) + kwarg_values = kwargs.values() + cke.queue_task_instance(**kwargs) + if test_queue == KUBERNETES_QUEUE: + k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values) + celery_executor_mock.queue_task_instance.assert_not_called() + else: + celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values) + k8s_executor_mock.queue_task_instance.assert_not_called() - celery_executor_mock.queue_command.assert_called_once_with( - simple_task_instance, command, priority, queue - ) - k8s_executor_mock.queue_command.assert_not_called() + @parameterized.expand( + [ + (True, True, True), + (False, True, True), + (True, False, True), + (False, False, False), + ] + ) + def test_has_tasks(self, celery_has, k8s_has, cke_has): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - when_using_k8s_executor() - when_using_celery_executor() + celery_executor_mock.has_task.return_value = celery_has + k8s_executor_mock.has_task.return_value = k8s_has - def test_queue_task_instance(self): - mark_success = False - pickle_id = None - ignore_all_deps = False - ignore_depends_on_past = False - ignore_task_deps = False - ignore_ti_state = False - pool = None - cfg_path = None + assert cke.has_task(None) == cke_has - def when_using_k8s_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + @parameterized.expand([(1, 0), (0, 1), (2, 1)]) + def test_adopt_tasks(self, num_k8s, num_celery): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + def mock_ti(queue): ti = mock.MagicMock() - ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE - - cke.queue_task_instance( - ti, - mark_success, - pickle_id, - ignore_all_deps, - ignore_depends_on_past, - ignore_task_deps, - ignore_ti_state, - pool, - cfg_path, - ) - - k8s_executor_mock.queue_task_instance.assert_called_once_with( - ti, - mark_success, - pickle_id, - ignore_all_deps, - ignore_depends_on_past, - ignore_task_deps, - ignore_ti_state, - pool, - cfg_path, - ) - celery_executor_mock.queue_task_instance.assert_not_called() - - def when_using_celery_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + ti.queue = queue + return ti - ti = mock.MagicMock() - ti.queue = 'non-kubernetes-queue' - - cke.queue_task_instance( - ti, - mark_success, - pickle_id, - ignore_all_deps, - ignore_depends_on_past, - ignore_task_deps, - ignore_ti_state, - pool, - cfg_path, - ) + celery_tis = [mock_ti('default') for _ in range(num_celery)] + k8s_tis = [mock_ti(KUBERNETES_QUEUE) for _ in range(num_k8s)] - k8s_executor_mock.queue_task_instance.assert_not_called() - celery_executor_mock.queue_task_instance.assert_called_once_with( - ti, - mark_success, - pickle_id, - ignore_all_deps, - ignore_depends_on_past, - ignore_task_deps, - ignore_ti_state, - pool, - cfg_path, - ) - - when_using_k8s_executor() - when_using_celery_executor() - - def test_has_tasks(self): - ti = mock.MagicMock - - def when_ti_in_k8s_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - celery_executor_mock.has_task.return_value = False - k8s_executor_mock.has_task.return_value = True - - assert cke.has_task(ti) - celery_executor_mock.has_task.assert_called_once_with(ti) - k8s_executor_mock.has_task.assert_called_once_with(ti) - - def when_ti_in_celery_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - celery_executor_mock.has_task.return_value = True - - assert cke.has_task(ti) - celery_executor_mock.has_task.assert_called_once_with(ti) - - when_ti_in_k8s_executor() - when_ti_in_celery_executor() - - def test_adopt_tasks(self): - ti = mock.MagicMock - - def when_ti_in_k8s_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - ti.queue = "kubernetes" - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - celery_executor_mock.try_adopt_task_instances.return_value = [] - k8s_executor_mock.try_adopt_task_instances.return_value = [] - - cke.try_adopt_task_instances([ti]) - celery_executor_mock.try_adopt_task_instances.assert_called_once_with([]) - k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti]) - - def when_ti_in_celery_executor(): - celery_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - ti.queue = "default" - cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) - - celery_executor_mock.try_adopt_task_instances.return_value = [] - k8s_executor_mock.try_adopt_task_instances.return_value = [] - - cke.try_adopt_task_instances([ti]) - celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti]) - k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([]) - - when_ti_in_k8s_executor() - when_ti_in_celery_executor() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + cke.try_adopt_task_instances(celery_tis + k8s_tis) + celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis) + k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis) def test_get_event_buffer(self): celery_executor_mock = mock.MagicMock() From 7420599e09d3d1ee167cb6c523d7ac21112111eb Mon Sep 17 00:00:00 2001 From: Daniel Standish Date: Wed, 30 Dec 2020 13:29:47 -0800 Subject: [PATCH 2/3] !fixup --- tests/executors/test_celery_kubernetes_executor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index 588785626f2952..788c445bd0d1da 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -66,7 +66,7 @@ def test_start(self): @parameterized.expand( [ - ('other-queue',), + ('any-other-queue',), (KUBERNETES_QUEUE,), ] ) @@ -142,8 +142,11 @@ def test_has_tasks(self, celery_has, k8s_has, cke_has): celery_executor_mock.has_task.return_value = celery_has k8s_executor_mock.has_task.return_value = k8s_has - - assert cke.has_task(None) == cke_has + ti = mock.MagicMock() + assert cke.has_task(ti) == cke_has + celery_executor_mock.has_task.assert_called_once_with(ti) + if not celery_has: + k8s_executor_mock.has_task.assert_called_once_with(ti) @parameterized.expand([(1, 0), (0, 1), (2, 1)]) def test_adopt_tasks(self, num_k8s, num_celery): From 85bfc50eadfd055035396b13feaa051fd2467f1c Mon Sep 17 00:00:00 2001 From: Daniel Standish Date: Wed, 30 Dec 2020 15:21:27 -0800 Subject: [PATCH 3/3] !fixup --- tests/executors/test_celery_kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index 788c445bd0d1da..13dd0e91cca13e 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -95,7 +95,7 @@ def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd): @parameterized.expand( [ - ('non-kubernetes-queue',), + ('any-other-queue',), (KUBERNETES_QUEUE,), ] )