From 2debf6deb61d6d628b63bad6c51ab24fa61b978c Mon Sep 17 00:00:00 2001 From: "D. Ferruzzi" Date: Sat, 12 Aug 2023 11:53:19 -0700 Subject: [PATCH] D401 Support - Macros to Operators (Inclusive) (#33337) * D401 Support - airflow/macros thru airflow/operators * fix static checks --- airflow/metrics/otel_logger.py | 18 ++++---- airflow/metrics/statsd_logger.py | 2 +- airflow/metrics/validators.py | 2 +- airflow/migrations/utils.py | 2 +- airflow/models/abstractoperator.py | 6 ++- airflow/models/baseoperator.py | 32 +++++++------- airflow/models/connection.py | 16 +++---- airflow/models/dag.py | 50 +++++++++++----------- airflow/models/dagbag.py | 8 ++-- airflow/models/dagcode.py | 12 +++--- airflow/models/dagrun.py | 22 +++++----- airflow/models/mappedoperator.py | 6 +-- airflow/models/param.py | 6 +-- airflow/models/serialized_dag.py | 12 +++--- airflow/models/skipmixin.py | 6 +-- airflow/models/taskinstance.py | 44 +++++++++---------- airflow/models/taskinstancekey.py | 2 +- airflow/models/taskmixin.py | 12 +++--- airflow/models/taskreschedule.py | 4 +- airflow/models/trigger.py | 8 ++-- airflow/models/variable.py | 8 ++-- airflow/models/xcom.py | 10 ++--- airflow/models/xcom_arg.py | 12 +++--- airflow/notifications/basenotifier.py | 2 +- airflow/operators/bash.py | 2 +- airflow/operators/datetime.py | 2 +- airflow/operators/python.py | 10 ++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 8 ++-- 29 files changed, 165 insertions(+), 161 deletions(-) diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 3e4701ca607795..8092fbe76e357b 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -87,7 +87,7 @@ def _generate_key_name(name: str, attributes: Attributes = None): def name_is_otel_safe(prefix: str, name: str) -> bool: """ - Returns True if the provided name and prefix would result in a name that meets the OpenTelemetry standard. + Return True if the provided name and prefix would result in a name that meets the OpenTelemetry standard. Legal names are defined here: https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax @@ -110,7 +110,7 @@ def _type_as_str(obj: Instrument) -> str: def _get_otel_safe_name(name: str) -> str: """ - Verifies that the provided name does not exceed OpenTelemetry's maximum length for metric names. + Verify that the provided name does not exceed OpenTelemetry's maximum length for metric names. :param name: The original metric name :returns: The name, truncated to an OTel-acceptable length if required. @@ -290,7 +290,7 @@ def clear(self) -> None: self.map.clear() def _create_counter(self, name): - """Creates a new counter or up_down_counter for the provided name.""" + """Create a new counter or up_down_counter for the provided name.""" otel_safe_name = _get_otel_safe_name(name) if _is_up_down_counter(name): @@ -303,7 +303,7 @@ def _create_counter(self, name): def get_counter(self, name: str, attributes: Attributes = None): """ - Returns the counter; creates a new one if it did not exist. + Return the counter; creates a new one if it did not exist. :param name: The name of the counter to fetch or create. :param attributes: Counter attributes, used to generate a unique key to store the counter. @@ -315,7 +315,7 @@ def get_counter(self, name: str, attributes: Attributes = None): def del_counter(self, name: str, attributes: Attributes = None) -> None: """ - Deletes a counter. + Delete a counter. :param name: The name of the counter to delete. :param attributes: Counter attributes which were used to generate a unique key to store the counter. @@ -326,7 +326,7 @@ def del_counter(self, name: str, attributes: Attributes = None) -> None: def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Attributes): """ - Overrides the last reading for a Gauge with a new value. + Override the last reading for a Gauge with a new value. :param name: The name of the gauge to record. :param value: The new reading to record. @@ -344,7 +344,7 @@ def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Att def _create_gauge(self, name: str, attributes: Attributes = None): """ - Creates a new Observable Gauge with the provided name and the default value. + Create a new Observable Gauge with the provided name and the default value. :param name: The name of the gauge to fetch or create. :param attributes: Gauge attributes, used to generate a unique key to store the gauge. @@ -361,12 +361,12 @@ def _create_gauge(self, name: str, attributes: Attributes = None): return gauge def read_gauge(self, key: str, *args) -> Iterable[Observation]: - """Callback for the Observable Gauges, returns the Observation for the provided key.""" + """Return the Observation for the provided key; callback for the Observable Gauges.""" yield self.map[key] def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues: """ - Returns the value of the gauge; creates a new one with the default value if it did not exist. + Return the value of the gauge; creates a new one with the default value if it did not exist. :param name: The name of the gauge to fetch or create. :param attributes: Gauge attributes, used to generate a unique key to store the gauge. diff --git a/airflow/metrics/statsd_logger.py b/airflow/metrics/statsd_logger.py index f54e00f488982a..2e69d6e60590cb 100644 --- a/airflow/metrics/statsd_logger.py +++ b/airflow/metrics/statsd_logger.py @@ -150,7 +150,7 @@ def timer( def get_statsd_logger(cls) -> SafeStatsdLogger: - """Returns logger for StatsD.""" + """Return logger for StatsD.""" # no need to check for the scheduler/statsd_on -> this method is only called when it is set # and previously it would crash with None is callable if it was called without it. from statsd import StatsClient diff --git a/airflow/metrics/validators.py b/airflow/metrics/validators.py index 0fd5fd1adef8cd..501229578d2b0a 100644 --- a/airflow/metrics/validators.py +++ b/airflow/metrics/validators.py @@ -107,7 +107,7 @@ def stat_name_otel_handler( max_length: int = OTEL_NAME_MAX_LENGTH, ) -> str: """ - Verifies that a proposed prefix and name combination will meet OpenTelemetry naming standards. + Verify that a proposed prefix and name combination will meet OpenTelemetry naming standards. See: https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py index a5a65c6745a59f..bc31c8f70c5edd 100644 --- a/airflow/migrations/utils.py +++ b/airflow/migrations/utils.py @@ -24,7 +24,7 @@ def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str, list[str]]]: """ - Returns the primary and unique constraint along with column name. + Return the primary and unique constraint along with column name. Some tables like `task_instance` are missing the primary key constraint name and the name is auto-generated by the SQL server, so this function diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 458d55ef5da123..ba357c0bd1bbf9 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -463,7 +463,8 @@ def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None: @cache def get_parse_time_mapped_ti_count(self) -> int: - """Number of mapped task instances that can be created on DAG run creation. + """ + Return the number of mapped task instances that can be created on DAG run creation. This only considers literal mapped arguments, and would return *None* when any non-literal values are used for mapping. @@ -479,7 +480,8 @@ def get_parse_time_mapped_ti_count(self) -> int: return group.get_parse_time_mapped_ti_count() def get_mapped_ti_count(self, run_id: str, *, session: Session) -> int: - """Number of mapped TaskInstances that can be created at run time. + """ + Return the number of mapped TaskInstances that can be created at run time. This considers both literal and non-literal mapped arguments, and the result is therefore available when all depended tasks have finished. The diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 45462bf7261991..47e6c07ee20665 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -991,7 +991,7 @@ def __hash__(self): # including lineage information def __or__(self, other): """ - Called for [This Operator] | [Operator]. + Return [This Operator] | [Operator]. The inlets of other will be set to pick up the outlets from this operator. Other will be set as a downstream task of this operator. @@ -1010,7 +1010,7 @@ def __or__(self, other): def __gt__(self, other): """ - Called for [Operator] > [Outlet]. + Return [Operator] > [Outlet]. If other is an attr annotated object it is set as an outlet of this Operator. """ @@ -1026,7 +1026,7 @@ def __gt__(self, other): def __lt__(self, other): """ - Called for [Inlet] > [Operator] or [Operator] < [Inlet]. + Return [Inlet] > [Operator] or [Operator] < [Inlet]. If other is an attr annotated object it is set as an inlet to this operator. """ @@ -1054,22 +1054,22 @@ def __setattr__(self, key, value): self.set_xcomargs_dependencies() def add_inlets(self, inlets: Iterable[Any]): - """Sets inlets to this operator.""" + """Set inlets to this operator.""" self.inlets.extend(inlets) def add_outlets(self, outlets: Iterable[Any]): - """Defines the outlets of this operator.""" + """Define the outlets of this operator.""" self.outlets.extend(outlets) def get_inlet_defs(self): - """Gets inlet definitions on this task. + """Get inlet definitions on this task. :meta private: """ return self.inlets def get_outlet_defs(self): - """Gets outlet definitions on this task. + """Get outlet definitions on this task. :meta private: """ @@ -1109,7 +1109,7 @@ def dag(self, dag: DAG | None): self._dag = dag def has_dag(self): - """Returns True if the Operator has been assigned to a DAG.""" + """Return True if the Operator has been assigned to a DAG.""" return self._dag is not None deps: frozenset[BaseTIDep] = frozenset( @@ -1134,7 +1134,7 @@ def prepare_for_execution(self) -> BaseOperator: def set_xcomargs_dependencies(self) -> None: """ - Resolves upstream dependencies of a task. + Resolve upstream dependencies of a task. In this way passing an ``XComArg`` as value for a template field will result in creating upstream relation between two tasks. @@ -1163,13 +1163,13 @@ def set_xcomargs_dependencies(self) -> None: @prepare_lineage def pre_execute(self, context: Any): - """This hook is triggered right before self.execute() is called.""" + """Execute right before self.execute() is called.""" if self._pre_execute_hook is not None: self._pre_execute_hook(context) def execute(self, context: Context) -> Any: """ - This is the main method to derive when creating an operator. + Derive when creating an operator. Context is the same dictionary used as when rendering jinja templates. @@ -1180,7 +1180,7 @@ def execute(self, context: Context) -> Any: @apply_lineage def post_execute(self, context: Any, result: Any = None): """ - This hook is triggered right after self.execute() is called. + Execute right after self.execute() is called. It is passed the execution context and any results returned by the operator. """ @@ -1252,7 +1252,7 @@ def clear( downstream: bool = False, session: Session = NEW_SESSION, ): - """Clears the state of task instances associated with the task, following the parameters specified.""" + """Clear the state of task instances associated with the task, following the parameters specified.""" qry = select(TaskInstance).where(TaskInstance.dag_id == self.dag_id) if start_date: @@ -1355,7 +1355,7 @@ def run( ) def dry_run(self) -> None: - """Performs dry run for the operator - just render template fields.""" + """Perform dry run for the operator - just render template fields.""" self.log.info("Dry run") for field in self.template_fields: try: @@ -1563,7 +1563,7 @@ def get_serialized_fields(cls): return cls.__serialized_fields def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: - """Required by DAGNode.""" + """Serialize; required by DAGNode.""" return DagAttributeTypes.OP, self.task_id @property @@ -1837,7 +1837,7 @@ def cross_downstream( def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]): """ - Helper to simplify task dependency definition. + Simplify task dependency definition. E.g.: suppose you want precedence like so:: diff --git a/airflow/models/connection.py b/airflow/models/connection.py index fe514cdf48c71b..e0ba22edd79f2b 100644 --- a/airflow/models/connection.py +++ b/airflow/models/connection.py @@ -39,7 +39,7 @@ def parse_netloc_to_hostname(*args, **kwargs): - """This method is deprecated.""" + """Do not use, this method is deprecated.""" warnings.warn("This method is deprecated.", RemovedInAirflow3Warning) return _parse_netloc_to_hostname(*args, **kwargs) @@ -142,7 +142,7 @@ def __init__( @staticmethod def _validate_extra(extra, conn_id) -> None: """ - Here we verify that ``extra`` is a JSON-encoded Python dict. + Verify that ``extra`` is a JSON-encoded Python dict. From Airflow 3.0, we should no longer suppress these errors but raise instead. """ @@ -173,7 +173,7 @@ def on_db_load(self): mask_secret(self.password) def parse_from_uri(self, **uri): - """This method is deprecated. Please use uri parameter in constructor.""" + """Use uri parameter in constructor, this method is deprecated.""" warnings.warn( "This method is deprecated. Please use uri parameter in constructor.", RemovedInAirflow3Warning, @@ -219,7 +219,7 @@ def _parse_from_uri(self, uri: str): @staticmethod def _create_host(protocol, host) -> str | None: - """Returns the connection host with the protocol.""" + """Return the connection host with the protocol.""" if not host: return host if protocol: @@ -378,9 +378,9 @@ def __repr__(self): def log_info(self): """ - This method is deprecated. + Read each field individually or use the default representation (`__repr__`). - You can read each field individually or use the default representation (`__repr__`). + This method is deprecated. """ warnings.warn( "This method is deprecated. You can read each field individually or " @@ -396,9 +396,9 @@ def log_info(self): def debug_info(self): """ - This method is deprecated. + Read each field individually or use the default representation (`__repr__`). - You can read each field individually or use the default representation (`__repr__`). + This method is deprecated. """ warnings.warn( "This method is deprecated. You can read each field individually or " diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 0371ed9192c4df..28dfca48eba7d5 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -219,7 +219,7 @@ def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timet def get_last_dagrun(dag_id, session, include_externally_triggered=False): """ - Returns the last dag run for a dag, None if there was none. + Return the last dag run for a dag, None if there was none. Last dag run can be any type of run e.g. scheduled or backfilled. Overridden DagRuns are ignored. @@ -825,7 +825,7 @@ def is_fixed_time_schedule(self): def following_schedule(self, dttm): """ - Calculates the following schedule for this dag in UTC. + Calculate the following schedule for this dag in UTC. :param dttm: utc datetime :return: utc datetime @@ -1103,7 +1103,7 @@ def iter_dagrun_infos_between( def get_run_dates(self, start_date, end_date=None) -> list: """ - Returns a list of dates between the interval received as parameter using this dag's schedule interval. + Return a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates. @@ -1318,7 +1318,7 @@ def allow_future_exec_dates(self) -> bool: @provide_session def get_concurrency_reached(self, session=NEW_SESSION) -> bool: - """Returns a boolean indicating whether the max_active_tasks limit for this DAG has been reached.""" + """Return a boolean indicating whether the max_active_tasks limit for this DAG has been reached.""" TI = TaskInstance total_tasks = session.scalar( select(func.count(TI.task_id)).where( @@ -1330,7 +1330,7 @@ def get_concurrency_reached(self, session=NEW_SESSION) -> bool: @property def concurrency_reached(self): - """This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.""" + """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated.""" warnings.warn( "This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.", RemovedInAirflow3Warning, @@ -1340,17 +1340,17 @@ def concurrency_reached(self): @provide_session def get_is_active(self, session=NEW_SESSION) -> None: - """Returns a boolean indicating whether this DAG is active.""" + """Return a boolean indicating whether this DAG is active.""" return session.scalar(select(DagModel.is_active).where(DagModel.dag_id == self.dag_id)) @provide_session def get_is_paused(self, session=NEW_SESSION) -> None: - """Returns a boolean indicating whether this DAG is paused.""" + """Return a boolean indicating whether this DAG is paused.""" return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id == self.dag_id)) @property def is_paused(self): - """This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method.""" + """Use `airflow.models.DAG.get_is_paused`, this attribute is deprecated.""" warnings.warn( "This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method.", RemovedInAirflow3Warning, @@ -1408,7 +1408,7 @@ def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION def get_active_runs(self): """ - Returns a list of dag run execution dates currently running. + Return a list of dag run execution dates currently running. :return: List of execution dates """ @@ -1423,7 +1423,7 @@ def get_active_runs(self): @provide_session def get_num_active_runs(self, external_trigger=None, only_running=True, session=NEW_SESSION): """ - Returns the number of active "running" dag runs. + Return the number of active "running" dag runs. :param external_trigger: True for externally triggered active dag runs :param session: @@ -1450,7 +1450,7 @@ def get_dagrun( session: Session = NEW_SESSION, ): """ - Returns the dag run for a given execution date or run_id if it exists, otherwise none. + Return the dag run for a given execution date or run_id if it exists, otherwise none. :param execution_date: The execution date of the DagRun to find. :param run_id: The run_id of the DagRun to find. @@ -1469,7 +1469,7 @@ def get_dagrun( @provide_session def get_dagruns_between(self, start_date, end_date, session=NEW_SESSION): """ - Returns the list of dag runs between start_date (inclusive) and end_date (inclusive). + Return the list of dag runs between start_date (inclusive) and end_date (inclusive). :param start_date: The starting execution date of the DagRun to find. :param end_date: The ending execution date of the DagRun to find. @@ -1488,12 +1488,12 @@ def get_dagruns_between(self, start_date, end_date, session=NEW_SESSION): @provide_session def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum.DateTime | None: - """Returns the latest date for which at least one dag run exists.""" + """Return the latest date for which at least one dag run exists.""" return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id)) @property def latest_execution_date(self): - """This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.""" + """Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated.""" warnings.warn( "This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.", RemovedInAirflow3Warning, @@ -1503,7 +1503,7 @@ def latest_execution_date(self): @property def subdags(self): - """Returns a list of the subdag objects associated to this DAG.""" + """Return a list of the subdag objects associated to this DAG.""" # Check SubDag for class but don't check class directly from airflow.operators.subdag import SubDagOperator @@ -2147,7 +2147,7 @@ def clear( exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), ) -> int | Iterable[TaskInstance]: """ - Clears a set of task instances associated with the current dag for a specified date range. + Clear a set of task instances associated with the current dag for a specified date range. :param task_ids: List of task ids or (``task_id``, ``map_index``) tuples to clear :param start_date: The minimum execution_date to clear @@ -2312,7 +2312,7 @@ def __deepcopy__(self, memo): return result def sub_dag(self, *args, **kwargs): - """This method is deprecated in favor of partial_subset.""" + """Use `airflow.models.DAG.partial_subset`, this method is deprecated.""" warnings.warn( "This method is deprecated and will be removed in a future version. Please use partial_subset", RemovedInAirflow3Warning, @@ -2595,7 +2595,7 @@ def run( disable_retry=False, ): """ - Runs the DAG. + Run the DAG. :param start_date: the start date of the range to run :param end_date: the end date of the range to run @@ -2761,7 +2761,7 @@ def create_dagrun( data_interval: tuple[datetime, datetime] | None = None, ): """ - Creates a dag run from this dag including the tasks associated with this dag. + Create a dag run from this dag including the tasks associated with this dag. Returns the dag run. @@ -2867,7 +2867,7 @@ def bulk_sync_to_db( dags: Collection[DAG], session=NEW_SESSION, ): - """This method is deprecated in favor of bulk_write_to_db.""" + """Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated.""" warnings.warn( "This method is deprecated and will be removed in a future version. Please use bulk_write_to_db", RemovedInAirflow3Warning, @@ -3122,7 +3122,7 @@ def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION): self.bulk_write_to_db([self], processor_subdir=processor_subdir, session=session) def get_default_view(self): - """This is only there for backward compatible jinja2 templates.""" + """Allow backward compatible jinja2 templates.""" if self.default_view is None: return airflow_conf.get("webserver", "dag_default_view").lower() else: @@ -3171,7 +3171,7 @@ def deactivate_stale_dags(expiration_date, session=NEW_SESSION): @provide_session def get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION) -> int: """ - Returns the number of task instances in the given DAG. + Return the number of task instances in the given DAG. :param session: ORM session :param dag_id: ID of the DAG to get the task concurrency of @@ -3249,7 +3249,7 @@ def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeI def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: EdgeInfoType): """ - Sets the given edge information on the DAG. + Set the given edge information on the DAG. Note that this will overwrite, rather than merge with, existing info. """ @@ -3257,7 +3257,7 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed def validate_schedule_and_params(self): """ - Validates Param values when the schedule_interval is not None. + Validate Param values when the schedule_interval is not None. Raise exception if there are any Params in the DAG which neither have a default value nor have the null in schema['type'] list, but the DAG have a schedule_interval which is not None. @@ -3274,7 +3274,7 @@ def validate_schedule_and_params(self): def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]: """ - Parses a given link, and verifies if it's a valid URL, or a 'mailto' link. + Parse a given link, and verifies if it's a valid URL, or a 'mailto' link. Returns an iterator of invalid (owner, link) pairs. """ diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index a8f5b4d6fc28c8..8608bcd13824da 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -174,7 +174,7 @@ def dag_ids(self) -> list[str]: @provide_session def get_dag(self, dag_id, session: Session = None): """ - Gets the DAG out of the dictionary, and refreshes it if expired. + Get the DAG out of the dictionary, and refreshes it if expired. :param dag_id: DAG ID """ @@ -453,7 +453,7 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk): def bag_dag(self, dag, root_dag): """ - Adds the DAG into the bag, recurses into sub dags. + Add the DAG into the bag, recurses into sub dags. :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags. :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag. @@ -568,7 +568,7 @@ def collect_dags( self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True) def collect_dags_from_db(self): - """Collects DAGs from database.""" + """Collect DAGs from database.""" from airflow.models.serialized_dag import SerializedDagModel with Stats.timer("collect_db_dags"): @@ -588,7 +588,7 @@ def collect_dags_from_db(self): self.dags.update(subdags) def dagbag_report(self): - """Prints a report around DagBag loading stats.""" + """Print a report around DagBag loading stats.""" stats = self.dagbag_stats dag_folder = self.dag_folder duration = sum((o.duration for o in stats), timedelta()).total_seconds() diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index 61b007bf6ce15e..206c97e08f29a9 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -61,7 +61,7 @@ def __init__(self, full_filepath: str, source_code: str | None = None): @provide_session def sync_to_db(self, session: Session = NEW_SESSION) -> None: - """Writes code into database. + """Write code into database. :param session: ORM Session """ @@ -70,7 +70,7 @@ def sync_to_db(self, session: Session = NEW_SESSION) -> None: @classmethod @provide_session def bulk_sync_to_db(cls, filelocs: Iterable[str], session: Session = NEW_SESSION) -> None: - """Writes code in bulk into database. + """Write code in bulk into database. :param filelocs: file paths of DAGs to sync :param session: ORM Session @@ -126,7 +126,7 @@ def bulk_sync_to_db(cls, filelocs: Iterable[str], session: Session = NEW_SESSION @classmethod @provide_session def remove_deleted_code(cls, alive_dag_filelocs: Collection[str], session: Session = NEW_SESSION) -> None: - """Deletes code not included in alive_dag_filelocs. + """Delete code not included in alive_dag_filelocs. :param alive_dag_filelocs: file paths of alive DAGs :param session: ORM Session @@ -144,7 +144,7 @@ def remove_deleted_code(cls, alive_dag_filelocs: Collection[str], session: Sessi @classmethod @provide_session def has_dag(cls, fileloc: str, session: Session = NEW_SESSION) -> bool: - """Checks a file exist in dag_code table. + """Check a file exist in dag_code table. :param fileloc: the file to check :param session: ORM Session @@ -157,7 +157,7 @@ def has_dag(cls, fileloc: str, session: Session = NEW_SESSION) -> bool: @classmethod def get_code_by_fileloc(cls, fileloc: str) -> str: - """Returns source code for a given fileloc. + """Return source code for a given fileloc. :param fileloc: file path of a DAG :return: source code as string @@ -166,7 +166,7 @@ def get_code_by_fileloc(cls, fileloc: str) -> str: @classmethod def code(cls, fileloc) -> str: - """Returns source code for this DagCode object. + """Return source code for this DagCode object. :return: source code as string """ diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index a6e1da6a0ec19f..923d6f3d8af1a6 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -91,7 +91,7 @@ class TISchedulingDecision(NamedTuple): def _creator_note(val): - """Custom creator for the ``note`` association proxy.""" + """Creator the ``note`` association proxy.""" if isinstance(val, str): return DagRunNote(content=val) elif isinstance(val, dict): @@ -281,7 +281,7 @@ def state(self): @provide_session def refresh_from_db(self, session: Session = NEW_SESSION) -> None: """ - Reloads the current dagrun from the database. + Reload the current dagrun from the database. :param session: database session """ @@ -380,7 +380,7 @@ def find( execution_end_date: datetime | None = None, ) -> list[DagRun]: """ - Returns a set of dag runs for the given search criteria. + Return a set of dag runs for the given search criteria. :param dag_id: the dag_id or list of dag_id to find dag runs for :param run_id: defines the run id for this dag run @@ -462,7 +462,7 @@ def get_task_instances( state: Iterable[TaskInstanceState | None] | None = None, session: Session = NEW_SESSION, ) -> list[TI]: - """Returns the task instances for this dag run.""" + """Return the task instances for this dag run.""" tis = ( select(TI) .options(joinedload(TI.dag_run)) @@ -499,7 +499,7 @@ def get_task_instance( map_index: int = -1, ) -> TI | None: """ - Returns the task instance specified by task_id for this dag run. + Return the task instance specified by task_id for this dag run. :param task_id: the task id :param session: Sqlalchemy ORM Session @@ -510,7 +510,7 @@ def get_task_instance( def get_dag(self) -> DAG: """ - Returns the Dag associated with this DagRun. + Return the Dag associated with this DagRun. :return: DAG """ @@ -523,7 +523,7 @@ def get_dag(self) -> DAG: def get_previous_dagrun( self, state: DagRunState | None = None, session: Session = NEW_SESSION ) -> DagRun | None: - """The previous DagRun, if there is one.""" + """Return the previous DagRun, if there is one.""" filters = [ DagRun.dag_id == self.dag_id, DagRun.execution_date < self.execution_date, @@ -534,7 +534,7 @@ def get_previous_dagrun( @provide_session def get_previous_scheduled_dagrun(self, session: Session = NEW_SESSION) -> DagRun | None: - """The previous, SCHEDULED DagRun, if there is one.""" + """Return the previous SCHEDULED DagRun, if there is one.""" return session.scalar( select(DagRun) .where( @@ -575,7 +575,7 @@ def update_state( self, session: Session = NEW_SESSION, execute_callbacks: bool = True ) -> tuple[list[TI], DagCallbackRequest | None]: """ - Determines the overall state of the DagRun based on the state of its TaskInstances. + Determine the overall state of the DagRun based on the state of its TaskInstances. :param session: Sqlalchemy ORM Session :param execute_callbacks: Should dag callbacks (success/failure, SLA etc.) be invoked @@ -973,7 +973,7 @@ def _emit_duration_stats_for_finished_state(self): @provide_session def verify_integrity(self, *, session: Session = NEW_SESSION) -> None: """ - Verifies the DagRun by checking for removed tasks or tasks that are not in the database yet. + Verify the DagRun by checking for removed tasks or tasks that are not in the database yet. It will set state to removed or add the task if required. @@ -1297,7 +1297,7 @@ def is_backfill(self) -> bool: @classmethod @provide_session def get_latest_runs(cls, session: Session = NEW_SESSION) -> list[DagRun]: - """Returns the latest DagRun for each DAG.""" + """Return the latest DagRun for each DAG.""" subquery = ( select(cls.dag_id, func.max(cls.execution_date).label("execution_date")) .group_by(cls.dag_id) diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 82dcc82aa0602a..cbdc8ca0563451 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -561,18 +561,18 @@ def doc_rst(self) -> str | None: return self.partial_kwargs.get("doc_rst") def get_dag(self) -> DAG | None: - """Implementing Operator.""" + """Implement Operator.""" return self.dag @property def output(self) -> XComArg: - """Returns reference to XCom pushed by current operator.""" + """Return reference to XCom pushed by current operator.""" from airflow.models.xcom_arg import XComArg return XComArg(operator=self) def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: - """Implementing DAGNode.""" + """Implement DAGNode.""" return DagAttributeTypes.OP, self.task_id def _expand_mapped_kwargs(self, context: Context, session: Session) -> tuple[Mapping[str, Any], set[int]]: diff --git a/airflow/models/param.py b/airflow/models/param.py index 5bb1db3d4412e3..82a49f715d8ac5 100644 --- a/airflow/models/param.py +++ b/airflow/models/param.py @@ -100,7 +100,7 @@ def _warn_if_not_rfc3339_dt(value): def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any: """ - Runs the validations and returns the Param's final value. + Run the validations and returns the Param's final value. May raise ValueError on failed validations, or TypeError if no value is passed and no value already exists. @@ -262,11 +262,11 @@ def update(self, *args, **kwargs) -> None: super().update(*args, **kwargs) def dump(self) -> dict[str, Any]: - """Dumps the ParamsDict object as a dictionary, while suppressing exceptions.""" + """Dump the ParamsDict object as a dictionary, while suppressing exceptions.""" return {k: v.resolve(suppress_exception=True) for k, v in self.items()} def validate(self) -> dict[str, Any]: - """Validates & returns all the Params object stored in the dictionary.""" + """Validate & returns all the Params object stored in the dictionary.""" resolved_dict = {} try: for k, v in self.items(): diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 82d4c6e31c3cb1..62a56cf7734e4e 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -129,7 +129,7 @@ def write_dag( session: Session = NEW_SESSION, ) -> bool: """ - Serializes a DAG and writes it into database. + Serialize a DAG and writes it into database. If the record already exists, it checks if the Serialized DAG changed or not. If it is changed, it updates the record, ignores otherwise. @@ -174,7 +174,7 @@ def write_dag( @classmethod @provide_session def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDAG]: - """Reads all DAGs in serialized_dag table. + """Read all DAGs in serialized_dag table. :param session: ORM Session :returns: a dict of DAGs read from database @@ -224,7 +224,7 @@ def dag(self) -> SerializedDAG: @provide_session def remove_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> None: """ - Deletes a DAG with given dag_id. + Delete a DAG with given dag_id. :param dag_id: dag_id to be deleted :param session: ORM Session. @@ -239,7 +239,7 @@ def remove_deleted_dags( processor_subdir: str | None = None, session: Session = NEW_SESSION, ) -> None: - """Deletes DAGs not included in alive_dag_filelocs. + """Delete DAGs not included in alive_dag_filelocs. :param alive_dag_filelocs: file paths of alive DAGs :param session: ORM Session @@ -266,7 +266,7 @@ def remove_deleted_dags( @classmethod @provide_session def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool: - """Checks a DAG exist in serialized_dag table. + """Check a DAG exist in serialized_dag table. :param dag_id: the DAG to check :param session: ORM Session @@ -310,7 +310,7 @@ def bulk_sync_to_db( session: Session = NEW_SESSION, ) -> None: """ - Saves DAGs as Serialized DAG objects in the database. + Save DAGs as Serialized DAG objects in the database. Each DAG is saved in a separate database query. diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py index 5ddc80308a38aa..c8feb58d91d45a 100644 --- a/airflow/models/skipmixin.py +++ b/airflow/models/skipmixin.py @@ -64,7 +64,7 @@ def _set_state_to_skipped( tasks: Sequence[str] | Sequence[tuple[str, int]], session: Session, ) -> None: - """Used internally to set state of task instances to skipped from the same dag run.""" + """Set state of task instances to skipped from the same dag run.""" if tasks: now = timezone.utcnow() TI = TaskInstance @@ -96,7 +96,7 @@ def skip( map_index: int = -1, ): """ - Sets tasks instances to skipped from the same dag run. + Set tasks instances to skipped from the same dag run. If this instance has a `task_id` attribute, store the list of skipped task IDs to XCom so that NotPreviouslySkippedDep knows these tasks should be skipped when they @@ -162,7 +162,7 @@ def skip_all_except( branch_task_ids: None | str | Iterable[str], ): """ - This method implements the logic for a branching operator. + Implement the logic for a branching operator. Given a single task ID or list of task IDs to follow, this skips all other tasks immediately downstream of this operator. diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 72105c6a9c5fb0..74cc5e45ffda64 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -169,7 +169,7 @@ class TaskReturnCode(Enum): @contextlib.contextmanager def set_current_context(context: Context) -> Generator[Context, None, None]: """ - Sets the current execution context to the provided context object. + Set the current execution context to the provided context object. This method should be called once per Task execution, before calling operator.execute. """ @@ -222,7 +222,7 @@ def clear_task_instances( dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED, ) -> None: """ - Clears a set of task instances, but makes sure the running ones get killed. + Clear a set of task instances, but make sure the running ones get killed. Also sets Dagrun's `state` to QUEUED and `start_date` to the time of execution. But only for finished DRs (SUCCESS and FAILED). @@ -362,7 +362,7 @@ def _is_mappable_value(value: Any) -> TypeGuard[Collection]: def _creator_note(val): - """Custom creator for the ``note`` association proxy.""" + """Creator the ``note`` association proxy.""" if isinstance(val, str): return TaskInstanceNote(content=val) elif isinstance(val, dict): @@ -653,7 +653,7 @@ def command_as_list( cfg_path=None, ) -> list[str]: """ - Returns a command that can be executed anywhere where airflow is installed. + Return a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator. """ @@ -719,7 +719,7 @@ def generate_command( map_index: int = -1, ) -> list[str]: """ - Generates the shell command required to execute this task instance. + Generate the shell command required to execute this task instance. :param dag_id: DAG ID :param task_id: Task ID @@ -822,7 +822,7 @@ def current_state(self, session: Session = NEW_SESSION) -> str: @provide_session def error(self, session: Session = NEW_SESSION) -> None: """ - Forces the task instance's state to FAILED in the database. + Force the task instance's state to FAILED in the database. :param session: SQLAlchemy ORM Session """ @@ -834,7 +834,7 @@ def error(self, session: Session = NEW_SESSION) -> None: @provide_session def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool = False) -> None: """ - Refreshes the task instance from the database based on the primary key. + Refresh the task instance from the database based on the primary key. :param session: SQLAlchemy ORM Session :param lock_for_update: if True, indicates that the database should @@ -971,7 +971,7 @@ def is_premature(self) -> bool: @provide_session def are_dependents_done(self, session: Session = NEW_SESSION) -> bool: """ - Checks whether the immediate dependents of this task instance have succeeded or have been skipped. + Check whether the immediate dependents of this task instance have succeeded or have been skipped. This is meant to be used by wait_for_downstream. @@ -1001,7 +1001,7 @@ def get_previous_dagrun( state: DagRunState | None = None, session: Session | None = None, ) -> DagRun | None: - """The DagRun that ran before this task instance's DagRun. + """Return the DagRun that ran before this task instance's DagRun. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session. @@ -1035,7 +1035,7 @@ def get_previous_ti( session: Session = NEW_SESSION, ) -> TaskInstance | None: """ - The task instance for the task that ran before this task instance. + Return the task instance for the task that ran before this task instance. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session @@ -1086,7 +1086,7 @@ def get_previous_execution_date( session: Session = NEW_SESSION, ) -> pendulum.DateTime | None: """ - The execution date from property previous_ti_success. + Return the execution date from property previous_ti_success. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session @@ -1100,7 +1100,7 @@ def get_previous_start_date( self, state: DagRunState | None = None, session: Session = NEW_SESSION ) -> pendulum.DateTime | None: """ - The start date from property previous_ti_success. + Return the start date from property previous_ti_success. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session @@ -1225,13 +1225,13 @@ def next_retry_datetime(self): return self.end_date + delay def ready_for_retry(self) -> bool: - """Checks on whether the task instance is in the right state and timeframe to be retried.""" + """Check on whether the task instance is in the right state and timeframe to be retried.""" return self.state == TaskInstanceState.UP_FOR_RETRY and self.next_retry_datetime() < timezone.utcnow() @provide_session def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun: """ - Returns the DagRun for this TaskInstance. + Return the DagRun for this TaskInstance. :param session: SQLAlchemy ORM Session :return: DagRun @@ -1269,7 +1269,7 @@ def check_and_change_state_before_execution( session: Session = NEW_SESSION, ) -> bool: """ - Checks dependencies and then sets state to RUNNING if they are met. + Check dependencies and then sets state to RUNNING if they are met. Returns True if and only if state is set to RUNNING, which implies that task should be executed, in preparation for _run_raw_task. @@ -1407,7 +1407,7 @@ def _log_state(self, lead_msg: str = "") -> None: def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: """ - Sends a time metric representing how much time a given state transition took. + Send a time metric representing how much time a given state transition took. The previous state and metric name is deduced from the state the task was put in. @@ -1705,7 +1705,7 @@ def _run_finished_callback( ) def _execute_task(self, context, task_orig): - """Executes Task (optionally with a Timeout) and pushes Xcom results.""" + """Execute Task (optionally with a Timeout) and push Xcom results.""" task_to_execute = self.task # If the task has been deferred and is being executed due to a trigger, # then we need to pick the right method to come back to, otherwise @@ -1760,7 +1760,7 @@ def _execute_task(self, context, task_orig): @provide_session def _defer_task(self, session: Session, defer: TaskDeferred) -> None: - """Marks the task as deferred and sets up the trigger that is needed to resume it.""" + """Mark the task as deferred and sets up the trigger that is needed to resume it.""" from airflow.models.trigger import Trigger # First, make the trigger entry @@ -1905,7 +1905,7 @@ def _handle_reschedule( @staticmethod def get_truncated_error_traceback(error: BaseException, truncate_to: Callable) -> TracebackType | None: """ - Truncates the traceback of an exception to the first frame called from within a given function. + Truncate the traceback of an exception to the first frame called from within a given function. :param error: exception to get traceback from :param truncate_to: Function to truncate TB to. Must have a ``__code__`` attribute @@ -2605,13 +2605,13 @@ def get_num_running_task_instances(self, session: Session, same_dagrun=False) -> return num_running_task_instances_query.scalar() def init_run_context(self, raw: bool = False) -> None: - """Sets the log context.""" + """Set the log context.""" self.raw = raw self._set_context(self) @staticmethod def filter_for_tis(tis: Iterable[TaskInstance | TaskInstanceKey]) -> BooleanClauseList | None: - """Returns SQLAlchemy filter to query selected task instances.""" + """Return SQLAlchemy filter to query selected task instances.""" # DictKeys type, (what we often pass here from the scheduler) is not directly indexable :( # Or it might be a generator, but we need to be able to iterate over it more than once tis = list(tis) @@ -2726,7 +2726,7 @@ def ti_selector_condition(cls, vals: Collection[str | tuple[str, int]]) -> Colum @provide_session def schedule_downstream_tasks(self, session: Session = NEW_SESSION, max_tis_per_query: int | None = None): """ - The mini-scheduler for scheduling downstream tasks of this task instance. + Schedule downstream tasks of this task instance. :meta: private """ diff --git a/airflow/models/taskinstancekey.py b/airflow/models/taskinstancekey.py index 47ed1d4d5422fc..50906e47b0a318 100644 --- a/airflow/models/taskinstancekey.py +++ b/airflow/models/taskinstancekey.py @@ -42,7 +42,7 @@ def reduced(self) -> TaskInstanceKey: ) def with_try_number(self, try_number: int) -> TaskInstanceKey: - """Returns TaskInstanceKey with provided ``try_number``.""" + """Return TaskInstanceKey with provided ``try_number``.""" return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, try_number, self.map_index) @property diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py index c5e7cea3b92eab..98c29cf3a37e58 100644 --- a/airflow/models/taskmixin.py +++ b/airflow/models/taskmixin.py @@ -94,22 +94,22 @@ def update_relative( """ def __lshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): - """Implements Task << Task.""" + """Implement Task << Task.""" self.set_upstream(other) return other def __rshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): - """Implements Task >> Task.""" + """Implement Task >> Task.""" self.set_downstream(other) return other def __rrshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): - """Called for Task >> [Task] because list don't have __rshift__ operators.""" + """Implement Task >> [Task] because list don't have __rshift__ operators.""" self.__lshift__(other) return self def __rlshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): - """Called for Task << [Task] because list don't have __lshift__ operators.""" + """Implement Task << [Task] because list don't have __lshift__ operators.""" self.__rshift__(other) return self @@ -201,7 +201,7 @@ def _set_relatives( upstream: bool = False, edge_modifier: EdgeModifier | None = None, ) -> None: - """Sets relatives for the task or task list.""" + """Set relatives for the task or task list.""" from airflow.models.baseoperator import BaseOperator from airflow.models.mappedoperator import MappedOperator @@ -297,5 +297,5 @@ def get_direct_relatives(self, upstream: bool = False) -> Iterable[DAGNode]: return self.downstream_list def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: - """This is used by TaskGroupSerialization to serialize a task group's content.""" + """Serialize a task group's content; used by TaskGroupSerialization.""" raise NotImplementedError() diff --git a/airflow/models/taskreschedule.py b/airflow/models/taskreschedule.py index 7b642f0a266bf4..f4ac7408f415ea 100644 --- a/airflow/models/taskreschedule.py +++ b/airflow/models/taskreschedule.py @@ -103,7 +103,7 @@ def query_for_task_instance( try_number: int | None = None, ) -> Query: """ - Returns query for task reschedules for a given the task instance. + Return query for task reschedules for a given the task instance. :param session: the database session object :param task_instance: the task instance to find task reschedules for @@ -135,7 +135,7 @@ def find_for_task_instance( try_number: int | None = None, ) -> list[TaskReschedule]: """ - Returns all task reschedules for the task instance and try number, in ascending order. + Return all task reschedules for the task instance and try number, in ascending order. :param session: the database session object :param task_instance: the task instance to find task reschedules for diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index 0161d5c9fe56e6..6ba1d45852b063 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -92,7 +92,7 @@ def from_object(cls, trigger: BaseTrigger) -> Trigger: @internal_api_call @provide_session def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[int, Trigger]: - """Fetches all the Triggers by ID and returns a dict mapping ID -> Trigger instance.""" + """Fetch all the Triggers by ID and return a dict mapping ID -> Trigger instance.""" query = session.scalars( select(cls) .where(cls.id.in_(ids)) @@ -108,7 +108,7 @@ def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[ @internal_api_call @provide_session def clean_unused(cls, session: Session = NEW_SESSION) -> None: - """Deletes all triggers that have no tasks dependent on them. + """Delete all triggers that have no tasks dependent on them. Triggers have a one-to-many relationship to task instances, so we need to clean those up first. Afterwards we can drop the triggers not @@ -141,7 +141,7 @@ def clean_unused(cls, session: Session = NEW_SESSION) -> None: @internal_api_call @provide_session def submit_event(cls, trigger_id, event, session: Session = NEW_SESSION) -> None: - """Takes an event from an instance of itself, and triggers all dependent tasks to resume.""" + """Take an event from an instance of itself, and trigger all dependent tasks to resume.""" for task_instance in session.scalars( select(TaskInstance).where( TaskInstance.trigger_id == trigger_id, TaskInstance.state == TaskInstanceState.DEFERRED @@ -193,7 +193,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) -> @internal_api_call @provide_session def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION) -> list[int]: - """Retrieves a list of triggerer_ids.""" + """Retrieve a list of triggerer_ids.""" return session.scalars(select(cls.id).where(cls.triggerer_id == triggerer_id)).all() @classmethod diff --git a/airflow/models/variable.py b/airflow/models/variable.py index a434a29e2b5732..5ca7774cbf064d 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -126,7 +126,7 @@ def get( default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False, ) -> Any: - """Gets a value for an Airflow Variable Key. + """Get a value for an Airflow Variable Key. :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist @@ -157,7 +157,7 @@ def set( serialize_json: bool = False, session: Session = None, ) -> None: - """Sets a value for an Airflow Variable with a given Key. + """Set a value for an Airflow Variable with a given Key. This operation overwrites an existing variable. @@ -190,7 +190,7 @@ def update( serialize_json: bool = False, session: Session = None, ) -> None: - """Updates a given Airflow Variable with the Provided value. + """Update a given Airflow Variable with the Provided value. :param key: Variable Key :param value: Value to set for the Variable @@ -227,7 +227,7 @@ def rotate_fernet_key(self): @staticmethod def check_for_write_conflict(key: str) -> None: - """Logs a warning if a variable exists outside of the metastore. + """Log a warning if a variable exists outside the metastore. If we try to write a variable to the metastore while the same key exists in an environment variable or custom secrets backend, then diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 50e62ea29ff679..637da3504df6d5 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -122,7 +122,7 @@ class BaseXCom(Base, LoggingMixin): @reconstructor def init_on_load(self): """ - Called by the ORM after the instance has been loaded from the DB or otherwise reconstituted. + Execute after the instance has been loaded from the DB or otherwise reconstituted; called by the ORM. i.e automatically deserialize Xcom value when loading from DB. """ @@ -838,7 +838,7 @@ def _shim(**kwargs): def _get_function_params(function) -> list[str]: """ - Returns the list of variables names of a function. + Return the list of variables names of a function. :param function: The function to inspect """ @@ -850,10 +850,10 @@ def _get_function_params(function) -> list[str]: def resolve_xcom_backend() -> type[BaseXCom]: - """Resolves custom XCom class. + """Resolve custom XCom class. - Confirms that custom XCom class extends the BaseXCom. - Compares the function signature of the custom XCom serialize_value to the base XCom serialize_value. + Confirm that custom XCom class extends the BaseXCom. + Compare the function signature of the custom XCom serialize_value to the base XCom serialize_value. """ clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") if not clazz: diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py index 0a54d18ff7b987..a19aa6703f3a42 100644 --- a/airflow/models/xcom_arg.py +++ b/airflow/models/xcom_arg.py @@ -85,11 +85,11 @@ class XComArg(ResolveMixin, DependencyMixin): @overload def __new__(cls: type[XComArg], operator: Operator, key: str = XCOM_RETURN_KEY) -> XComArg: - """Called when the user writes ``XComArg(...)`` directly.""" + """Execute when the user writes ``XComArg(...)`` directly.""" @overload def __new__(cls: type[XComArg]) -> XComArg: - """Called by Python internals from subclasses.""" + """Execute by Python internals from subclasses.""" def __new__(cls, *args, **kwargs) -> XComArg: if cls is XComArg: @@ -155,7 +155,8 @@ def set_downstream( operator.set_downstream(task_or_task_list, edge_modifier) def _serialize(self) -> dict[str, Any]: - """Called by DAG serialization. + """ + Serialize a DAG. The implementation should be the inverse function to ``deserialize``, returning a data dict converted from this XComArg derivative. DAG @@ -167,7 +168,8 @@ def _serialize(self) -> dict[str, Any]: @classmethod def _deserialize(cls, data: dict[str, Any], dag: DAG) -> XComArg: - """Called when deserializing a DAG. + """ + Deserialize a DAG. The implementation should be the inverse function to ``serialize``, implementing given a data dict converted from this XComArg derivative, @@ -246,7 +248,7 @@ def __eq__(self, other: Any) -> bool: return self.operator == other.operator and self.key == other.key def __getitem__(self, item: str) -> XComArg: - """Implements xcomresult['some_result_key'].""" + """Implement xcomresult['some_result_key'].""" if not isinstance(item, str): raise ValueError(f"XComArg only supports str lookup, received {type(item).__name__}") return PlainXComArg(operator=self.operator, key=item) diff --git a/airflow/notifications/basenotifier.py b/airflow/notifications/basenotifier.py index 7ef0603be14edc..81552404e15179 100644 --- a/airflow/notifications/basenotifier.py +++ b/airflow/notifications/basenotifier.py @@ -73,7 +73,7 @@ def render_template_fields( @abstractmethod def notify(self, context: Context) -> None: """ - Sends a notification. + Send a notification. :param context: The airflow context """ diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py index 6ec1b0e80d6fc0..9a188c2cc5656e 100644 --- a/airflow/operators/bash.py +++ b/airflow/operators/bash.py @@ -172,7 +172,7 @@ def subprocess_hook(self): return SubprocessHook() def get_env(self, context): - """Builds the set of environment variables to be exposed for the bash command.""" + """Build the set of environment variables to be exposed for the bash command.""" system_env = os.environ.copy() env = self.env if env is None: diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index 7c3c648130c158..e6f47912ad9af9 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -99,7 +99,7 @@ def target_times_as_dates( lower: datetime.datetime | datetime.time | None, upper: datetime.datetime | datetime.time | None, ): - """Ensures upper and lower time targets are datetimes by combining them with base_date.""" + """Ensure upper and lower time targets are datetimes by combining them with base_date.""" if isinstance(lower, datetime.datetime) and isinstance(upper, datetime.datetime): return lower, upper diff --git a/airflow/operators/python.py b/airflow/operators/python.py index b35e16f8b8d810..0593b8de78f711 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -57,7 +57,7 @@ def is_venv_installed() -> bool: """ - Checks if the virtualenv package is installed via checking if it is on the path or installed as package. + Check if the virtualenv package is installed via checking if it is on the path or installed as package. :return: True if it is. Whichever way of checking it works, is fine. """ @@ -67,7 +67,7 @@ def is_venv_installed() -> bool: def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): - """Deprecated. Use :func:`airflow.decorators.task` instead. + """Use :func:`airflow.decorators.task` instead, this is deprecated. Calls ``@task.python`` and allows users to turn a Python function into an Airflow task. @@ -202,7 +202,7 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: def execute_callable(self) -> Any: """ - Calls the python callable with the given arguments. + Call the python callable with the given arguments. :return: the return value of the call. """ @@ -586,7 +586,7 @@ def __init__( ) def _requirements_list(self) -> list[str]: - """Prepares a list of requirements that need to be installed for the venv.""" + """Prepare a list of requirements that need to be installed for the venv.""" requirements = [str(dependency) for dependency in self.requirements] if not self.system_site_packages and self.use_dill and "dill" not in requirements: requirements.append("dill") @@ -594,7 +594,7 @@ def _requirements_list(self) -> list[str]: return requirements def _prepare_venv(self, venv_path: Path) -> None: - """Prepares the requirements and installs the venv.""" + """Prepare the requirements and installs the venv.""" requirements_file = venv_path / "requirements.txt" requirements_file.write_text("\n".join(self._requirements_list())) prepare_virtualenv( diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 0594dd21e83bd4..2a70c2746a490a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -266a8533dca6d9c7e984341bfe29e99f29217e44529455a49e7c4a5faccdcdf3 \ No newline at end of file +52cd8c81c9f586992515a903d882e72e8df0a55da5021d2614ba1807b8b63f7e \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 8bf922245e2593..ba548c60fdddf1 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1228,28 +1228,28 @@ task_instance--xcom -1 +0..N 1 task_instance--xcom -0..N +1 1 task_instance--xcom -1 +0..N 1 task_instance--xcom -0..N +1 1