Skip to content

Commit

Permalink
Fix: Keep compatibility with old FAB versions (apache#41549)
Browse files Browse the repository at this point in the history
(cherry picked from commit d7d944e)
  • Loading branch information
joaopamaral authored and potiuk committed Aug 27, 2024
1 parent 6fe8e8f commit 8c37105
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
30 changes: 21 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
from packaging import version as packaging_version
from sqlalchemy import (
Boolean,
Column,
Expand Down Expand Up @@ -116,6 +117,7 @@
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
Expand Down Expand Up @@ -940,16 +942,26 @@ def update_old_perm(permission: str):

updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"):
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm)
for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
elif isinstance(perms, dict):
# Not allow new access control format with old FAB versions
raise AirflowException(
"Please upgrade the FAB provider to a version >= 1.3.0 to allow "
"use the Dag Level Access Control new format."
)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
updated_access_control[role] = {update_old_perm(perm) for perm in perms}

return updated_access_control

Expand Down
57 changes: 57 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2789,6 +2789,63 @@ def test_replace_outdated_access_control_actions(self):
assert "permission is deprecated" in str(deprecation_warnings[0].message)
assert "permission is deprecated" in str(deprecation_warnings[1].message)

@pytest.mark.parametrize(
"fab_version, perms, expected_exception, expected_perms",
[
pytest.param(
"1.2.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
# will raise error in old FAB with new access control format
},
AirflowException,
None,
id="old_fab_new_access_control_format",
),
pytest.param(
"1.2.0",
{
"role1": [
permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT,
permissions.ACTION_CAN_READ,
],
},
None,
{"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}},
id="old_fab_old_access_control_format",
),
pytest.param(
"1.3.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, # old format
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, # new format
},
None,
{
"role1": {
permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
},
id="new_fab_mixed_access_control_format",
),
],
)
def test_access_control_format(self, fab_version, perms, expected_exception, expected_perms):
if expected_exception:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
with pytest.raises(
expected_exception,
match="Please upgrade the FAB provider to a version >= 1.3.0 to allow use the Dag Level Access Control new format.",
):
DAG(dag_id="dag_test", schedule=None, access_control=perms)
else:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
dag = DAG(dag_id="dag_test", schedule=None, access_control=perms)
assert dag.access_control == expected_perms

def test_validate_executor_field_executor_not_configured(self):
dag = DAG("test-dag", schedule=None)
EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
Expand Down

0 comments on commit 8c37105

Please sign in to comment.