Skip to content

Commit

Permalink
[AIRFLOW-5702] Fix common docstring issues (#6372)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored and kaxil committed Dec 14, 2019
1 parent 3137dda commit 82584cb
Show file tree
Hide file tree
Showing 37 changed files with 135 additions and 75 deletions.
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/get_dag_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ def get_dag_runs(dag_id, state=None, run_url_route='Airflow.graph'):
# type: (str, Optional[str], str) -> List[Dict[str, Any]]
"""
Returns a list of Dag Runs for a specific DAG ID.
:param dag_id: String identifier of a DAG
:param state: queued|running|success...
:return: List of DAG runs of a DAG with requested state,
or all runs if the state is not specified
or all runs if the state is not specified
"""
check_and_get_dag(dag_id=dag_id)

Expand Down
13 changes: 10 additions & 3 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
def _create_dagruns(dag, execution_dates, state, run_id_template):
"""
Infers from the dates which dag runs need to be created and does so.
:param dag: the dag to create dag runs for
:param execution_dates: list of execution dates to evaluate
:param state: the state to set the dag run to
Expand Down Expand Up @@ -75,6 +76,7 @@ def set_state(
for past tasks. Will verify integrity of past dag runs in order to create
tasks that did not exist. It will not create dag runs that are missing
on the schedule (but it will as for subdag dag runs if needed).
:param tasks: the iterable of tasks from which to work. task.task.dag needs to be set
:param execution_date: the execution date from which to start looking
:param upstream: Mark all parents (upstream tasks)
Expand Down Expand Up @@ -211,9 +213,10 @@ def verify_dagruns(dag_runs, commit, state, session, current_task):


def verify_dag_run_integrity(dag, dates):
"""Verify the integrity of the dag runs in case a task was added or removed
set the confirmed execution dates as they might be different
from what was provided
"""
Verify the integrity of the dag runs in case a task was added or removed
set the confirmed execution dates as they might be different
from what was provided
"""
confirmed_dates = []
dag_runs = DagRun.find(dag_id=dag.dag_id, execution_date=dates)
Expand Down Expand Up @@ -266,6 +269,7 @@ def get_execution_dates(dag, execution_date, future, past):
def _set_dag_run_state(dag_id, execution_date, state, session=None):
"""
Helper method that set dag run state in the DB.
:param dag_id: dag_id of target dag run
:param execution_date: the execution date from which to start looking
:param state: target state
Expand All @@ -289,6 +293,7 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None
"""
Set the dag run for a specific execution date and its task instances
to success.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param commit: commit DAG and tasks to be altered to the database
Expand Down Expand Up @@ -316,6 +321,7 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
"""
Set the dag run for a specific execution date and its running task instances
to failed.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param commit: commit DAG and tasks to be altered to the database
Expand Down Expand Up @@ -354,6 +360,7 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None)
def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None):
"""
Set the dag run for a specific execution date to running.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param commit: commit DAG and tasks to be altered to the database
Expand Down
2 changes: 2 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def backfill(args, dag=None):
def trigger_dag(args):
"""
Creates a dag run for the specified dag
:param args:
:return:
"""
Expand All @@ -243,6 +244,7 @@ def trigger_dag(args):
def delete_dag(args):
"""
Deletes all DB records related to the specified dag
:param args:
:return:
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def parameterized_config(template):
"""
Generates a configuration from the provided template + variables defined in
current scope
:param template: a config content templated with {{variables}}
"""
all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,11 @@ def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid):
"""
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
:param safe_dag_id: a dag_id with only alphanumeric characters
:param safe_task_id: a task_id with only alphanumeric characters
:param random_uuid: a uuid
:param safe_uuid: a uuid
:return: ``str`` valid Pod name of appropriate length
"""
safe_key = safe_dag_id + safe_task_id
Expand Down Expand Up @@ -590,6 +590,7 @@ def _datetime_to_label_safe_datestring(datetime_obj):
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/aws_athena_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def check_query_status(self, query_execution_id):
def get_state_change_reason(self, query_execution_id):
"""
Fetch the reason for a state change (e.g. error message). Returns None or reason string.
:param query_execution_id: Id of submitted athena query
:type query_execution_id: str
:return: str
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/aws_firehose_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
class AwsFirehoseHook(AwsHook):
"""
Interact with AWS Kinesis Firehose.
:param delivery_stream: Name of the delivery stream
:type delivery_stream: str
:param region_name: AWS region name (example: us-east-1)
Expand Down
25 changes: 12 additions & 13 deletions airflow/contrib/hooks/grpc_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,21 @@
class GrpcHook(BaseHook):
"""
General interaction with gRPC servers.
:param grpc_conn_id: The connection ID to use when fetching connection info.
:type grpc_conn_id: str
:param interceptors: a list of gRPC interceptor objects which would be applied
to the connected gRPC channel. None by default.
:type interceptors: a list of gRPC interceptors based on or extends the four
official gRPC interceptors, eg, UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor, StreamUnaryClientInterceptor,
StreamStreamClientInterceptor.
:param custom_connection_func: The customized connection function to return gRPC channel.
:type custom_connection_func: python callable objects that accept the connection as
its only arg. Could be partial or lambda.
"""

def __init__(self, grpc_conn_id, interceptors=None, custom_connection_func=None):
"""
:param grpc_conn_id: The connection ID to use when fetching connection info.
:type grpc_conn_id: str
:param interceptors: a list of gRPC interceptor objects which would be applied
to the connected gRPC channel. None by default.
:type interceptors: a list of gRPC interceptors based on or extends the four
official gRPC interceptors, eg, UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor, StreamUnaryClientInterceptor,
StreamStreamClientInterceptor.
::param custom_connection_func: The customized connection function to return gRPC channel.
:type custom_connection_func: python callable objects that accept the connection as
its only arg. Could be partial or lambda.
"""
self.grpc_conn_id = grpc_conn_id
self.conn = self.get_connection(self.grpc_conn_id)
self.extras = self.conn.extra_dejson
Expand Down
4 changes: 4 additions & 0 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def execute(self, context):
def kill(self, ti):
"""
Kill (cancel) a Qubole command
:param ti: Task Instance of the dag, used to determine the Quboles command id
:return: response from Qubole
"""
Expand All @@ -171,6 +172,7 @@ def kill(self, ti):
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
"""
Get results (or just s3 locations) of a command from Qubole and save into a file
:param ti: Task Instance of the dag, used to determine the Quboles command id
:param fp: Optional file pointer, will create one and return if None passed
:param inline: True to download actual results, False to get s3 locations only
Expand Down Expand Up @@ -199,6 +201,7 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
def get_log(self, ti):
"""
Get Logs of a command from Qubole
:param ti: Task Instance of the dag, used to determine the Quboles command id
:return: command log as text
"""
Expand All @@ -209,6 +212,7 @@ def get_log(self, ti):
def get_jobs_id(self, ti):
"""
Get jobs associated with a Qubole commands
:param ti: Task Instance of the dag, used to determine the Quboles command id
:return: Job information associated with command
"""
Expand Down
7 changes: 7 additions & 0 deletions airflow/contrib/hooks/sftp_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def describe_directory(self, path):
"""
Returns a dictionary of {filename: {attributes}} for all files
on the remote system (where the MLSD command is supported).
:param path: full path to the remote directory
:type path: str
"""
Expand All @@ -145,6 +146,7 @@ def describe_directory(self, path):
def list_directory(self, path):
"""
Returns a list of files on the remote system.
:param path: full path to the remote directory to list
:type path: str
"""
Expand All @@ -155,6 +157,7 @@ def list_directory(self, path):
def create_directory(self, path, mode=777):
"""
Creates a directory on the remote system.
:param path: full path to the remote directory to create
:type path: str
:param mode: int representation of octal mode for directory
Expand All @@ -165,6 +168,7 @@ def create_directory(self, path, mode=777):
def delete_directory(self, path):
"""
Deletes a directory on the remote system.
:param path: full path to the remote directory to delete
:type path: str
"""
Expand All @@ -176,6 +180,7 @@ def retrieve_file(self, remote_full_path, local_full_path):
Transfers the remote file to a local location.
If local_full_path is a string path, the file will be put
at that location
:param remote_full_path: full path to the remote file
:type remote_full_path: str
:param local_full_path: full path to the local file
Expand All @@ -191,6 +196,7 @@ def store_file(self, remote_full_path, local_full_path):
Transfers a local file to the remote location.
If local_full_path_or_buffer is a string path, the file will be read
from that location
:param remote_full_path: full path to the remote file
:type remote_full_path: str
:param local_full_path: full path to the local file
Expand All @@ -202,6 +208,7 @@ def store_file(self, remote_full_path, local_full_path):
def delete_file(self, path):
"""
Removes a file on the FTP Server
:param path: full path to the remote file
:type path: str
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/slack_webhook_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(self,
def _get_token(self, token, http_conn_id):
"""
Given either a manually set token or a conn_id, return the webhook_token to use
:param token: The manually provided token
:type token: str
:param http_conn_id: The conn_id provided
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def _get_spark_binary_path(self):
def _build_spark_submit_command(self, application):
"""
Construct the spark-submit command to execute.
:param application: command to append to the spark-submit command
:type application: str
:return: full command to be executed
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/kubernetes/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ class Secret(object):
"""Defines Kubernetes Secret Volume"""

def __init__(self, deploy_type, deploy_target, secret, key=None):
"""Initialize a Kubernetes Secret Object. Used to track requested secrets from
"""
Initialize a Kubernetes Secret Object. Used to track requested secrets from
the user.
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:type deploy_type: str
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self, result_queue):
def execute_work(self, key, command):
"""
Executes command received and stores result state in queue.
:param key: the key to identify the TI
:type key: tuple(dag_id, task_id, execution_date)
:param command: the command to execute
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def _run_file_processor(result_channel,
:type dag_id_white_list: list[unicode]
:param thread_name: the name to use for the process that is launched
:type thread_name: unicode
:return: the process that was launched
:rtype: multiprocessing.Process
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
:return: the process that was launched
:rtype: multiprocessing.Process
"""
# This helper runs in the newly created process
log = logging.getLogger("airflow.processor")
Expand Down
1 change: 1 addition & 0 deletions airflow/lineage/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def send_lineage(self,
operator=None, inlets=None, outlets=None, context=None):
"""
Sends lineage metadata to a backend
:param operator: the operator executing a transformation on the inlets and outlets
:param inlets: the inlets to this operator
:param outlets: the outlets from this operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ def get_table_constraints(conn, table_name):
def reorder_columns(columns):
"""
Reorder the columns for creating constraint, preserve primary key ordering
['task_id', 'dag_id', 'execution_date']
``['task_id', 'dag_id', 'execution_date']``
:param columns: columns retrieved from DB related to constraint
:return: ordered column
"""
Expand All @@ -262,6 +263,7 @@ def reorder_columns(columns):
def drop_constraint(operator, constraint_dict):
"""
Drop a primary key or unique constraint
:param operator: batch_alter_table for the table
:param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table
"""
Expand All @@ -282,6 +284,7 @@ def drop_constraint(operator, constraint_dict):
def create_constraint(operator, constraint_dict):
"""
Create a primary key or unique constraint
:param operator: batch_alter_table for the table
:param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table
"""
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,11 +1066,12 @@ def get_extra_links(self, dttm, link_name):
"""
For an operator, gets the URL that the external links specified in
`extra_links` should point to.
:raise ValueError: The error message of a ValueError will be passed on through to
the fronted to show up as a tooltip on the disabled link
the fronted to show up as a tooltip on the disabled link
:param dttm: The datetime parsed execution date for the URL being searched for
:param link_name: The name of the link we're looking for the URL for. Should be
one of the options specified in `extra_links`
one of the options specified in `extra_links`
:return: A URL
"""
if link_name in self.operator_extra_link_dict:
Expand Down
1 change: 1 addition & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def id_for_date(cls, date, prefix=ID_FORMAT_PREFIX):
def refresh_from_db(self, session=None):
"""
Reloads the current dagrun from the database
:param session: database session
"""
DR = DagRun
Expand Down
9 changes: 4 additions & 5 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ class BaseTaskRunner(LoggingMixin):
"""
Runs Airflow task instances by invoking the `airflow run` command with raw
mode enabled in a subprocess.
:param local_task_job: The local task job associated with running the
associated task instance.
:type local_task_job: airflow.jobs.LocalTaskJob
"""

def __init__(self, local_task_job):
"""
:param local_task_job: The local task job associated with running the
associated task instance.
:type local_task_job: airflow.jobs.LocalTaskJob
"""
# Pass task instance context into log handlers to setup the logger.
super(BaseTaskRunner, self).__init__(local_task_job.task_instance)
self._task_instance = local_task_job.task_instance
Expand Down
1 change: 1 addition & 0 deletions airflow/ti_deps/deps/dagrun_id_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DagrunIdDep(BaseTIDep):
def _get_dep_statuses(self, ti, session, dep_context=None):
"""
Determines if the DagRun ID is valid for scheduling from scheduler.
:param ti: the task instance to get the dependency status for
:type ti: airflow.models.TaskInstance
:param session: database session
Expand Down
Loading

0 comments on commit 82584cb

Please sign in to comment.