Skip to content

Commit

Permalink
Add CeleryKubernetesExecutor (#10901)
Browse files Browse the repository at this point in the history
it consists of CeleryExecutor and KubernetesExecutor, which allows users
to route their tasks to either Kubernetes or Celery based on the queue
defined on a task
  • Loading branch information
pingzh authored Sep 15, 2020
1 parent 83ed6bd commit 9616518
Show file tree
Hide file tree
Showing 10 changed files with 505 additions and 12 deletions.
18 changes: 16 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
- name: executor
description: |
The executor class that airflow should use. Choices include
SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
or the full import path to the class when using a custom executor.
SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor,
CeleryKubernetesExecutor or the full import path to the class when using a custom executor.
version_added: ~
type: string
example: ~
Expand Down Expand Up @@ -1175,6 +1175,20 @@
type: string
example: ~
default: ""
- name: celery_kubernetes_executor
description: |
This section only applies if you are using the CeleryKubernetesExecutor in
``[core]`` section above
options:
- name: kubernetes_queue
description: |
Define when to send a task to KubernetesExecutor when using CeleryKuebernetesExecutor.
When the queue of a task is kubernetes_queue, the task is executed via KubernetesExecutor,
otherwise via CeleryExecutor
version_added: ~
type: string
example: ~
default: "kubernetes"
- name: celery
description: |
This section only applies if you are using the CeleryExecutor in
Expand Down
13 changes: 11 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ hostname_callable = socket.getfqdn
default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
# or the full import path to the class when using a custom executor.
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor,
# CeleryKubernetesExecutor or the full import path to the class when using a custom executor.
executor = SequentialExecutor

# The SqlAlchemy connection string to the metadata database.
Expand Down Expand Up @@ -577,6 +577,15 @@ smtp_mail_from = [email protected]
sentry_on = false
sentry_dsn =

[celery_kubernetes_executor]

# This section only applies if you are using the CeleryKubernetesExecutor in
# ``[core]`` section above
# Define when to send a task to KubernetesExecutor when using CeleryKuebernetesExecutor.
# When the queue of a task is kubernetes_queue, the task is executed via KubernetesExecutor,
# otherwise via CeleryExecutor
kubernetes_queue = kubernetes

[celery]

# This section only applies if you are using the CeleryExecutor in
Expand Down
158 changes: 158 additions & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict, Optional, Set, Union

from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.utils.log.logging_mixin import LoggingMixin


class CeleryKubernetesExecutor(LoggingMixin):
"""
CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.
It chooses an executor to use based on the queue defined on the task.
When the queue is `kubernetes`, KubernetesExecutor is selected to run the task,
otherwise, CeleryExecutor is used.
"""

KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue')

def __init__(self, celery_executor, kubernetes_executor):
super().__init__()
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor

@property
def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
"""
Return queued tasks from celery and kubernetes executor
"""
queued_tasks = self.celery_executor.queued_tasks.copy()
queued_tasks.update(self.kubernetes_executor.queued_tasks)

return queued_tasks

@property
def running(self) -> Set[TaskInstanceKey]:
"""
Return running tasks from celery and kubernetes executor
"""
return self.celery_executor.running.union(self.kubernetes_executor.running)

def start(self) -> None:
"""Start celery and kubernetes executor"""
self.celery_executor.start()
self.kubernetes_executor.start()

def queue_command(self,
simple_task_instance: SimpleTaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None):
"""Queues command via celery or kubernetes executor"""
executor = self._router(simple_task_instance)
self.log.debug("Using executor: %s for %s",
executor.__class__.__name__, simple_task_instance.key
)
executor.queue_command(simple_task_instance, command, priority, queue)

def queue_task_instance(
self,
task_instance: TaskInstance,
mark_success: bool = False,
pickle_id: Optional[str] = None,
ignore_all_deps: bool = False,
ignore_depends_on_past: bool = False,
ignore_task_deps: bool = False,
ignore_ti_state: bool = False,
pool: Optional[str] = None,
cfg_path: Optional[str] = None) -> None:
"""Queues task instance via celery or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
self.log.debug("Using executor: %s to queue_task_instance for %s",
executor.__class__.__name__, task_instance.key
)
executor.queue_task_instance(
task_instance,
mark_success,
pickle_id,
ignore_all_deps,
ignore_depends_on_past,
ignore_task_deps,
ignore_ti_state,
pool,
cfg_path
)

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in either celery or kubernetes executor.
:param task_instance: TaskInstance
:return: True if the task is known to this executor
"""
return self.celery_executor.has_task(task_instance) \
or self.kubernetes_executor.has_task(task_instance)

def heartbeat(self) -> None:
"""
Heartbeat sent to trigger new jobs in celery and kubernetes executor
"""
self.celery_executor.heartbeat()
self.kubernetes_executor.heartbeat()

def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKey, EventBufferValueType]:
"""
Returns and flush the event buffer from celery and kubernetes executor
:param dag_ids: to dag_ids to return events for, if None returns all
:return: a dict of events
"""
cleared_events_from_celery = self.celery_executor.get_event_buffer(dag_ids)
cleared_events_from_kubernetes = self.kubernetes_executor.get_event_buffer(dag_ids)

return {**cleared_events_from_celery, **cleared_events_from_kubernetes}

def end(self) -> None:
"""
End celery and kubernetes executor
"""
self.celery_executor.end()
self.kubernetes_executor.end()

def terminate(self) -> None:
"""
Terminate celery and kubernetes executor
"""
self.celery_executor.terminate()
self.kubernetes_executor.terminate()

def _router(self, simple_task_instance: SimpleTaskInstance) -> Union[CeleryExecutor, KubernetesExecutor]:
"""
Return either celery_executor or kubernetes_executor
:param simple_task_instance: SimpleTaskInstance
:return: celery_executor or kubernetes_executor
:rtype: Union[CeleryExecutor, KubernetesExecutor]
"""
if simple_task_instance.queue == self.KUBERNETES_QUEUE:
return self.kubernetes_executor
return self.celery_executor
30 changes: 24 additions & 6 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""All executors."""
import logging
from contextlib import suppress
from typing import Optional, Type
from typing import Optional

from airflow.exceptions import AirflowConfigException
from airflow.executors.base_executor import BaseExecutor
Expand All @@ -34,6 +34,7 @@ class ExecutorLoader:
LOCAL_EXECUTOR = "LocalExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
DASK_EXECUTOR = "DaskExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
Expand All @@ -43,6 +44,7 @@ class ExecutorLoader:
LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor',
SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor',
CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor',
CELERY_KUBERNETES_EXECUTOR: 'airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor',
DASK_EXECUTOR: 'airflow.executors.dask_executor.DaskExecutor',
KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor.KubernetesExecutor',
DEBUG_EXECUTOR: 'airflow.executors.debug_executor.DebugExecutor'
Expand All @@ -57,23 +59,28 @@ def get_default_executor(cls) -> BaseExecutor:
from airflow.configuration import conf
executor_name = conf.get('core', 'EXECUTOR')

cls._default_executor = cls.load_executor(executor_name)()
cls._default_executor = cls.load_executor(executor_name)

return cls._default_executor

@classmethod
def load_executor(cls, executor_name: str) -> Type[BaseExecutor]:
def load_executor(cls, executor_name: str) -> BaseExecutor:
"""
Loads the executor.
This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path.
:return: an instance of executor class via executor_name
"""
if executor_name == cls.CELERY_KUBERNETES_EXECUTOR:
return cls.__load_celery_kubernetes_executor()

if executor_name in cls.executors:
log.debug("Loading core executor: %s", executor_name)
return import_string(cls.executors[executor_name])
return import_string(cls.executors[executor_name])()
# If the executor name looks like "plugin executor path" then try to load plugins.
if executor_name.count(".") == 1:
log.debug(
Expand All @@ -85,11 +92,11 @@ def load_executor(cls, executor_name: str) -> Type[BaseExecutor]:
# initialized yet
from airflow import plugins_manager
plugins_manager.integrate_executor_plugins()
return import_string(f"airflow.executors.{executor_name}")
return import_string(f"airflow.executors.{executor_name}")()

log.debug("Loading executor from custom path: %s", executor_name)
try:
executor = import_string(executor_name)
executor = import_string(executor_name)()
except ImportError as e:
log.error(e)
raise AirflowConfigException(
Expand All @@ -100,6 +107,17 @@ def load_executor(cls, executor_name: str) -> Type[BaseExecutor]:

return executor

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
"""
:return: an instance of CeleryKubernetesExecutor
"""
celery_executor = import_string(cls.executors[cls.CELERY_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[cls.KUBERNETES_EXECUTOR])()

celery_kubernetes_executor_cls = import_string(cls.executors[cls.CELERY_KUBERNETES_EXECUTOR])
return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)


UNPICKLEABLE_EXECUTORS = (
ExecutorLoader.LOCAL_EXECUTOR,
Expand Down
46 changes: 46 additions & 0 deletions docs/executor/celery_kubernetes.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _executor:CeleryKubernetesExecutor:

CeleryKubernetes Executor
=========================

The :class:`~airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor` allows users
to run simultaneously ``CeleryExecutor`` and a ``KubernetesExecutor``.
An executor is chosen to run a task based on the task's queue.

``CeleryKubernetesExecutor`` inherits the scalability of ``CeleryExecutor`` to
handle the high load at the peak time and runtime isolation of ``KubernetesExecutor``.


When to use CeleryKubernetesExecutor
####################################

``CeleryKubernetesExecutor`` should only be used at certain cases, given that
it requires setting up ``CeleryExecutor`` and ``KubernetesExecutor``.

We recommend considering ``CeleryKubernetesExecutor`` when your use case meets:

1. The number of tasks needed to be scheduled at the peak exceeds the scale that your kubernetes cluster
can comfortably handle

2. A relative small portion of your tasks requires runtime isolation.

3. You have plenty of small tasks that can be executed on Celery workers
but you also have resource-hungry tasks that will be better to run in predefined environments.
1 change: 1 addition & 0 deletions docs/executor/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ Supported Backends
dask
celery
kubernetes
celery_kubernetes
Loading

0 comments on commit 9616518

Please sign in to comment.