Skip to content

Commit

Permalink
Finish refactor of DAG resource name helper (apache#15511)
Browse files Browse the repository at this point in the history
This finishes moving `prefixed_dag_id` into `airflow.security.permissions.resource_name_for_dag`.
  • Loading branch information
jedcunningham authored Apr 26, 2021
1 parent 53fc1a9 commit 0b4c67d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
6 changes: 3 additions & 3 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,17 @@ def _sync_perm_for_dag(self, dag, session: Optional[Session] = None):
"""Sync DAG specific permissions, if necessary"""
from flask_appbuilder.security.sqla import models as sqla_models

from airflow.security.permissions import DAG_PERMS, permission_name_for_dag
from airflow.security.permissions import DAG_PERMS, resource_name_for_dag

def needs_perm_views(dag_id: str) -> bool:
view_menu_name = permission_name_for_dag(dag_id)
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_PERMS:
if not (
session.query(sqla_models.PermissionView)
.join(sqla_models.Permission)
.join(sqla_models.ViewMenu)
.filter(sqla_models.Permission.name == permission_name)
.filter(sqla_models.ViewMenu.name == view_menu_name)
.filter(sqla_models.ViewMenu.name == dag_resource_name)
.one_or_none()
):
return True
Expand Down
4 changes: 2 additions & 2 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
DAG_PERMS = {ACTION_CAN_READ, ACTION_CAN_EDIT}


def permission_name_for_dag(dag_id):
"""Returns the permission name for a DAG id."""
def resource_name_for_dag(dag_id):
"""Returns the resource name for a DAG id."""
if dag_id == RESOURCE_DAG:
return dag_id

Expand Down
40 changes: 23 additions & 17 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def get_accessible_dags(self, user_actions, user, session=None):
def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
"""Checks if user has read or write access to some dags."""
if dag_id and dag_id != '~':
return self.has_access(action, self.prefixed_dag_id(dag_id))
return self.has_access(action, permissions.resource_name_for_dag(dag_id))

user = g.user
if action == permissions.ACTION_CAN_READ:
Expand All @@ -345,24 +345,30 @@ def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)
return self._has_view_access(
user, permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG
) or self._has_view_access(user, permissions.ACTION_CAN_READ, prefixed_dag_id)
) or self._has_view_access(user, permissions.ACTION_CAN_READ, dag_resource_name)

def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)

return self._has_view_access(
user, permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG
) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, prefixed_dag_id)
) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, dag_resource_name)

def prefixed_dag_id(self, dag_id):
"""Returns the permission name for a DAG id."""
return permissions.permission_name_for_dag(dag_id)
warnings.warn(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name_for_dag` instead.",
DeprecationWarning,
stacklevel=2,
)
return permissions.resource_name_for_dag(dag_id)

def is_dag_resource(self, resource_name):
"""Determines if a permission belongs to a DAG or all DAGs."""
Expand Down Expand Up @@ -548,7 +554,7 @@ def create_dag_specific_permissions(self) -> None:
dags = dagbag.dags.values()

for dag in dags:
dag_resource_name = self.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
for perm_name in self.DAG_PERMS:
if (perm_name, dag_resource_name) not in perms:
self._merge_perm(perm_name, dag_resource_name)
Expand Down Expand Up @@ -624,12 +630,12 @@ def sync_perm_for_dag(self, dag_id, access_control=None):
:type access_control: dict
:return:
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)
for dag_perm in self.DAG_PERMS:
self.add_permission_view_menu(dag_perm, prefixed_dag_id)
self.add_permission_view_menu(dag_perm, dag_resource_name)

if access_control:
self._sync_dag_view_permissions(prefixed_dag_id, access_control)
self._sync_dag_view_permissions(dag_resource_name, access_control)

def _sync_dag_view_permissions(self, dag_id, access_control):
"""Set the access policy on the given DAG's ViewModel.
Expand All @@ -641,13 +647,13 @@ def _sync_dag_view_permissions(self, dag_id, access_control):
{'can_read'}
:type access_control: dict
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)

def _get_or_create_dag_permission(perm_name):
dag_perm = self.find_permission_view_menu(perm_name, prefixed_dag_id)
dag_perm = self.find_permission_view_menu(perm_name, dag_resource_name)
if not dag_perm:
self.log.info("Creating new permission '%s' on view '%s'", perm_name, prefixed_dag_id)
dag_perm = self.add_permission_view_menu(perm_name, prefixed_dag_id)
self.log.info("Creating new permission '%s' on view '%s'", perm_name, dag_resource_name)
dag_perm = self.add_permission_view_menu(perm_name, dag_resource_name)

return dag_perm

Expand All @@ -661,12 +667,12 @@ def _revoke_stale_permissions(dag_view):
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.permission,
prefixed_dag_id,
dag_resource_name,
role.name,
)
self.del_permission_role(role, perm)

dag_view = self.find_view_menu(prefixed_dag_id)
dag_view = self.find_view_menu(dag_resource_name)
if dag_view:
_revoke_stale_permissions(dag_view)

Expand All @@ -684,7 +690,7 @@ def _revoke_stale_permissions(dag_view):
raise AirflowException(
"The access_control map for DAG '{}' includes the following "
"invalid permissions: {}; The set of valid permissions "
"is: {}".format(prefixed_dag_id, invalid_perms, self.DAG_PERMS)
"is: {}".format(dag_resource_name, invalid_perms, self.DAG_PERMS)
)

for perm_name in perms:
Expand Down
26 changes: 18 additions & 8 deletions tests/www/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
def _has_dag_perm(self, perm, dag_id, user):
# if not user:
# user = self.user
return self.security_manager.has_access(perm, self.security_manager.prefixed_dag_id(dag_id), user)
return self.security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)

def _create_dag(self, dag_id):
dag_model = DagModel(dag_id=dag_id)
Expand Down Expand Up @@ -581,24 +581,24 @@ def test_create_dag_specific_permissions(self, dagbag_mock):
self.security_manager._sync_dag_view_permissions = mock.Mock()

for dag in dags:
prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = self.security_manager.get_all_permissions()
assert ('can_read', prefixed_dag_id) not in all_perms
assert ('can_edit', prefixed_dag_id) not in all_perms
assert ('can_read', dag_resource_name) not in all_perms
assert ('can_edit', dag_resource_name) not in all_perms

self.security_manager.create_dag_specific_permissions()

dagbag_mock.assert_called_once_with(read_dags_from_db=True)
collect_dags_from_db_mock.assert_called_once_with()

for dag in dags:
prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = self.security_manager.get_all_permissions()
assert ('can_read', prefixed_dag_id) in all_perms
assert ('can_edit', prefixed_dag_id) in all_perms
assert ('can_read', dag_resource_name) in all_perms
assert ('can_edit', dag_resource_name) in all_perms

self.security_manager._sync_dag_view_permissions.assert_called_once_with(
self.security_manager.prefixed_dag_id('has_access_control'), access_control
permissions.resource_name_for_dag('has_access_control'), access_control
)

del dagbag.dags["has_access_control"]
Expand Down Expand Up @@ -638,3 +638,13 @@ def test_get_all_roles_with_permissions(self):
assert isinstance(role, self.security_manager.role_model)

assert 'Admin' in roles

def test_prefixed_dag_id_is_deprecated(self):
with pytest.warns(
DeprecationWarning,
match=(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name_for_dag` instead."
),
):
self.security_manager.prefixed_dag_id("hello")

0 comments on commit 0b4c67d

Please sign in to comment.