Skip to content

Commit

Permalink
[Fixes #11995] Implement POST and PATCH methods for the User API
Browse files Browse the repository at this point in the history
  • Loading branch information
RegisSinjari committed Feb 29, 2024
1 parent 023761d commit 4bad53a
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 6 deletions.
4 changes: 2 additions & 2 deletions geonode/base/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class IsSelf(permissions.BasePermission):
"""

def has_permission(self, request, view):
"""Always return False here.
"""Always return True here.
The fine-grained permissions are handled in has_object_permission().
"""
return False
return True

def has_object_permission(self, request, view, obj):
user = request.user
Expand Down
247 changes: 247 additions & 0 deletions geonode/people/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import django
from django.test.utils import override_settings
from mock import MagicMock, PropertyMock, patch
from geonode.tests.base import GeoNodeBaseTestSupport
Expand Down Expand Up @@ -471,3 +472,249 @@ def test_new_user_is_no_assigned_automatically_to_contributors_if_disabled(self)
"""
new_user = get_user_model().objects.create(username="random_username")
self.assertFalse("contributors" in [x.name for x in new_user.groups.iterator()])

def test_users_api_valid_post(self):
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
"email": "[email protected]",
}

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertTrue(response.status_code, 201)

def test_users_api_post_not_admin(self):
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": True,
"is_staff": True,
"email": "[email protected]",
}
bobby = get_user_model().objects.get(username="bobby")
self.client.login(username="bobby", password="bob")
# assert that bobby is not a super user or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertEqual(response.status_code, 403)

def test_users_api_patch_self(self):

bobby = get_user_model().objects.get(username="bobby")
self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his own data
data = {"first_name": "Robert"}
# before change
self.assertNotEqual(bobby.first_name, "Robert")

# and can acess even if he's not admin or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")
response_json = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response_json["user"]["first_name"], "Robert")

def test_users_api_patch_self_as_superuser(self):

bobby = get_user_model().objects.get(username="bobby")
self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his own data
data = {
"first_name": "Robert",
"is_superuser": True,
"is_staff": True,
}
# before change
self.assertNotEqual(bobby.first_name, "Robert")

# and can acess even if he's not admin or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")
response_json = response.json()

self.assertEqual(response.status_code, 200)
self.assertEqual(response_json["user"]["first_name"], "Robert")

self.assertFalse(response_json["user"]["is_superuser"])
self.assertFalse(response_json["user"]["is_staff"])
# check db side too
bobby = get_user_model().objects.get(username="bobby")
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

def test_users_api_patch_others_from_non_admin(self):

bobby = get_user_model().objects.get(username="bobby")
profile = get_user_model().objects.get(username="user1")

self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his user's data
data = {"first_name": "Norman Sky", "password": "@!2XJSL_S&V^0nt", "email": "[email protected]"}

# Bobby is not superuser or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{profile.pk}"
response = self.client.patch(url, data=data, content_type="application/json")

# bobby is not permitted to update user data
self.assertEqual(response.status_code, 403)

def test_users_api_patch_others_from_admin(self):

bobby = get_user_model().objects.get(username="bobby")
admin = get_user_model().objects.get(username="admin")

self.assertTrue(self.client.login(username="admin", password="admin"))
self.assertTrue(admin.is_authenticated)
# admin wants to edit his bobby's data
data = {"first_name": "Robert Baratheon", "password": "@!2XJSL_S&V^0nt000", "email": "[email protected]"}

# Admin is superuser or staff
self.assertTrue(admin.is_superuser or admin.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")

# admin is permitted to update bobby's data
self.assertEqual(response.status_code, 200)

self.assertEqual(response.json()["user"]["first_name"], "Robert Baratheon")

@override_settings(ACCOUNT_EMAIL_REQUIRED=True)
def test_users_api_empty_email(self):
"""
If the environment variable ACCOUNT_EMAIL_REQUIRED is set to True,
the email will be mandatory in the payload.
"""
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
}
# ensure there is no email in payload
data.pop("email", None)

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")

# endpoint throws Exception on missing email
self.assertTrue(response.status_code, 400)
self.assertTrue("email missing from payload" in response.json()["errors"])

@override_settings(
AUTH_PASSWORD_VALIDATORS=[
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
"OPTIONS": {
"min_length": 14,
},
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "geonode.people.password_validators.UppercaseValidator",
},
{
"NAME": "geonode.people.password_validators.NumberValidator",
"OPTIONS": {
"min_digits": 1,
},
},
{
"NAME": "geonode.people.password_validators.LowercaseValidator",
},
{
"NAME": "geonode.people.password_validators.SpecialCharsValidator",
},
]
)
def test_users_api_invalid_password(self):
"""
If a password validator is set via AUTH_PASSWORD_VALIDATORS,
the API will return an error if the validation fails
"""
error_codes = [
"This password is too short. It must contain at least 14 characters.",
"The password must contain at least1 digit(s), 0-9.",
]
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "whitetext",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
"email": "[email protected]",
}

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertTrue(response.status_code, 400)

for error in error_codes:
self.assertTrue(error in response.json()["errors"][0])

@override_settings(
ACCOUNT_EMAIL_VERIFICATION="mandatory",
EMAIL_HOST="localhost",
EMAIL_HOST_USER="",
EMAIL_HOST_PASSWORD="",
EMAIL_PORT="25",
)
def test_users_register_email_verification(self):
"""
If the email confirmation requirement is configured,
a verification email will be sent to the user before allowing them to log in.
"""
data = {
"username": "usernam3e",
"email": "[email protected]",
"password1": "@!2XJSL_S&V^0nt",
"password2": "@!2XJSL_S&V^0nt",
}

response = self.client.post(reverse("account_signup"), data=data, format="json")
# response should be a redirect to the confirmation email
self.assertEqual(response.status_code, 302)

# check that user was created
get_user_model().objects.get(email=data["email"])

email_box = django.core.mail.outbox
# assert that an email was sent to the email provided in the payload
self.assertEqual(len(email_box), 1)
self.assertTrue(data["email"] in email_box[0].to)
43 changes: 39 additions & 4 deletions geonode/people/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.forms import ValidationError as ValidationErrorForm
from django.shortcuts import render, redirect, get_object_or_404
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
Expand All @@ -28,14 +29,14 @@
from django.http import HttpResponseForbidden
from django.db.models import Q
from django.views import View

from geonode.tasks.tasks import send_email
from geonode.people.forms import ProfileForm
from geonode.people.utils import get_available_users
from geonode.base.auth import get_or_create_token
from geonode.people.forms import ForgotUsernameForm
from geonode.base.views import user_and_group_permission

from django.contrib.auth.hashers import make_password
from django.contrib.auth.password_validation import validate_password
from dal import autocomplete

from dynamic_rest.filters import DynamicFilterBackend, DynamicSortingFilter
Expand All @@ -48,13 +49,15 @@
from geonode.base.models import ResourceBase
from geonode.base.api.filters import DynamicSearchFilter
from geonode.groups.models import GroupProfile, GroupMember
from geonode.base.api.permissions import IsSelfOrAdminOrReadOnly
from geonode.base.api.permissions import IsSelfOrAdmin
from geonode.base.api.serializers import UserSerializer, GroupProfileSerializer, ResourceBaseSerializer
from geonode.base.api.pagination import GeoNodeApiPagination

from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from geonode.security.utils import get_visible_resources
from guardian.shortcuts import get_objects_for_user
from geonode.settings import ACCOUNT_EMAIL_REQUIRED
from rest_framework.exceptions import PermissionDenied, ValidationError


class SetUserLayerPermission(View):
Expand Down Expand Up @@ -170,7 +173,7 @@ class UserViewSet(DynamicModelViewSet):
authentication_classes = [SessionAuthentication, BasicAuthentication, OAuth2Authentication]
permission_classes = [
IsAuthenticated,
IsSelfOrAdminOrReadOnly,
IsSelfOrAdmin,
]
filter_backends = [DynamicFilterBackend, DynamicSortingFilter, DynamicSearchFilter]
serializer_class = UserSerializer
Expand All @@ -189,6 +192,38 @@ def get_queryset(self):
queryset = self.get_serializer_class().setup_eager_loading(queryset)
return queryset.order_by("username")

def perform_create(self, serializer):
user = self.request.user
if not (user.is_superuser or user.is_staff):
raise PermissionDenied()

email_payload = self.request.data.get("email", "")
password_payload = self.request.data.get("password", "")

if ACCOUNT_EMAIL_REQUIRED and email_payload == "":
raise ValidationError(detail="email missing from payload")
try:
validate_password(password_payload, user=None, password_validators=None)
self.request.data["password"] = make_password(password_payload)
except ValidationErrorForm as err:
raise ValidationError(detail=",".join(err.messages))
instance = serializer.save()
return instance

def update(self, request, *args, **kwargs):
kwargs["partial"] = True
if not self.request.user.is_superuser:
request.data.pop("is_superuser", None)
request.data.pop("is_staff", None)
password_payload = self.request.data.get("password", "")
if password_payload:
try:
validate_password(password_payload, user=None, password_validators=None)
request.data["password"] = make_password(password_payload)
except ValidationErrorForm as err:
raise ValidationError(detail=",".join(err.messages))
return super().update(request, *args, **kwargs)

@extend_schema(
methods=["get"],
responses={200: ResourceBaseSerializer(many=True)},
Expand Down

0 comments on commit 4bad53a

Please sign in to comment.