Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create DAG-level cluster policy #12184

Merged
merged 5 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ session_lifetime_minutes = 43200

## Airflow 2.0.0b1

### Rename policy to task_policy

Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
function to `task_policy` to make the distinction more profound and avoid any confusion.

Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
to `task_policy`.
kaxil marked this conversation as resolved.
Show resolved Hide resolved

turbaszek marked this conversation as resolved.
Show resolved Hide resolved
### Default value for `[celery] operation_timeout` has changed to `1.0`

From Airflow 2, by default Airflow will retry 3 times to publish task to Celery broker. This is controlled by
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,11 @@ def bag_dag(self, dag, root_dag):
dag.resolve_template_files()
dag.last_loaded = timezone.utcnow()

# Check policies
settings.dag_policy(dag)

for task in dag.tasks:
settings.policy(task)
settings.task_policy(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should check if 'policy' is defined and run it as well but with deprecation warning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, this is what users will see:

root@817b1471dabe:/opt/airflow# airflow scheduler
/opt/airflow/airflow/models/dag.py:61: DeprecationWarning: Using `policy` in airflow_local_settings.py is deprecated. Please rename your `policy` to `task_policy`.
  from airflow.models.dagbag import DagBag
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-11-08 19:09:09,005] {scheduler_job.py:1248} INFO - Starting the scheduler
[2020-11-08 19:09:09,005] {scheduler_job.py:1253} INFO - Processing each file at most -1 times
[2020-11-08 19:09:09,114] {scheduler_job.py:1275} INFO - Resetting orphaned tasks for active dag runs


subdags = dag.subdags

Expand Down
46 changes: 42 additions & 4 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ def custom_show_warning(message, category, filename, lineno, file=None, line=Non
warnings.showwarning = custom_show_warning


def policy(task): # pylint: disable=unused-argument
def task_policy(task) -> None: # pylint: disable=unused-argument
"""
This policy setting allows altering tasks after they are loaded in
the DagBag. It allows administrator to rewire some task parameters.
the DagBag. It allows administrator to rewire some task's parameters.
Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
to stop DAG from being executed.

To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``policy`` function.
to your PYTHONPATH that defines this ``task_policy`` function.

Here are a few examples of how this can be useful:

Expand All @@ -133,7 +135,29 @@ def policy(task): # pylint: disable=unused-argument
tasks get wired to the right workers
* You could enforce a task timeout policy, making sure that no tasks run
for more than 48 hours
* ...

:param task: task to be mutated
:type task: airflow.models.baseoperator.BaseOperator
"""


def dag_policy(dag) -> None: # pylint: disable=unused-argument
"""
This policy setting allows altering DAGs after they are loaded in
the DagBag. It allows administrator to rewire some DAG's parameters.
Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
to stop DAG from being executed.

To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``dag_policy`` function.

Here are a few examples of how this can be useful:

* You could enforce default user for DAGs
* Check if every DAG has configured tags

:param dag: dag to be mutated
:type dag: airflow.models.dag.DAG
"""


Expand All @@ -146,6 +170,9 @@ def task_instance_mutation_hook(task_instance): # pylint: disable=unused-argume
to your PYTHONPATH that defines this ``task_instance_mutation_hook`` function.

This could be used, for instance, to modify the task instance during retries.

:param task_instance: task instance to be mutated
:type task_instance: airflow.models.taskinstance.TaskInstance
"""


Expand Down Expand Up @@ -350,6 +377,17 @@ def import_local_settings():
if not k.startswith("__"):
globals()[k] = v

# TODO: Remove once deprecated
if "policy" in globals() and "task_policy" not in globals():
warnings.warn(
"Using `policy` in airflow_local_settings.py is deprecated. "
"Please rename your `policy` to `task_policy`.",
DeprecationWarning,
stacklevel=2,
)
globals()["task_policy"] = globals()["policy"]
del globals()["policy"]

log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
except ImportError:
log.debug("Failed to import airflow_local_settings.", exc_info=True)
Expand Down
108 changes: 52 additions & 56 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1194,72 +1194,67 @@ state.

Cluster Policy
==============
Cluster policies provide an interface for taking action on every Airflow task
either at DAG load time or just before task execution.

Cluster Policies for Task Mutation
-----------------------------------
In case you want to apply cluster-wide mutations to the Airflow tasks,
you can either mutate the task right after the DAG is loaded or
mutate the task instance before task execution.

Mutate tasks after DAG loaded
-----------------------------

To mutate the task right after the DAG is parsed, you can define
a ``policy`` function in ``airflow_local_settings.py`` that mutates the
task based on other task or DAG attributes (through ``task.dag``).
It receives a single argument as a reference to the task object and you can alter
its attributes.

For example, this function could apply a specific queue property when
using a specific operator, or enforce a task timeout policy, making sure
that no tasks run for more than 48 hours. Here's an example of what this
may look like inside your ``airflow_local_settings.py``:
Cluster policies provide an interface for taking action on every Airflow task
or DAG either at DAG load time or just before task execution. In this way users
are able to do the following:

- set default arguments on each DAG/task
- checks that DAG/task meets required standards
- perform custom logic of routing task to a queue

.. code-block:: python
And many other options. To use cluster-wide policies users can define in their
``airflow_local_settings`` the following functions

def policy(task):
if task.task_type == 'HivePartitionSensor':
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
- ``dag_policy`` - which as an input takes ``dag`` argument of :class:`~airflow.models.dag.DAG` type.
This function allows users to define dag-level policy which is executed for every DAG at loading time.
- ``task_policy`` - which as an input takes ``task`` argument of :class:`~airflow.models.baseoperator.BaseOperator`
type. This function allows users to define task-level policy which is executed for every task at DAG loading time.
- ``task_instance_mutation_hook`` - which as an input takes ``task_instance`` argument of
:class:`~airflow.models.taskinstance.TaskInstance` type. This function allows users to define task-level
policy that is executed right before the task execution.

In case of DAG and task policies users may raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
to prevent a DAG from being imported or prevent a task from being executed if the task is not compliant with
users' check.

Please note, cluster policy will have precedence over task
attributes defined in DAG meaning if ``task.sla`` is defined
in dag and also mutated via cluster policy then later will have precedence.
Please note, cluster policy will have precedence over task attributes defined in DAG meaning
if ``task.sla`` is defined in dag and also mutated via cluster policy then later will have precedence.

In next sections we show examples of each type of cluster policy.

Mutate task instances before task execution
Where to put ``airflow_local_settings.py``?
-------------------------------------------
Add a ``airflow_local_settings.py`` file to your ``$PYTHONPATH`` or to ``$AIRFLOW_HOME/config`` folder.

To mutate the task instance before the task execution, you can define a
``task_instance_mutation_hook`` function in ``airflow_local_settings.py``
that mutates the task instance.
See :doc:`modules_management` for details on how Python and Airflow manage modules.

For example, this function re-routes the task to execute in a different
queue during retries:

.. code-block:: python
DAG level cluster policy
-----------------------------------
In this example we check if each DAG has at least one tag defined.
Here is what it may look like:

def task_instance_mutation_hook(ti):
if ti.try_number >= 1:
ti.queue = 'retry_queue'
.. literalinclude:: /../tests/cluster_policies/__init__.py
:language: python
:start-after: [START example_dag_cluster_policy]
:end-before: [END example_dag_cluster_policy]

Task level cluster policy
-----------------------------
For example, this function could apply a specific queue property when
using a specific operator, or enforce a task timeout policy, making sure
that no tasks run for more than 48 hours. Here's an example of what this
may look like:

Cluster Policies for Custom Task Checks
-------------------------------------------
You may also use Cluster Policies to apply cluster-wide checks on Airflow
tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
in a policy or task mutation hook (described below) to prevent a DAG from being
imported or prevent a task from being executed if the task is not compliant with
your check.
.. literalinclude:: /../tests/cluster_policies/__init__.py
:language: python
:start-after: [START example_task_cluster_policy]
:end-before: [END example_task_cluster_policy]

These checks are intended to help teams using Airflow to protect against common
beginner errors that may get past a code reviewer, rather than as technical
security controls.
As a more advanced example we may consider implementing checks that are intended to help
teams using Airflow to protect against common beginner errors that may get past a code
reviewer, rather than as technical security controls.

For example, don't run tasks without airflow owners:

Expand All @@ -1281,14 +1276,15 @@ For Example in ``airflow_local_settings.py``:
:start-after: [START example_list_of_cluster_policy_rules]
:end-before: [END example_list_of_cluster_policy_rules]

Where to put ``airflow_local_settings.py``?
Task instance mutation hook
-------------------------------------------
Task instance mutation hook can be used for example to re-routes the task to
execute in a different queue during retries:

Add a ``airflow_local_settings.py`` file to your ``$PYTHONPATH``
or to ``$AIRFLOW_HOME/config`` folder.

See :doc:`modules_management` for details on how Python and Airflow manage modules.

.. literalinclude:: /../tests/cluster_policies/__init__.py
:language: python
:start-after: [START example_task_mutation_hook]
:end-before: [END example_task_mutation_hook]

Documentation & Notes
=====================
Expand Down
34 changes: 34 additions & 0 deletions tests/cluster_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from typing import Callable, List

from airflow import DAG
from airflow.configuration import conf
from airflow.exceptions import AirflowClusterPolicyViolation
from airflow.models import TaskInstance
from airflow.models.baseoperator import BaseOperator


Expand Down Expand Up @@ -62,3 +65,34 @@ def cluster_policy(task: BaseOperator):


# [END example_list_of_cluster_policy_rules]

# [START example_dag_cluster_policy]
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag"""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.filepath}"
)


# [END example_dag_cluster_policy]


# [START example_task_cluster_policy]
def task_policy(task: BaseOperator):
if task.task_type == 'HivePartitionSensor':
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)


# [END example_task_cluster_policy]


# [START example_task_mutation_hook]
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = 'retry_queue'


# [END example_task_mutation_hook]
31 changes: 31 additions & 0 deletions tests/dags/test_dag_with_no_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

DEFAULT_DATE = datetime(2016, 1, 1)

default_args = {
"owner": "airflow",
"start_date": DEFAULT_DATE,
}

with DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') as dag:
task_a = DummyOperator(task_id="test_task_a")
22 changes: 16 additions & 6 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,10 @@ def test_collect_dags_from_db(self):
self.assertEqual(serialized_dag.dag_id, dag.dag_id)
self.assertEqual(set(serialized_dag.task_dict), set(dag.task_dict))

@patch("airflow.settings.policy", cluster_policies.cluster_policy)
def test_cluster_policy_violation(self):
"""test that file processing results in import error when task does not
@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_violation(self):
"""
test that file processing results in import error when task does not
obey cluster policy.
"""
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
Expand All @@ -732,9 +733,10 @@ def test_cluster_policy_violation(self):
}
self.assertEqual(expected_import_errors, dagbag.import_errors)

@patch("airflow.settings.policy", cluster_policies.cluster_policy)
def test_cluster_policy_obeyed(self):
"""test that dag successfully imported without import errors when tasks
@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_obeyed(self):
"""
test that dag successfully imported without import errors when tasks
obey cluster policy.
"""
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py")
Expand All @@ -743,3 +745,11 @@ def test_cluster_policy_obeyed(self):
self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids))

self.assertEqual({}, dagbag.import_errors)

@patch("airflow.settings.dag_policy", cluster_policies.dag_policy)
def test_dag_cluster_policy_obeyed(self):
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_with_no_tags.py")

dagbag = DagBag(dag_folder=dag_file, include_examples=False, include_smart_sensor=False)
assert len(dagbag.dag_ids) == 0
assert "has no tags" in dagbag.import_errors[dag_file]