Skip to content

Commit

Permalink
[AIP-51] Executors vending CLI commands (apache#29055)
Browse files Browse the repository at this point in the history
* Base implementation of executor vending cli commands

Update the cli_parser construction (and associated tests) to now get cli
methods from the executor modules.

* Move existing Celery and Kubernetes cli commands to their module

Move the existing cli commands out of the base/core cli module to the
respective executor modules
  • Loading branch information
o-nikolas authored Jul 29, 2023
1 parent 8940453 commit fcbbf47
Show file tree
Hide file tree
Showing 17 changed files with 370 additions and 173 deletions.
151 changes: 0 additions & 151 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import json
import os
import textwrap
from argparse import ArgumentError
from typing import Callable, Iterable, NamedTuple, Union

import lazy_object_proxy

from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
Expand Down Expand Up @@ -61,46 +58,6 @@ class DefaultHelpParser(argparse.ArgumentParser):

def _check_value(self, action, value):
"""Override _check_value and check conditionally added command."""
if action.dest == "subcommand" and value == "celery":
executor = conf.get("core", "EXECUTOR")
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
classes = ()
try:
from airflow.providers.celery.executors.celery_executor import CeleryExecutor

classes += (CeleryExecutor,)
except ImportError:
message = (
"The celery subcommand requires that you pip install the celery module. "
"To do it, run: pip install 'apache-airflow[celery]'"
)
raise ArgumentError(action, message)
try:
from airflow.providers.celery.executors.celery_kubernetes_executor import (
CeleryKubernetesExecutor,
)

classes += (CeleryKubernetesExecutor,)
except ImportError:
pass
if not issubclass(executor_cls, classes):
message = (
f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and "
f"executors derived from them, your current executor: {executor}, subclassed from: "
f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
)
raise ArgumentError(action, message)
if action.dest == "subcommand" and value == "kubernetes":
try:
import kubernetes.client # noqa: F401
except ImportError:
message = (
"The kubernetes subcommand requires that you pip install the kubernetes python client. "
"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
)
raise ArgumentError(action, message)

if action.choices is not None and value not in action.choices:
check_legacy_command(action, value)

Expand Down Expand Up @@ -823,25 +780,6 @@ def string_lower_type(val):
action="store_true",
)

ARG_QUEUES = Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get("operators", "DEFAULT_QUEUE"),
)
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.getint("celery", "worker_concurrency"),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
help="Set the hostname of celery worker if you have multiple workers on a single machine",
)
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
default=False,
Expand All @@ -855,34 +793,6 @@ def string_lower_type(val):
action="store_true",
)

# flower
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("celery", "FLOWER_HOST"),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.getint("celery", "FLOWER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"),
default=conf.get("celery", "FLOWER_URL_PREFIX"),
help="URL prefix for Flower",
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get("celery", "FLOWER_BASIC_AUTH"),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
"Example: flower_basic_auth = user1:password1,user2:password2"
),
)
ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
ARG_POST_MORTEM = Arg(
("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
Expand Down Expand Up @@ -1978,55 +1888,6 @@ class GroupCommand(NamedTuple):
),
)

CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
ARG_CELERY_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_UMASK,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_AUTOSCALE,
ARG_SKIP_SERVE_LOGS,
ARG_WITHOUT_MINGLE,
ARG_WITHOUT_GOSSIP,
ARG_VERBOSE,
),
),
ActionCommand(
name="flower",
help="Start a Celery Flower",
func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
ARG_FLOWER_CONF,
ARG_FLOWER_URL_PREFIX,
ARG_FLOWER_BASIC_AUTH,
ARG_BROKER_API,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_VERBOSE,
),
),
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)

CONFIG_COMMANDS = (
ActionCommand(
name="get-value",
Expand Down Expand Up @@ -2109,9 +1970,6 @@ class GroupCommand(NamedTuple):
help="Manage DAGs",
subcommands=DAGS_COMMANDS,
),
GroupCommand(
name="kubernetes", help="Tools to help run the KubernetesExecutor", subcommands=KUBERNETES_COMMANDS
),
GroupCommand(
name="tasks",
help="Manage tasks",
Expand Down Expand Up @@ -2298,15 +2156,6 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.plugins_command.dump_plugins"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
GroupCommand(
name="celery",
help="Celery components",
description=(
"Start celery components. Works only when using CeleryExecutor. For more information, see "
"https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html"
),
subcommands=CELERY_COMMANDS,
),
ActionCommand(
name="standalone",
help="Run an all-in-one copy of Airflow",
Expand Down
18 changes: 18 additions & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from __future__ import annotations

import argparse
import logging
from argparse import Action
from functools import lru_cache
from typing import Iterable
Expand All @@ -41,10 +42,27 @@
core_commands,
)
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import partition

airflow_commands = core_commands

log = logging.getLogger(__name__)
try:
executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
airflow_commands.extend(executor.get_cli_commands())
except Exception:
executor_name = ExecutorLoader.get_default_executor_name()
log.exception("Failed to load CLI commands from executor: %s", executor_name)
log.error(
"Ensure all dependencies are met and try again. If using a Celery based executor install "
"a 3.3.0+ version of the Celery provider. If using a Kubernetes executor, install a "
"7.4.0+ version of the CNCF provider"
)
# Do no re-raise the exception since we want the CLI to still function for
# other commands.


ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands}


Expand Down
10 changes: 10 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import pendulum

from airflow.cli.cli_config import GroupCommand
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.stats import Stats
Expand Down Expand Up @@ -479,3 +480,12 @@ def send_callback(self, request: CallbackRequest) -> None:
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""Vends CLI commands to be included in Airflow CLI.
Override this method to expose commands via Airflow CLI to manage this executor. This can
be commands to setup/teardown the executor, inspect state, etc.
"""
return []
16 changes: 12 additions & 4 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,24 @@ def load_executor(cls, executor_name: str) -> BaseExecutor:
return executor_cls()

@classmethod
def import_executor_cls(cls, executor_name: str) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_executor_cls(
cls, executor_name: str, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the executor class.
Supports the same formats as ExecutorLoader.load_executor.
:param executor_name: Name of core executor or module path to provider provided as a plugin.
:param validate: Whether or not to validate the executor before returning
:return: executor class via executor_name and executor import source
"""

def _import_and_validate(path: str) -> type[BaseExecutor]:
executor = import_string(path)
cls.validate_database_executor_compatibility(executor)
if validate:
cls.validate_database_executor_compatibility(executor)
return executor

if executor_name in cls.executors:
Expand All @@ -151,14 +157,16 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH

@classmethod
def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the default executor class.
:param validate: Whether or not to validate the executor before returning
:return: executor class and executor import source
"""
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name)
executor, source = cls.import_executor_cls(executor_name, validate=validate)
return executor, source

@classmethod
Expand Down
Loading

0 comments on commit fcbbf47

Please sign in to comment.