Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify CeleryKubernetesExecutor tests #13307

Merged
merged 3 commits into from
Dec 31, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 98 additions & 167 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
# under the License.
from unittest import mock

from parameterized import parameterized

from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor

KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE


class TestCeleryKubernetesExecutor:
Expand Down Expand Up @@ -58,182 +64,107 @@ def test_start(self):
celery_executor_mock.start.assert_called()
k8s_executor_mock.start.assert_called()

def test_queue_command(self):
command = ['airflow', 'run', 'dag']
priority = 1
queue = 'default'

def when_using_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE

cke.queue_command(simple_task_instance, command, priority, queue)

k8s_executor_mock.queue_command.assert_called_once_with(
simple_task_instance, command, priority, queue
)
celery_executor_mock.queue_command.assert_not_called()

def when_using_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = 'non-kubernetes-queue'

cke.queue_command(simple_task_instance, command, priority, queue)

celery_executor_mock.queue_command.assert_called_once_with(
simple_task_instance, command, priority, queue
)
k8s_executor_mock.queue_command.assert_not_called()

when_using_k8s_executor()
when_using_celery_executor()

def test_queue_task_instance(self):
mark_success = False
pickle_id = None
ignore_all_deps = False
ignore_depends_on_past = False
ignore_task_deps = False
ignore_ti_state = False
pool = None
cfg_path = None

def when_using_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
@parameterized.expand(
[
('any-other-queue',),
(KUBERNETES_QUEUE,),
]
)
@mock.patch.object(CeleryExecutor, 'queue_command')
@mock.patch.object(KubernetesExecutor, 'queue_command')
def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
kwargs = dict(
command=['airflow', 'run', 'dag'],
priority=1,
queue='default',
)
kwarg_values = kwargs.values()
cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())

simple_task_instance = mock.MagicMock()
simple_task_instance.queue = test_queue

cke.queue_command(simple_task_instance, **kwargs)

if test_queue == KUBERNETES_QUEUE:
k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
celery_queue_cmd.assert_not_called()
else:
celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
k8s_queue_cmd.assert_not_called()

@parameterized.expand(
[
('any-other-queue',),
(KUBERNETES_QUEUE,),
]
)
def test_queue_task_instance(self, test_queue):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

ti = mock.MagicMock()
ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE

cke.queue_task_instance(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

k8s_executor_mock.queue_task_instance.assert_called_once_with(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)
ti = mock.MagicMock()
ti.queue = test_queue

kwargs = dict(
task_instance=ti,
mark_success=False,
pickle_id=None,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=None,
cfg_path=None,
)
kwarg_values = kwargs.values()
cke.queue_task_instance(**kwargs)
if test_queue == KUBERNETES_QUEUE:
k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
celery_executor_mock.queue_task_instance.assert_not_called()

def when_using_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

ti = mock.MagicMock()
ti.queue = 'non-kubernetes-queue'

cke.queue_task_instance(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

else:
celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
k8s_executor_mock.queue_task_instance.assert_not_called()
celery_executor_mock.queue_task_instance.assert_called_once_with(
ti,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path,
)

when_using_k8s_executor()
when_using_celery_executor()

def test_has_tasks(self):
ti = mock.MagicMock

def when_ti_in_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.has_task.return_value = False
k8s_executor_mock.has_task.return_value = True

assert cke.has_task(ti)
celery_executor_mock.has_task.assert_called_once_with(ti)
k8s_executor_mock.has_task.assert_called_once_with(ti)

def when_ti_in_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.has_task.return_value = True

assert cke.has_task(ti)
celery_executor_mock.has_task.assert_called_once_with(ti)

when_ti_in_k8s_executor()
when_ti_in_celery_executor()

def test_adopt_tasks(self):
ti = mock.MagicMock

def when_ti_in_k8s_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
ti.queue = "kubernetes"
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

celery_executor_mock.try_adopt_task_instances.return_value = []
k8s_executor_mock.try_adopt_task_instances.return_value = []
@parameterized.expand(
[
(True, True, True),
(False, True, True),
(True, False, True),
(False, False, False),
]
)
def test_has_tasks(self, celery_has, k8s_has, cke_has):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

cke.try_adopt_task_instances([ti])
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
celery_executor_mock.has_task.return_value = celery_has
k8s_executor_mock.has_task.return_value = k8s_has
ti = mock.MagicMock()
assert cke.has_task(ti) == cke_has
celery_executor_mock.has_task.assert_called_once_with(ti)
if not celery_has:
k8s_executor_mock.has_task.assert_called_once_with(ti)

def when_ti_in_celery_executor():
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
ti.queue = "default"
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
@parameterized.expand([(1, 0), (0, 1), (2, 1)])
def test_adopt_tasks(self, num_k8s, num_celery):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()

celery_executor_mock.try_adopt_task_instances.return_value = []
k8s_executor_mock.try_adopt_task_instances.return_value = []
def mock_ti(queue):
ti = mock.MagicMock()
ti.queue = queue
return ti

cke.try_adopt_task_instances([ti])
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
celery_tis = [mock_ti('default') for _ in range(num_celery)]
k8s_tis = [mock_ti(KUBERNETES_QUEUE) for _ in range(num_k8s)]

when_ti_in_k8s_executor()
when_ti_in_celery_executor()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
cke.try_adopt_task_instances(celery_tis + k8s_tis)
celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis)
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis)

def test_get_event_buffer(self):
celery_executor_mock = mock.MagicMock()
Expand Down