diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py index 9f32e5e6c56..bc9dc434770 100644 --- a/geonode/base/api/tests.py +++ b/geonode/base/api/tests.py @@ -464,15 +464,15 @@ def test_delete_user_profile(self): # Bob can't delete user self.assertTrue(self.client.login(username="bobby", password="bob")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) + self.assertEqual(response.status_code, 403) # User can not delete self profile self.assertTrue(self.client.login(username="user_test_delete", password="user")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) + self.assertEqual(response.status_code, 403) # Admin can delete user self.assertTrue(self.client.login(username="admin", password="admin")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) + self.assertEqual(response.status_code, 204) finally: user.delete() diff --git a/geonode/people/tests.py b/geonode/people/tests.py index b306bbea220..122df063135 100644 --- a/geonode/people/tests.py +++ b/geonode/people/tests.py @@ -19,6 +19,9 @@ import django from django.test.utils import override_settings from mock import MagicMock, PropertyMock, patch +from geonode.base.models import ResourceBase +from geonode.groups.models import GroupMember, GroupProfile +from geonode.people.utils import call_validators from geonode.tests.base import GeoNodeBaseTestSupport from django.core import mail @@ -58,6 +61,7 @@ def setUp(self): self.permission_type = ("view", "download", "edit") self.groups = Group.objects.all()[:3] self.group_ids = ",".join(str(element.pk) for element in self.groups) + self.bar = GroupProfile.objects.get(slug="bar") def test_redirect_on_get_request(self): """ @@ -786,3 +790,121 @@ def test_users_api_patch_username(self): # username cannot be updated self.assertEqual(response.status_code, 400) self.assertTrue("username cannot be updated" in response.json()["errors"]) + + @override_settings(USER_DELETION_RULES=["geonode.people.utils.has_resources", "geonode.people.utils.is_manager"]) + def test_valid_delete(self): + # create a new user + tim = get_user_model().objects.create(username="tim") + + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete tim + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + # check that tim is not manager + # nor has any resources + self.assertFalse(ResourceBase.objects.filter(owner_id=tim.pk).exists()) + self.assertFalse(GroupMember.objects.filter(user_id=tim.pk, role="manager").exists()) + + url = f"{reverse('users-list')}/{tim.pk}" + response = self.client.delete(url, content_type="application/json") + + # admin is permitted to delete + self.assertEqual(response.status_code, 204) + # tim has been deleted + self.assertEqual(get_user_model().objects.filter(username="tim").first(), None) + + @override_settings(USER_DELETION_RULES=[]) + def test_delete_without_validators(self): + # reset global + call_validators(None, reset=True) + + norman = get_user_model().objects.get(username="norman") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete norman but norman is already promoted + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + + # Make sure norman is not a member + self.assertFalse(self.bar.user_is_member(norman)) + + # Add norman to the self.bar group + self.bar.join(norman) + + # Ensure norman is now a member + self.assertTrue(self.bar.user_is_member(norman)) + + # promotion + self.bar.promote(norman) + # Ensure norman is in the managers queryset + # self.bar.join(norman, role=GroupMember.MANAGER) + self.assertTrue(norman in self.bar.get_managers()) + + url = f"{reverse('users-list')}/{norman.pk}" + response = self.client.delete(url, content_type="application/json") + + # norman can be deleted because validator rules are not applied + self.assertEqual(response.status_code, 204) + self.assertEqual(get_user_model().objects.filter(username="norman").first(), None) + + @override_settings(USER_DELETION_RULES=["geonode.people.utils.has_resources", "geonode.people.utils.is_manager"]) + def test_delete_a_manger(self): + norman = get_user_model().objects.get(username="norman") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete norman but norman is already promoted + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + + # Make sure norman is not a member + self.assertFalse(self.bar.user_is_member(norman)) + + # Add norman to the self.bar group + self.bar.join(norman) + + # Ensure norman is now a member + self.assertTrue(self.bar.user_is_member(norman)) + + # promotion + self.bar.promote(norman) + # Ensure norman is in the managers queryset + # self.bar.join(norman, role=GroupMember.MANAGER) + self.assertTrue(norman in self.bar.get_managers()) + + url = f"{reverse('users-list')}/{norman.pk}" + response = self.client.delete(url, content_type="application/json") + + # norman cant be deleted + self.assertEqual(response.status_code, 403) + self.assertNotEqual(get_user_model().objects.filter(username="norman").first(), None) + + @override_settings(USER_DELETION_RULES=["geonode.people.utils.has_resources", "geonode.people.utils.is_manager"]) + def test_delete_a_user_with_resource(self): + # create a new user + bobby = get_user_model().objects.get(username="bobby") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete bobby + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + # check that bobby is not manager + # but he has resources already assigned + self.assertTrue(ResourceBase.objects.filter(owner_id=bobby.pk).exists()) + self.assertFalse(GroupMember.objects.filter(user_id=bobby.pk, role="manager").exists()) + + url = f"{reverse('users-list')}/{bobby.pk}" + response = self.client.delete(url, content_type="application/json") + + # admin is permitted to delete + self.assertEqual(response.status_code, 403) + # bobby cant be deleted + self.assertNotEqual(get_user_model().objects.filter(username="bobby").first(), None) diff --git a/geonode/people/utils.py b/geonode/people/utils.py index 99ec757f220..063e0e1591e 100644 --- a/geonode/people/utils.py +++ b/geonode/people/utils.py @@ -22,8 +22,12 @@ from django.contrib.auth.models import Group from geonode import GeoNodeException +from geonode.base.models import ResourceBase from geonode.groups.models import GroupProfile, GroupMember from geonode.groups.conf import settings as groups_settings +from rest_framework.exceptions import PermissionDenied +from django.conf import settings +from django.utils.module_loading import import_string def get_default_user(): @@ -140,3 +144,22 @@ def get_available_users(user): member_ids.extend(users_ids) return get_user_model().objects.filter(id__in=member_ids) + + +def has_resources(profile) -> bool: + return ResourceBase.objects.filter(owner_id=profile.pk).exists() + + +def is_manager(profile) -> bool: + return GroupMember.objects.filter(user_id=profile.pk, role=GroupMember.MANAGER).exists() + + +def call_validators(profile, reset=False): + if reset: + globals()["user_deletion_modules"] = [] + if not globals().get("user_deletion_modules"): + storer_module_path = settings.USER_DELETION_RULES if hasattr(settings, "USER_DELETION_RULES") else [] + globals()["user_deletion_modules"] = [import_string(storer_path) for storer_path in storer_module_path] + for not_valid in globals().get("user_deletion_modules", []): + if not_valid(profile): + raise PermissionDenied() diff --git a/geonode/people/views.py b/geonode/people/views.py index e62dbdd1327..e9b80c713c7 100644 --- a/geonode/people/views.py +++ b/geonode/people/views.py @@ -54,6 +54,7 @@ from geonode.security.utils import get_visible_resources from guardian.shortcuts import get_objects_for_user from rest_framework.exceptions import PermissionDenied +from geonode.people.utils import call_validators class SetUserLayerPermission(View): @@ -166,7 +167,7 @@ class UserViewSet(DynamicModelViewSet): API endpoint that allows users to be viewed or edited. """ - http_method_names = ["get", "post", "patch"] + http_method_names = ["get", "post", "patch", "delete"] authentication_classes = [SessionAuthentication, BasicAuthentication, OAuth2Authentication] permission_classes = [ IsAuthenticated, @@ -204,6 +205,13 @@ def update(self, request, *args, **kwargs): serializer.save() return Response(serializer.data) + def perform_destroy(self, instance): + # self delete check + if self.request.user.pk == int(self.kwargs["pk"]): + raise PermissionDenied() + call_validators(instance) + instance.delete() + @extend_schema( methods=["get"], responses={200: ResourceBaseSerializer(many=True)},