From 396fd3cae91df2d16eeb9d399784687e87e47035 Mon Sep 17 00:00:00 2001 From: "D. Ferruzzi" Date: Sat, 12 Aug 2023 02:32:43 -0700 Subject: [PATCH] D401 Support - airflow/callbacks thru airflow/decorators (#33335) --- airflow/callbacks/base_callback_sink.py | 2 +- airflow/callbacks/database_callback_sink.py | 2 +- airflow/callbacks/pipe_callback_sink.py | 2 +- airflow/cli/cli_config.py | 4 +-- airflow/cli/cli_parser.py | 2 +- airflow/cli/commands/celery_command.py | 8 +++--- airflow/cli/commands/connection_command.py | 12 ++++----- airflow/cli/commands/dag_command.py | 26 +++++++++---------- airflow/cli/commands/dag_processor_command.py | 4 +-- airflow/cli/commands/db_command.py | 12 ++++----- airflow/cli/commands/info_command.py | 6 ++--- airflow/cli/commands/internal_api_command.py | 2 +- airflow/cli/commands/jobs_command.py | 2 +- airflow/cli/commands/kubernetes_command.py | 8 ++++-- airflow/cli/commands/legacy_commands.py | 2 +- airflow/cli/commands/pool_command.py | 16 ++++++------ airflow/cli/commands/provider_command.py | 20 +++++++------- airflow/cli/commands/role_command.py | 12 ++++----- airflow/cli/commands/scheduler_command.py | 6 ++--- airflow/cli/commands/standalone_command.py | 17 ++++++------ airflow/cli/commands/sync_perm_command.py | 2 +- airflow/cli/commands/task_command.py | 18 ++++++------- airflow/cli/commands/triggerer_command.py | 4 +-- airflow/cli/commands/user_command.py | 12 ++++----- airflow/cli/commands/variable_command.py | 12 ++++----- airflow/cli/commands/version_command.py | 2 +- airflow/cli/commands/webserver_command.py | 10 +++---- airflow/cli/simple_table.py | 10 +++---- airflow/dag_processing/manager.py | 26 +++++++++---------- airflow/dag_processing/processor.py | 4 +-- airflow/decorators/base.py | 2 +- airflow/decorators/branch_python.py | 2 +- airflow/decorators/external_python.py | 3 ++- airflow/decorators/python.py | 3 ++- airflow/decorators/python_virtualenv.py | 3 ++- airflow/decorators/sensor.py | 2 +- airflow/decorators/short_circuit.py | 3 ++- 37 files changed, 145 insertions(+), 138 deletions(-) diff --git a/airflow/callbacks/base_callback_sink.py b/airflow/callbacks/base_callback_sink.py index c243f0fbd640f1..2f02ce054cf903 100644 --- a/airflow/callbacks/base_callback_sink.py +++ b/airflow/callbacks/base_callback_sink.py @@ -24,5 +24,5 @@ class BaseCallbackSink: """Base class for Callbacks Sinks.""" def send(self, callback: CallbackRequest) -> None: - """Sends callback for execution.""" + """Send callback for execution.""" raise NotImplementedError() diff --git a/airflow/callbacks/database_callback_sink.py b/airflow/callbacks/database_callback_sink.py index 24306170dfea23..0be8127f6f5562 100644 --- a/airflow/callbacks/database_callback_sink.py +++ b/airflow/callbacks/database_callback_sink.py @@ -30,6 +30,6 @@ class DatabaseCallbackSink(BaseCallbackSink): @provide_session def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None: - """Sends callback for execution.""" + """Send callback for execution.""" db_callback = DbCallbackRequest(callback=callback, priority_weight=10) session.add(db_callback) diff --git a/airflow/callbacks/pipe_callback_sink.py b/airflow/callbacks/pipe_callback_sink.py index d702a781fa57c0..0c9433b057ca5f 100644 --- a/airflow/callbacks/pipe_callback_sink.py +++ b/airflow/callbacks/pipe_callback_sink.py @@ -36,7 +36,7 @@ def __init__(self, get_sink_pipe: Callable[[], MultiprocessingConnection]): def send(self, callback: CallbackRequest): """ - Sends information about the callback to be executed by Pipe. + Send information about the callback to be executed by Pipe. :param callback: Callback request to be executed. """ diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 8e48a5535134a1..2ec55ff79498f2 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -131,12 +131,12 @@ def _check(value): def string_list_type(val): - """Parses comma-separated list and returns list of string (strips whitespace).""" + """Parse comma-separated list and returns list of string (strips whitespace).""" return [x.strip() for x in val.split(",")] def string_lower_type(val): - """Lowers arg.""" + """Lower arg.""" if not val: return return val.strip().lower() diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 3e1e2bd5c1329e..8e4d819098c5ab 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -110,7 +110,7 @@ def add_argument(self, action: Action) -> None: @lru_cache(maxsize=None) def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser: - """Creates and returns command line argument parser.""" + """Create and returns command line argument parser.""" parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter) subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND") subparsers.required = True diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index af7dcbc6077a8c..eb53d6f60db680 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -45,7 +45,7 @@ @cli_utils.action_cli @providers_configuration_loaded def flower(args): - """Starts Flower, Celery monitoring tool.""" + """Start Flower, Celery monitoring tool.""" # This needs to be imported locally to not trigger Providers Manager initialization from airflow.providers.celery.executors.celery_executor import app as celery_app @@ -94,7 +94,7 @@ def flower(args): @contextmanager def _serve_logs(skip_serve_logs: bool = False): - """Starts serve_logs sub-process.""" + """Start serve_logs sub-process.""" sub_proc = None if skip_serve_logs is False: sub_proc = Process(target=serve_logs) @@ -137,7 +137,7 @@ def filter(self, record): @cli_utils.action_cli @providers_configuration_loaded def worker(args): - """Starts Airflow Celery worker.""" + """Start Airflow Celery worker.""" # This needs to be imported locally to not trigger Providers Manager initialization from airflow.providers.celery.executors.celery_executor import app as celery_app @@ -245,7 +245,7 @@ def worker(args): @cli_utils.action_cli @providers_configuration_loaded def stop_worker(args): - """Sends SIGTERM to Celery worker.""" + """Send SIGTERM to Celery worker.""" # Read PID from file if args.pid: pid_file_path = args.pid diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py index f68dd83f02ac8e..e32abda1f8ae00 100644 --- a/airflow/cli/commands/connection_command.py +++ b/airflow/cli/commands/connection_command.py @@ -80,7 +80,7 @@ def connections_get(args): @suppress_logs_and_warning @providers_configuration_loaded def connections_list(args): - """Lists all connections at the command line.""" + """List all connections at the command line.""" with create_session() as session: query = select(Connection) if args.conn_id: @@ -145,7 +145,7 @@ def _valid_uri(uri: str) -> bool: @cache def _get_connection_types() -> list[str]: - """Returns connection types available.""" + """Return connection types available.""" _connection_types = [] providers_manager = ProvidersManager() for connection_type, provider_info in providers_manager.hooks.items(): @@ -156,7 +156,7 @@ def _get_connection_types() -> list[str]: @providers_configuration_loaded def connections_export(args): - """Exports all connections to a file.""" + """Export all connections to a file.""" file_formats = [".yaml", ".json", ".env"] if args.format: warnings.warn("Option `--format` is deprecated. Use `--file-format` instead.", DeprecationWarning) @@ -205,7 +205,7 @@ def connections_export(args): @cli_utils.action_cli @providers_configuration_loaded def connections_add(args): - """Adds new connection.""" + """Add new connection.""" has_uri = bool(args.conn_uri) has_json = bool(args.conn_json) has_type = bool(args.conn_type) @@ -297,7 +297,7 @@ def connections_add(args): @cli_utils.action_cli @providers_configuration_loaded def connections_delete(args): - """Deletes connection from DB.""" + """Delete connection from DB.""" with create_session() as session: try: to_delete = session.scalars(select(Connection).where(Connection.conn_id == args.conn_id)).one() @@ -313,7 +313,7 @@ def connections_delete(args): @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def connections_import(args): - """Imports connections from a file.""" + """Import connections from a file.""" if os.path.exists(args.file): _import_helper(args.file, args.overwrite) else: diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index f965e24b5ef776..ea06619917f8f8 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -123,7 +123,7 @@ def _run_dag_backfill(dags: list[DAG], args) -> None: @cli_utils.action_cli @providers_configuration_loaded def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None: - """Creates backfill job or dry run for a DAG or list of DAGs using regex.""" + """Create backfill job or dry run for a DAG or list of DAGs using regex.""" logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) signal.signal(signal.SIGTERM, sigint_handler) warnings.warn( @@ -154,7 +154,7 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None: @cli_utils.action_cli @providers_configuration_loaded def dag_trigger(args) -> None: - """Creates a dag run for the specified dag.""" + """Create a dag run for the specified dag.""" api_client = get_current_api_client() try: message = api_client.trigger_dag( @@ -175,7 +175,7 @@ def dag_trigger(args) -> None: @cli_utils.action_cli @providers_configuration_loaded def dag_delete(args) -> None: - """Deletes all DB records related to the specified dag.""" + """Delete all DB records related to the specified dag.""" api_client = get_current_api_client() if ( args.yes @@ -207,7 +207,7 @@ def dag_unpause(args) -> None: @providers_configuration_loaded def set_is_paused(is_paused: bool, args) -> None: - """Sets is_paused for DAG by a given dag_id.""" + """Set is_paused for DAG by a given dag_id.""" dag = DagModel.get_dagmodel(args.dag_id) if not dag: @@ -220,7 +220,7 @@ def set_is_paused(is_paused: bool, args) -> None: @providers_configuration_loaded def dag_dependencies_show(args) -> None: - """Displays DAG dependencies, save to file or show as imgcat image.""" + """Display DAG dependencies, save to file or show as imgcat image.""" dot = render_dag_dependencies(SerializedDagModel.get_dag_dependencies()) filename = args.save imgcat = args.imgcat @@ -240,7 +240,7 @@ def dag_dependencies_show(args) -> None: @providers_configuration_loaded def dag_show(args) -> None: - """Displays DAG or saves it's graphic representation to the file.""" + """Display DAG or saves it's graphic representation to the file.""" dag = get_dag(args.subdir, args.dag_id) dot = render_dag(dag) filename = args.save @@ -286,7 +286,7 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None: @provide_session def dag_state(args, session: Session = NEW_SESSION) -> None: """ - Returns the state (and conf if exists) of a DagRun at the command line. + Return the state (and conf if exists) of a DagRun at the command line. >>> airflow dags state tutorial 2015-01-01T00:00:00.000000 running @@ -309,7 +309,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None: @providers_configuration_loaded def dag_next_execution(args) -> None: """ - Returns the next execution datetime of a DAG at the command line. + Return the next execution datetime of a DAG at the command line. >>> airflow dags next-execution tutorial 2018-08-31 10:38:00 @@ -348,7 +348,7 @@ def print_execution_interval(interval: DataInterval | None): @suppress_logs_and_warning @providers_configuration_loaded def dag_list_dags(args) -> None: - """Displays dags with or without stats at the command line.""" + """Display dags with or without stats at the command line.""" dagbag = DagBag(process_subdir(args.subdir)) if dagbag.import_errors: from rich import print as rich_print @@ -396,7 +396,7 @@ def dag_details(args, session=NEW_SESSION): @suppress_logs_and_warning @providers_configuration_loaded def dag_list_import_errors(args) -> None: - """Displays dags with import errors on the command line.""" + """Display dags with import errors on the command line.""" dagbag = DagBag(process_subdir(args.subdir)) data = [] for filename, errors in dagbag.import_errors.items(): @@ -411,7 +411,7 @@ def dag_list_import_errors(args) -> None: @suppress_logs_and_warning @providers_configuration_loaded def dag_report(args) -> None: - """Displays dagbag stats at the command line.""" + """Display dagbag stats at the command line.""" dagbag = DagBag(process_subdir(args.subdir)) AirflowConsole().print_as( data=dagbag.dagbag_stats, @@ -431,7 +431,7 @@ def dag_report(args) -> None: @providers_configuration_loaded @provide_session def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None: - """Lists latest n jobs.""" + """List latest n jobs.""" queries = [] if dag: args.dag_id = dag.dag_id @@ -462,7 +462,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) @providers_configuration_loaded @provide_session def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None: - """Lists dag runs for a given DAG.""" + """List dag runs for a given DAG.""" if dag: args.dag_id = dag.dag_id else: diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index 757bd778cca5d4..cf880f6622e913 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -37,7 +37,7 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: - """Creates DagFileProcessorProcess instance.""" + """Create DagFileProcessorProcess instance.""" processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout") processor_timeout = timedelta(seconds=processor_timeout_seconds) @@ -56,7 +56,7 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: @cli_utils.action_cli @providers_configuration_loaded def dag_processor(args): - """Starts Airflow Dag Processor Job.""" + """Start Airflow Dag Processor Job.""" if not conf.getboolean("scheduler", "standalone_dag_processor"): raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.") diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py index 2fb21dfd229564..f0d3ac4edc9448 100644 --- a/airflow/cli/commands/db_command.py +++ b/airflow/cli/commands/db_command.py @@ -39,7 +39,7 @@ @providers_configuration_loaded def initdb(args): - """Initializes the metadata database.""" + """Initialize the metadata database.""" warnings.warn( "`db init` is deprecated. Use `db migrate` instead to migrate the db and/or " "airflow connections create-default-connections to create the default connections", @@ -52,7 +52,7 @@ def initdb(args): @providers_configuration_loaded def resetdb(args): - """Resets the metadata database.""" + """Reset the metadata database.""" print("DB: " + repr(settings.engine.url)) if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"): raise SystemExit("Cancelled") @@ -161,7 +161,7 @@ def downgrade(args): @providers_configuration_loaded def check_migrations(args): - """Function to wait for all airflow migrations to complete. Used for launching airflow in k8s.""" + """Wait for all airflow migrations to complete. Used for launching airflow in k8s.""" db.check_migrations(timeout=args.migration_wait_timeout) @@ -212,7 +212,7 @@ def shell(args): @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def check(args): - """Runs a check command that checks if db is available.""" + """Run a check command that checks if db is available.""" retries: int = args.retry retry_delay: int = args.retry_delay @@ -251,7 +251,7 @@ def cleanup_tables(args): @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def export_archived(args): - """Exports archived records from metadata database.""" + """Export archived records from metadata database.""" export_archived_records( export_format=args.export_format, output_path=args.output_path, @@ -264,7 +264,7 @@ def export_archived(args): @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def drop_archived(args): - """Drops archived tables from metadata database.""" + """Drop archived tables from metadata database.""" drop_archived_tables( table_names=args.tables, needs_confirm=not args.yes, diff --git a/airflow/cli/commands/info_command.py b/airflow/cli/commands/info_command.py index 2e60d80b271d8c..3a4ba396e4cd25 100644 --- a/airflow/cli/commands/info_command.py +++ b/airflow/cli/commands/info_command.py @@ -209,7 +209,7 @@ def _get_version(cmd: list[str], grep: bytes | None = None): @staticmethod def _task_logging_handler(): - """Returns task logging handler.""" + """Return task logging handler.""" def get_fullname(o): module = o.__class__.__module__ @@ -314,7 +314,7 @@ def _providers_info(self): return [(p.data["package-name"], p.version) for p in ProvidersManager().providers.values()] def show(self, output: str, console: AirflowConsole | None = None) -> None: - """Shows information about Airflow instance.""" + """Show information about Airflow instance.""" all_info = { "Apache Airflow": self._airflow_info, "System info": self._system_info, @@ -336,7 +336,7 @@ def show(self, output: str, console: AirflowConsole | None = None) -> None: ) def render_text(self, output: str) -> str: - """Exports the info to string.""" + """Export the info to string.""" console = AirflowConsole(record=True) with console.capture(): self.show(output=output, console=console) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 72fe57c206a0e3..b9b77c093446d7 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -61,7 +61,7 @@ @cli_utils.action_cli @providers_configuration_loaded def internal_api(args): - """Starts Airflow Internal API.""" + """Start Airflow Internal API.""" print(settings.HEADER) access_logfile = args.access_logfile or "-" diff --git a/airflow/cli/commands/jobs_command.py b/airflow/cli/commands/jobs_command.py index b6509ea6424e9f..3f22241db9cb73 100644 --- a/airflow/cli/commands/jobs_command.py +++ b/airflow/cli/commands/jobs_command.py @@ -29,7 +29,7 @@ @providers_configuration_loaded @provide_session def check(args, session: Session = NEW_SESSION) -> None: - """Checks if job(s) are still alive.""" + """Check if job(s) are still alive.""" if args.allow_multiple and not args.limit > 1: raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.") if args.hostname and args.local: diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py index 056465577fcd3f..038f53e3f37076 100644 --- a/airflow/cli/commands/kubernetes_command.py +++ b/airflow/cli/commands/kubernetes_command.py @@ -39,7 +39,7 @@ @cli_utils.action_cli @providers_configuration_loaded def generate_pod_yaml(args): - """Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor.""" + """Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor.""" execution_date = args.execution_date dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) yaml_output_path = args.output_path @@ -151,7 +151,11 @@ def cleanup_pods(args): def _delete_pod(name, namespace): - """Helper Function for cleanup_pods.""" + """ + Delete a namespaced pod. + + Helper Function for cleanup_pods. + """ kube_client = get_kube_client() delete_options = client.V1DeleteOptions() print(f'Deleting POD "{name}" from "{namespace}" namespace') diff --git a/airflow/cli/commands/legacy_commands.py b/airflow/cli/commands/legacy_commands.py index 910c6e442703f7..4338cf1c339d99 100644 --- a/airflow/cli/commands/legacy_commands.py +++ b/airflow/cli/commands/legacy_commands.py @@ -50,7 +50,7 @@ def check_legacy_command(action, value): - """Checks command value and raise error if value is in removed command.""" + """Check command value and raise error if value is in removed command.""" new_command = COMMAND_MAP.get(value) if new_command is not None: msg = f"`airflow {value}` command, has been removed, please use `airflow {new_command}`" diff --git a/airflow/cli/commands/pool_command.py b/airflow/cli/commands/pool_command.py index 7e7c63640cb6eb..b26f2032a85bc7 100644 --- a/airflow/cli/commands/pool_command.py +++ b/airflow/cli/commands/pool_command.py @@ -46,7 +46,7 @@ def _show_pools(pools, output): @suppress_logs_and_warning @providers_configuration_loaded def pool_list(args): - """Displays info of all the pools.""" + """Display info of all the pools.""" api_client = get_current_api_client() pools = api_client.get_pools() _show_pools(pools=pools, output=args.output) @@ -55,7 +55,7 @@ def pool_list(args): @suppress_logs_and_warning @providers_configuration_loaded def pool_get(args): - """Displays pool info by a given name.""" + """Display pool info by a given name.""" api_client = get_current_api_client() try: pools = [api_client.get_pool(name=args.pool)] @@ -68,7 +68,7 @@ def pool_get(args): @suppress_logs_and_warning @providers_configuration_loaded def pool_set(args): - """Creates new pool with a given name and slots.""" + """Create new pool with a given name and slots.""" api_client = get_current_api_client() api_client.create_pool( name=args.pool, slots=args.slots, description=args.description, include_deferred=args.include_deferred @@ -80,7 +80,7 @@ def pool_set(args): @suppress_logs_and_warning @providers_configuration_loaded def pool_delete(args): - """Deletes pool by a given name.""" + """Delete pool by a given name.""" api_client = get_current_api_client() try: api_client.delete_pool(name=args.pool) @@ -93,7 +93,7 @@ def pool_delete(args): @suppress_logs_and_warning @providers_configuration_loaded def pool_import(args): - """Imports pools from the file.""" + """Import pools from the file.""" if not os.path.exists(args.file): raise SystemExit(f"Missing pools file {args.file}") pools, failed = pool_import_helper(args.file) @@ -104,13 +104,13 @@ def pool_import(args): @providers_configuration_loaded def pool_export(args): - """Exports all the pools to the file.""" + """Export all the pools to the file.""" pools = pool_export_helper(args.file) print(f"Exported {len(pools)} pools to {args.file}") def pool_import_helper(filepath): - """Helps import pools from the json file.""" + """Help import pools from the json file.""" api_client = get_current_api_client() with open(filepath) as poolfile: @@ -137,7 +137,7 @@ def pool_import_helper(filepath): def pool_export_helper(filepath): - """Helps export all the pools to the json file.""" + """Help export all the pools to the json file.""" api_client = get_current_api_client() pool_dict = {} pools = api_client.get_pools() diff --git a/airflow/cli/commands/provider_command.py b/airflow/cli/commands/provider_command.py index a246575e76211a..460c11bbbdddcd 100644 --- a/airflow/cli/commands/provider_command.py +++ b/airflow/cli/commands/provider_command.py @@ -58,7 +58,7 @@ def provider_get(args): @suppress_logs_and_warning @providers_configuration_loaded def providers_list(args): - """Lists all providers at the command line.""" + """List all providers at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().providers.values()), output=args.output, @@ -73,7 +73,7 @@ def providers_list(args): @suppress_logs_and_warning @providers_configuration_loaded def hooks_list(args): - """Lists all hooks at the command line.""" + """List all hooks at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().hooks.items()), output=args.output, @@ -116,7 +116,7 @@ def notifications_list(args): @suppress_logs_and_warning @providers_configuration_loaded def connection_form_widget_list(args): - """Lists all custom connection form fields at the command line.""" + """List all custom connection form fields at the command line.""" AirflowConsole().print_as( data=sorted(ProvidersManager().connection_form_widgets.items()), output=args.output, @@ -132,7 +132,7 @@ def connection_form_widget_list(args): @suppress_logs_and_warning @providers_configuration_loaded def connection_field_behaviours(args): - """Lists field behaviours.""" + """List field behaviours.""" AirflowConsole().print_as( data=list(ProvidersManager().field_behaviours), output=args.output, @@ -145,7 +145,7 @@ def connection_field_behaviours(args): @suppress_logs_and_warning @providers_configuration_loaded def extra_links_list(args): - """Lists all extra links at the command line.""" + """List all extra links at the command line.""" AirflowConsole().print_as( data=ProvidersManager().extra_links_class_names, output=args.output, @@ -158,7 +158,7 @@ def extra_links_list(args): @suppress_logs_and_warning @providers_configuration_loaded def logging_list(args): - """Lists all log task handlers at the command line.""" + """List all log task handlers at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().logging_class_names), output=args.output, @@ -171,7 +171,7 @@ def logging_list(args): @suppress_logs_and_warning @providers_configuration_loaded def secrets_backends_list(args): - """Lists all secrets backends at the command line.""" + """List all secrets backends at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().secrets_backend_class_names), output=args.output, @@ -184,7 +184,7 @@ def secrets_backends_list(args): @suppress_logs_and_warning @providers_configuration_loaded def auth_backend_list(args): - """Lists all API auth backend modules at the command line.""" + """List all API auth backend modules at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().auth_backend_module_names), output=args.output, @@ -197,7 +197,7 @@ def auth_backend_list(args): @suppress_logs_and_warning @providers_configuration_loaded def executors_list(args): - """Lists all executors at the command line.""" + """List all executors at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().executor_class_names), output=args.output, @@ -210,7 +210,7 @@ def executors_list(args): @suppress_logs_and_warning @providers_configuration_loaded def config_list(args): - """Lists all configurations at the command line.""" + """List all configurations at the command line.""" AirflowConsole().print_as( data=list(ProvidersManager().provider_configs), output=args.output, diff --git a/airflow/cli/commands/role_command.py b/airflow/cli/commands/role_command.py index 180e2dd2a7cbd5..a582b331953202 100644 --- a/airflow/cli/commands/role_command.py +++ b/airflow/cli/commands/role_command.py @@ -34,7 +34,7 @@ @suppress_logs_and_warning @providers_configuration_loaded def roles_list(args): - """Lists all existing roles.""" + """List all existing roles.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -62,7 +62,7 @@ def roles_list(args): @suppress_logs_and_warning @providers_configuration_loaded def roles_create(args): - """Creates new empty role in DB.""" + """Create new empty role in DB.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -75,7 +75,7 @@ def roles_create(args): @suppress_logs_and_warning @providers_configuration_loaded def roles_delete(args): - """Deletes role in DB.""" + """Delete role in DB.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -144,7 +144,7 @@ def __roles_add_or_remove_permissions(args): @suppress_logs_and_warning @providers_configuration_loaded def roles_add_perms(args): - """Adds permissions to role in DB.""" + """Add permissions to role in DB.""" __roles_add_or_remove_permissions(args) @@ -152,7 +152,7 @@ def roles_add_perms(args): @suppress_logs_and_warning @providers_configuration_loaded def roles_del_perms(args): - """Deletes permissions from role in DB.""" + """Delete permissions from role in DB.""" __roles_add_or_remove_permissions(args) @@ -160,7 +160,7 @@ def roles_del_perms(args): @providers_configuration_loaded def roles_export(args): """ - Exports all the roles from the database to a file. + Export all the roles from the database to a file. Note, this function does not export the permissions associated for each role. Strictly, it exports the role names into the passed role json file. diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index ad0063ffa3d044..fd25951ad3228d 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -52,7 +52,7 @@ def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: bool) @cli_utils.action_cli @providers_configuration_loaded def scheduler(args): - """Starts Airflow Scheduler.""" + """Start Airflow Scheduler.""" print(settings.HEADER) job_runner = SchedulerJobRunner( @@ -87,7 +87,7 @@ def scheduler(args): @contextmanager def _serve_logs(skip_serve_logs: bool = False): - """Starts serve_logs sub-process.""" + """Start serve_logs sub-process.""" from airflow.utils.serve_logs import serve_logs sub_proc = None @@ -103,7 +103,7 @@ def _serve_logs(skip_serve_logs: bool = False): @contextmanager def _serve_health_check(enable_health_check: bool = False): - """Starts serve_health_check sub-process.""" + """Start serve_health_check sub-process.""" sub_proc = None if enable_health_check: sub_proc = Process(target=serve_health_check) diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index 68abfdd0002b37..ceae1c6dcaec63 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -59,7 +59,6 @@ def __init__(self): @providers_configuration_loaded def run(self): - """Main run loop.""" self.print_output("standalone", "Starting Airflow Standalone") # Silence built-in logging at INFO logging.getLogger("").setLevel(logging.WARNING) @@ -130,7 +129,7 @@ def update_output(self): def print_output(self, name: str, output): """ - Prints an output line with name and colouring. + Print an output line with name and colouring. You can pass multiple lines to output if you wish; it will be split for you. """ @@ -146,7 +145,7 @@ def print_output(self, name: str, output): def print_error(self, name: str, output): """ - Prints an error message to the console. + Print an error message to the console. This is the same as print_output but with the text red """ @@ -172,7 +171,7 @@ def calculate_env(self): return env def initialize_database(self): - """Makes sure all the tables are created.""" + """Make sure all the tables are created.""" # Set up DB tables self.print_output("standalone", "Checking database is initialized") db.initdb() @@ -214,7 +213,7 @@ def initialize_database(self): def is_ready(self): """ - Detects when all Airflow components are ready to serve. + Detect when all Airflow components are ready to serve. For now, it's simply time-based. """ @@ -226,7 +225,7 @@ def is_ready(self): def port_open(self, port): """ - Checks if the given port is listening on the local machine. + Check if the given port is listening on the local machine. Used to tell if webserver is alive. """ @@ -242,7 +241,7 @@ def port_open(self, port): def job_running(self, job_runner_class: type[BaseJobRunner]): """ - Checks if the given job name is running and heartbeating correctly. + Check if the given job name is running and heartbeating correctly. Used to tell if scheduler is alive. """ @@ -253,7 +252,7 @@ def job_running(self, job_runner_class: type[BaseJobRunner]): def print_ready(self): """ - Prints the banner shown when Airflow is ready to go. + Print the banner shown when Airflow is ready to go. Include with login details. """ @@ -288,7 +287,7 @@ def __init__(self, parent, name: str, command: list[str], env: dict[str, str]): self.env = env def run(self): - """Runs the actual process and captures it output to a queue.""" + """Run the actual process and captures it output to a queue.""" self.process = subprocess.Popen( ["airflow"] + self.command, stdout=subprocess.PIPE, diff --git a/airflow/cli/commands/sync_perm_command.py b/airflow/cli/commands/sync_perm_command.py index ab458b2d93b41c..4d4e280637f9c8 100644 --- a/airflow/cli/commands/sync_perm_command.py +++ b/airflow/cli/commands/sync_perm_command.py @@ -25,7 +25,7 @@ @cli_utils.action_cli @providers_configuration_loaded def sync_perm(args): - """Updates permissions for existing roles and DAGs.""" + """Update permissions for existing roles and DAGs.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index da47e8dd8b375e..205d87b2f03458 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -196,7 +196,7 @@ def _get_ti( def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode: """ - Runs the task based on a mode. + Run the task based on a mode. Any of the 3 modes are available: @@ -214,7 +214,7 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | Tas def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None: """ - Sends the task to the executor for execution. + Send the task to the executor for execution. This can result in the task being started by another host if the executor implementation does. """ @@ -285,7 +285,7 @@ def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode: - """Runs the main task handling code.""" + """Run the main task handling code.""" return ti._run_raw_task( mark_success=args.mark_success, job_id=args.job_id, @@ -473,7 +473,7 @@ def task_failed_deps(args) -> None: @providers_configuration_loaded def task_state(args) -> None: """ - Returns the state of a TaskInstance at the command line. + Return the state of a TaskInstance at the command line. >>> airflow tasks state tutorial sleep 2015-01-01 success @@ -488,7 +488,7 @@ def task_state(args) -> None: @suppress_logs_and_warning @providers_configuration_loaded def task_list(args, dag: DAG | None = None) -> None: - """Lists the tasks within a DAG at the command line.""" + """List the tasks within a DAG at the command line.""" dag = dag or get_dag(args.subdir, args.dag_id) if args.tree: dag.tree_view() @@ -512,7 +512,7 @@ def post_mortem(self) -> None: def _guess_debugger() -> _SupportedDebugger: """ - Trying to guess the debugger used by the user. + Try to guess the debugger used by the user. When it doesn't find any user-installed debugger, returns ``pdb``. @@ -576,7 +576,7 @@ def format_task_instance(ti: TaskInstance) -> dict[str, str]: @cli_utils.action_cli(check_db=False) def task_test(args, dag: DAG | None = None) -> None: - """Tests task for a given dag_id.""" + """Test task for a given dag_id.""" # We want to log output from operators etc to show up here. Normally # airflow.task would redirect to a file, but here we want it to propagate # up to the normal airflow handler. @@ -638,7 +638,7 @@ def task_test(args, dag: DAG | None = None) -> None: @suppress_logs_and_warning @providers_configuration_loaded def task_render(args, dag: DAG | None = None) -> None: - """Renders and displays templated fields for a given task.""" + """Render and displays templated fields for a given task.""" if not dag: dag = get_dag(args.subdir, args.dag_id) task = dag.get_task(task_id=args.task_id) @@ -661,7 +661,7 @@ def task_render(args, dag: DAG | None = None) -> None: @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def task_clear(args) -> None: - """Clears all task instances or only those matched by regex for a DAG(s).""" + """Clear all task instances or only those matched by regex for a DAG(s).""" logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) if args.dag_id and not args.subdir and not args.dag_regex and not args.task_regex: diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index c7d0827bd8e589..2288f1537fbb97 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -38,7 +38,7 @@ @contextmanager def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: - """Starts serve_logs sub-process.""" + """Start serve_logs sub-process.""" sub_proc = None if skip_serve_logs is False: port = conf.getint("logging", "trigger_log_server_port", fallback=8794) @@ -54,7 +54,7 @@ def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: @cli_utils.action_cli @providers_configuration_loaded def triggerer(args): - """Starts Airflow Triggerer.""" + """Start Airflow Triggerer.""" settings.MASK_SECRETS_IN_LOGS = True print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") diff --git a/airflow/cli/commands/user_command.py b/airflow/cli/commands/user_command.py index 1553d27a015661..bc982719c94dfb 100644 --- a/airflow/cli/commands/user_command.py +++ b/airflow/cli/commands/user_command.py @@ -49,7 +49,7 @@ class UserSchema(Schema): @suppress_logs_and_warning @providers_configuration_loaded def users_list(args): - """Lists users at the command line.""" + """List users at the command line.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -64,7 +64,7 @@ def users_list(args): @cli_utils.action_cli(check_db=True) @providers_configuration_loaded def users_create(args): - """Creates new user in the DB.""" + """Create new user in the DB.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -113,7 +113,7 @@ def _find_user(args): @cli_utils.action_cli @providers_configuration_loaded def users_delete(args): - """Deletes user from DB.""" + """Delete user from DB.""" user = _find_user(args) # Clear the associated user roles first. @@ -131,7 +131,7 @@ def users_delete(args): @cli_utils.action_cli @providers_configuration_loaded def users_manage_role(args, remove=False): - """Deletes or appends user roles.""" + """Delete or appends user roles.""" user = _find_user(args) from airflow.utils.cli_app_builder import get_application_builder @@ -160,7 +160,7 @@ def users_manage_role(args, remove=False): @providers_configuration_loaded def users_export(args): - """Exports all users to the json file.""" + """Export all users to the json file.""" from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: @@ -190,7 +190,7 @@ def remove_underscores(s): @cli_utils.action_cli @providers_configuration_loaded def users_import(args): - """Imports users from the json file.""" + """Import users from the json file.""" json_file = getattr(args, "import") if not os.path.exists(json_file): raise SystemExit(f"File '{json_file}' does not exist") diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py index 78275666b62934..eb3d353e41b177 100644 --- a/airflow/cli/commands/variable_command.py +++ b/airflow/cli/commands/variable_command.py @@ -37,7 +37,7 @@ @suppress_logs_and_warning @providers_configuration_loaded def variables_list(args): - """Displays all the variables.""" + """Display all the variables.""" with create_session() as session: variables = session.scalars(select(Variable)).all() AirflowConsole().print_as(data=variables, output=args.output, mapper=lambda x: {"key": x.key}) @@ -46,7 +46,7 @@ def variables_list(args): @suppress_logs_and_warning @providers_configuration_loaded def variables_get(args): - """Displays variable by a given name.""" + """Display variable by a given name.""" try: if args.default is None: var = Variable.get(args.key, deserialize_json=args.json) @@ -61,7 +61,7 @@ def variables_get(args): @cli_utils.action_cli @providers_configuration_loaded def variables_set(args): - """Creates new variable with a given name and value.""" + """Create new variable with a given name and value.""" Variable.set(args.key, args.value, serialize_json=args.json) print(f"Variable {args.key} created") @@ -69,7 +69,7 @@ def variables_set(args): @cli_utils.action_cli @providers_configuration_loaded def variables_delete(args): - """Deletes variable by a given name.""" + """Delete variable by a given name.""" Variable.delete(args.key) print(f"Variable {args.key} deleted") @@ -77,7 +77,7 @@ def variables_delete(args): @cli_utils.action_cli @providers_configuration_loaded def variables_import(args): - """Imports variables from a given file.""" + """Import variables from a given file.""" if not os.path.exists(args.file): raise SystemExit("Missing variables file.") with open(args.file) as varfile: @@ -101,7 +101,7 @@ def variables_import(args): @providers_configuration_loaded def variables_export(args): - """Exports all the variables to the file.""" + """Export all the variables to the file.""" var_dict = {} with create_session() as session: qry = session.scalars(select(Variable)) diff --git a/airflow/cli/commands/version_command.py b/airflow/cli/commands/version_command.py index d3b735951c1fd7..81f56f3062135c 100644 --- a/airflow/cli/commands/version_command.py +++ b/airflow/cli/commands/version_command.py @@ -21,5 +21,5 @@ def version(args): - """Displays Airflow version at the command line.""" + """Display Airflow version at the command line.""" print(airflow.__version__) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 17d48ad8dae4fc..00ac66372465cc 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -132,7 +132,7 @@ def _get_file_hash(fname: str): return hash_md5.hexdigest() def _get_num_ready_workers_running(self) -> int: - """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name.""" + """Return number of ready Gunicorn workers by looking for READY_PREFIX in process name.""" workers = psutil.Process(self.gunicorn_master_proc.pid).children() def ready_prefix_on_cmdline(proc): @@ -148,12 +148,12 @@ def ready_prefix_on_cmdline(proc): return len(ready_workers) def _get_num_workers_running(self) -> int: - """Returns number of running Gunicorn workers processes.""" + """Return number of running Gunicorn workers processes.""" workers = psutil.Process(self.gunicorn_master_proc.pid).children() return len(workers) def _wait_until_true(self, fn, timeout: int = 0) -> None: - """Sleeps until fn is true.""" + """Sleep until fn is true.""" start_time = time.monotonic() while not fn(): if 0 < timeout <= time.monotonic() - start_time: @@ -207,7 +207,7 @@ def _reload_gunicorn(self) -> None: ) def start(self) -> NoReturn: - """Starts monitoring the webserver.""" + """Start monitoring the webserver.""" try: self._wait_until_true( lambda: self.num_workers_expected == self._get_num_workers_running(), @@ -323,7 +323,7 @@ def _check_workers(self) -> None: @cli_utils.action_cli @providers_configuration_loaded def webserver(args): - """Starts Airflow Webserver.""" + """Start Airflow Webserver.""" print(settings.HEADER) # Check for old/insecure config, and fail safe (i.e. don't launch) if the config is wildly insecure. diff --git a/airflow/cli/simple_table.py b/airflow/cli/simple_table.py index b9338eb6a38de0..f4f418742bfd2f 100644 --- a/airflow/cli/simple_table.py +++ b/airflow/cli/simple_table.py @@ -48,17 +48,17 @@ def __init__(self, show_header: bool = True, *args, **kwargs): self.show_header = show_header def print_as_json(self, data: dict): - """Renders dict as json text representation.""" + """Render dict as json text representation.""" json_content = json.dumps(data) self.print(Syntax(json_content, "json", theme="ansi_dark"), soft_wrap=True) def print_as_yaml(self, data: dict): - """Renders dict as yaml text representation.""" + """Render dict as yaml text representation.""" yaml_content = yaml.dump(data) self.print(Syntax(yaml_content, "yaml", theme="ansi_dark"), soft_wrap=True) def print_as_table(self, data: list[dict]): - """Renders list of dictionaries as table.""" + """Render list of dictionaries as table.""" if not data: self.print("No data found") return @@ -72,7 +72,7 @@ def print_as_table(self, data: list[dict]): self.print(table) def print_as_plain_table(self, data: list[dict]): - """Renders list of dictionaries as a simple table than can be easily piped.""" + """Render list of dictionaries as a simple table than can be easily piped.""" if not data: self.print("No data found") return @@ -99,7 +99,7 @@ def print_as( output: str, mapper: Callable[[Any], dict] | None = None, ) -> None: - """Prints provided using format specified by output argument.""" + """Print provided using format specified by output argument.""" output_to_renderer: dict[str, Callable[[Any], None]] = { "json": self.print_as_json, "yaml": self.print_as_yaml, diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 04697650db03c1..53476384379110 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -167,10 +167,10 @@ def start(self) -> None: def run_single_parsing_loop(self) -> None: """ - Should only be used when launched DAG file processor manager in sync mode. - Send agent heartbeat signal to the manager, requesting that it runs one processing "loop". + Should only be used when launched DAG file processor manager in sync mode. + Call wait_until_finished to ensure that any launched processors have finished before continuing. """ if not self._parent_signal_conn or not self._process: @@ -186,13 +186,13 @@ def run_single_parsing_loop(self) -> None: pass def get_callbacks_pipe(self) -> MultiprocessingConnection: - """Returns the pipe for sending Callbacks to DagProcessorManager.""" + """Return the pipe for sending Callbacks to DagProcessorManager.""" if not self._parent_signal_conn: raise ValueError("Process not started.") return self._parent_signal_conn def wait_until_finished(self) -> None: - """Waits until DAG parsing is finished.""" + """Wait until DAG parsing is finished.""" if not self._parent_signal_conn: raise ValueError("Process not started.") if self._async_mode: @@ -466,7 +466,7 @@ def register_exit_signals(self): signal.signal(signal.SIGUSR2, signal.SIG_IGN) def _exit_gracefully(self, signum, frame): - """Helper method to clean up DAG file processors to avoid leaving orphan processes.""" + """Clean up DAG file processors to avoid leaving orphan processes.""" self.log.info("Exiting gracefully upon receiving signal %s", signum) self.log.debug("Current Stacktrace is: %s", "\n".join(map(str, inspect.stack()))) self.terminate() @@ -519,7 +519,7 @@ def deactivate_stale_dags( session: Session = NEW_SESSION, ): """ - Detects DAGs which are no longer present in files. + Detect DAGs which are no longer present in files. Deactivate them and remove them in the serialized_dag table. """ @@ -695,7 +695,7 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): @retry_db_transaction def _fetch_callbacks_with_retries(self, max_callbacks: int, session: Session): - """Fetches callbacks from database and add them to the internal queue for execution.""" + """Fetch callbacks from database and add them to the internal queue for execution.""" self.log.debug("Fetching callbacks from the database.") with prohibit_commit(session) as guard: query = session.query(DbCallbackRequest) @@ -811,7 +811,7 @@ def _print_stat(self): @provide_session def clear_nonexistent_import_errors(file_paths: list[str] | None, session=NEW_SESSION): """ - Clears import errors for files that no longer exist. + Clear import errors for files that no longer exist. :param file_paths: list of paths to DAG definition files :param session: session for ORM operations @@ -960,7 +960,7 @@ def get_start_time(self, file_path) -> datetime | None: def get_run_count(self, file_path) -> int: """ - The number of times the given file has been parsed. + Return the number of times the given file has been parsed. :param file_path: the path to the file that's being processed. """ @@ -968,7 +968,7 @@ def get_run_count(self, file_path) -> int: return stat.run_count if stat else 0 def get_dag_directory(self) -> str: - """Returns the dag_director as a string.""" + """Return the dag_director as a string.""" if isinstance(self._dag_directory, Path): return str(self._dag_directory.resolve()) else: @@ -1063,7 +1063,7 @@ def collect_results(self) -> None: @staticmethod def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests): - """Creates DagFileProcessorProcess instance.""" + """Create DagFileProcessorProcess instance.""" return DagFileProcessorProcess( file_path=file_path, pickle_dags=pickle_dags, @@ -1240,7 +1240,7 @@ def _kill_timed_out_processors(self): self._processors.pop(proc) def _add_paths_to_queue(self, file_paths_to_enqueue: list[str], add_at_front: bool): - """Adds stuff to the back or front of the file queue, unless it's already present.""" + """Add stuff to the back or front of the file queue, unless it's already present.""" new_file_paths = list(p for p in file_paths_to_enqueue if p not in self._file_path_queue) if add_at_front: self._file_path_queue.extendleft(new_file_paths) @@ -1260,7 +1260,7 @@ def max_runs_reached(self): return True def terminate(self): - """Stops all running processors.""" + """Stop all running processors.""" for processor in self._processors.values(): Stats.decr( "dag_processing.processes", tags={"file_path": processor.file_path, "action": "terminate"} diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8b0ecb36407032..162fc5889c89d6 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -415,7 +415,7 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L @provide_session def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None: """ - Finding all tasks that have SLAs defined, and sending alert emails when needed. + Find all tasks that have SLAs defined, and send alert emails when needed. New SLA misses are also recorded in the database. @@ -645,7 +645,7 @@ def update_import_errors( @provide_session def _validate_task_pools(self, *, dagbag: DagBag, session: Session = NEW_SESSION): - """Validates and raise exception if any task in a dag is using a non-existent pool.""" + """Validate and raise exception if any task in a dag is using a non-existent pool.""" from airflow.models.pool import Pool def check_pools(dag): diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index af37f191be2dc3..cf8e650034f979 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -223,7 +223,7 @@ def execute(self, context: Context): def _handle_output(self, return_value: Any, context: Context, xcom_push: Callable): """ - Handles logic for whether a decorator needs to push a single return value or multiple return values. + Handle logic for whether a decorator needs to push a single return value or multiple return values. It sets outlets if any datasets are found in the returned value(s) diff --git a/airflow/decorators/branch_python.py b/airflow/decorators/branch_python.py index 4dcff0a361f982..39105b0072752b 100644 --- a/airflow/decorators/branch_python.py +++ b/airflow/decorators/branch_python.py @@ -33,7 +33,7 @@ def branch_task( python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs ) -> TaskDecorator: """ - Wraps a python function into a BranchPythonOperator. + Wrap a python function into a BranchPythonOperator. For more information on how to use this operator, take a look at the guide: :ref:`concepts:branching` diff --git a/airflow/decorators/external_python.py b/airflow/decorators/external_python.py index 1f083a144a7770..f17a9f3f1caf3e 100644 --- a/airflow/decorators/external_python.py +++ b/airflow/decorators/external_python.py @@ -35,7 +35,8 @@ def external_python_task( multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: - """Wraps a callable into an Airflow operator to run via a Python virtual environment. + """ + Wrap a callable into an Airflow operator to run via a Python virtual environment. Accepts kwargs for operator kwarg. Can be reused in a single DAG. diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index f79631da57af42..d4423a2092b666 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -60,7 +60,8 @@ def python_task( multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: - """Wraps a function into an Airflow operator. + """ + Wrap a function into an Airflow operator. Accepts kwargs for operator kwarg. Can be reused in a single DAG. diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index 123378d43c2c0e..9b632e0c87bbfc 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -34,7 +34,8 @@ def virtualenv_task( multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: - """Wraps a callable into an Airflow operator to run via a Python virtual environment. + """ + Wrap a callable into an Airflow operator to run via a Python virtual environment. Accepts kwargs for operator kwarg. Can be reused in a single DAG. diff --git a/airflow/decorators/sensor.py b/airflow/decorators/sensor.py index f25d1610fe191d..e8363317db8eec 100644 --- a/airflow/decorators/sensor.py +++ b/airflow/decorators/sensor.py @@ -60,7 +60,7 @@ def __init__( def sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: """ - Wraps a function into an Airflow operator. + Wrap a function into an Airflow operator. Accepts kwargs for operator kwarg. Can be reused in a single DAG. :param python_callable: Function to decorate diff --git a/airflow/decorators/short_circuit.py b/airflow/decorators/short_circuit.py index b6b2de5632889d..dd94daddd6a39f 100644 --- a/airflow/decorators/short_circuit.py +++ b/airflow/decorators/short_circuit.py @@ -34,7 +34,8 @@ def short_circuit_task( multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: - """Wraps a function into an ShortCircuitOperator. + """ + Wrap a function into an ShortCircuitOperator. Accepts kwargs for operator kwarg. Can be reused in a single DAG.