Skip to content

Commit

Permalink
Simplify CeleryKubernetesExecutor tests (#13307)
Browse files Browse the repository at this point in the history
* Simplify CeleryKubernetesExecutor tests

Co-authored-by: Daniel Standish <[email protected]>
  • Loading branch information
dstandish and Daniel Standish authored Dec 31, 2020
1 parent dcedb81 commit 10be375
Showing 1 changed file with 98 additions and 167 deletions.
265 changes: 98 additions & 167 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
# under the License.
from unittest import mock

from parameterized import parameterized

from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor

KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE


class TestCeleryKubernetesExecutor:
Expand Down Expand Up @@ -58,182 +64,107 @@ def test_start(self):
celery_executor_mock.start.assert_called()
k8s_executor_mock.start.assert_called()

def test_queue_command(self):
command = ['airflow', 'run', 'dag']
priority = 1
queue = 'default'

def when_using_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE

cke.queue_command(simple_task_instance, command, priority, queue)

k8s_executor_mock.queue_command.assert_called_once_with(
simple_task_instance, command, priority, queue
)
celery_executor_mock.queue_command.assert_not_called()

def when_using_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = 'non-kubernetes-queue'

cke.queue_command(simple_task_instance, command, priority, queue)

celery_executor_mock.queue_command.assert_called_once_with(
simple_task_instance, command, priority, queue
)
k8s_executor_mock.queue_command.assert_not_called()

when_using_k8s_executor()
when_using_celery_executor()

def test_queue_task_instance(self):
mark_success = False
pickle_id = None
ignore_all_deps = False
ignore_depends_on_past = False
ignore_task_deps = False
ignore_ti_state = False
pool = None
cfg_path = None

def when_using_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
@parameterized.expand(
[
('any-other-queue',),
(KUBERNETES_QUEUE,),
]
)
@mock.patch.object(CeleryExecutor, 'queue_command')
@mock.patch.object(KubernetesExecutor, 'queue_command')
def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
kwargs = dict(
command=['airflow', 'run', 'dag'],
priority=1,
queue='default',
)
kwarg_values = kwargs.values()
cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = test_queue

cke.queue_command(simple_task_instance, **kwargs)

if test_queue == KUBERNETES_QUEUE:
k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
celery_queue_cmd.assert_not_called()
else:
celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
k8s_queue_cmd.assert_not_called()

@parameterized.expand(
[
('any-other-queue',),
(KUBERNETES_QUEUE,),
]
)
def test_queue_task_instance(self, test_queue):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

ti = mock.MagicMock()
ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE

cke.queue_task_instance(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

k8s_executor_mock.queue_task_instance.assert_called_once_with(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)
ti = mock.MagicMock()
ti.queue = test_queue

kwargs = dict(
task_instance=ti,
mark_success=False,
pickle_id=None,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=None,
cfg_path=None,
)
kwarg_values = kwargs.values()
cke.queue_task_instance(**kwargs)
if test_queue == KUBERNETES_QUEUE:
k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
celery_executor_mock.queue_task_instance.assert_not_called()

def when_using_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

ti = mock.MagicMock()
ti.queue = 'non-kubernetes-queue'

cke.queue_task_instance(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

else:
celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
k8s_executor_mock.queue_task_instance.assert_not_called()
celery_executor_mock.queue_task_instance.assert_called_once_with(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

when_using_k8s_executor()
when_using_celery_executor()

def test_has_tasks(self):
ti = mock.MagicMock

def when_ti_in_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.has_task.return_value = False
k8s_executor_mock.has_task.return_value = True

assert cke.has_task(ti)
celery_executor_mock.has_task.assert_called_once_with(ti)
k8s_executor_mock.has_task.assert_called_once_with(ti)

def when_ti_in_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.has_task.return_value = True

assert cke.has_task(ti)
celery_executor_mock.has_task.assert_called_once_with(ti)

when_ti_in_k8s_executor()
when_ti_in_celery_executor()

def test_adopt_tasks(self):
ti = mock.MagicMock

def when_ti_in_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
ti.queue = "kubernetes"
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.try_adopt_task_instances.return_value = []
k8s_executor_mock.try_adopt_task_instances.return_value = []
@parameterized.expand(
[
(True, True, True),
(False, True, True),
(True, False, True),
(False, False, False),
]
)
def test_has_tasks(self, celery_has, k8s_has, cke_has):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

cke.try_adopt_task_instances([ti])
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
celery_executor_mock.has_task.return_value = celery_has
k8s_executor_mock.has_task.return_value = k8s_has
ti = mock.MagicMock()
assert cke.has_task(ti) == cke_has
celery_executor_mock.has_task.assert_called_once_with(ti)
if not celery_has:
k8s_executor_mock.has_task.assert_called_once_with(ti)

def when_ti_in_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
ti.queue = "default"
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
@parameterized.expand([(1, 0), (0, 1), (2, 1)])
def test_adopt_tasks(self, num_k8s, num_celery):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()

celery_executor_mock.try_adopt_task_instances.return_value = []
k8s_executor_mock.try_adopt_task_instances.return_value = []
def mock_ti(queue):
ti = mock.MagicMock()
ti.queue = queue
return ti

cke.try_adopt_task_instances([ti])
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
celery_tis = [mock_ti('default') for _ in range(num_celery)]
k8s_tis = [mock_ti(KUBERNETES_QUEUE) for _ in range(num_k8s)]

when_ti_in_k8s_executor()
when_ti_in_celery_executor()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
cke.try_adopt_task_instances(celery_tis + k8s_tis)
celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis)
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis)

def test_get_event_buffer(self):
celery_executor_mock = mock.MagicMock()
Expand Down

0 comments on commit 10be375

Please sign in to comment.