Skip to content

Commit

Permalink
D401 Support - airflow/callbacks thru airflow/decorators (#33335)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Aug 12, 2023
1 parent b657ae9 commit 396fd3c
Show file tree
Hide file tree
Showing 37 changed files with 145 additions and 138 deletions.
2 changes: 1 addition & 1 deletion airflow/callbacks/base_callback_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ class BaseCallbackSink:
"""Base class for Callbacks Sinks."""

def send(self, callback: CallbackRequest) -> None:
"""Sends callback for execution."""
"""Send callback for execution."""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion airflow/callbacks/database_callback_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ class DatabaseCallbackSink(BaseCallbackSink):

@provide_session
def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
"""Sends callback for execution."""
"""Send callback for execution."""
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
session.add(db_callback)
2 changes: 1 addition & 1 deletion airflow/callbacks/pipe_callback_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, get_sink_pipe: Callable[[], MultiprocessingConnection]):

def send(self, callback: CallbackRequest):
"""
Sends information about the callback to be executed by Pipe.
Send information about the callback to be executed by Pipe.
:param callback: Callback request to be executed.
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ def _check(value):


def string_list_type(val):
"""Parses comma-separated list and returns list of string (strips whitespace)."""
"""Parse comma-separated list and returns list of string (strips whitespace)."""
return [x.strip() for x in val.split(",")]


def string_lower_type(val):
"""Lowers arg."""
"""Lower arg."""
if not val:
return
return val.strip().lower()
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def add_argument(self, action: Action) -> None:

@lru_cache(maxsize=None)
def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
"""Creates and returns command line argument parser."""
"""Create and returns command line argument parser."""
parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
subparsers.required = True
Expand Down
8 changes: 4 additions & 4 deletions airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@cli_utils.action_cli
@providers_configuration_loaded
def flower(args):
"""Starts Flower, Celery monitoring tool."""
"""Start Flower, Celery monitoring tool."""
# This needs to be imported locally to not trigger Providers Manager initialization
from airflow.providers.celery.executors.celery_executor import app as celery_app

Expand Down Expand Up @@ -94,7 +94,7 @@ def flower(args):

@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Starts serve_logs sub-process."""
"""Start serve_logs sub-process."""
sub_proc = None
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
Expand Down Expand Up @@ -137,7 +137,7 @@ def filter(self, record):
@cli_utils.action_cli
@providers_configuration_loaded
def worker(args):
"""Starts Airflow Celery worker."""
"""Start Airflow Celery worker."""
# This needs to be imported locally to not trigger Providers Manager initialization
from airflow.providers.celery.executors.celery_executor import app as celery_app

Expand Down Expand Up @@ -245,7 +245,7 @@ def worker(args):
@cli_utils.action_cli
@providers_configuration_loaded
def stop_worker(args):
"""Sends SIGTERM to Celery worker."""
"""Send SIGTERM to Celery worker."""
# Read PID from file
if args.pid:
pid_file_path = args.pid
Expand Down
12 changes: 6 additions & 6 deletions airflow/cli/commands/connection_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def connections_get(args):
@suppress_logs_and_warning
@providers_configuration_loaded
def connections_list(args):
"""Lists all connections at the command line."""
"""List all connections at the command line."""
with create_session() as session:
query = select(Connection)
if args.conn_id:
Expand Down Expand Up @@ -145,7 +145,7 @@ def _valid_uri(uri: str) -> bool:

@cache
def _get_connection_types() -> list[str]:
"""Returns connection types available."""
"""Return connection types available."""
_connection_types = []
providers_manager = ProvidersManager()
for connection_type, provider_info in providers_manager.hooks.items():
Expand All @@ -156,7 +156,7 @@ def _get_connection_types() -> list[str]:

@providers_configuration_loaded
def connections_export(args):
"""Exports all connections to a file."""
"""Export all connections to a file."""
file_formats = [".yaml", ".json", ".env"]
if args.format:
warnings.warn("Option `--format` is deprecated. Use `--file-format` instead.", DeprecationWarning)
Expand Down Expand Up @@ -205,7 +205,7 @@ def connections_export(args):
@cli_utils.action_cli
@providers_configuration_loaded
def connections_add(args):
"""Adds new connection."""
"""Add new connection."""
has_uri = bool(args.conn_uri)
has_json = bool(args.conn_json)
has_type = bool(args.conn_type)
Expand Down Expand Up @@ -297,7 +297,7 @@ def connections_add(args):
@cli_utils.action_cli
@providers_configuration_loaded
def connections_delete(args):
"""Deletes connection from DB."""
"""Delete connection from DB."""
with create_session() as session:
try:
to_delete = session.scalars(select(Connection).where(Connection.conn_id == args.conn_id)).one()
Expand All @@ -313,7 +313,7 @@ def connections_delete(args):
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def connections_import(args):
"""Imports connections from a file."""
"""Import connections from a file."""
if os.path.exists(args.file):
_import_helper(args.file, args.overwrite)
else:
Expand Down
26 changes: 13 additions & 13 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _run_dag_backfill(dags: list[DAG], args) -> None:
@cli_utils.action_cli
@providers_configuration_loaded
def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
"""Creates backfill job or dry run for a DAG or list of DAGs using regex."""
"""Create backfill job or dry run for a DAG or list of DAGs using regex."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
warnings.warn(
Expand Down Expand Up @@ -154,7 +154,7 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
@cli_utils.action_cli
@providers_configuration_loaded
def dag_trigger(args) -> None:
"""Creates a dag run for the specified dag."""
"""Create a dag run for the specified dag."""
api_client = get_current_api_client()
try:
message = api_client.trigger_dag(
Expand All @@ -175,7 +175,7 @@ def dag_trigger(args) -> None:
@cli_utils.action_cli
@providers_configuration_loaded
def dag_delete(args) -> None:
"""Deletes all DB records related to the specified dag."""
"""Delete all DB records related to the specified dag."""
api_client = get_current_api_client()
if (
args.yes
Expand Down Expand Up @@ -207,7 +207,7 @@ def dag_unpause(args) -> None:

@providers_configuration_loaded
def set_is_paused(is_paused: bool, args) -> None:
"""Sets is_paused for DAG by a given dag_id."""
"""Set is_paused for DAG by a given dag_id."""
dag = DagModel.get_dagmodel(args.dag_id)

if not dag:
Expand All @@ -220,7 +220,7 @@ def set_is_paused(is_paused: bool, args) -> None:

@providers_configuration_loaded
def dag_dependencies_show(args) -> None:
"""Displays DAG dependencies, save to file or show as imgcat image."""
"""Display DAG dependencies, save to file or show as imgcat image."""
dot = render_dag_dependencies(SerializedDagModel.get_dag_dependencies())
filename = args.save
imgcat = args.imgcat
Expand All @@ -240,7 +240,7 @@ def dag_dependencies_show(args) -> None:

@providers_configuration_loaded
def dag_show(args) -> None:
"""Displays DAG or saves it's graphic representation to the file."""
"""Display DAG or saves it's graphic representation to the file."""
dag = get_dag(args.subdir, args.dag_id)
dot = render_dag(dag)
filename = args.save
Expand Down Expand Up @@ -286,7 +286,7 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
@provide_session
def dag_state(args, session: Session = NEW_SESSION) -> None:
"""
Returns the state (and conf if exists) of a DagRun at the command line.
Return the state (and conf if exists) of a DagRun at the command line.
>>> airflow dags state tutorial 2015-01-01T00:00:00.000000
running
Expand All @@ -309,7 +309,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None:
@providers_configuration_loaded
def dag_next_execution(args) -> None:
"""
Returns the next execution datetime of a DAG at the command line.
Return the next execution datetime of a DAG at the command line.
>>> airflow dags next-execution tutorial
2018-08-31 10:38:00
Expand Down Expand Up @@ -348,7 +348,7 @@ def print_execution_interval(interval: DataInterval | None):
@suppress_logs_and_warning
@providers_configuration_loaded
def dag_list_dags(args) -> None:
"""Displays dags with or without stats at the command line."""
"""Display dags with or without stats at the command line."""
dagbag = DagBag(process_subdir(args.subdir))
if dagbag.import_errors:
from rich import print as rich_print
Expand Down Expand Up @@ -396,7 +396,7 @@ def dag_details(args, session=NEW_SESSION):
@suppress_logs_and_warning
@providers_configuration_loaded
def dag_list_import_errors(args) -> None:
"""Displays dags with import errors on the command line."""
"""Display dags with import errors on the command line."""
dagbag = DagBag(process_subdir(args.subdir))
data = []
for filename, errors in dagbag.import_errors.items():
Expand All @@ -411,7 +411,7 @@ def dag_list_import_errors(args) -> None:
@suppress_logs_and_warning
@providers_configuration_loaded
def dag_report(args) -> None:
"""Displays dagbag stats at the command line."""
"""Display dagbag stats at the command line."""
dagbag = DagBag(process_subdir(args.subdir))
AirflowConsole().print_as(
data=dagbag.dagbag_stats,
Expand All @@ -431,7 +431,7 @@ def dag_report(args) -> None:
@providers_configuration_loaded
@provide_session
def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
"""Lists latest n jobs."""
"""List latest n jobs."""
queries = []
if dag:
args.dag_id = dag.dag_id
Expand Down Expand Up @@ -462,7 +462,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION)
@providers_configuration_loaded
@provide_session
def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
"""Lists dag runs for a given DAG."""
"""List dag runs for a given DAG."""
if dag:
args.dag_id = dag.dag_id
else:
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Creates DagFileProcessorProcess instance."""
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)

Expand All @@ -56,7 +56,7 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
@cli_utils.action_cli
@providers_configuration_loaded
def dag_processor(args):
"""Starts Airflow Dag Processor Job."""
"""Start Airflow Dag Processor Job."""
if not conf.getboolean("scheduler", "standalone_dag_processor"):
raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.")

Expand Down
12 changes: 6 additions & 6 deletions airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

@providers_configuration_loaded
def initdb(args):
"""Initializes the metadata database."""
"""Initialize the metadata database."""
warnings.warn(
"`db init` is deprecated. Use `db migrate` instead to migrate the db and/or "
"airflow connections create-default-connections to create the default connections",
Expand All @@ -52,7 +52,7 @@ def initdb(args):

@providers_configuration_loaded
def resetdb(args):
"""Resets the metadata database."""
"""Reset the metadata database."""
print("DB: " + repr(settings.engine.url))
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
Expand Down Expand Up @@ -161,7 +161,7 @@ def downgrade(args):

@providers_configuration_loaded
def check_migrations(args):
"""Function to wait for all airflow migrations to complete. Used for launching airflow in k8s."""
"""Wait for all airflow migrations to complete. Used for launching airflow in k8s."""
db.check_migrations(timeout=args.migration_wait_timeout)


Expand Down Expand Up @@ -212,7 +212,7 @@ def shell(args):
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def check(args):
"""Runs a check command that checks if db is available."""
"""Run a check command that checks if db is available."""
retries: int = args.retry
retry_delay: int = args.retry_delay

Expand Down Expand Up @@ -251,7 +251,7 @@ def cleanup_tables(args):
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def export_archived(args):
"""Exports archived records from metadata database."""
"""Export archived records from metadata database."""
export_archived_records(
export_format=args.export_format,
output_path=args.output_path,
Expand All @@ -264,7 +264,7 @@ def export_archived(args):
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def drop_archived(args):
"""Drops archived tables from metadata database."""
"""Drop archived tables from metadata database."""
drop_archived_tables(
table_names=args.tables,
needs_confirm=not args.yes,
Expand Down
6 changes: 3 additions & 3 deletions airflow/cli/commands/info_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _get_version(cmd: list[str], grep: bytes | None = None):

@staticmethod
def _task_logging_handler():
"""Returns task logging handler."""
"""Return task logging handler."""

def get_fullname(o):
module = o.__class__.__module__
Expand Down Expand Up @@ -314,7 +314,7 @@ def _providers_info(self):
return [(p.data["package-name"], p.version) for p in ProvidersManager().providers.values()]

def show(self, output: str, console: AirflowConsole | None = None) -> None:
"""Shows information about Airflow instance."""
"""Show information about Airflow instance."""
all_info = {
"Apache Airflow": self._airflow_info,
"System info": self._system_info,
Expand All @@ -336,7 +336,7 @@ def show(self, output: str, console: AirflowConsole | None = None) -> None:
)

def render_text(self, output: str) -> str:
"""Exports the info to string."""
"""Export the info to string."""
console = AirflowConsole(record=True)
with console.capture():
self.show(output=output, console=console)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
@cli_utils.action_cli
@providers_configuration_loaded
def internal_api(args):
"""Starts Airflow Internal API."""
"""Start Airflow Internal API."""
print(settings.HEADER)

access_logfile = args.access_logfile or "-"
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/jobs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@providers_configuration_loaded
@provide_session
def check(args, session: Session = NEW_SESSION) -> None:
"""Checks if job(s) are still alive."""
"""Check if job(s) are still alive."""
if args.allow_multiple and not args.limit > 1:
raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.")
if args.hostname and args.local:
Expand Down
8 changes: 6 additions & 2 deletions airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@cli_utils.action_cli
@providers_configuration_loaded
def generate_pod_yaml(args):
"""Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor."""
"""Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor."""
execution_date = args.execution_date
dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
yaml_output_path = args.output_path
Expand Down Expand Up @@ -151,7 +151,11 @@ def cleanup_pods(args):


def _delete_pod(name, namespace):
"""Helper Function for cleanup_pods."""
"""
Delete a namespaced pod.
Helper Function for cleanup_pods.
"""
kube_client = get_kube_client()
delete_options = client.V1DeleteOptions()
print(f'Deleting POD "{name}" from "{namespace}" namespace')
Expand Down
Loading

0 comments on commit 396fd3c

Please sign in to comment.