Skip to content

Commit

Permalink
[AIRFLOW-4686] Make dags Pylint compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
feluelle committed Aug 12, 2019
1 parent 47dd4c9 commit 1344a75
Show file tree
Hide file tree
Showing 24 changed files with 137 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the AzureContainerInstancesOperator.
"""
from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.azure_container_instances_operator import AzureContainerInstancesOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'Airflow',
Expand Down
28 changes: 15 additions & 13 deletions airflow/contrib/example_dags/example_databricks_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example DAG which uses the DatabricksSubmitRunOperator.
In this example, we create two tasks which execute sequentially.
The first task is to run a notebook at the workspace path "/test"
and the second task is to run a JAR uploaded to DBFS. Both,
tasks use new clusters.
Because we have set a downstream dependency on the notebook task,
the spark jar task will NOT run until the notebook task completes
successfully.
The definition of a successful run is if the run has a result_state of "SUCCESS".
For more information about the state of a run refer to
https://docs.databricks.com/api/latest/jobs.html#runstate
"""

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator

# This is an example DAG which uses the DatabricksSubmitRunOperator.
# In this example, we create two tasks which execute sequentially.
# The first task is to run a notebook at the workspace path "/test"
# and the second task is to run a JAR uploaded to DBFS. Both,
# tasks use new clusters.
#
# Because we have set a downstream dependency on the notebook task,
# the spark jar task will NOT run until the notebook task completes
# successfully.
#
# The definition of a successful run is if the run has a result_state of "SUCCESS".
# For more information about the state of a run refer to
# https://docs.databricks.com/api/latest/jobs.html#runstate

args = {
'owner': 'Airflow',
Expand Down
10 changes: 9 additions & 1 deletion airflow/contrib/example_dags/example_dingding_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example dag for using the DingdingOperator.
"""
from datetime import timedelta

import airflow
Expand All @@ -32,6 +34,12 @@

# [START howto_operator_dingding_failure_callback]
def failure_callback(context):
"""
The function that will be executed on failure.
:param context: The context of the executed task.
:type context: dict
"""
message = 'AIRFLOW TASK FAILURE TIPS:\n' \
'DAG: {}\n' \
'TASKS: {}\n' \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example dag for a AWS EMR Pipeline with auto steps.
"""
from datetime import timedelta
import airflow
from airflow import DAG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for a AWS EMR Pipeline.
Starting by creating a cluster, adding steps/operations, checking steps and finally when finished
terminating the cluster.
"""
from datetime import timedelta

import airflow
Expand Down
19 changes: 14 additions & 5 deletions airflow/contrib/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the Kubernetes Executor.
"""
import os

import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import os
from airflow.operators.python_operator import PythonOperator

args = {
'owner': 'Airflow',
Expand Down Expand Up @@ -58,13 +61,19 @@
}]


def print_stuff():
def print_stuff(): # pylint: disable=missing-docstring
print("stuff!")


def use_zip_binary():
rc = os.system("zip")
assert rc == 0
"""
Checks whether Zip is installed.
:return: True if it is installed, False if not.
:rtype: bool
"""
return_code = os.system("zip")
assert return_code == 0


# You don't have to use any special KubernetesExecutor configuration if you don't want to
Expand Down
14 changes: 10 additions & 4 deletions airflow/contrib/example_dags/example_kubernetes_executor_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using a Kubernetes Executor Configuration.
"""
import os

import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.example_dags.libs.helper import print_stuff
from airflow.models import DAG
import os
from airflow.operators.python_operator import PythonOperator

args = {
'owner': 'Airflow',
Expand All @@ -35,11 +38,14 @@


def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')

rc = os.system("cat /foo/volume_mount_test.txt")
assert rc == 0
return_code = os.system("cat /foo/volume_mount_test.txt")
assert return_code == 0


# You can use annotations on your kubernetes pods!
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/example_dags/example_kubernetes_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example dag for using the KubernetesPodOperator.
"""
from airflow.utils.dates import days_ago
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.models import DAG
Expand Down
2 changes: 2 additions & 0 deletions airflow/contrib/example_dags/example_pubsub_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@

with DAG('pubsub-end-to-end', default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# pylint: disable=no-value-for-parameter
# Note that parameters gets passed via dags default_args above.
t1 = PubSubTopicCreateOperator(task_id='create-topic')
t2 = PubSubSubscriptionCreateOperator(
task_id='create-subscription', topic_project=project,
Expand Down
23 changes: 16 additions & 7 deletions airflow/contrib/example_dags/example_qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
example. Also be aware that it might spin up clusters to run these examples.*
"""

import filecmp
import random

import airflow
from airflow import DAG
from airflow.contrib.operators.qubole_operator import QuboleOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.contrib.operators.qubole_operator import QuboleOperator
import filecmp
import random

default_args = {
'owner': 'Airflow',
Expand All @@ -50,11 +51,19 @@
dag.doc_md = __doc__


def compare_result(ds, **kwargs):
def compare_result(**kwargs):
"""
Compares the results of two QuboleOperator tasks.
:param kwargs: The context of the executed task.
:type kwargs: dict
:return: True if the files are the same, False otherwise.
:rtype: bool
"""
ti = kwargs['ti']
r1 = t1.get_results(ti)
r2 = t2.get_results(ti)
return filecmp.cmp(r1, r2)
qubole_result_1 = t1.get_results(ti)
qubole_result_2 = t2.get_results(ti)
return filecmp.cmp(qubole_result_1, qubole_result_2)


t1 = QuboleOperator(
Expand Down
24 changes: 18 additions & 6 deletions airflow/contrib/example_dags/example_twitter_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
"""
This is an example dag for managing twitter data.
"""
from datetime import date, timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
from datetime import date, timedelta
from airflow.operators.python_operator import PythonOperator


# --------------------------------------------------------------------------------
# Create a few placeholder scripts. In practice these would be different python
Expand All @@ -41,19 +45,27 @@


def fetchtweets():
return None
"""
This is a placeholder for fetchtweets.
"""


def cleantweets():
return None
"""
This is a placeholder for cleantweets.
"""


def analyzetweets():
return None
"""
This is a placeholder for analyzetweets.
"""


def transfertodb():
return None
"""
This is a placeholder for transfertodb.
"""


# --------------------------------------------------------------------------------
Expand Down
10 changes: 6 additions & 4 deletions airflow/contrib/example_dags/example_winrm_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import airflow
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
"""
This is an example dag for using the WinRMOperator.
"""
from datetime import timedelta

import airflow
from airflow.contrib.hooks.winrm_hook import WinRMHook
from airflow.contrib.operators.winrm_operator import WinRMOperator

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

args = {
'owner': 'Airflow',
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/example_dags/libs/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
# under the License.


# pylint: disable=missing-docstring
def print_stuff():
print("annotated!")
7 changes: 5 additions & 2 deletions dags/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This dag only runs some simple tasks to test Airflow's task execution.
"""
from datetime import datetime, timedelta

from airflow import utils
from airflow import DAG
from airflow import utils
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

now = datetime.now()
now_to_the_hour = (
Expand Down
23 changes: 0 additions & 23 deletions scripts/ci/pylint_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@
./airflow/contrib/auth/backends/kerberos_auth.py
./airflow/contrib/auth/backends/ldap_auth.py
./airflow/contrib/auth/backends/password_auth.py
./airflow/contrib/example_dags/example_azure_container_instances_operator.py
./airflow/contrib/example_dags/example_databricks_operator.py
./airflow/contrib/example_dags/example_dingding_operator.py
./airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
./airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
./airflow/contrib/example_dags/example_kubernetes_executor.py
./airflow/contrib/example_dags/example_kubernetes_executor_config.py
./airflow/contrib/example_dags/example_kubernetes_operator.py
./airflow/contrib/example_dags/example_pubsub_flow.py
./airflow/contrib/example_dags/example_qubole_operator.py
./airflow/contrib/example_dags/example_twitter_dag.py
./airflow/contrib/example_dags/example_winrm_operator.py
./airflow/contrib/example_dags/libs/helper.py
./airflow/contrib/hooks/azure_container_instance_hook.py
./airflow/contrib/hooks/azure_container_registry_hook.py
./airflow/contrib/hooks/azure_container_volume_hook.py
Expand Down Expand Up @@ -401,13 +388,10 @@
./airflow/www/views.py
./airflow/www/widgets.py
./airflow/__init__.py
./dags/test_dag.py
./docs/conf.py
./docs/exts/docroles.py
./docs/exts/exampleinclude.py
./docs/exts/removemarktransform.py
./scripts/perf/dags/perf_dag_1.py
./scripts/perf/dags/perf_dag_2.py
./scripts/perf/scheduler_ops_metrics.py
./tests/cli/test_cli.py
./tests/cli/test_worker_initialisation.py
Expand Down Expand Up @@ -473,13 +457,6 @@
./tests/contrib/utils/test_sendgrid.py
./tests/contrib/utils/test_weekday.py
./tests/core.py
./tests/dags/test_cli_triggered_dags.py
./tests/dags/test_default_impersonation.py
./tests/dags/test_example_bash_operator.py
./tests/dags/test_impersonation.py
./tests/dags/test_impersonation_custom.py
./tests/dags/test_no_impersonation.py
./tests/dags/test_retry_handling_job.py
./tests/executors/test_base_executor.py
./tests/executors/test_celery_executor.py
./tests/executors/test_dask_executor.py
Expand Down
Loading

0 comments on commit 1344a75

Please sign in to comment.