Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 11995 Implement POST and PATCH methods for the User API #12009

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions geonode/base/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def has_permission(self, request, view):
"""Always return False here.
The fine-grained permissions are handled in has_object_permission().
"""
if view.basename == "users": # CUTOM CASE FOR users
return True
return False

def has_object_permission(self, request, view, obj):
Expand Down
11 changes: 5 additions & 6 deletions geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,7 @@ def test_register_users(self):
Ensure users are created with default groups.
"""
url = reverse("users-list")
user_data = {
"username": "new_user",
}
user_data = {"username": "new_user", "password": "@!2XJSL_S&V^0nt", "email": "[email protected]"}
self.assertTrue(self.client.login(username="admin", password="admin"))
response = self.client.post(url, data=user_data, format="json")
self.assertEqual(response.status_code, 201)
Expand Down Expand Up @@ -419,7 +417,7 @@ def test_update_user_profile(self):
username="user_test_delete", email="[email protected]", password="user"
)
url = reverse("users-detail", kwargs={"pk": user.pk})
data = {"first_name": "user"}
data = {"first_name": "user", "password": "@!2XJSL_S&V^0nt", "email": "[email protected]"}
# Anonymous
response = self.client.patch(url, data=data, format="json")
self.assertEqual(response.status_code, 403)
Expand All @@ -430,14 +428,15 @@ def test_update_user_profile(self):
# User self profile
self.assertTrue(self.client.login(username="user_test_delete", password="user"))
response = self.client.patch(url, data=data, format="json")
self.assertEqual(response.status_code, 403)
self.assertEqual(response.status_code, 200)
# Group manager
group = GroupProfile.objects.create(slug="test_group_manager", title="test_group_manager")
group.join(user)
group.join(get_user_model().objects.get(username="norman"), role="manager")
self.assertTrue(self.client.login(username="norman", password="norman"))
response = self.client.post(url, data=data, format="json")
self.assertEqual(response.status_code, 403)
# malformed url on post
self.assertEqual(response.status_code, 405)
# Admin can edit user
self.assertTrue(self.client.login(username="admin", password="admin"))
response = self.client.patch(url, data={"first_name": "user_admin"}, format="json")
Expand Down
247 changes: 247 additions & 0 deletions geonode/people/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import django
from django.test.utils import override_settings
from mock import MagicMock, PropertyMock, patch
from geonode.tests.base import GeoNodeBaseTestSupport
Expand Down Expand Up @@ -471,3 +472,249 @@ def test_new_user_is_no_assigned_automatically_to_contributors_if_disabled(self)
"""
new_user = get_user_model().objects.create(username="random_username")
self.assertFalse("contributors" in [x.name for x in new_user.groups.iterator()])

def test_users_api_valid_post(self):
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
"email": "[email protected]",
}

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertTrue(response.status_code, 201)

def test_users_api_post_not_admin(self):
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": True,
"is_staff": True,
"email": "[email protected]",
}
bobby = get_user_model().objects.get(username="bobby")
self.client.login(username="bobby", password="bob")
# assert that bobby is not a super user or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertEqual(response.status_code, 403)

def test_users_api_patch_self(self):

bobby = get_user_model().objects.get(username="bobby")
self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his own data
data = {"first_name": "Robert"}
# before change
self.assertNotEqual(bobby.first_name, "Robert")

# and can acess even if he's not admin or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")
response_json = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response_json["user"]["first_name"], "Robert")

def test_users_api_patch_self_as_superuser(self):

bobby = get_user_model().objects.get(username="bobby")
self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his own data
data = {
"first_name": "Robert",
"is_superuser": True,
"is_staff": True,
}
# before change
self.assertNotEqual(bobby.first_name, "Robert")

# and can acess even if he's not admin or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")
response_json = response.json()

self.assertEqual(response.status_code, 200)
self.assertEqual(response_json["user"]["first_name"], "Robert")

self.assertFalse(response_json["user"]["is_superuser"])
self.assertFalse(response_json["user"]["is_staff"])
# check db side too
bobby = get_user_model().objects.get(username="bobby")
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

def test_users_api_patch_others_from_non_admin(self):

bobby = get_user_model().objects.get(username="bobby")
profile = get_user_model().objects.get(username="user1")

self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# bobby wants to edit his user's data
data = {"first_name": "Norman Sky", "password": "@!2XJSL_S&V^0nt", "email": "[email protected]"}

# Bobby is not superuser or staff
self.assertFalse(bobby.is_superuser)
self.assertFalse(bobby.is_staff)

url = f"{reverse('users-list')}/{profile.pk}"
response = self.client.patch(url, data=data, content_type="application/json")

# bobby is not permitted to update user data
self.assertEqual(response.status_code, 403)

def test_users_api_patch_others_from_admin(self):

bobby = get_user_model().objects.get(username="bobby")
admin = get_user_model().objects.get(username="admin")

self.assertTrue(self.client.login(username="admin", password="admin"))
self.assertTrue(admin.is_authenticated)
# admin wants to edit his bobby's data
data = {"first_name": "Robert Baratheon", "password": "@!2XJSL_S&V^0nt000", "email": "[email protected]"}

# Admin is superuser or staff
self.assertTrue(admin.is_superuser or admin.is_staff)

url = f"{reverse('users-list')}/{bobby.pk}"
response = self.client.patch(url, data=data, content_type="application/json")

# admin is permitted to update bobby's data
self.assertEqual(response.status_code, 200)

self.assertEqual(response.json()["user"]["first_name"], "Robert Baratheon")

@override_settings(ACCOUNT_EMAIL_REQUIRED=True)
def test_users_api_empty_email(self):
"""
If the environment variable ACCOUNT_EMAIL_REQUIRED is set to True,
the email will be mandatory in the payload.
"""
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "@!2XJSL_S&V^0nt",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
}
# ensure there is no email in payload
data.pop("email", None)

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")

# endpoint throws Exception on missing email
self.assertTrue(response.status_code, 400)
self.assertTrue("email missing from payload" in response.json()["errors"])

@override_settings(
AUTH_PASSWORD_VALIDATORS=[
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
"OPTIONS": {
"min_length": 14,
},
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "geonode.people.password_validators.UppercaseValidator",
},
{
"NAME": "geonode.people.password_validators.NumberValidator",
"OPTIONS": {
"min_digits": 1,
},
},
{
"NAME": "geonode.people.password_validators.LowercaseValidator",
},
{
"NAME": "geonode.people.password_validators.SpecialCharsValidator",
},
]
)
def test_users_api_invalid_password(self):
"""
If a password validator is set via AUTH_PASSWORD_VALIDATORS,
the API will return an error if the validation fails
"""
error_codes = [
"This password is too short. It must contain at least 14 characters.",
"The password must contain at least1 digit(s), 0-9.",
]
data = {
"username": "usernam3e",
"first_name": "Registered",
"password": "whitetext",
"last_name": "Test",
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"perms": ["add_resource"],
"is_superuser": False,
"is_staff": False,
"email": "[email protected]",
}

self.client.login(username="admin", password="admin")
response = self.client.post(reverse("users-list"), data=data, content_type="application/json")
self.assertTrue(response.status_code, 400)

for error in error_codes:
self.assertTrue(error in response.json()["errors"][0])

@override_settings(
ACCOUNT_EMAIL_VERIFICATION="mandatory",
EMAIL_HOST="localhost",
EMAIL_HOST_USER="",
EMAIL_HOST_PASSWORD="",
EMAIL_PORT="25",
)
def test_users_register_email_verification(self):
"""
If the email confirmation requirement is configured,
a verification email will be sent to the user before allowing them to log in.
"""
data = {
"username": "usernam3e",
"email": "[email protected]",
"password1": "@!2XJSL_S&V^0nt",
"password2": "@!2XJSL_S&V^0nt",
}

response = self.client.post(reverse("account_signup"), data=data, format="json")
# response should be a redirect to the confirmation email
self.assertEqual(response.status_code, 302)

# check that user was created
get_user_model().objects.get(email=data["email"])

email_box = django.core.mail.outbox
# assert that an email was sent to the email provided in the payload
self.assertEqual(len(email_box), 1)
self.assertTrue(data["email"] in email_box[0].to)
51 changes: 49 additions & 2 deletions geonode/people/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.forms import ValidationError as ValidationErrorForm
from django.shortcuts import render, redirect, get_object_or_404
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
Expand All @@ -28,14 +29,14 @@
from django.http import HttpResponseForbidden
from django.db.models import Q
from django.views import View

from geonode.tasks.tasks import send_email
from geonode.people.forms import ProfileForm
from geonode.people.utils import get_available_users
from geonode.base.auth import get_or_create_token
from geonode.people.forms import ForgotUsernameForm
from geonode.base.views import user_and_group_permission

from django.contrib.auth.hashers import make_password
from django.contrib.auth.password_validation import validate_password
from dal import autocomplete

from dynamic_rest.filters import DynamicFilterBackend, DynamicSortingFilter
Expand All @@ -55,6 +56,8 @@
from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from geonode.security.utils import get_visible_resources
from guardian.shortcuts import get_objects_for_user
from geonode.settings import ACCOUNT_EMAIL_REQUIRED
from rest_framework.exceptions import PermissionDenied, ValidationError


class SetUserLayerPermission(View):
Expand Down Expand Up @@ -189,6 +192,50 @@
queryset = self.get_serializer_class().setup_eager_loading(queryset)
return queryset.order_by("username")

def perform_destroy(self, instance):
# not implemented added to make tests pass
if any(
(
not self.request.user.is_superuser,
not self.request.user.is_staff,
self.request.user.pk == int(self.kwargs["pk"]),
)
):
raise PermissionDenied()
instance.delete()

def perform_create(self, serializer):
user = self.request.user
if not (user.is_superuser or user.is_staff):
raise PermissionDenied()

email_payload = self.request.data.get("email", "")
password_payload = self.request.data.get("password", "")

if ACCOUNT_EMAIL_REQUIRED and email_payload == "":
raise ValidationError(detail="email missing from payload")
try:
validate_password(password_payload, user=None, password_validators=None)
self.request.data["password"] = make_password(password_payload)
except ValidationErrorForm as err:
raise ValidationError(detail=",".join(err.messages))
instance = serializer.save()
return instance

def update(self, request, *args, **kwargs):
kwargs["partial"] = True
if not self.request.user.is_superuser:
request.data.pop("is_superuser", None)
request.data.pop("is_staff", None)
password_payload = self.request.data.get("password", "")
if password_payload:
try:
validate_password(password_payload, user=None, password_validators=None)
request.data["password"] = make_password(password_payload)
except ValidationErrorForm as err:
raise ValidationError(detail=",".join(err.messages))

Check warning on line 236 in geonode/people/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/people/views.py#L235-L236

Added lines #L235 - L236 were not covered by tests
return super().update(request, *args, **kwargs)

@extend_schema(
methods=["get"],
responses={200: ResourceBaseSerializer(many=True)},
Expand Down
Loading