From 82584cbaf6576a221adb6f15126373ece7819cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Sun, 20 Oct 2019 07:28:29 +0200 Subject: [PATCH] [AIRFLOW-5702] Fix common docstring issues (#6372) --- .../api/common/experimental/get_dag_runs.py | 3 +- airflow/api/common/experimental/mark_tasks.py | 13 +++-- airflow/bin/cli.py | 2 + airflow/configuration.py | 1 + .../contrib/executors/kubernetes_executor.py | 5 +- airflow/contrib/hooks/aws_athena_hook.py | 1 + airflow/contrib/hooks/aws_firehose_hook.py | 1 + airflow/contrib/hooks/grpc_hook.py | 25 +++++----- airflow/contrib/hooks/qubole_hook.py | 4 ++ airflow/contrib/hooks/sftp_hook.py | 7 +++ airflow/contrib/hooks/slack_webhook_hook.py | 1 + airflow/contrib/hooks/spark_submit_hook.py | 1 + airflow/contrib/kubernetes/secret.py | 4 +- airflow/executors/local_executor.py | 1 + airflow/jobs/scheduler_job.py | 4 +- airflow/lineage/backend/__init__.py | 1 + ...hange_datetime_to_datetime2_6_on_mssql_.py | 5 +- airflow/models/baseoperator.py | 5 +- airflow/models/dagrun.py | 1 + airflow/task/task_runner/base_task_runner.py | 9 ++-- airflow/ti_deps/deps/dagrun_id_dep.py | 1 + .../ti_deps/deps/pool_slots_available_dep.py | 1 + airflow/utils/cli.py | 3 +- airflow/utils/cli_action_loggers.py | 5 ++ airflow/utils/dag_processing.py | 49 +++++++++---------- airflow/utils/log/es_task_handler.py | 10 ++-- airflow/utils/log/file_task_handler.py | 4 ++ airflow/utils/log/gcs_task_handler.py | 3 ++ airflow/utils/log/s3_task_handler.py | 4 ++ airflow/utils/log/wasb_task_handler.py | 4 ++ airflow/utils/timezone.py | 5 +- airflow/www/api/experimental/endpoints.py | 3 +- airflow/www/utils.py | 19 +++---- airflow/www_rbac/static_config.py | 1 + .../utils/base_gcp_system_test_case.py | 1 + tests/test_core.py | 1 + tests/test_utils/get_all_tests.py | 2 + 37 files changed, 135 insertions(+), 75 deletions(-) diff --git a/airflow/api/common/experimental/get_dag_runs.py b/airflow/api/common/experimental/get_dag_runs.py index 635286aedea3e..c58e650bac8f9 100644 --- a/airflow/api/common/experimental/get_dag_runs.py +++ b/airflow/api/common/experimental/get_dag_runs.py @@ -29,10 +29,11 @@ def get_dag_runs(dag_id, state=None, run_url_route='Airflow.graph'): # type: (str, Optional[str], str) -> List[Dict[str, Any]] """ Returns a list of Dag Runs for a specific DAG ID. + :param dag_id: String identifier of a DAG :param state: queued|running|success... :return: List of DAG runs of a DAG with requested state, - or all runs if the state is not specified + or all runs if the state is not specified """ check_and_get_dag(dag_id=dag_id) diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index e512fcb364c01..9cc48521a2fcf 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -34,6 +34,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template): """ Infers from the dates which dag runs need to be created and does so. + :param dag: the dag to create dag runs for :param execution_dates: list of execution dates to evaluate :param state: the state to set the dag run to @@ -75,6 +76,7 @@ def set_state( for past tasks. Will verify integrity of past dag runs in order to create tasks that did not exist. It will not create dag runs that are missing on the schedule (but it will as for subdag dag runs if needed). + :param tasks: the iterable of tasks from which to work. task.task.dag needs to be set :param execution_date: the execution date from which to start looking :param upstream: Mark all parents (upstream tasks) @@ -211,9 +213,10 @@ def verify_dagruns(dag_runs, commit, state, session, current_task): def verify_dag_run_integrity(dag, dates): - """Verify the integrity of the dag runs in case a task was added or removed - set the confirmed execution dates as they might be different - from what was provided + """ + Verify the integrity of the dag runs in case a task was added or removed + set the confirmed execution dates as they might be different + from what was provided """ confirmed_dates = [] dag_runs = DagRun.find(dag_id=dag.dag_id, execution_date=dates) @@ -266,6 +269,7 @@ def get_execution_dates(dag, execution_date, future, past): def _set_dag_run_state(dag_id, execution_date, state, session=None): """ Helper method that set dag run state in the DB. + :param dag_id: dag_id of target dag run :param execution_date: the execution date from which to start looking :param state: target state @@ -289,6 +293,7 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None """ Set the dag run for a specific execution date and its task instances to success. + :param dag: the DAG of which to alter state :param execution_date: the execution date from which to start looking :param commit: commit DAG and tasks to be altered to the database @@ -316,6 +321,7 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None) """ Set the dag run for a specific execution date and its running task instances to failed. + :param dag: the DAG of which to alter state :param execution_date: the execution date from which to start looking :param commit: commit DAG and tasks to be altered to the database @@ -354,6 +360,7 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None) def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None): """ Set the dag run for a specific execution date to running. + :param dag: the DAG of which to alter state :param execution_date: the execution date from which to start looking :param commit: commit DAG and tasks to be altered to the database diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e956dd8408e54..e12bb076dee38 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -226,6 +226,7 @@ def backfill(args, dag=None): def trigger_dag(args): """ Creates a dag run for the specified dag + :param args: :return: """ @@ -243,6 +244,7 @@ def trigger_dag(args): def delete_dag(args): """ Deletes all DB records related to the specified dag + :param args: :return: """ diff --git a/airflow/configuration.py b/airflow/configuration.py index 68149eb6a4416..89ecf9229643e 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -549,6 +549,7 @@ def parameterized_config(template): """ Generates a configuration from the provided template + variables defined in current scope + :param template: a config content templated with {{variables}} """ all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()} diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 02711bd62a14e..0fe2c3f1337e4 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -529,11 +529,11 @@ def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid): """ Kubernetes pod names must be <= 253 chars and must pass the following regex for validation - "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" + ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` :param safe_dag_id: a dag_id with only alphanumeric characters :param safe_task_id: a task_id with only alphanumeric characters - :param random_uuid: a uuid + :param safe_uuid: a uuid :return: ``str`` valid Pod name of appropriate length """ safe_key = safe_dag_id + safe_task_id @@ -590,6 +590,7 @@ def _datetime_to_label_safe_datestring(datetime_obj): Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" + :param datetime_obj: datetime.datetime object :return: ISO-like string representing the datetime """ diff --git a/airflow/contrib/hooks/aws_athena_hook.py b/airflow/contrib/hooks/aws_athena_hook.py index b54cbe64cb77f..6dc2a023aa945 100644 --- a/airflow/contrib/hooks/aws_athena_hook.py +++ b/airflow/contrib/hooks/aws_athena_hook.py @@ -96,6 +96,7 @@ def check_query_status(self, query_execution_id): def get_state_change_reason(self, query_execution_id): """ Fetch the reason for a state change (e.g. error message). Returns None or reason string. + :param query_execution_id: Id of submitted athena query :type query_execution_id: str :return: str diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py index 2885d775e176e..3f91c27011e91 100644 --- a/airflow/contrib/hooks/aws_firehose_hook.py +++ b/airflow/contrib/hooks/aws_firehose_hook.py @@ -25,6 +25,7 @@ class AwsFirehoseHook(AwsHook): """ Interact with AWS Kinesis Firehose. + :param delivery_stream: Name of the delivery stream :type delivery_stream: str :param region_name: AWS region name (example: us-east-1) diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py index 7ad4e596b7d44..3a57afc1b1c15 100644 --- a/airflow/contrib/hooks/grpc_hook.py +++ b/airflow/contrib/hooks/grpc_hook.py @@ -32,22 +32,21 @@ class GrpcHook(BaseHook): """ General interaction with gRPC servers. + + :param grpc_conn_id: The connection ID to use when fetching connection info. + :type grpc_conn_id: str + :param interceptors: a list of gRPC interceptor objects which would be applied + to the connected gRPC channel. None by default. + :type interceptors: a list of gRPC interceptors based on or extends the four + official gRPC interceptors, eg, UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, + StreamStreamClientInterceptor. + :param custom_connection_func: The customized connection function to return gRPC channel. + :type custom_connection_func: python callable objects that accept the connection as + its only arg. Could be partial or lambda. """ def __init__(self, grpc_conn_id, interceptors=None, custom_connection_func=None): - """ - :param grpc_conn_id: The connection ID to use when fetching connection info. - :type grpc_conn_id: str - :param interceptors: a list of gRPC interceptor objects which would be applied - to the connected gRPC channel. None by default. - :type interceptors: a list of gRPC interceptors based on or extends the four - official gRPC interceptors, eg, UnaryUnaryClientInterceptor, - UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, - StreamStreamClientInterceptor. - ::param custom_connection_func: The customized connection function to return gRPC channel. - :type custom_connection_func: python callable objects that accept the connection as - its only arg. Could be partial or lambda. - """ self.grpc_conn_id = grpc_conn_id self.conn = self.get_connection(self.grpc_conn_id) self.extras = self.conn.extra_dejson diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index 1cc579e4ecf87..34353c21b2611 100644 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -154,6 +154,7 @@ def execute(self, context): def kill(self, ti): """ Kill (cancel) a Qubole command + :param ti: Task Instance of the dag, used to determine the Quboles command id :return: response from Qubole """ @@ -171,6 +172,7 @@ def kill(self, ti): def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): """ Get results (or just s3 locations) of a command from Qubole and save into a file + :param ti: Task Instance of the dag, used to determine the Quboles command id :param fp: Optional file pointer, will create one and return if None passed :param inline: True to download actual results, False to get s3 locations only @@ -199,6 +201,7 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): def get_log(self, ti): """ Get Logs of a command from Qubole + :param ti: Task Instance of the dag, used to determine the Quboles command id :return: command log as text """ @@ -209,6 +212,7 @@ def get_log(self, ti): def get_jobs_id(self, ti): """ Get jobs associated with a Qubole commands + :param ti: Task Instance of the dag, used to determine the Quboles command id :return: Job information associated with command """ diff --git a/airflow/contrib/hooks/sftp_hook.py b/airflow/contrib/hooks/sftp_hook.py index 6fc01babe5ab2..2b2338774429e 100644 --- a/airflow/contrib/hooks/sftp_hook.py +++ b/airflow/contrib/hooks/sftp_hook.py @@ -127,6 +127,7 @@ def describe_directory(self, path): """ Returns a dictionary of {filename: {attributes}} for all files on the remote system (where the MLSD command is supported). + :param path: full path to the remote directory :type path: str """ @@ -145,6 +146,7 @@ def describe_directory(self, path): def list_directory(self, path): """ Returns a list of files on the remote system. + :param path: full path to the remote directory to list :type path: str """ @@ -155,6 +157,7 @@ def list_directory(self, path): def create_directory(self, path, mode=777): """ Creates a directory on the remote system. + :param path: full path to the remote directory to create :type path: str :param mode: int representation of octal mode for directory @@ -165,6 +168,7 @@ def create_directory(self, path, mode=777): def delete_directory(self, path): """ Deletes a directory on the remote system. + :param path: full path to the remote directory to delete :type path: str """ @@ -176,6 +180,7 @@ def retrieve_file(self, remote_full_path, local_full_path): Transfers the remote file to a local location. If local_full_path is a string path, the file will be put at that location + :param remote_full_path: full path to the remote file :type remote_full_path: str :param local_full_path: full path to the local file @@ -191,6 +196,7 @@ def store_file(self, remote_full_path, local_full_path): Transfers a local file to the remote location. If local_full_path_or_buffer is a string path, the file will be read from that location + :param remote_full_path: full path to the remote file :type remote_full_path: str :param local_full_path: full path to the local file @@ -202,6 +208,7 @@ def store_file(self, remote_full_path, local_full_path): def delete_file(self, path): """ Removes a file on the FTP Server + :param path: full path to the remote file :type path: str """ diff --git a/airflow/contrib/hooks/slack_webhook_hook.py b/airflow/contrib/hooks/slack_webhook_hook.py index 1b3aaa46358c4..13d8a4e81847e 100644 --- a/airflow/contrib/hooks/slack_webhook_hook.py +++ b/airflow/contrib/hooks/slack_webhook_hook.py @@ -84,6 +84,7 @@ def __init__(self, def _get_token(self, token, http_conn_id): """ Given either a manually set token or a conn_id, return the webhook_token to use + :param token: The manually provided token :type token: str :param http_conn_id: The conn_id provided diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 3cb993d65ca69..61714ec3a1b75 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -220,6 +220,7 @@ def _get_spark_binary_path(self): def _build_spark_submit_command(self, application): """ Construct the spark-submit command to execute. + :param application: command to append to the spark-submit command :type application: str :return: full command to be executed diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index 73c51e900acf9..fde1ded38d275 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -21,8 +21,10 @@ class Secret(object): """Defines Kubernetes Secret Volume""" def __init__(self, deploy_type, deploy_target, secret, key=None): - """Initialize a Kubernetes Secret Object. Used to track requested secrets from + """ + Initialize a Kubernetes Secret Object. Used to track requested secrets from the user. + :param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume` :type deploy_type: str diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 99558da8182f9..2c3ba4046c9c0 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -74,6 +74,7 @@ def __init__(self, result_queue): def execute_work(self, key, command): """ Executes command received and stores result state in queue. + :param key: the key to identify the TI :type key: tuple(dag_id, task_id, execution_date) :param command: the command to execute diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 9afebd95cc6f1..1012a31323df9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -121,10 +121,10 @@ def _run_file_processor(result_channel, :type dag_id_white_list: list[unicode] :param thread_name: the name to use for the process that is launched :type thread_name: unicode - :return: the process that was launched - :rtype: multiprocessing.Process :param zombies: zombie task instances to kill :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] + :return: the process that was launched + :rtype: multiprocessing.Process """ # This helper runs in the newly created process log = logging.getLogger("airflow.processor") diff --git a/airflow/lineage/backend/__init__.py b/airflow/lineage/backend/__init__.py index d0c2ec5601880..4b9cb9cf1445c 100644 --- a/airflow/lineage/backend/__init__.py +++ b/airflow/lineage/backend/__init__.py @@ -25,6 +25,7 @@ def send_lineage(self, operator=None, inlets=None, outlets=None, context=None): """ Sends lineage metadata to a backend + :param operator: the operator executing a transformation on the inlets and outlets :param inlets: the inlets to this operator :param outlets: the outlets from this operator diff --git a/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py b/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py index 30a4365e32262..a9ef785131d91 100644 --- a/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py +++ b/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py @@ -243,7 +243,8 @@ def get_table_constraints(conn, table_name): def reorder_columns(columns): """ Reorder the columns for creating constraint, preserve primary key ordering - ['task_id', 'dag_id', 'execution_date'] + ``['task_id', 'dag_id', 'execution_date']`` + :param columns: columns retrieved from DB related to constraint :return: ordered column """ @@ -262,6 +263,7 @@ def reorder_columns(columns): def drop_constraint(operator, constraint_dict): """ Drop a primary key or unique constraint + :param operator: batch_alter_table for the table :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table """ @@ -282,6 +284,7 @@ def drop_constraint(operator, constraint_dict): def create_constraint(operator, constraint_dict): """ Create a primary key or unique constraint + :param operator: batch_alter_table for the table :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table """ diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 2ef905b99e864..7da8c01daedda 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1066,11 +1066,12 @@ def get_extra_links(self, dttm, link_name): """ For an operator, gets the URL that the external links specified in `extra_links` should point to. + :raise ValueError: The error message of a ValueError will be passed on through to - the fronted to show up as a tooltip on the disabled link + the fronted to show up as a tooltip on the disabled link :param dttm: The datetime parsed execution date for the URL being searched for :param link_name: The name of the link we're looking for the URL for. Should be - one of the options specified in `extra_links` + one of the options specified in `extra_links` :return: A URL """ if link_name in self.operator_extra_link_dict: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 46e8113fba934..a4991d7814167 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -96,6 +96,7 @@ def id_for_date(cls, date, prefix=ID_FORMAT_PREFIX): def refresh_from_db(self, session=None): """ Reloads the current dagrun from the database + :param session: database session """ DR = DagRun diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 97da5c8ab6b0b..d2b5d8bd4c73b 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -36,14 +36,13 @@ class BaseTaskRunner(LoggingMixin): """ Runs Airflow task instances by invoking the `airflow run` command with raw mode enabled in a subprocess. + + :param local_task_job: The local task job associated with running the + associated task instance. + :type local_task_job: airflow.jobs.LocalTaskJob """ def __init__(self, local_task_job): - """ - :param local_task_job: The local task job associated with running the - associated task instance. - :type local_task_job: airflow.jobs.LocalTaskJob - """ # Pass task instance context into log handlers to setup the logger. super(BaseTaskRunner, self).__init__(local_task_job.task_instance) self._task_instance = local_task_job.task_instance diff --git a/airflow/ti_deps/deps/dagrun_id_dep.py b/airflow/ti_deps/deps/dagrun_id_dep.py index e2888e8e7826f..641fe84131858 100644 --- a/airflow/ti_deps/deps/dagrun_id_dep.py +++ b/airflow/ti_deps/deps/dagrun_id_dep.py @@ -36,6 +36,7 @@ class DagrunIdDep(BaseTIDep): def _get_dep_statuses(self, ti, session, dep_context=None): """ Determines if the DagRun ID is valid for scheduling from scheduler. + :param ti: the task instance to get the dependency status for :type ti: airflow.models.TaskInstance :param session: database session diff --git a/airflow/ti_deps/deps/pool_slots_available_dep.py b/airflow/ti_deps/deps/pool_slots_available_dep.py index 62638c3a096cb..9a83efb801f01 100644 --- a/airflow/ti_deps/deps/pool_slots_available_dep.py +++ b/airflow/ti_deps/deps/pool_slots_available_dep.py @@ -37,6 +37,7 @@ class PoolSlotsAvailableDep(BaseTIDep): def _get_dep_statuses(self, ti, session, dep_context=None): """ Determines if the pool task instance is in has available slots + :param ti: the task instance to get the dependency status for :type ti: airflow.models.TaskInstance :param session: database session diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index 063d67c03f480..297da404d19b2 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -60,8 +60,9 @@ def wrapper(*args, **kwargs): """ An wrapper for cli functions. It assumes to have Namespace instance at 1st positional argument + :param args: Positional argument. It assumes to have Namespace instance - at 1st positional argument + at 1st positional argument :param kwargs: A passthrough keyword argument """ assert args diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py index 1c96e5f55c89e..3a711a002013b 100644 --- a/airflow/utils/cli_action_loggers.py +++ b/airflow/utils/cli_action_loggers.py @@ -35,6 +35,7 @@ def register_pre_exec_callback(action_logger): This function callback is expected to be called with keyword args. For more about the arguments that is being passed to the callback, refer to airflow.utils.cli.action_logging() + :param action_logger: An action logger function :return: None """ @@ -48,6 +49,7 @@ def register_post_exec_callback(action_logger): This function callback is expected to be called with keyword args. For more about the arguments that is being passed to the callback, refer to airflow.utils.cli.action_logging() + :param action_logger: An action logger function :return: None """ @@ -59,6 +61,7 @@ def on_pre_execution(**kwargs): """ Calls callbacks before execution. Note that any exception from callback will be logged but won't be propagated. + :param kwargs: :return: None """ @@ -76,6 +79,7 @@ def on_post_execution(**kwargs): As it's being called after execution, it can capture status of execution, duration, etc. Note that any exception from callback will be logged but won't be propagated. + :param kwargs: :return: None """ @@ -91,6 +95,7 @@ def default_action_log(log, **_): """ A default action logger callback that behave same as www.utils.action_logging which uses global session and pushes log ORM object. + :param log: An log ORM instance :param **_: other keyword arguments that is not being used by this function :return: None diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 135146189e0c0..03463da9f5a32 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -65,15 +65,14 @@ class SimpleDag(BaseDag): """ A simplified representation of a DAG that contains all attributes required for instantiating and scheduling its associated tasks. + + :param dag: the DAG + :type dag: airflow.models.DAG + :param pickle_id: ID associated with the pickled version of this DAG. + :type pickle_id: unicode """ def __init__(self, dag, pickle_id=None): - """ - :param dag: the DAG - :type dag: airflow.models.DAG - :param pickle_id: ID associated with the pickled version of this DAG. - :type pickle_id: unicode - """ self._dag_id = dag.dag_id self._task_ids = [task.task_id for task in dag.tasks] self._full_filepath = dag.full_filepath @@ -717,8 +716,23 @@ class DagFileProcessorManager(LoggingMixin): processors finish, more are launched. The files are processed over and over again, but no more often than the specified interval. - :type _file_path_queue: list[unicode] - :type _processors: dict[unicode, AbstractDagFileProcessor] + :param dag_directory: Directory where DAG definitions are kept. All + files in file_paths should be under this directory + :type dag_directory: unicode + :param file_paths: list of file paths that contain DAG definitions + :type file_paths: list[unicode] + :param max_runs: The number of times to parse and schedule each file. -1 + for unlimited. + :type max_runs: int + :param processor_factory: function that creates processors for DAG + definition files. Arguments are (dag_definition_path) + :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor) + :param processor_timeout: How long to wait before timing out a DAG file processor + :type processor_timeout: timedelta + :param signal_conn: connection to communicate signal with processor agent. + :type signal_conn: airflow.models.connection.Connection + :param async_mode: whether to start the manager in async mode + :type async_mode: bool """ def __init__(self, @@ -729,25 +743,6 @@ def __init__(self, processor_timeout, signal_conn, async_mode=True): - """ - :param dag_directory: Directory where DAG definitions are kept. All - files in file_paths should be under this directory - :type dag_directory: unicode - :param file_paths: list of file paths that contain DAG definitions - :type file_paths: list[unicode] - :param max_runs: The number of times to parse and schedule each file. -1 - for unlimited. - :type max_runs: int - :param processor_factory: function that creates processors for DAG - definition files. Arguments are (dag_definition_path) - :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor) - :param processor_timeout: How long to wait before timing out a DAG file processor - :type processor_timeout: timedelta - :param signal_conn: connection to communicate signal with processor agent. - :type signal_conn: airflow.models.connection.Connection - :param async_mode: whether to start the manager in async mode - :type async_mode: bool - """ self._file_paths = file_paths self._file_path_queue = [] self._dag_directory = dag_directory diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 9982a5a63c49f..b4798138c0c4d 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -33,9 +33,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): - PAGE = 0 - MAX_LINE_PER_PAGE = 1000 - """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note logs are not directly @@ -52,6 +49,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): might have the same timestamp. """ + PAGE = 0 + MAX_LINE_PER_PAGE = 1000 + def __init__(self, base_log_folder, filename_template, log_id_template, end_of_log_mark, write_stdout, json_format, json_fields, @@ -99,6 +99,7 @@ def _clean_execution_date(execution_date): Clean up an execution date so that it is safe to query in elasticsearch by removing reserved characters. # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters + :param execution_date: execution date of the dag run. """ return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f") @@ -106,6 +107,7 @@ def _clean_execution_date(execution_date): def _read(self, ti, try_number, metadata=None): """ Endpoint for streaming log. + :param ti: task instance object :param try_number: try_number of the task instance :param metadata: log metadata, @@ -158,6 +160,7 @@ def es_read(self, log_id, offset, metadata): """ Returns the logs matching log_id in Elasticsearch and next offset. Returns '' if no log is found or there was an error. + :param log_id: the log_id of the log to read. :type log_id: str :param offset: the offset start to read log from. @@ -193,6 +196,7 @@ def es_read(self, log_id, offset, metadata): def set_context(self, ti): """ Provide task_instance context to airflow task handler. + :param ti: task instance object """ super(ElasticsearchTaskHandler, self).set_context(ti) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 7862ab6868408..4496a5bfdd0fe 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -35,6 +35,7 @@ class FileTaskHandler(logging.Handler): task instance logs. It creates and delegates log handling to `logging.FileHandler` after receiving task instance context. It reads logs from task instance's host machine. + :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string """ @@ -84,6 +85,7 @@ def _read(self, ti, try_number, metadata=None): """ Template method that contains custom logic of reading logs given the try_number. + :param ti: task instance record :param try_number: current try_number to read log from :param metadata: log metadata, @@ -137,6 +139,7 @@ def _read(self, ti, try_number, metadata=None): def read(self, task_instance, try_number=None, metadata=None): """ Read logs of given task instance from local machine. + :param task_instance: task instance object :param try_number: task instance try_number to read logs from. If None it returns all logs separated by try_number @@ -172,6 +175,7 @@ def read(self, task_instance, try_number=None, metadata=None): def _init_file(self, ti): """ Create log directory and give it correct permissions. + :param ti: task instance object :return: relative log path of the given task instance """ diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 0eb621d33fe48..4d0bccaeec01c 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -95,6 +95,7 @@ def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from GCS. If failed, read the log from task instance host machine. + :param ti: task instance object :param try_number: task instance try_number to read logs from :param metadata: log metadata, @@ -122,6 +123,7 @@ def _read(self, ti, try_number, metadata=None): def gcs_read(self, remote_log_location): """ Returns the log found at the remote_log_location. + :param remote_log_location: the log's location in remote storage :type remote_log_location: str (path) """ @@ -132,6 +134,7 @@ def gcs_write(self, log, remote_log_location, append=True): """ Writes the log to the remote_log_location. Fails silently if no hook was created. + :param log: the log to write to the remote_log_location :type log: str :param remote_log_location: the log's location in remote storage diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 0f5bf7c90facf..609a9da7d26a7 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -90,6 +90,7 @@ def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from S3 remote storage. If failed, read the log from task instance host machine. + :param ti: task instance object :param try_number: task instance try_number to read logs from :param metadata: log metadata, @@ -115,6 +116,7 @@ def _read(self, ti, try_number, metadata=None): def s3_log_exists(self, remote_log_location): """ Check if remote_log_location exists in remote storage + :param remote_log_location: log's location in remote storage :return: True if location exists else False """ @@ -128,6 +130,7 @@ def s3_read(self, remote_log_location, return_error=False): """ Returns the log found at the remote_log_location. Returns '' if no logs are found or there is an error. + :param remote_log_location: the log's location in remote storage :type remote_log_location: str (path) :param return_error: if True, returns a string error message if an @@ -147,6 +150,7 @@ def s3_write(self, log, remote_log_location, append=True): """ Writes the log to the remote_log_location. Fails silently if no hook was created. + :param log: the log to write to the remote_log_location :type log: str :param remote_log_location: the log's location in remote storage diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py index c6a22099059f2..d8b470599dae8 100644 --- a/airflow/utils/log/wasb_task_handler.py +++ b/airflow/utils/log/wasb_task_handler.py @@ -98,6 +98,7 @@ def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from Wasb remote storage. If failed, read the log from task instance host machine. + :param ti: task instance object :param try_number: task instance try_number to read logs from :param metadata: log metadata, @@ -123,6 +124,7 @@ def _read(self, ti, try_number, metadata=None): def wasb_log_exists(self, remote_log_location): """ Check if remote_log_location exists in remote storage + :param remote_log_location: log's location in remote storage :return: True if location exists else False """ @@ -136,6 +138,7 @@ def wasb_read(self, remote_log_location, return_error=False): """ Returns the log found at the remote_log_location. Returns '' if no logs are found or there is an error. + :param remote_log_location: the log's location in remote storage :type remote_log_location: str (path) :param return_error: if True, returns a string error message if an @@ -155,6 +158,7 @@ def wasb_write(self, log, remote_log_location, append=True): """ Writes the log to the remote_log_location. Fails silently if no hook was created. + :param log: the log to write to the remote_log_location :type log: str :param remote_log_location: the log's location in remote storage diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index fd3dffa908b4d..6fe753efe2bce 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -52,6 +52,7 @@ def is_naive(value): def utcnow(): """ Get the current date and time in UTC + :return: """ @@ -67,6 +68,7 @@ def utcnow(): def utc_epoch(): """ Gets the epoch in the users timezone + :return: """ @@ -83,6 +85,7 @@ def convert_to_utc(value): """ Returns the datetime with the default timezone added if timezone information was not associated + :param value: datetime :return: datetime with tzinfo """ @@ -102,7 +105,6 @@ def make_aware(value, timezone=None): :param value: datetime :param timezone: timezone :return: localized datetime in settings.TIMEZONE or timezone - """ if timezone is None: timezone = TIMEZONE @@ -172,6 +174,7 @@ def datetime(*args, **kwargs): def parse(string, timezone=None): """ Parse a time string and return an aware datetime + :param string: time string """ return pendulum.parse(string, tz=timezone or TIMEZONE) diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index f88dd47a0e972..0084eede0b95e 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -116,9 +116,10 @@ def dag_runs(dag_id): """ Returns a list of Dag Runs for a specific DAG ID. :query param state: a query string parameter '?state=queued|running|success...' + :param dag_id: String identifier of a DAG :return: List of DAG runs of a DAG with requested state, - or all runs if the state is not specified + or all runs if the state is not specified """ try: state = request.args.get('state') diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 3aac6b9f8e726..4c324d7bfcc84 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -137,18 +137,13 @@ def generate_pages(current_page, num_of_pages, This component takes into account custom parameters such as search and showPaused, which could be added to the pages link in order to maintain the state between client and server. It also allows to make a bookmark on a specific paging state. - :param current_page: - the current page number, 0-indexed - :param num_of_pages: - the total number of pages - :param search: - the search query string, if any - :param showPaused: - false if paused dags will be hidden, otherwise true to show them - :param window: - the number of pages to be shown in the paging component (7 default) - :return: - the HTML string of the paging component + + :param current_page: the current page number, 0-indexed + :param num_of_pages: the total number of pages + :param search: the search query string, if any + :param showPaused: false if paused dags will be hidden, otherwise true to show them + :param window: the number of pages to be shown in the paging component (7 default) + :return: the HTML string of the paging component """ void_link = 'javascript:void(0)' diff --git a/airflow/www_rbac/static_config.py b/airflow/www_rbac/static_config.py index 7b5560c193d57..aaa075f6ce77f 100644 --- a/airflow/www_rbac/static_config.py +++ b/airflow/www_rbac/static_config.py @@ -30,6 +30,7 @@ def configure_manifest_files(app): """ Loads the manifest file and register the `url_for_asset_` template tag. + :param app: :return: """ diff --git a/tests/contrib/utils/base_gcp_system_test_case.py b/tests/contrib/utils/base_gcp_system_test_case.py index 62d99e4ba373a..6d3a296fa66de 100644 --- a/tests/contrib/utils/base_gcp_system_test_case.py +++ b/tests/contrib/utils/base_gcp_system_test_case.py @@ -143,6 +143,7 @@ def _get_files_to_link(path): """ Returns all file names (note - file names not paths) that have the same base name as the .py dag file (for example dag_name.sql etc.) + :param path: path to the dag file. :return: list of files matching the base name """ diff --git a/tests/test_core.py b/tests/test_core.py index 558e5eee4f1be..d7be582a3887e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2446,6 +2446,7 @@ def __init__(self): def ls(self, path, include_toplevel=False): """ the fake snakebite client + :param path: the array of path to test :param include_toplevel: to return the toplevel directory info :return: a list for path for the matching queries diff --git a/tests/test_utils/get_all_tests.py b/tests/test_utils/get_all_tests.py index 513314a06baa6..989210a7c27d1 100644 --- a/tests/test_utils/get_all_tests.py +++ b/tests/test_utils/get_all_tests.py @@ -26,6 +26,7 @@ def last_replace(s, old, new, number_of_occurrences): """ Replaces last n occurrences of the old string with the new one within the string provided + :param s: string to replace occurrences with :param old: old string :param new: new string @@ -39,6 +40,7 @@ def last_replace(s, old, new, number_of_occurrences): def print_all_cases(xunit_test_file_path): """ Prints all test cases read from the xunit test file + :param xunit_test_file_path: path of the xunit file :return: None """