Skip to content

Commit

Permalink
D401 Support - airflow/example_dags thru airflow/listeners (#33336)
Browse files Browse the repository at this point in the history
(cherry picked from commit d0c94d6)
  • Loading branch information
ferruzzi authored and ephraimbuddy committed Aug 28, 2023
1 parent 448cf69 commit 85d75a2
Show file tree
Hide file tree
Showing 21 changed files with 94 additions and 84 deletions.
18 changes: 9 additions & 9 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def queue_task_instance(

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in this executor.
Check if a task is either queued or running in this executor.
:param task_instance: TaskInstance
:return: True if the task is known to this executor
Expand Down Expand Up @@ -250,7 +250,7 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTa

def trigger_tasks(self, open_slots: int) -> None:
"""
Initiates async execution of the queued tasks, up to the number of available slots.
Initiate async execution of the queued tasks, up to the number of available slots.
:param open_slots: Number of open slots
"""
Expand Down Expand Up @@ -298,7 +298,7 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:

def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:
"""
Changes state of the task.
Change state of the task.
:param info: Executor information for the task instance
:param key: Unique key for the task instance
Expand Down Expand Up @@ -358,7 +358,7 @@ def execute_async(
executor_config: Any | None = None,
) -> None: # pragma: no cover
"""
This method will execute the command asynchronously.
Execute the command asynchronously.
:param key: Unique key for the task instance
:param command: Command to run
Expand All @@ -369,7 +369,7 @@ def execute_async(

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""
This method can be implemented by any child class to return the task logs.
Return the task logs.
:param ti: A TaskInstance object
:param try_number: current try_number to read log from
Expand All @@ -382,7 +382,7 @@ def end(self) -> None: # pragma: no cover
raise NotImplementedError()

def terminate(self):
"""This method is called when the daemon receives a SIGTERM."""
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError()

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
Expand Down Expand Up @@ -458,7 +458,7 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
return None, None

def debug_dump(self):
"""Called in response to SIGUSR2 by the scheduler."""
"""Get called in response to SIGUSR2 by the scheduler."""
self.log.info(
"executor.queued (%d)\n\t%s",
len(self.queued_tasks),
Expand All @@ -472,7 +472,7 @@ def debug_dump(self):
)

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.
"""Send callback for execution.
Provides a default implementation which sends the callback to the `callback_sink` object.
Expand All @@ -493,7 +493,7 @@ def get_cli_commands() -> list[GroupCommand]:

@classmethod
def _get_parser(cls) -> argparse.ArgumentParser:
"""This method is used by Sphinx argparse to generate documentation.
"""Generate documentation; used by Sphinx argparse.
:meta private:
"""
Expand Down
13 changes: 7 additions & 6 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ExecutorLoader:

@classmethod
def get_default_executor_name(cls) -> str:
"""Returns the default executor name from Airflow configuration.
"""Return the default executor name from Airflow configuration.
:return: executor name from Airflow configuration
"""
Expand All @@ -82,7 +82,7 @@ def get_default_executor_name(cls) -> str:

@classmethod
def get_default_executor(cls) -> BaseExecutor:
"""Creates a new instance of the configured executor if none exists and returns it."""
"""Create a new instance of the configured executor if none exists and returns it."""
if cls._default_executor is not None:
return cls._default_executor

Expand All @@ -91,7 +91,7 @@ def get_default_executor(cls) -> BaseExecutor:
@classmethod
def load_executor(cls, executor_name: str) -> BaseExecutor:
"""
Loads the executor.
Load the executor.
This supports the following formats:
* by executor name for core executor
Expand Down Expand Up @@ -123,7 +123,7 @@ def import_executor_cls(
cls, executor_name: str, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the executor class.
Import the executor class.
Supports the same formats as ExecutorLoader.load_executor.
Expand Down Expand Up @@ -159,7 +159,7 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
@classmethod
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the default executor class.
Import the default executor class.
:param validate: Whether or not to validate the executor before returning
Expand All @@ -172,7 +172,8 @@ def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseEx
@classmethod
@functools.lru_cache(maxsize=None)
def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
"""Validate database and executor compatibility.
"""
Validate database and executor compatibility.
Most of the databases work universally, but SQLite can only work with
single-threaded executors (e.g. Sequential).
Expand Down
22 changes: 13 additions & 9 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run(self):

def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Executes command received and stores result state in queue.
Execute command received and stores result state in queue.
:param key: the key to identify the task instance
:param command: the command to execute
Expand Down Expand Up @@ -141,7 +141,7 @@ def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:

@abstractmethod
def do_work(self):
"""Called in the subprocess and should then execute tasks."""
"""Execute tasks; called in the subprocess."""
raise NotImplementedError()


Expand Down Expand Up @@ -236,7 +236,7 @@ def __init__(self, executor: LocalExecutor):
self.executor: LocalExecutor = executor

def start(self) -> None:
"""Starts the executor."""
"""Start the executor."""
self.executor.workers_used = 0
self.executor.workers_active = 0

Expand All @@ -248,7 +248,7 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""
Executes task asynchronously.
Execute task asynchronously.
:param key: the key to identify the task instance
:param command: the command to execute
Expand Down Expand Up @@ -291,7 +291,7 @@ def __init__(self, executor: LocalExecutor):
self.queue: Queue[ExecutorWorkType] | None = None

def start(self) -> None:
"""Starts limited parallelism implementation."""
"""Start limited parallelism implementation."""
if TYPE_CHECKING:
assert self.executor.manager
assert self.executor.result_queue
Expand All @@ -315,7 +315,7 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""
Executes task asynchronously.
Execute task asynchronously.
:param key: the key to identify the task instance
:param command: the command to execute
Expand All @@ -340,7 +340,11 @@ def sync(self):
break

def end(self):
"""Ends the executor. Sends the poison pill to all workers."""
"""
End the executor.
Sends the poison pill to all workers.
"""
for _ in self.executor.workers:
self.queue.put((None, None))

Expand All @@ -349,7 +353,7 @@ def end(self):
self.executor.sync()

def start(self) -> None:
"""Starts the executor."""
"""Start the executor."""
old_proctitle = getproctitle()
setproctitle("airflow executor -- LocalExecutor")
self.manager = Manager()
Expand Down Expand Up @@ -389,7 +393,7 @@ def sync(self) -> None:
self.impl.sync()

def end(self) -> None:
"""Ends the executor."""
"""End the executor."""
if TYPE_CHECKING:
assert self.impl
assert self.manager
Expand Down
6 changes: 3 additions & 3 deletions airflow/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_connection(cls, conn_id: str) -> Connection:
@classmethod
def get_hook(cls, conn_id: str) -> BaseHook:
"""
Returns default hook for this connection id.
Return default hook for this connection id.
:param conn_id: connection id
:return: default hook for this connection
Expand All @@ -85,7 +85,7 @@ def get_hook(cls, conn_id: str) -> BaseHook:
return connection.get_hook()

def get_conn(self) -> Any:
"""Returns connection for the hook."""
"""Return connection for the hook."""
raise NotImplementedError()

@classmethod
Expand Down Expand Up @@ -144,7 +144,7 @@ def get_ui_field_behaviour(cls):
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""
Returns dictionary of widgets to be added for the hook to handle extra values.
Return dictionary of widgets to be added for the hook to handle extra values.
If you have class hierarchy, usually the widgets needed by your class are already
added by the base class, so there is no need to implement this method. It might
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class FSHook(BaseHook):
"""
Allows for interaction with an file server.
Allow for interaction with an file server.
Connection should have a name and a path specified under extra:
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def pre_exec():
return SubprocessResult(exit_code=return_code, output=line)

def send_sigterm(self):
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
"""Send SIGTERM signal to ``self.sub_process`` if one exists."""
self.log.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def __init__(

def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> None:
"""
Updates the counters per state of the tasks that were running.
Update the counters per state of the tasks that were running.
Can re-add to tasks to run when required.
Expand Down
12 changes: 9 additions & 3 deletions airflow/jobs/base_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def __init__(self, job: J) -> None:

def _execute(self) -> int | None:
"""
Executes the logic connected to the runner. This method should be overridden by subclasses.
Execute the logic connected to the runner.
This method should be overridden by subclasses.
:meta private:
:return: return code if available, otherwise None
Expand All @@ -55,12 +57,16 @@ def _execute(self) -> int | None:

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
"""Callback that is called during heartbeat. This method can be overwritten by the runners."""
"""
Execute callback during heartbeat.
This method can be overwritten by the runners.
"""

@classmethod
@provide_session
def most_recent_job(cls, session: Session = NEW_SESSION) -> Job | None:
"""Returns the most recent job of this type, if any, based on last heartbeat received."""
"""Return the most recent job of this type, if any, based on last heartbeat received."""
from airflow.jobs.job import most_recent_job

return most_recent_job(cls.job_type, session=session)
12 changes: 6 additions & 6 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def is_alive(self, grace_multiplier=2.1):

@provide_session
def kill(self, session: Session = NEW_SESSION) -> NoReturn:
"""Handles on_kill callback and updates state in database."""
"""Handle on_kill callback and updates state in database."""
job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
job.end_date = timezone.utcnow()
try:
Expand Down Expand Up @@ -222,7 +222,7 @@ def heartbeat(

@provide_session
def prepare_for_execution(self, session: Session = NEW_SESSION):
"""Prepares the job for execution."""
"""Prepare the job for execution."""
Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
self.state = JobState.RUNNING
self.start_date = timezone.utcnow()
Expand All @@ -240,7 +240,7 @@ def complete_execution(self, session: Session = NEW_SESSION):

@provide_session
def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None:
"""Returns the most recent job of this type, if any, based on last heartbeat received."""
"""Return the most recent job of this type, if any, based on last heartbeat received."""
return most_recent_job(self.job_type, session=session)


Expand Down Expand Up @@ -272,7 +272,7 @@ def run_job(
job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
) -> int | None:
"""
Runs the job.
Run the job.
The Job is always an ORM object and setting the state is happening within the
same DB session and the session is kept open throughout the whole execution.
Expand All @@ -293,7 +293,7 @@ def run_job(

def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None:
"""
Executes the job.
Execute the job.
Job execution requires no session as generally executing session does not require an
active database connection. The session might be temporary acquired and used if the job
Expand Down Expand Up @@ -331,7 +331,7 @@ def perform_heartbeat(
job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
) -> None:
"""
Performs heartbeat for the Job passed to it,optionally checking if it is necessary.
Perform heartbeat for the Job passed to it,optionally checking if it is necessary.
:param job: job to perform heartbeat for
:param heartbeat_callback: callback to run by the heartbeat
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ def _execute(self) -> int | None:
self.task_runner = get_task_runner(self)

def signal_handler(signum, frame):
"""Setting kill signal handler."""
"""Set kill signal handler."""
self.log.error("Received SIGTERM. Terminating subprocesses")
self.task_runner.terminate()
self.handle_task_exit(128 + signum)

def segfault_signal_handler(signum, frame):
"""Setting sigmentation violation signal handler."""
"""Set sigmentation violation signal handler."""
self.log.critical(SIGSEGV_MESSAGE)
self.task_runner.terminate()
self.handle_task_exit(128 + signum)
Expand Down
Loading

0 comments on commit 85d75a2

Please sign in to comment.