From 1344a7578df3214518a07226c8ba67efb043aa1c Mon Sep 17 00:00:00 2001 From: feluelle Date: Wed, 7 Aug 2019 20:33:02 +0200 Subject: [PATCH] [AIRFLOW-4686] Make dags Pylint compatible --- ...mple_azure_container_instances_operator.py | 5 +++- .../example_databricks_operator.py | 28 ++++++++++--------- .../example_dags/example_dingding_operator.py | 10 ++++++- .../example_emr_job_flow_automatic_steps.py | 4 ++- .../example_emr_job_flow_manual_steps.py | 5 ++++ .../example_kubernetes_executor.py | 19 +++++++++---- .../example_kubernetes_executor_config.py | 14 +++++++--- .../example_kubernetes_operator.py | 4 ++- .../example_dags/example_pubsub_flow.py | 2 ++ .../example_dags/example_qubole_operator.py | 23 ++++++++++----- .../example_dags/example_twitter_dag.py | 24 ++++++++++++---- .../example_dags/example_winrm_operator.py | 10 ++++--- airflow/contrib/example_dags/libs/helper.py | 1 + dags/test_dag.py | 7 +++-- scripts/ci/pylint_todo.txt | 23 --------------- scripts/perf/dags/perf_dag_1.py | 8 ++++-- scripts/perf/dags/perf_dag_2.py | 7 +++-- tests/dags/test_cli_triggered_dags.py | 1 - tests/dags/test_default_impersonation.py | 4 +-- tests/dags/test_example_bash_operator.py | 6 ++-- tests/dags/test_impersonation.py | 4 +-- tests/dags/test_impersonation_custom.py | 8 +++--- tests/dags/test_no_impersonation.py | 4 +-- tests/dags/test_retry_handling_job.py | 3 +- 24 files changed, 137 insertions(+), 87 deletions(-) diff --git a/airflow/contrib/example_dags/example_azure_container_instances_operator.py b/airflow/contrib/example_dags/example_azure_container_instances_operator.py index 023d395fa31bf..553b76a144ccf 100644 --- a/airflow/contrib/example_dags/example_azure_container_instances_operator.py +++ b/airflow/contrib/example_dags/example_azure_container_instances_operator.py @@ -16,10 +16,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This is an example dag for using the AzureContainerInstancesOperator. +""" +from datetime import datetime, timedelta from airflow import DAG from airflow.contrib.operators.azure_container_instances_operator import AzureContainerInstancesOperator -from datetime import datetime, timedelta default_args = { 'owner': 'Airflow', diff --git a/airflow/contrib/example_dags/example_databricks_operator.py b/airflow/contrib/example_dags/example_databricks_operator.py index f1b18fe5244f0..b8ec5cc2d4f59 100644 --- a/airflow/contrib/example_dags/example_databricks_operator.py +++ b/airflow/contrib/example_dags/example_databricks_operator.py @@ -16,25 +16,27 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This is an example DAG which uses the DatabricksSubmitRunOperator. +In this example, we create two tasks which execute sequentially. +The first task is to run a notebook at the workspace path "/test" +and the second task is to run a JAR uploaded to DBFS. Both, +tasks use new clusters. + +Because we have set a downstream dependency on the notebook task, +the spark jar task will NOT run until the notebook task completes +successfully. + +The definition of a successful run is if the run has a result_state of "SUCCESS". +For more information about the state of a run refer to +https://docs.databricks.com/api/latest/jobs.html#runstate +""" import airflow from airflow import DAG from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator -# This is an example DAG which uses the DatabricksSubmitRunOperator. -# In this example, we create two tasks which execute sequentially. -# The first task is to run a notebook at the workspace path "/test" -# and the second task is to run a JAR uploaded to DBFS. Both, -# tasks use new clusters. -# -# Because we have set a downstream dependency on the notebook task, -# the spark jar task will NOT run until the notebook task completes -# successfully. -# -# The definition of a successful run is if the run has a result_state of "SUCCESS". -# For more information about the state of a run refer to -# https://docs.databricks.com/api/latest/jobs.html#runstate args = { 'owner': 'Airflow', diff --git a/airflow/contrib/example_dags/example_dingding_operator.py b/airflow/contrib/example_dags/example_dingding_operator.py index e7c8cb87e9bee..79be9f6a1660e 100644 --- a/airflow/contrib/example_dags/example_dingding_operator.py +++ b/airflow/contrib/example_dags/example_dingding_operator.py @@ -16,7 +16,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +This is an example dag for using the DingdingOperator. +""" from datetime import timedelta import airflow @@ -32,6 +34,12 @@ # [START howto_operator_dingding_failure_callback] def failure_callback(context): + """ + The function that will be executed on failure. + + :param context: The context of the executed task. + :type context: dict + """ message = 'AIRFLOW TASK FAILURE TIPS:\n' \ 'DAG: {}\n' \ 'TASKS: {}\n' \ diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py index 5c64409a60c27..91f368652c68c 100644 --- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py +++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py @@ -16,7 +16,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +This is an example dag for a AWS EMR Pipeline with auto steps. +""" from datetime import timedelta import airflow from airflow import DAG diff --git a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py index 5c79daecf8787..fba531fb475fa 100644 --- a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py +++ b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py @@ -16,7 +16,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This is an example dag for a AWS EMR Pipeline. +Starting by creating a cluster, adding steps/operations, checking steps and finally when finished +terminating the cluster. +""" from datetime import timedelta import airflow diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py index ff1952b09a87f..309200d3ff89e 100644 --- a/airflow/contrib/example_dags/example_kubernetes_executor.py +++ b/airflow/contrib/example_dags/example_kubernetes_executor.py @@ -16,11 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This is an example dag for using the Kubernetes Executor. +""" +import os import airflow -from airflow.operators.python_operator import PythonOperator from airflow.models import DAG -import os +from airflow.operators.python_operator import PythonOperator args = { 'owner': 'Airflow', @@ -58,13 +61,19 @@ }] -def print_stuff(): +def print_stuff(): # pylint: disable=missing-docstring print("stuff!") def use_zip_binary(): - rc = os.system("zip") - assert rc == 0 + """ + Checks whether Zip is installed. + + :return: True if it is installed, False if not. + :rtype: bool + """ + return_code = os.system("zip") + assert return_code == 0 # You don't have to use any special KubernetesExecutor configuration if you don't want to diff --git a/airflow/contrib/example_dags/example_kubernetes_executor_config.py b/airflow/contrib/example_dags/example_kubernetes_executor_config.py index a1dc23e329d3f..46460d6fa5061 100644 --- a/airflow/contrib/example_dags/example_kubernetes_executor_config.py +++ b/airflow/contrib/example_dags/example_kubernetes_executor_config.py @@ -16,12 +16,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This is an example dag for using a Kubernetes Executor Configuration. +""" +import os import airflow -from airflow.operators.python_operator import PythonOperator from airflow.contrib.example_dags.libs.helper import print_stuff from airflow.models import DAG -import os +from airflow.operators.python_operator import PythonOperator args = { 'owner': 'Airflow', @@ -35,11 +38,14 @@ def test_volume_mount(): + """ + Tests whether the volume has been mounted. + """ with open('/foo/volume_mount_test.txt', 'w') as foo: foo.write('Hello') - rc = os.system("cat /foo/volume_mount_test.txt") - assert rc == 0 + return_code = os.system("cat /foo/volume_mount_test.txt") + assert return_code == 0 # You can use annotations on your kubernetes pods! diff --git a/airflow/contrib/example_dags/example_kubernetes_operator.py b/airflow/contrib/example_dags/example_kubernetes_operator.py index 611852a66b88f..77354a59e2cc7 100644 --- a/airflow/contrib/example_dags/example_kubernetes_operator.py +++ b/airflow/contrib/example_dags/example_kubernetes_operator.py @@ -16,7 +16,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +This is an example dag for using the KubernetesPodOperator. +""" from airflow.utils.dates import days_ago from airflow.utils.log.logging_mixin import LoggingMixin from airflow.models import DAG diff --git a/airflow/contrib/example_dags/example_pubsub_flow.py b/airflow/contrib/example_dags/example_pubsub_flow.py index a1b78290b0e2f..0a32e04db4220 100644 --- a/airflow/contrib/example_dags/example_pubsub_flow.py +++ b/airflow/contrib/example_dags/example_pubsub_flow.py @@ -70,6 +70,8 @@ with DAG('pubsub-end-to-end', default_args=default_args, schedule_interval=datetime.timedelta(days=1)) as dag: + # pylint: disable=no-value-for-parameter + # Note that parameters gets passed via dags default_args above. t1 = PubSubTopicCreateOperator(task_id='create-topic') t2 = PubSubSubscriptionCreateOperator( task_id='create-subscription', topic_project=project, diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index b07f2734e8ff0..1f7e2a8ce9d8f 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -28,13 +28,14 @@ example. Also be aware that it might spin up clusters to run these examples.* """ +import filecmp +import random + import airflow from airflow import DAG +from airflow.contrib.operators.qubole_operator import QuboleOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator -from airflow.contrib.operators.qubole_operator import QuboleOperator -import filecmp -import random default_args = { 'owner': 'Airflow', @@ -50,11 +51,19 @@ dag.doc_md = __doc__ -def compare_result(ds, **kwargs): +def compare_result(**kwargs): + """ + Compares the results of two QuboleOperator tasks. + + :param kwargs: The context of the executed task. + :type kwargs: dict + :return: True if the files are the same, False otherwise. + :rtype: bool + """ ti = kwargs['ti'] - r1 = t1.get_results(ti) - r2 = t2.get_results(ti) - return filecmp.cmp(r1, r2) + qubole_result_1 = t1.get_results(ti) + qubole_result_2 = t2.get_results(ti) + return filecmp.cmp(qubole_result_1, qubole_result_2) t1 = QuboleOperator( diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py index c2c8f4dd071d7..fa4210881292a 100644 --- a/airflow/contrib/example_dags/example_twitter_dag.py +++ b/airflow/contrib/example_dags/example_twitter_dag.py @@ -26,13 +26,17 @@ # -------------------------------------------------------------------------------- # Load The Dependencies # -------------------------------------------------------------------------------- +""" +This is an example dag for managing twitter data. +""" +from datetime import date, timedelta import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator -from airflow.operators.python_operator import PythonOperator from airflow.operators.hive_operator import HiveOperator -from datetime import date, timedelta +from airflow.operators.python_operator import PythonOperator + # -------------------------------------------------------------------------------- # Create a few placeholder scripts. In practice these would be different python @@ -41,19 +45,27 @@ def fetchtweets(): - return None + """ + This is a placeholder for fetchtweets. + """ def cleantweets(): - return None + """ + This is a placeholder for cleantweets. + """ def analyzetweets(): - return None + """ + This is a placeholder for analyzetweets. + """ def transfertodb(): - return None + """ + This is a placeholder for transfertodb. + """ # -------------------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_winrm_operator.py b/airflow/contrib/example_dags/example_winrm_operator.py index 271dd605f6081..a0eb6f3a8f8b6 100644 --- a/airflow/contrib/example_dags/example_winrm_operator.py +++ b/airflow/contrib/example_dags/example_winrm_operator.py @@ -26,14 +26,16 @@ # -------------------------------------------------------------------------------- # Load The Dependencies # -------------------------------------------------------------------------------- -import airflow -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG +""" +This is an example dag for using the WinRMOperator. +""" from datetime import timedelta +import airflow from airflow.contrib.hooks.winrm_hook import WinRMHook from airflow.contrib.operators.winrm_operator import WinRMOperator - +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'Airflow', diff --git a/airflow/contrib/example_dags/libs/helper.py b/airflow/contrib/example_dags/libs/helper.py index d7b62e65c7c88..9991954698583 100644 --- a/airflow/contrib/example_dags/libs/helper.py +++ b/airflow/contrib/example_dags/libs/helper.py @@ -18,5 +18,6 @@ # under the License. +# pylint: disable=missing-docstring def print_stuff(): print("annotated!") diff --git a/dags/test_dag.py b/dags/test_dag.py index a133dd5a1263b..245764ae1999c 100644 --- a/dags/test_dag.py +++ b/dags/test_dag.py @@ -16,11 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This dag only runs some simple tasks to test Airflow's task execution. +""" +from datetime import datetime, timedelta -from airflow import utils from airflow import DAG +from airflow import utils from airflow.operators.dummy_operator import DummyOperator -from datetime import datetime, timedelta now = datetime.now() now_to_the_hour = ( diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 0b02a3434b8dd..1229cf848297a 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -8,19 +8,6 @@ ./airflow/contrib/auth/backends/kerberos_auth.py ./airflow/contrib/auth/backends/ldap_auth.py ./airflow/contrib/auth/backends/password_auth.py -./airflow/contrib/example_dags/example_azure_container_instances_operator.py -./airflow/contrib/example_dags/example_databricks_operator.py -./airflow/contrib/example_dags/example_dingding_operator.py -./airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py -./airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py -./airflow/contrib/example_dags/example_kubernetes_executor.py -./airflow/contrib/example_dags/example_kubernetes_executor_config.py -./airflow/contrib/example_dags/example_kubernetes_operator.py -./airflow/contrib/example_dags/example_pubsub_flow.py -./airflow/contrib/example_dags/example_qubole_operator.py -./airflow/contrib/example_dags/example_twitter_dag.py -./airflow/contrib/example_dags/example_winrm_operator.py -./airflow/contrib/example_dags/libs/helper.py ./airflow/contrib/hooks/azure_container_instance_hook.py ./airflow/contrib/hooks/azure_container_registry_hook.py ./airflow/contrib/hooks/azure_container_volume_hook.py @@ -401,13 +388,10 @@ ./airflow/www/views.py ./airflow/www/widgets.py ./airflow/__init__.py -./dags/test_dag.py ./docs/conf.py ./docs/exts/docroles.py ./docs/exts/exampleinclude.py ./docs/exts/removemarktransform.py -./scripts/perf/dags/perf_dag_1.py -./scripts/perf/dags/perf_dag_2.py ./scripts/perf/scheduler_ops_metrics.py ./tests/cli/test_cli.py ./tests/cli/test_worker_initialisation.py @@ -473,13 +457,6 @@ ./tests/contrib/utils/test_sendgrid.py ./tests/contrib/utils/test_weekday.py ./tests/core.py -./tests/dags/test_cli_triggered_dags.py -./tests/dags/test_default_impersonation.py -./tests/dags/test_example_bash_operator.py -./tests/dags/test_impersonation.py -./tests/dags/test_impersonation_custom.py -./tests/dags/test_no_impersonation.py -./tests/dags/test_retry_handling_job.py ./tests/executors/test_base_executor.py ./tests/executors/test_celery_executor.py ./tests/executors/test_dask_executor.py diff --git a/scripts/perf/dags/perf_dag_1.py b/scripts/perf/dags/perf_dag_1.py index 83960dc06b9ac..4ac25c226aa30 100644 --- a/scripts/perf/dags/perf_dag_1.py +++ b/scripts/perf/dags/perf_dag_1.py @@ -16,10 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This dag tests performance of simple bash commands executed with Airflow. +""" +from datetime import timedelta + import airflow -from airflow.operators.bash_operator import BashOperator from airflow.models import DAG -from datetime import timedelta +from airflow.operators.bash_operator import BashOperator args = { 'owner': 'Airflow', diff --git a/scripts/perf/dags/perf_dag_2.py b/scripts/perf/dags/perf_dag_2.py index bfbc6917f5541..b4f4370fa0222 100644 --- a/scripts/perf/dags/perf_dag_2.py +++ b/scripts/perf/dags/perf_dag_2.py @@ -16,11 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This dag tests performance of simple bash commands executed with Airflow. +""" +from datetime import timedelta import airflow -from airflow.operators.bash_operator import BashOperator from airflow.models import DAG -from datetime import timedelta +from airflow.operators.bash_operator import BashOperator args = { 'owner': 'Airflow', diff --git a/tests/dags/test_cli_triggered_dags.py b/tests/dags/test_cli_triggered_dags.py index 9f53ca4c3ab0b..7747d20710b1d 100644 --- a/tests/dags/test_cli_triggered_dags.py +++ b/tests/dags/test_cli_triggered_dags.py @@ -37,7 +37,6 @@ def fail(): def success(ti=None, *args, **kwargs): if ti.execution_date != DEFAULT_DATE + timedelta(days=1): fail() - return # DAG tests that tasks ignore all dependencies diff --git a/tests/dags/test_default_impersonation.py b/tests/dags/test_default_impersonation.py index 537fee9a7cb93..a9b83a1a2d9fe 100644 --- a/tests/dags/test_default_impersonation.py +++ b/tests/dags/test_default_impersonation.py @@ -17,11 +17,11 @@ # specific language governing permissions and limitations # under the License. -from airflow.models import DAG -from airflow.operators.bash_operator import BashOperator from datetime import datetime from textwrap import dedent +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index d8c36f0bfdecc..e839e16e6747e 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -17,12 +17,12 @@ # specific language governing permissions and limitations # under the License. +from datetime import timedelta + import airflow +from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG -from datetime import timedelta - args = { 'owner': 'airflow', diff --git a/tests/dags/test_impersonation.py b/tests/dags/test_impersonation.py index bc327d2953844..95887f36e7a89 100644 --- a/tests/dags/test_impersonation.py +++ b/tests/dags/test_impersonation.py @@ -17,11 +17,11 @@ # specific language governing permissions and limitations # under the License. -from airflow.models import DAG -from airflow.operators.bash_operator import BashOperator from datetime import datetime from textwrap import dedent +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_impersonation_custom.py b/tests/dags/test_impersonation_custom.py index 1157812abca02..c6d0039ded62e 100644 --- a/tests/dags/test_impersonation_custom.py +++ b/tests/dags/test_impersonation_custom.py @@ -17,10 +17,10 @@ # specific language governing permissions and limitations # under the License. -from airflow.models import DAG -from airflow.operators.python_operator import PythonOperator from datetime import datetime +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator # AIRFLOW-1893 - Originally, impersonation tests were incomplete missing the use case when # DAGs access custom packages usually made available through the PYTHONPATH environment # variable. This file includes a DAG that imports a custom package made available and if @@ -42,8 +42,8 @@ def print_today(): - dt = FakeDatetime.utcnow() - print('Today is {}'.format(dt.strftime('%Y-%m-%d'))) + date_time = FakeDatetime.utcnow() + print('Today is {}'.format(date_time.strftime('%Y-%m-%d'))) def check_hive_conf(): diff --git a/tests/dags/test_no_impersonation.py b/tests/dags/test_no_impersonation.py index 26cb935d8679e..9382733737f2e 100644 --- a/tests/dags/test_no_impersonation.py +++ b/tests/dags/test_no_impersonation.py @@ -17,11 +17,11 @@ # specific language governing permissions and limitations # under the License. -from airflow.models import DAG -from airflow.operators.bash_operator import BashOperator from datetime import datetime from textwrap import dedent +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_retry_handling_job.py b/tests/dags/test_retry_handling_job.py index d8e314dbabde5..54d5a15cac323 100644 --- a/tests/dags/test_retry_handling_job.py +++ b/tests/dags/test_retry_handling_job.py @@ -17,9 +17,10 @@ # specific language governing permissions and limitations # under the License. +from datetime import datetime, timedelta + from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import datetime, timedelta default_args = { 'owner': 'airflow',