From 3e459d75d5ae90b1c1b7396fd8498d649b804bb7 Mon Sep 17 00:00:00 2001 From: Tomasz Urbaszek Date: Sun, 8 Nov 2020 17:50:09 +0100 Subject: [PATCH 1/5] Create DAG-level cluster policy This commit adds new concept of dag_policy which is checked once for every DAG when creating DagBag. It also improves documentation around cluster policies. closes: #12179 --- UPDATING.md | 8 +++ airflow/models/dagbag.py | 5 +- airflow/settings.py | 35 +++++++-- docs/concepts.rst | 108 ++++++++++++++-------------- tests/cluster_policies/__init__.py | 34 +++++++++ tests/dags/test_dag_with_no_tags.py | 31 ++++++++ tests/models/test_dagbag.py | 22 ++++-- 7 files changed, 176 insertions(+), 67 deletions(-) create mode 100644 tests/dags/test_dag_with_no_tags.py diff --git a/UPDATING.md b/UPDATING.md index 2f6dc43577778f..b23aad3685e8cb 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -77,6 +77,14 @@ session_lifetime_minutes = 43200 ## Airflow 2.0.0b1 +### Rename policy to task_policy + +Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy` +function to `task_policy` to make the distinction more profound and avoid any confusion. + +Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py` +to `task_policy`. + ### Default value for `[celery] operation_timeout` has changed to `1.0` From Airflow 2, by default Airflow will retry 3 times to publish task to Celery broker. This is controlled by diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 0b1688f453a965..d8207c570bf1cb 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -386,8 +386,11 @@ def bag_dag(self, dag, root_dag): dag.resolve_template_files() dag.last_loaded = timezone.utcnow() + # Check policies + settings.dag_policy(dag) + for task in dag.tasks: - settings.policy(task) + settings.task_policy(task) subdags = dag.subdags diff --git a/airflow/settings.py b/airflow/settings.py index d6e4749197de10..1c12aeb106383f 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -118,13 +118,15 @@ def custom_show_warning(message, category, filename, lineno, file=None, line=Non warnings.showwarning = custom_show_warning -def policy(task): # pylint: disable=unused-argument +def task_policy(task) -> None: # pylint: disable=unused-argument """ This policy setting allows altering tasks after they are loaded in - the DagBag. It allows administrator to rewire some task parameters. + the DagBag. It allows administrator to rewire some task's parameters. + Alternatively you can raise ``AirflowClusterPolicyViolation`` exception + to stop DAG from being executed. To define policy, add a ``airflow_local_settings`` module - to your PYTHONPATH that defines this ``policy`` function. + to your PYTHONPATH that defines this ``task_policy`` function. Here are a few examples of how this can be useful: @@ -133,7 +135,29 @@ def policy(task): # pylint: disable=unused-argument tasks get wired to the right workers * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours - * ... + + :param task: task to be mutated + :type task: airflow.models.baseoperator.BaseOperator + """ + + +def dag_policy(dag) -> None: # pylint: disable=unused-argument + """ + This policy setting allows altering DAGs after they are loaded in + the DagBag. It allows administrator to rewire some DAG's parameters. + Alternatively you can raise ``AirflowClusterPolicyViolation`` exception + to stop DAG from being executed. + + To define policy, add a ``airflow_local_settings`` module + to your PYTHONPATH that defines this ``dag_policy`` function. + + Here are a few examples of how this can be useful: + + * You could enforce default user for DAGs + * Check if every DAG has configured tags + + :param dag: dag to be mutated + :type dag: airflow.models.dag.DAG """ @@ -146,6 +170,9 @@ def task_instance_mutation_hook(task_instance): # pylint: disable=unused-argume to your PYTHONPATH that defines this ``task_instance_mutation_hook`` function. This could be used, for instance, to modify the task instance during retries. + + :param task_instance: task instance to be mutated + :type task_instance: airflow.models.taskinstance.TaskInstance """ diff --git a/docs/concepts.rst b/docs/concepts.rst index b05f73fbbe1ad4..67542f1bd27e06 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1194,72 +1194,67 @@ state. Cluster Policy ============== -Cluster policies provide an interface for taking action on every Airflow task -either at DAG load time or just before task execution. - -Cluster Policies for Task Mutation ------------------------------------ -In case you want to apply cluster-wide mutations to the Airflow tasks, -you can either mutate the task right after the DAG is loaded or -mutate the task instance before task execution. - -Mutate tasks after DAG loaded ------------------------------ - -To mutate the task right after the DAG is parsed, you can define -a ``policy`` function in ``airflow_local_settings.py`` that mutates the -task based on other task or DAG attributes (through ``task.dag``). -It receives a single argument as a reference to the task object and you can alter -its attributes. -For example, this function could apply a specific queue property when -using a specific operator, or enforce a task timeout policy, making sure -that no tasks run for more than 48 hours. Here's an example of what this -may look like inside your ``airflow_local_settings.py``: +Cluster policies provide an interface for taking action on every Airflow task +or DAG either at DAG load time or just before task execution. In this way users +are able to do the following: +- set default arguments on each DAG/task +- checks that DAG/task meets required standards +- perform custom logic of routing task to a queue -.. code-block:: python +Any many other options. To use cluster-wide policies users can define in their +``airflow_local_settings`` the following functions - def policy(task): - if task.task_type == 'HivePartitionSensor': - task.queue = "sensor_queue" - if task.timeout > timedelta(hours=48): - task.timeout = timedelta(hours=48) +- ``dag_policy`` - which as an input takes ``dag`` argument of :class:`~airflow.models.dag.DAG` type. + This function allows users to define dag-level policy which is executed for every DAG at loading time. +- ``task_policy`` - which as an input takes ``task`` argument of :class:`~airflow.models.baseoperator.BaseOperator` + type. This function allows users to define task-level policy which is executed for every task at DAG loading time. +- ``task_instance_mutation_hook`` - which as an input takes ``task_instance`` argument of + :class:`~airflow.models.taskinstance.TaskInstance` type. This function allows users to define task-level + policy that is executed right before the task execution. +In case of DAG and task policies users may raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation` +to prevent a DAG from being imported or prevent a task from being executed if the task is not compliant with +users' check. -Please note, cluster policy will have precedence over task -attributes defined in DAG meaning if ``task.sla`` is defined -in dag and also mutated via cluster policy then later will have precedence. +Please note, cluster policy will have precedence over task attributes defined in DAG meaning +if ``task.sla`` is defined in dag and also mutated via cluster policy then later will have precedence. +In next sections we show examples of each type of cluster policy. -Mutate task instances before task execution +Where to put ``airflow_local_settings.py``? ------------------------------------------- +Add a ``airflow_local_settings.py`` file to your ``$PYTHONPATH`` or to ``$AIRFLOW_HOME/config`` folder. -To mutate the task instance before the task execution, you can define a -``task_instance_mutation_hook`` function in ``airflow_local_settings.py`` -that mutates the task instance. +See :doc:`modules_management` for details on how Python and Airflow manage modules. -For example, this function re-routes the task to execute in a different -queue during retries: -.. code-block:: python +DAG level cluster policy +----------------------------------- +In this example we check if each DAG has at least one tag defined. +Here is what it may look like: - def task_instance_mutation_hook(ti): - if ti.try_number >= 1: - ti.queue = 'retry_queue' +.. literalinclude:: /../tests/cluster_policies/__init__.py + :language: python + :start-after: [START example_dag_cluster_policy] + :end-before: [END example_dag_cluster_policy] +Task level cluster policy +----------------------------- +For example, this function could apply a specific queue property when +using a specific operator, or enforce a task timeout policy, making sure +that no tasks run for more than 48 hours. Here's an example of what this +may look like: -Cluster Policies for Custom Task Checks -------------------------------------------- -You may also use Cluster Policies to apply cluster-wide checks on Airflow -tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation` -in a policy or task mutation hook (described below) to prevent a DAG from being -imported or prevent a task from being executed if the task is not compliant with -your check. +.. literalinclude:: /../tests/cluster_policies/__init__.py + :language: python + :start-after: [START example_task_cluster_policy] + :end-before: [END example_task_cluster_policy] -These checks are intended to help teams using Airflow to protect against common -beginner errors that may get past a code reviewer, rather than as technical -security controls. +As a more advanced example we may consider implementing checks that are intended to help +teams using Airflow to protect against common beginner errors that may get past a code +reviewer, rather than as technical security controls. For example, don't run tasks without airflow owners: @@ -1281,14 +1276,15 @@ For Example in ``airflow_local_settings.py``: :start-after: [START example_list_of_cluster_policy_rules] :end-before: [END example_list_of_cluster_policy_rules] -Where to put ``airflow_local_settings.py``? +Task instance mutation hook ------------------------------------------- +Task instance mutation hook can be used for example to re-routes the task to +execute in a different queue during retries: -Add a ``airflow_local_settings.py`` file to your ``$PYTHONPATH`` -or to ``$AIRFLOW_HOME/config`` folder. - -See :doc:`modules_management` for details on how Python and Airflow manage modules. - +.. literalinclude:: /../tests/cluster_policies/__init__.py + :language: python + :start-after: [START example_task_mutation_hook] + :end-before: [END example_task_mutation_hook] Documentation & Notes ===================== diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py index 77e5253f2254b7..9d2818117187e9 100644 --- a/tests/cluster_policies/__init__.py +++ b/tests/cluster_policies/__init__.py @@ -15,10 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import timedelta from typing import Callable, List +from airflow import DAG from airflow.configuration import conf from airflow.exceptions import AirflowClusterPolicyViolation +from airflow.models import TaskInstance from airflow.models.baseoperator import BaseOperator @@ -62,3 +65,34 @@ def cluster_policy(task: BaseOperator): # [END example_list_of_cluster_policy_rules] + +# [START example_dag_cluster_policy] +def dag_policy(dag: DAG): + """Ensure that DAG has at least one tag""" + if not dag.tags: + raise AirflowClusterPolicyViolation( + f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.filepath}" + ) + + +# [END example_dag_cluster_policy] + + +# [START example_task_cluster_policy] +def task_policy(task: BaseOperator): + if task.task_type == 'HivePartitionSensor': + task.queue = "sensor_queue" + if task.timeout > timedelta(hours=48): + task.timeout = timedelta(hours=48) + + +# [END example_task_cluster_policy] + + +# [START example_task_mutation_hook] +def task_instance_mutation_hook(task_instance: TaskInstance): + if task_instance.try_number >= 1: + task_instance.queue = 'retry_queue' + + +# [END example_task_mutation_hook] diff --git a/tests/dags/test_dag_with_no_tags.py b/tests/dags/test_dag_with_no_tags.py new file mode 100644 index 00000000000000..aae5256d28617e --- /dev/null +++ b/tests/dags/test_dag_with_no_tags.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + +DEFAULT_DATE = datetime(2016, 1, 1) + +default_args = { + "owner": "airflow", + "start_date": DEFAULT_DATE, +} + +with DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') as dag: + task_a = DummyOperator(task_id="test_task_a") diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index a0d7fbed5f0629..abbb4b1271db19 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -714,9 +714,10 @@ def test_collect_dags_from_db(self): self.assertEqual(serialized_dag.dag_id, dag.dag_id) self.assertEqual(set(serialized_dag.task_dict), set(dag.task_dict)) - @patch("airflow.settings.policy", cluster_policies.cluster_policy) - def test_cluster_policy_violation(self): - """test that file processing results in import error when task does not + @patch("airflow.settings.task_policy", cluster_policies.cluster_policy) + def test_task_cluster_policy_violation(self): + """ + test that file processing results in import error when task does not obey cluster policy. """ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py") @@ -732,9 +733,10 @@ def test_cluster_policy_violation(self): } self.assertEqual(expected_import_errors, dagbag.import_errors) - @patch("airflow.settings.policy", cluster_policies.cluster_policy) - def test_cluster_policy_obeyed(self): - """test that dag successfully imported without import errors when tasks + @patch("airflow.settings.task_policy", cluster_policies.cluster_policy) + def test_task_cluster_policy_obeyed(self): + """ + test that dag successfully imported without import errors when tasks obey cluster policy. """ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py") @@ -743,3 +745,11 @@ def test_cluster_policy_obeyed(self): self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids)) self.assertEqual({}, dagbag.import_errors) + + @patch("airflow.settings.dag_policy", cluster_policies.dag_policy) + def test_dag_cluster_policy_obeyed(self): + dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_with_no_tags.py") + + dagbag = DagBag(dag_folder=dag_file, include_examples=False, include_smart_sensor=False) + assert len(dagbag.dag_ids) == 0 + assert "has no tags" in dagbag.import_errors[dag_file] From 4bee507994ae3016e4251c8d53e3643213d47b27 Mon Sep 17 00:00:00 2001 From: Tomasz Urbaszek Date: Sun, 8 Nov 2020 20:11:44 +0100 Subject: [PATCH 2/5] fixup! Create DAG-level cluster policy --- airflow/models/dagbag.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index d8207c570bf1cb..908d7b879c9823 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -47,6 +47,16 @@ from airflow.utils.session import provide_session from airflow.utils.timeout import timeout +# TODO: Remove once deprecated +if hasattr(settings, "policy"): + warnings.warn( + "Using `policy` in airflow_local_settings.py is deprecated. " + "Please rename your `policy` to `task_policy`.", + DeprecationWarning, + stacklevel=2, + ) + setattr(settings, "task_policy", settings.policy) # pylint: disable=no-member + class FileLoadStat(NamedTuple): """Information about single file""" From 39b3202673637380dae54fc433e1679a598bf712 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Mon, 9 Nov 2020 21:19:58 +0100 Subject: [PATCH 3/5] Update docs/concepts.rst Co-authored-by: Kaxil Naik --- docs/concepts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 67542f1bd27e06..4c94f3da3b5dd7 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1203,7 +1203,7 @@ are able to do the following: - checks that DAG/task meets required standards - perform custom logic of routing task to a queue -Any many other options. To use cluster-wide policies users can define in their +And many other options. To use cluster-wide policies users can define in their ``airflow_local_settings`` the following functions - ``dag_policy`` - which as an input takes ``dag`` argument of :class:`~airflow.models.dag.DAG` type. From c7dab70c705c5743121eccf0c4e08709b4a826c0 Mon Sep 17 00:00:00 2001 From: Tomasz Urbaszek Date: Thu, 12 Nov 2020 15:17:48 +0100 Subject: [PATCH 4/5] fixup! Update docs/concepts.rst --- airflow/models/dagbag.py | 10 ---------- airflow/settings.py | 11 +++++++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 908d7b879c9823..d8207c570bf1cb 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -47,16 +47,6 @@ from airflow.utils.session import provide_session from airflow.utils.timeout import timeout -# TODO: Remove once deprecated -if hasattr(settings, "policy"): - warnings.warn( - "Using `policy` in airflow_local_settings.py is deprecated. " - "Please rename your `policy` to `task_policy`.", - DeprecationWarning, - stacklevel=2, - ) - setattr(settings, "task_policy", settings.policy) # pylint: disable=no-member - class FileLoadStat(NamedTuple): """Information about single file""" diff --git a/airflow/settings.py b/airflow/settings.py index 1c12aeb106383f..6ce402fac83807 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -377,6 +377,17 @@ def import_local_settings(): if not k.startswith("__"): globals()[k] = v + # TODO: Remove once deprecated + if "policy" in globals(): + warnings.warn( + "Using `policy` in airflow_local_settings.py is deprecated. " + "Please rename your `policy` to `task_policy`.", + DeprecationWarning, + stacklevel=2, + ) + globals()["task_policy"] = globals()["policy"] + del globals()["policy"] + log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) except ImportError: log.debug("Failed to import airflow_local_settings.", exc_info=True) From 340c13062a1aa8906afaa5765bd3ff3141b2f852 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Thu, 12 Nov 2020 19:24:14 +0100 Subject: [PATCH 5/5] Update airflow/settings.py Co-authored-by: Ash Berlin-Taylor --- airflow/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/settings.py b/airflow/settings.py index 6ce402fac83807..41642f428e065e 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -378,7 +378,7 @@ def import_local_settings(): globals()[k] = v # TODO: Remove once deprecated - if "policy" in globals(): + if "policy" in globals() and "task_policy" not in globals(): warnings.warn( "Using `policy` in airflow_local_settings.py is deprecated. " "Please rename your `policy` to `task_policy`.",