diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5011b3ebb2b35d..ace9650f50f28c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -57,6 +57,7 @@ import re2 import sqlalchemy_jsonfield from dateutil.relativedelta import relativedelta +from packaging import version as packaging_version from sqlalchemy import ( Boolean, Column, @@ -116,6 +117,7 @@ clear_task_instances, ) from airflow.models.tasklog import LogTemplate +from airflow.providers.fab import __version__ as FAB_VERSION from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.security import permissions from airflow.settings import json @@ -940,16 +942,26 @@ def update_old_perm(permission: str): updated_access_control = {} for role, perms in access_control.items(): - updated_access_control[role] = updated_access_control.get(role, {}) - if isinstance(perms, (set, list)): - # Support for old-style access_control where only the actions are specified - updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) + if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"): + updated_access_control[role] = updated_access_control.get(role, {}) + if isinstance(perms, (set, list)): + # Support for old-style access_control where only the actions are specified + updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) + else: + updated_access_control[role] = perms + if permissions.RESOURCE_DAG in updated_access_control[role]: + updated_access_control[role][permissions.RESOURCE_DAG] = { + update_old_perm(perm) + for perm in updated_access_control[role][permissions.RESOURCE_DAG] + } + elif isinstance(perms, dict): + # Not allow new access control format with old FAB versions + raise AirflowException( + "Please upgrade the FAB provider to a version >= 1.3.0 to allow " + "use the Dag Level Access Control new format." + ) else: - updated_access_control[role] = perms - if permissions.RESOURCE_DAG in updated_access_control[role]: - updated_access_control[role][permissions.RESOURCE_DAG] = { - update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG] - } + updated_access_control[role] = {update_old_perm(perm) for perm in perms} return updated_access_control diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index d2b23c02654f31..831990fd68b1ec 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2789,6 +2789,63 @@ def test_replace_outdated_access_control_actions(self): assert "permission is deprecated" in str(deprecation_warnings[0].message) assert "permission is deprecated" in str(deprecation_warnings[1].message) + @pytest.mark.parametrize( + "fab_version, perms, expected_exception, expected_perms", + [ + pytest.param( + "1.2.0", + { + "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, + # will raise error in old FAB with new access control format + }, + AirflowException, + None, + id="old_fab_new_access_control_format", + ), + pytest.param( + "1.2.0", + { + "role1": [ + permissions.ACTION_CAN_READ, + permissions.ACTION_CAN_EDIT, + permissions.ACTION_CAN_READ, + ], + }, + None, + {"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}, + id="old_fab_old_access_control_format", + ), + pytest.param( + "1.3.0", + { + "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, # old format + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, # new format + }, + None, + { + "role1": { + permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT} + }, + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, + }, + id="new_fab_mixed_access_control_format", + ), + ], + ) + def test_access_control_format(self, fab_version, perms, expected_exception, expected_perms): + if expected_exception: + with patch("airflow.models.dag.FAB_VERSION", fab_version): + with pytest.raises( + expected_exception, + match="Please upgrade the FAB provider to a version >= 1.3.0 to allow use the Dag Level Access Control new format.", + ): + DAG(dag_id="dag_test", schedule=None, access_control=perms) + else: + with patch("airflow.models.dag.FAB_VERSION", fab_version): + dag = DAG(dag_id="dag_test", schedule=None, access_control=perms) + assert dag.access_control == expected_perms + def test_validate_executor_field_executor_not_configured(self): dag = DAG("test-dag", schedule=None) EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")