From 71a9813f7fd46f53b49d0bc7ccfd0239eaa2815b Mon Sep 17 00:00:00 2001 From: "D. Ferruzzi" Date: Sat, 12 Aug 2023 02:49:17 -0700 Subject: [PATCH] D401 Support - airflow/example_dags thru airflow/listeners (#33336) --- airflow/executors/base_executor.py | 18 ++++++------ airflow/executors/executor_loader.py | 13 +++++---- airflow/executors/local_executor.py | 22 ++++++++------ airflow/hooks/base.py | 6 ++-- airflow/hooks/filesystem.py | 4 +-- airflow/hooks/package_index.py | 8 ++--- airflow/hooks/subprocess.py | 2 +- airflow/jobs/backfill_job_runner.py | 2 +- airflow/jobs/base_job_runner.py | 12 ++++++-- airflow/jobs/job.py | 12 ++++---- airflow/jobs/local_task_job_runner.py | 4 +-- airflow/jobs/scheduler_job_runner.py | 10 +++---- airflow/jobs/triggerer_job_runner.py | 29 +++++++++---------- .../pre_7_4_0_compatibility/kube_client.py | 4 +-- .../pre_7_4_0_compatibility/pod_generator.py | 8 ++--- .../pod_generator_deprecated.py | 6 ++-- .../pre_7_4_0_compatibility/secret.py | 6 ++-- airflow/lineage/__init__.py | 4 +-- airflow/lineage/backend.py | 2 +- airflow/listeners/spec/dagrun.py | 6 ++-- airflow/listeners/spec/lifecycle.py | 4 +-- airflow/listeners/spec/taskinstance.py | 6 ++-- 22 files changed, 99 insertions(+), 89 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 97de6be1faca1..81c441b521f83 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -188,7 +188,7 @@ def queue_task_instance( def has_task(self, task_instance: TaskInstance) -> bool: """ - Checks if a task is either queued or running in this executor. + Check if a task is either queued or running in this executor. :param task_instance: TaskInstance :return: True if the task is known to this executor @@ -250,7 +250,7 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTa def trigger_tasks(self, open_slots: int) -> None: """ - Initiates async execution of the queued tasks, up to the number of available slots. + Initiate async execution of the queued tasks, up to the number of available slots. :param open_slots: Number of open slots """ @@ -298,7 +298,7 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None: def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None: """ - Changes state of the task. + Change state of the task. :param info: Executor information for the task instance :param key: Unique key for the task instance @@ -358,7 +358,7 @@ def execute_async( executor_config: Any | None = None, ) -> None: # pragma: no cover """ - This method will execute the command asynchronously. + Execute the command asynchronously. :param key: Unique key for the task instance :param command: Command to run @@ -369,7 +369,7 @@ def execute_async( def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: """ - This method can be implemented by any child class to return the task logs. + Return the task logs. :param ti: A TaskInstance object :param try_number: current try_number to read log from @@ -382,7 +382,7 @@ def end(self) -> None: # pragma: no cover raise NotImplementedError() def terminate(self): - """This method is called when the daemon receives a SIGTERM.""" + """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError() def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover @@ -458,7 +458,7 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None, return None, None def debug_dump(self): - """Called in response to SIGUSR2 by the scheduler.""" + """Get called in response to SIGUSR2 by the scheduler.""" self.log.info( "executor.queued (%d)\n\t%s", len(self.queued_tasks), @@ -472,7 +472,7 @@ def debug_dump(self): ) def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. + """Send callback for execution. Provides a default implementation which sends the callback to the `callback_sink` object. @@ -493,7 +493,7 @@ def get_cli_commands() -> list[GroupCommand]: @classmethod def _get_parser(cls) -> argparse.ArgumentParser: - """This method is used by Sphinx argparse to generate documentation. + """Generate documentation; used by Sphinx argparse. :meta private: """ diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 4a4bda85831a3..73c1149515d0d 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -72,7 +72,7 @@ class ExecutorLoader: @classmethod def get_default_executor_name(cls) -> str: - """Returns the default executor name from Airflow configuration. + """Return the default executor name from Airflow configuration. :return: executor name from Airflow configuration """ @@ -82,7 +82,7 @@ def get_default_executor_name(cls) -> str: @classmethod def get_default_executor(cls) -> BaseExecutor: - """Creates a new instance of the configured executor if none exists and returns it.""" + """Create a new instance of the configured executor if none exists and returns it.""" if cls._default_executor is not None: return cls._default_executor @@ -91,7 +91,7 @@ def get_default_executor(cls) -> BaseExecutor: @classmethod def load_executor(cls, executor_name: str) -> BaseExecutor: """ - Loads the executor. + Load the executor. This supports the following formats: * by executor name for core executor @@ -123,7 +123,7 @@ def import_executor_cls( cls, executor_name: str, validate: bool = True ) -> tuple[type[BaseExecutor], ConnectorSource]: """ - Imports the executor class. + Import the executor class. Supports the same formats as ExecutorLoader.load_executor. @@ -159,7 +159,7 @@ def _import_and_validate(path: str) -> type[BaseExecutor]: @classmethod def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]: """ - Imports the default executor class. + Import the default executor class. :param validate: Whether or not to validate the executor before returning @@ -172,7 +172,8 @@ def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseEx @classmethod @functools.lru_cache(maxsize=None) def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None: - """Validate database and executor compatibility. + """ + Validate database and executor compatibility. Most of the databases work universally, but SQLite can only work with single-threaded executors (e.g. Sequential). diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 7f83f8c7a2550..cf88ca13b2f5e 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -75,7 +75,7 @@ def run(self): def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ - Executes command received and stores result state in queue. + Execute command received and stores result state in queue. :param key: the key to identify the task instance :param command: the command to execute @@ -141,7 +141,7 @@ def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState: @abstractmethod def do_work(self): - """Called in the subprocess and should then execute tasks.""" + """Execute tasks; called in the subprocess.""" raise NotImplementedError() @@ -236,7 +236,7 @@ def __init__(self, executor: LocalExecutor): self.executor: LocalExecutor = executor def start(self) -> None: - """Starts the executor.""" + """Start the executor.""" self.executor.workers_used = 0 self.executor.workers_active = 0 @@ -248,7 +248,7 @@ def execute_async( executor_config: Any | None = None, ) -> None: """ - Executes task asynchronously. + Execute task asynchronously. :param key: the key to identify the task instance :param command: the command to execute @@ -291,7 +291,7 @@ def __init__(self, executor: LocalExecutor): self.queue: Queue[ExecutorWorkType] | None = None def start(self) -> None: - """Starts limited parallelism implementation.""" + """Start limited parallelism implementation.""" if TYPE_CHECKING: assert self.executor.manager assert self.executor.result_queue @@ -315,7 +315,7 @@ def execute_async( executor_config: Any | None = None, ) -> None: """ - Executes task asynchronously. + Execute task asynchronously. :param key: the key to identify the task instance :param command: the command to execute @@ -340,7 +340,11 @@ def sync(self): break def end(self): - """Ends the executor. Sends the poison pill to all workers.""" + """ + End the executor. + + Sends the poison pill to all workers. + """ for _ in self.executor.workers: self.queue.put((None, None)) @@ -349,7 +353,7 @@ def end(self): self.executor.sync() def start(self) -> None: - """Starts the executor.""" + """Start the executor.""" old_proctitle = getproctitle() setproctitle("airflow executor -- LocalExecutor") self.manager = Manager() @@ -389,7 +393,7 @@ def sync(self) -> None: self.impl.sync() def end(self) -> None: - """Ends the executor.""" + """End the executor.""" if TYPE_CHECKING: assert self.impl assert self.manager diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index c1c758a756151..0974813c5a7ad 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -76,7 +76,7 @@ def get_connection(cls, conn_id: str) -> Connection: @classmethod def get_hook(cls, conn_id: str) -> BaseHook: """ - Returns default hook for this connection id. + Return default hook for this connection id. :param conn_id: connection id :return: default hook for this connection @@ -85,7 +85,7 @@ def get_hook(cls, conn_id: str) -> BaseHook: return connection.get_hook() def get_conn(self) -> Any: - """Returns connection for the hook.""" + """Return connection for the hook.""" raise NotImplementedError() @classmethod @@ -144,7 +144,7 @@ def get_ui_field_behaviour(cls): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: """ - Returns dictionary of widgets to be added for the hook to handle extra values. + Return dictionary of widgets to be added for the hook to handle extra values. If you have class hierarchy, usually the widgets needed by your class are already added by the base class, so there is no need to implement this method. It might diff --git a/airflow/hooks/filesystem.py b/airflow/hooks/filesystem.py index d436e0050caa0..a47fba7225cdc 100644 --- a/airflow/hooks/filesystem.py +++ b/airflow/hooks/filesystem.py @@ -43,7 +43,7 @@ class FSHook(BaseHook): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import StringField @@ -52,7 +52,7 @@ def get_connection_form_widgets() -> dict[str, Any]: @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["host", "schema", "port", "login", "password", "extra"], "relabeling": {}, diff --git a/airflow/hooks/package_index.py b/airflow/hooks/package_index.py index 5c940506a1f14..87e830f68a881 100644 --- a/airflow/hooks/package_index.py +++ b/airflow/hooks/package_index.py @@ -40,7 +40,7 @@ def __init__(self, pi_conn_id: str = default_conn_name) -> None: @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema", "port", "extra"], "relabeling": {"host": "Package Index URL"}, @@ -53,7 +53,7 @@ def get_ui_field_behaviour() -> dict[str, Any]: @staticmethod def _get_basic_auth_conn_url(index_url: str, user: str | None, password: str | None) -> str: - """Returns a connection URL with basic auth credentials based on connection config.""" + """Return a connection URL with basic auth credentials based on connection config.""" url = urlparse(index_url) host = url.netloc.split("@")[-1] if user: @@ -64,11 +64,11 @@ def _get_basic_auth_conn_url(index_url: str, user: str | None, password: str | N return url._replace(netloc=host).geturl() def get_conn(self) -> Any: - """Returns connection for the hook.""" + """Return connection for the hook.""" return self.get_connection_url() def get_connection_url(self) -> Any: - """Returns a connection URL with embedded credentials.""" + """Return a connection URL with embedded credentials.""" conn = self.get_connection(self.pi_conn_id) index_url = conn.host if not index_url: diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index af901789e4338..051b4cf66280f 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -100,7 +100,7 @@ def pre_exec(): return SubprocessResult(exit_code=return_code, output=line) def send_sigterm(self): - """Sends SIGTERM signal to ``self.sub_process`` if one exists.""" + """Send SIGTERM signal to ``self.sub_process`` if one exists.""" self.log.info("Sending SIGTERM signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 3d2c20c61253d..2508ba69ab3b7 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -170,7 +170,7 @@ def __init__( def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> None: """ - Updates the counters per state of the tasks that were running. + Update the counters per state of the tasks that were running. Can re-add to tasks to run when required. diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py index fd3060db813c5..611579b239e9c 100644 --- a/airflow/jobs/base_job_runner.py +++ b/airflow/jobs/base_job_runner.py @@ -46,7 +46,9 @@ def __init__(self, job: J) -> None: def _execute(self) -> int | None: """ - Executes the logic connected to the runner. This method should be overridden by subclasses. + Execute the logic connected to the runner. + + This method should be overridden by subclasses. :meta private: :return: return code if available, otherwise None @@ -55,12 +57,16 @@ def _execute(self) -> int | None: @provide_session def heartbeat_callback(self, session: Session = NEW_SESSION) -> None: - """Callback that is called during heartbeat. This method can be overwritten by the runners.""" + """ + Execute callback during heartbeat. + + This method can be overwritten by the runners. + """ @classmethod @provide_session def most_recent_job(cls, session: Session = NEW_SESSION) -> Job | None: - """Returns the most recent job of this type, if any, based on last heartbeat received.""" + """Return the most recent job of this type, if any, based on last heartbeat received.""" from airflow.jobs.job import most_recent_job return most_recent_job(cls.job_type, session=session) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 264eed15aa9f7..f20808868db1d 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -147,7 +147,7 @@ def is_alive(self, grace_multiplier=2.1): @provide_session def kill(self, session: Session = NEW_SESSION) -> NoReturn: - """Handles on_kill callback and updates state in database.""" + """Handle on_kill callback and updates state in database.""" job = session.scalar(select(Job).where(Job.id == self.id).limit(1)) job.end_date = timezone.utcnow() try: @@ -222,7 +222,7 @@ def heartbeat( @provide_session def prepare_for_execution(self, session: Session = NEW_SESSION): - """Prepares the job for execution.""" + """Prepare the job for execution.""" Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1) self.state = JobState.RUNNING self.start_date = timezone.utcnow() @@ -240,7 +240,7 @@ def complete_execution(self, session: Session = NEW_SESSION): @provide_session def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None: - """Returns the most recent job of this type, if any, based on last heartbeat received.""" + """Return the most recent job of this type, if any, based on last heartbeat received.""" return most_recent_job(self.job_type, session=session) @@ -272,7 +272,7 @@ def run_job( job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION ) -> int | None: """ - Runs the job. + Run the job. The Job is always an ORM object and setting the state is happening within the same DB session and the session is kept open throughout the whole execution. @@ -293,7 +293,7 @@ def run_job( def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None: """ - Executes the job. + Execute the job. Job execution requires no session as generally executing session does not require an active database connection. The session might be temporary acquired and used if the job @@ -331,7 +331,7 @@ def perform_heartbeat( job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool ) -> None: """ - Performs heartbeat for the Job passed to it,optionally checking if it is necessary. + Perform heartbeat for the Job passed to it,optionally checking if it is necessary. :param job: job to perform heartbeat for :param heartbeat_callback: callback to run by the heartbeat diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index 6184a3e7fc42e..c142504639328 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -111,13 +111,13 @@ def _execute(self) -> int | None: self.task_runner = get_task_runner(self) def signal_handler(signum, frame): - """Setting kill signal handler.""" + """Set kill signal handler.""" self.log.error("Received SIGTERM. Terminating subprocesses") self.task_runner.terminate() self.handle_task_exit(128 + signum) def segfault_signal_handler(signum, frame): - """Setting sigmentation violation signal handler.""" + """Set sigmentation violation signal handler.""" self.log.critical(SIGSEGV_MESSAGE) self.task_runner.terminate() self.handle_task_exit(128 + signum) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0ed7aae5ceb21..ffcaf4f1a384e 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -240,7 +240,7 @@ def register_signals(self) -> None: signal.signal(signal.SIGUSR2, self._debug_dump) def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None: - """Helper method to clean up processor_agent to avoid leaving orphan processes.""" + """Clean up processor_agent to avoid leaving orphan processes.""" if not _is_parent_process(): # Only the parent process should perform the cleanup. return @@ -905,7 +905,7 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) def _run_scheduler_loop(self) -> None: """ - The actual scheduler loop. + Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop. The main steps in the loop are: #. Harvest DAG parsing results through DagFileProcessorAgent @@ -1021,7 +1021,7 @@ def _run_scheduler_loop(self) -> None: def _do_scheduling(self, session: Session) -> int: """ - This function is where the main scheduling decisions take places. + Make the main scheduling decisions. It: - Creates any necessary DAG runs by examining the next_dagrun_create_after column of DagModel @@ -1378,7 +1378,7 @@ def _schedule_all_dag_runs( dag_runs: Iterable[DagRun], session: Session, ) -> list[tuple[DagRun, DagCallbackRequest | None]]: - """Makes scheduling decisions for all `dag_runs`.""" + """Make scheduling decisions for all `dag_runs`.""" callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] guard.commit() return callback_tuples @@ -1504,7 +1504,7 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") def _send_sla_callbacks_to_processor(self, dag: DAG) -> None: - """Sends SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True.""" + """Send SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True.""" if not settings.CHECK_SLAS: return diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 77fe95ed7f30f..81ac8c178f776 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -294,14 +294,18 @@ def register_signals(self) -> None: @provide_session def is_needed(cls, session) -> bool: """ - Tests if the triggerer job needs to be run (i.e., if there are triggers in the trigger table). + Test if the triggerer job needs to be run (i.e., if there are triggers in the trigger table). This is used for the warning boxes in the UI. """ return session.query(func.count(Trigger.id)).scalar() > 0 def on_kill(self): - """Called when there is an external kill command (via the heartbeat mechanism, for example).""" + """ + Stop the trigger runner. + + Called when there is an external kill command (via the heartbeat mechanism, for example). + """ self.trigger_runner.stop = True def _kill_listener(self): @@ -311,7 +315,7 @@ def _kill_listener(self): self.listener.stop() def _exit_gracefully(self, signum, frame) -> None: - """Helper method to clean up processor_agent to avoid leaving orphan processes.""" + """Clean up processor_agent to avoid leaving orphan processes.""" # The first time, try to exit nicely if not self.trigger_runner.stop: self.log.info("Exiting gracefully upon receiving signal %s", signum) @@ -345,11 +349,7 @@ def _execute(self) -> int | None: return None def _run_trigger_loop(self) -> None: - """ - The main-thread trigger loop. - - This runs synchronously and handles all database reads/writes. - """ + """Run synchronously and handle all database reads/writes; the main-thread trigger loop.""" while not self.trigger_runner.stop: if not self.trigger_runner.is_alive(): self.log.error("Trigger runner thread has died! Exiting.") @@ -386,7 +386,7 @@ def handle_events(self): def handle_failed_triggers(self): """ - Handles "failed" triggers. - ones that errored or exited before they sent an event. + Handle "failed" triggers. - ones that errored or exited before they sent an event. Task Instances that depend on them need failing. """ @@ -454,15 +454,14 @@ def __init__(self): self.job_id = None def run(self): - """Sync entrypoint - just runs arun in an async loop.""" + """Sync entrypoint - just run a run in an async loop.""" asyncio.run(self.arun()) async def arun(self): """ - Main (asynchronous) logic loop. + Run trigger addition/deletion/cleanup; main (asynchronous) logic loop. - The loop in here runs trigger addition/deletion/cleanup. Actual - triggers run in their own separate coroutines. + Actual triggers run in their own separate coroutines. """ watchdog = asyncio.create_task(self.block_watchdog()) last_status = time.time() @@ -639,7 +638,7 @@ def mark_trigger_end(trigger): def update_triggers(self, requested_trigger_ids: set[int]): """ - Called from the main thread to request that we update what triggers we're running. + Request that we update what triggers we're running. Works out the differences - ones to add, and ones to remove - then adds them to the deques so the subthread can actually mutate the running @@ -706,7 +705,7 @@ def set_trigger_logging_metadata(self, ti: TaskInstance, trigger_id, trigger): def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]: """ - Gets a trigger class by its classpath ("path.to.module.classname"). + Get a trigger class by its classpath ("path.to.module.classname"). Uses a cache dictionary to speed up lookups after the first time. """ diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py b/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py index d2e791dbfdf67..393a3ce94fade 100644 --- a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py +++ b/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py @@ -52,7 +52,7 @@ def _disable_verify_ssl() -> None: def _enable_tcp_keepalive() -> None: """ - This function enables TCP keepalive mechanism. + Enable TCP keepalive mechanism. This prevents urllib3 connection to hang indefinitely when idle connection is time-outed on services like cloud load balancers or firewalls. @@ -95,7 +95,7 @@ def get_kube_client( config_file: str | None = None, ) -> client.CoreV1Api: """ - Retrieves Kubernetes client. + Retrieve Kubernetes client. :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py index aaacc8ce45fb2..a4cfda193cc96 100644 --- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py +++ b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py @@ -164,7 +164,7 @@ def __init__( self.extract_xcom = extract_xcom def gen_pod(self) -> k8s.V1Pod: - """Generates pod.""" + """Generate pod.""" warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning) result = self.ud_pod @@ -177,7 +177,7 @@ def gen_pod(self) -> k8s.V1Pod: @staticmethod def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Adds sidecar.""" + """Add sidecar.""" warnings.warn( "This function is deprecated. " "Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead" @@ -193,7 +193,7 @@ def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: @staticmethod def from_obj(obj) -> dict | k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None @@ -227,7 +227,7 @@ def from_obj(obj) -> dict | k8s.V1Pod | None: @staticmethod def from_legacy_obj(obj) -> k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py index 8876556a8d748..df1b78c0252e0 100644 --- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py +++ b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py @@ -216,7 +216,7 @@ def __init__( self.extract_xcom = extract_xcom def gen_pod(self) -> k8s.V1Pod: - """Generates pod.""" + """Generate pod.""" result = None if result is None: @@ -234,7 +234,7 @@ def gen_pod(self) -> k8s.V1Pod: @staticmethod def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Adds sidecar.""" + """Add sidecar.""" pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) @@ -246,7 +246,7 @@ def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: @staticmethod def from_obj(obj) -> k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py b/airflow/kubernetes/pre_7_4_0_compatibility/secret.py index 14295f5c7a89c..b02bbb3dd70e7 100644 --- a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py +++ b/airflow/kubernetes/pre_7_4_0_compatibility/secret.py @@ -65,7 +65,7 @@ def __init__(self, deploy_type, deploy_target, secret, key=None, items=None): self.key = key def to_env_secret(self) -> k8s.V1EnvVar: - """Stores es environment secret.""" + """Store es environment secret.""" return k8s.V1EnvVar( name=self.deploy_target, value_from=k8s.V1EnvVarSource( @@ -74,11 +74,11 @@ def to_env_secret(self) -> k8s.V1EnvVar: ) def to_env_from_secret(self) -> k8s.V1EnvFromSource: - """Reads from environment to secret.""" + """Read from environment to secret.""" return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret)) def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]: - """Converts to volume secret.""" + """Convert to volume secret.""" vol_id = f"secretvol{uuid.uuid4()}" volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret)) if self.items: diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py index 22e7d82c092a6..e22f264fdb4e1 100644 --- a/airflow/lineage/__init__.py +++ b/airflow/lineage/__init__.py @@ -38,7 +38,7 @@ def get_backend() -> LineageBackend | None: - """Gets the lineage backend if defined in the configs.""" + """Get the lineage backend if defined in the configs.""" clazz = conf.getimport("lineage", "backend", fallback=None) if clazz: @@ -99,7 +99,7 @@ def wrapper(self, context, *args, **kwargs): def prepare_lineage(func: T) -> T: """ - Prepares the lineage inlets and outlets. + Prepare the lineage inlets and outlets. Inlets can be: diff --git a/airflow/lineage/backend.py b/airflow/lineage/backend.py index 29a755109c64f..1ccfa78b890be 100644 --- a/airflow/lineage/backend.py +++ b/airflow/lineage/backend.py @@ -35,7 +35,7 @@ def send_lineage( context: dict | None = None, ): """ - Sends lineage metadata to a backend. + Send lineage metadata to a backend. :param operator: the operator executing a transformation on the inlets and outlets :param inlets: the inlets to this operator diff --git a/airflow/listeners/spec/dagrun.py b/airflow/listeners/spec/dagrun.py index d2ae1a6b78cb5..3337f4b9a16ce 100644 --- a/airflow/listeners/spec/dagrun.py +++ b/airflow/listeners/spec/dagrun.py @@ -29,14 +29,14 @@ @hookspec def on_dag_run_running(dag_run: DagRun, msg: str): - """Called when dag run state changes to RUNNING.""" + """Execute when dag run state changes to RUNNING.""" @hookspec def on_dag_run_success(dag_run: DagRun, msg: str): - """Called when dag run state changes to SUCCESS.""" + """Execute when dag run state changes to SUCCESS.""" @hookspec def on_dag_run_failed(dag_run: DagRun, msg: str): - """Called when dag run state changes to FAIL.""" + """Execute when dag run state changes to FAIL.""" diff --git a/airflow/listeners/spec/lifecycle.py b/airflow/listeners/spec/lifecycle.py index 6ab0aa3b5cde1..c5e3bb52e4ddf 100644 --- a/airflow/listeners/spec/lifecycle.py +++ b/airflow/listeners/spec/lifecycle.py @@ -25,7 +25,7 @@ @hookspec def on_starting(component): """ - Called before Airflow component - jobs like scheduler, worker, or task runner starts. + Execute before Airflow component - jobs like scheduler, worker, or task runner starts. It's guaranteed this will be called before any other plugin method. @@ -36,7 +36,7 @@ def on_starting(component): @hookspec def before_stopping(component): """ - Called before Airflow component - jobs like scheduler, worker, or task runner stops. + Execute before Airflow component - jobs like scheduler, worker, or task runner stops. It's guaranteed this will be called after any other plugin method. diff --git a/airflow/listeners/spec/taskinstance.py b/airflow/listeners/spec/taskinstance.py index b87043a99d8f0..03f0a00478931 100644 --- a/airflow/listeners/spec/taskinstance.py +++ b/airflow/listeners/spec/taskinstance.py @@ -34,18 +34,18 @@ def on_task_instance_running( previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None ): - """Called when task state changes to RUNNING. previous_state can be None.""" + """Execute when task state changes to RUNNING. previous_state can be None.""" @hookspec def on_task_instance_success( previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None ): - """Called when task state changes to SUCCESS. previous_state can be None.""" + """Execute when task state changes to SUCCESS. previous_state can be None.""" @hookspec def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None ): - """Called when task state changes to FAIL. previous_state can be None.""" + """Execute when task state changes to FAIL. previous_state can be None."""