diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 25b24da39eff51..275903fd8984d7 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -53,8 +53,8 @@ - name: executor description: | The executor class that airflow should use. Choices include - SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor - or the full import path to the class when using a custom executor. + SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor, + CeleryKubernetesExecutor or the full import path to the class when using a custom executor. version_added: ~ type: string example: ~ @@ -1175,6 +1175,20 @@ type: string example: ~ default: "" +- name: celery_kubernetes_executor + description: | + This section only applies if you are using the CeleryKubernetesExecutor in + ``[core]`` section above + options: + - name: kubernetes_queue + description: | + Define when to send a task to KubernetesExecutor when using CeleryKuebernetesExecutor. + When the queue of a task is kubernetes_queue, the task is executed via KubernetesExecutor, + otherwise via CeleryExecutor + version_added: ~ + type: string + example: ~ + default: "kubernetes" - name: celery description: | This section only applies if you are using the CeleryExecutor in diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 24ba5a1ba763c7..68284d3eb0adde 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -48,8 +48,8 @@ hostname_callable = socket.getfqdn default_timezone = utc # The executor class that airflow should use. Choices include -# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor -# or the full import path to the class when using a custom executor. +# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor, +# CeleryKubernetesExecutor or the full import path to the class when using a custom executor. executor = SequentialExecutor # The SqlAlchemy connection string to the metadata database. @@ -577,6 +577,15 @@ smtp_mail_from = airflow@example.com sentry_on = false sentry_dsn = +[celery_kubernetes_executor] + +# This section only applies if you are using the CeleryKubernetesExecutor in +# ``[core]`` section above +# Define when to send a task to KubernetesExecutor when using CeleryKuebernetesExecutor. +# When the queue of a task is kubernetes_queue, the task is executed via KubernetesExecutor, +# otherwise via CeleryExecutor +kubernetes_queue = kubernetes + [celery] # This section only applies if you are using the CeleryExecutor in diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py new file mode 100644 index 00000000000000..353474b377bfab --- /dev/null +++ b/airflow/executors/celery_kubernetes_executor.py @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Dict, Optional, Set, Union + +from airflow.configuration import conf +from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType +from airflow.executors.celery_executor import CeleryExecutor +from airflow.executors.kubernetes_executor import KubernetesExecutor +from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CeleryKubernetesExecutor(LoggingMixin): + """ + CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor. + It chooses an executor to use based on the queue defined on the task. + When the queue is `kubernetes`, KubernetesExecutor is selected to run the task, + otherwise, CeleryExecutor is used. + """ + + KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue') + + def __init__(self, celery_executor, kubernetes_executor): + super().__init__() + self.celery_executor = celery_executor + self.kubernetes_executor = kubernetes_executor + + @property + def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]: + """ + Return queued tasks from celery and kubernetes executor + """ + queued_tasks = self.celery_executor.queued_tasks.copy() + queued_tasks.update(self.kubernetes_executor.queued_tasks) + + return queued_tasks + + @property + def running(self) -> Set[TaskInstanceKey]: + """ + Return running tasks from celery and kubernetes executor + """ + return self.celery_executor.running.union(self.kubernetes_executor.running) + + def start(self) -> None: + """Start celery and kubernetes executor""" + self.celery_executor.start() + self.kubernetes_executor.start() + + def queue_command(self, + simple_task_instance: SimpleTaskInstance, + command: CommandType, + priority: int = 1, + queue: Optional[str] = None): + """Queues command via celery or kubernetes executor""" + executor = self._router(simple_task_instance) + self.log.debug("Using executor: %s for %s", + executor.__class__.__name__, simple_task_instance.key + ) + executor.queue_command(simple_task_instance, command, priority, queue) + + def queue_task_instance( + self, + task_instance: TaskInstance, + mark_success: bool = False, + pickle_id: Optional[str] = None, + ignore_all_deps: bool = False, + ignore_depends_on_past: bool = False, + ignore_task_deps: bool = False, + ignore_ti_state: bool = False, + pool: Optional[str] = None, + cfg_path: Optional[str] = None) -> None: + """Queues task instance via celery or kubernetes executor""" + executor = self._router(SimpleTaskInstance(task_instance)) + self.log.debug("Using executor: %s to queue_task_instance for %s", + executor.__class__.__name__, task_instance.key + ) + executor.queue_task_instance( + task_instance, + mark_success, + pickle_id, + ignore_all_deps, + ignore_depends_on_past, + ignore_task_deps, + ignore_ti_state, + pool, + cfg_path + ) + + def has_task(self, task_instance: TaskInstance) -> bool: + """ + Checks if a task is either queued or running in either celery or kubernetes executor. + + :param task_instance: TaskInstance + :return: True if the task is known to this executor + """ + return self.celery_executor.has_task(task_instance) \ + or self.kubernetes_executor.has_task(task_instance) + + def heartbeat(self) -> None: + """ + Heartbeat sent to trigger new jobs in celery and kubernetes executor + """ + self.celery_executor.heartbeat() + self.kubernetes_executor.heartbeat() + + def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKey, EventBufferValueType]: + """ + Returns and flush the event buffer from celery and kubernetes executor + + :param dag_ids: to dag_ids to return events for, if None returns all + :return: a dict of events + """ + cleared_events_from_celery = self.celery_executor.get_event_buffer(dag_ids) + cleared_events_from_kubernetes = self.kubernetes_executor.get_event_buffer(dag_ids) + + return {**cleared_events_from_celery, **cleared_events_from_kubernetes} + + def end(self) -> None: + """ + End celery and kubernetes executor + """ + self.celery_executor.end() + self.kubernetes_executor.end() + + def terminate(self) -> None: + """ + Terminate celery and kubernetes executor + """ + self.celery_executor.terminate() + self.kubernetes_executor.terminate() + + def _router(self, simple_task_instance: SimpleTaskInstance) -> Union[CeleryExecutor, KubernetesExecutor]: + """ + Return either celery_executor or kubernetes_executor + + :param simple_task_instance: SimpleTaskInstance + :return: celery_executor or kubernetes_executor + :rtype: Union[CeleryExecutor, KubernetesExecutor] + """ + if simple_task_instance.queue == self.KUBERNETES_QUEUE: + return self.kubernetes_executor + return self.celery_executor diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index db49cbde71805a..d5a8a51ff8d69c 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -17,7 +17,7 @@ """All executors.""" import logging from contextlib import suppress -from typing import Optional, Type +from typing import Optional from airflow.exceptions import AirflowConfigException from airflow.executors.base_executor import BaseExecutor @@ -34,6 +34,7 @@ class ExecutorLoader: LOCAL_EXECUTOR = "LocalExecutor" SEQUENTIAL_EXECUTOR = "SequentialExecutor" CELERY_EXECUTOR = "CeleryExecutor" + CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor" DASK_EXECUTOR = "DaskExecutor" KUBERNETES_EXECUTOR = "KubernetesExecutor" DEBUG_EXECUTOR = "DebugExecutor" @@ -43,6 +44,7 @@ class ExecutorLoader: LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor', SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor', CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor', + CELERY_KUBERNETES_EXECUTOR: 'airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor', DASK_EXECUTOR: 'airflow.executors.dask_executor.DaskExecutor', KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor.KubernetesExecutor', DEBUG_EXECUTOR: 'airflow.executors.debug_executor.DebugExecutor' @@ -57,12 +59,12 @@ def get_default_executor(cls) -> BaseExecutor: from airflow.configuration import conf executor_name = conf.get('core', 'EXECUTOR') - cls._default_executor = cls.load_executor(executor_name)() + cls._default_executor = cls.load_executor(executor_name) return cls._default_executor @classmethod - def load_executor(cls, executor_name: str) -> Type[BaseExecutor]: + def load_executor(cls, executor_name: str) -> BaseExecutor: """ Loads the executor. @@ -70,10 +72,15 @@ def load_executor(cls, executor_name: str) -> Type[BaseExecutor]: * by executor name for core executor * by ``{plugin_name}.{class_name}`` for executor from plugins * by import path. + + :return: an instance of executor class via executor_name """ + if executor_name == cls.CELERY_KUBERNETES_EXECUTOR: + return cls.__load_celery_kubernetes_executor() + if executor_name in cls.executors: log.debug("Loading core executor: %s", executor_name) - return import_string(cls.executors[executor_name]) + return import_string(cls.executors[executor_name])() # If the executor name looks like "plugin executor path" then try to load plugins. if executor_name.count(".") == 1: log.debug( @@ -85,11 +92,11 @@ def load_executor(cls, executor_name: str) -> Type[BaseExecutor]: # initialized yet from airflow import plugins_manager plugins_manager.integrate_executor_plugins() - return import_string(f"airflow.executors.{executor_name}") + return import_string(f"airflow.executors.{executor_name}")() log.debug("Loading executor from custom path: %s", executor_name) try: - executor = import_string(executor_name) + executor = import_string(executor_name)() except ImportError as e: log.error(e) raise AirflowConfigException( @@ -100,6 +107,17 @@ def load_executor(cls, executor_name: str) -> Type[BaseExecutor]: return executor + @classmethod + def __load_celery_kubernetes_executor(cls) -> BaseExecutor: + """ + :return: an instance of CeleryKubernetesExecutor + """ + celery_executor = import_string(cls.executors[cls.CELERY_EXECUTOR])() + kubernetes_executor = import_string(cls.executors[cls.KUBERNETES_EXECUTOR])() + + celery_kubernetes_executor_cls = import_string(cls.executors[cls.CELERY_KUBERNETES_EXECUTOR]) + return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor) + UNPICKLEABLE_EXECUTORS = ( ExecutorLoader.LOCAL_EXECUTOR, diff --git a/docs/executor/celery_kubernetes.rst b/docs/executor/celery_kubernetes.rst new file mode 100644 index 00000000000000..76fdf3c8750396 --- /dev/null +++ b/docs/executor/celery_kubernetes.rst @@ -0,0 +1,46 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _executor:CeleryKubernetesExecutor: + +CeleryKubernetes Executor +========================= + +The :class:`~airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor` allows users +to run simultaneously ``CeleryExecutor`` and a ``KubernetesExecutor``. +An executor is chosen to run a task based on the task's queue. + +``CeleryKubernetesExecutor`` inherits the scalability of ``CeleryExecutor`` to +handle the high load at the peak time and runtime isolation of ``KubernetesExecutor``. + + +When to use CeleryKubernetesExecutor +#################################### + +``CeleryKubernetesExecutor`` should only be used at certain cases, given that +it requires setting up ``CeleryExecutor`` and ``KubernetesExecutor``. + +We recommend considering ``CeleryKubernetesExecutor`` when your use case meets: + +1. The number of tasks needed to be scheduled at the peak exceeds the scale that your kubernetes cluster + can comfortably handle + +2. A relative small portion of your tasks requires runtime isolation. + +3. You have plenty of small tasks that can be executed on Celery workers + but you also have resource-hungry tasks that will be better to run in predefined environments. diff --git a/docs/executor/index.rst b/docs/executor/index.rst index bde64dd8fb54a6..6dcf71ac234d57 100644 --- a/docs/executor/index.rst +++ b/docs/executor/index.rst @@ -48,3 +48,4 @@ Supported Backends dask celery kubernetes + celery_kubernetes diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py new file mode 100644 index 00000000000000..31b73366690207 --- /dev/null +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -0,0 +1,242 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import mock + +from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor + + +class TestCeleryKubernetesExecutor: + def test_queued_tasks(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + celery_queued_tasks = {('dag_id', 'task_id', '2020-08-30', 1): 'queued_command'} + k8s_queued_tasks = {('dag_id_2', 'task_id_2', '2020-08-30', 2): 'queued_command'} + + celery_executor_mock.queued_tasks = celery_queued_tasks + k8s_executor_mock.queued_tasks = k8s_queued_tasks + + expected_queued_tasks = {**celery_queued_tasks, **k8s_queued_tasks} + assert cke.queued_tasks == expected_queued_tasks + + def test_running(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + celery_running_tasks = {('dag_id', 'task_id', '2020-08-30', 1)} + k8s_running_tasks = {('dag_id_2', 'task_id_2', '2020-08-30', 2)} + + celery_executor_mock.running = celery_running_tasks + k8s_executor_mock.running = k8s_running_tasks + + assert cke.running == celery_running_tasks.union(k8s_running_tasks) + + def test_start(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + cke.start() + + celery_executor_mock.start.assert_called() + k8s_executor_mock.start.assert_called() + + def test_queue_command(self): + command = ['airflow', 'run', 'dag'] + priority = 1 + queue = 'default' + + def when_using_k8s_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + simple_task_instance = mock.MagicMock() + simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE + + cke.queue_command(simple_task_instance, command, priority, queue) + + k8s_executor_mock.queue_command.assert_called_once_with( + simple_task_instance, command, priority, queue) + celery_executor_mock.queue_command.assert_not_called() + + def when_using_celery_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + simple_task_instance = mock.MagicMock() + simple_task_instance.queue = 'non-kubernetes-queue' + + cke.queue_command(simple_task_instance, command, priority, queue) + + celery_executor_mock.queue_command.assert_called_once_with( + simple_task_instance, command, priority, queue) + k8s_executor_mock.queue_command.assert_not_called() + + when_using_k8s_executor() + when_using_celery_executor() + + def test_queue_task_instance(self): + mark_success = False + pickle_id = None + ignore_all_deps = False + ignore_depends_on_past = False + ignore_task_deps = False + ignore_ti_state = False + pool = None + cfg_path = None + + def when_using_k8s_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + ti = mock.MagicMock() + ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE + + cke.queue_task_instance( + ti, + mark_success, + pickle_id, + ignore_all_deps, + ignore_depends_on_past, + ignore_task_deps, + ignore_ti_state, + pool, + cfg_path + ) + + k8s_executor_mock.queue_task_instance.assert_called_once_with( + ti, + mark_success, + pickle_id, + ignore_all_deps, + ignore_depends_on_past, + ignore_task_deps, + ignore_ti_state, + pool, + cfg_path + ) + celery_executor_mock.queue_task_instance.assert_not_called() + + def when_using_celery_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + ti = mock.MagicMock() + ti.queue = 'non-kubernetes-queue' + + cke.queue_task_instance( + ti, + mark_success, + pickle_id, + ignore_all_deps, + ignore_depends_on_past, + ignore_task_deps, + ignore_ti_state, + pool, + cfg_path + ) + + k8s_executor_mock.queue_task_instance.assert_not_called() + celery_executor_mock.queue_task_instance.assert_called_once_with( + ti, + mark_success, + pickle_id, + ignore_all_deps, + ignore_depends_on_past, + ignore_task_deps, + ignore_ti_state, + pool, + cfg_path + ) + + when_using_k8s_executor() + when_using_celery_executor() + + def test_has_tasks(self): + ti = mock.MagicMock + + def when_ti_in_k8s_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + celery_executor_mock.has_task.return_value = False + k8s_executor_mock.has_task.return_value = True + + assert cke.has_task(ti) + celery_executor_mock.has_task.assert_called_once_with(ti) + k8s_executor_mock.has_task.assert_called_once_with(ti) + + def when_ti_in_celery_executor(): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + celery_executor_mock.has_task.return_value = True + + assert cke.has_task(ti) + celery_executor_mock.has_task.assert_called_once_with(ti) + + when_ti_in_k8s_executor() + when_ti_in_celery_executor() + + def test_get_event_buffer(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + dag_ids = ['dag_ids'] + + events_in_celery = {('dag_id', 'task_id', '2020-08-30', 1): ('failed', 'failed task')} + events_in_k8s = {('dag_id_2', 'task_id_2', '2020-08-30', 1): ('success', None)} + + celery_executor_mock.get_event_buffer.return_value = events_in_celery + k8s_executor_mock.get_event_buffer.return_value = events_in_k8s + + events = cke.get_event_buffer(dag_ids) + + assert events == {**events_in_celery, **events_in_k8s} + + celery_executor_mock.get_event_buffer.assert_called_once_with(dag_ids) + k8s_executor_mock.get_event_buffer.assert_called_once_with(dag_ids) + + def test_end(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + cke.end() + + celery_executor_mock.end.assert_called_once() + k8s_executor_mock.end.assert_called_once() + + def test_terminate(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + cke.terminate() + + celery_executor_mock.terminate.assert_called_once() + k8s_executor_mock.terminate.assert_called_once() diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index 1a644b9f957ce5..8c0a4e8d73d71a 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -46,8 +46,11 @@ def tearDown(self) -> None: ExecutorLoader._default_executor = None @parameterized.expand([ - ("LocalExecutor", ), + ("CeleryExecutor", ), + ("CeleryKubernetesExecutor", ), ("DebugExecutor", ), + ("KubernetesExecutor", ), + ("LocalExecutor", ), ]) def test_should_support_executor_from_core(self, executor_name): with conf_vars({ diff --git a/tests/test_config_templates.py b/tests/test_config_templates.py index 7b513d9e3d1352..374e8c2a364c18 100644 --- a/tests/test_config_templates.py +++ b/tests/test_config_templates.py @@ -39,6 +39,7 @@ 'email', 'smtp', 'sentry', + 'celery_kubernetes_executor', 'celery', 'celery_broker_transport_options', 'dask', diff --git a/tests/utils/perf/scheduler_dag_execution_timing.py b/tests/utils/perf/scheduler_dag_execution_timing.py index 6756366a13020e..8b9b979357e44b 100755 --- a/tests/utils/perf/scheduler_dag_execution_timing.py +++ b/tests/utils/perf/scheduler_dag_execution_timing.py @@ -107,9 +107,10 @@ def get_executor_under_test(dotted_path): else: executor = ExecutorLoader.load_executor(dotted_path) + executor_cls = type(executor) # Change this to try other executors - class ShortCircuitExecutor(ShortCircuitExecutorMixin, executor): + class ShortCircuitExecutor(ShortCircuitExecutorMixin, executor_cls): """ Placeholder class that implements the inheritance hierarchy """