Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic and test cases for generating and clearing tokens programmatically #181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 66 additions & 55 deletions django_rest_passwordreset/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,60 @@ def _unicode_ci_compare(s1, s2):
return normalized1.casefold() == normalized2.casefold()


def clear_expired_tokens():
"""
Delete all existing expired tokens
"""
password_reset_token_validation_time = get_password_reset_token_expiry_time()

# datetime.now minus expiry hours
now_minus_expiry_time = timezone.now() - timedelta(hours=password_reset_token_validation_time)

# delete all tokens where created_at < now - 24 hours
clear_expired(now_minus_expiry_time)


def generate_token_for_email(email, user_agent='', ip_address=''):
# find a user by email address (case-insensitive search)
users = User.objects.filter(**{'{}__iexact'.format(get_password_reset_lookup_field()): email})

active_user_found = False

# iterate over all users and check if there is any user that is active
# also check whether the password can be changed (is usable), as there could be users that are not allowed
# to change their password (e.g., LDAP user)
for user in users:
if user.eligible_for_reset():
active_user_found = True
break

# No active user found, raise a ValidationError
# but not if DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE == True
if not active_user_found and not getattr(settings, 'DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE', False):
raise exceptions.ValidationError({
'email': [_(
"We couldn't find an account associated with that email. Please try a different e-mail address.")],
})

# last but not least: iterate over all users that are active and can change their password
# and create a Reset Password Token and send a signal with the created token
for user in users:
if user.eligible_for_reset() and _unicode_ci_compare(email, getattr(user, get_password_reset_lookup_field())):
password_reset_tokens = user.password_reset_tokens.all()

# check if the user already has a token
if password_reset_tokens.count():
# yes, already has a token, re-use this token
return password_reset_tokens.first()

# no token exists, generate a new token
return ResetPasswordToken.objects.create(
user=user,
user_agent=user_agent,
ip_address=ip_address,
)


class ResetPasswordValidateToken(GenericAPIView):
"""
An Api View which provides a method to verify that a token is valid
Expand Down Expand Up @@ -128,61 +182,18 @@ class ResetPasswordRequestToken(GenericAPIView):
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
email = serializer.validated_data['email']

# before we continue, delete all existing expired tokens
password_reset_token_validation_time = get_password_reset_token_expiry_time()

# datetime.now minus expiry hours
now_minus_expiry_time = timezone.now() - timedelta(hours=password_reset_token_validation_time)

# delete all tokens where created_at < now - 24 hours
clear_expired(now_minus_expiry_time)

# find a user by email address (case insensitive search)
users = User.objects.filter(**{'{}__iexact'.format(get_password_reset_lookup_field()): email})

active_user_found = False

# iterate over all users and check if there is any user that is active
# also check whether the password can be changed (is useable), as there could be users that are not allowed
# to change their password (e.g., LDAP user)
for user in users:
if user.eligible_for_reset():
active_user_found = True
break

# No active user found, raise a validation error
# but not if DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE == True
if not active_user_found and not getattr(settings, 'DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE', False):
raise exceptions.ValidationError({
'email': [_(
"We couldn't find an account associated with that email. Please try a different e-mail address.")],
})

# last but not least: iterate over all users that are active and can change their password
# and create a Reset Password Token and send a signal with the created token
for user in users:
if user.eligible_for_reset() and \
_unicode_ci_compare(email, getattr(user, get_password_reset_lookup_field())):
# define the token as none for now
token = None

# check if the user already has a token
if user.password_reset_tokens.all().count() > 0:
# yes, already has a token, re-use this token
token = user.password_reset_tokens.all()[0]
else:
# no token exists, generate a new token
token = ResetPasswordToken.objects.create(
user=user,
user_agent=request.META.get(HTTP_USER_AGENT_HEADER, ''),
ip_address=request.META.get(HTTP_IP_ADDRESS_HEADER, ''),
)
# send a signal that the password token was created
# let whoever receives this signal handle sending the email for the password reset
reset_password_token_created.send(sender=self.__class__, instance=self, reset_password_token=token)
# done

clear_expired_tokens()
token = generate_token_for_email(
email=serializer.validated_data['email'],
user_agent=request.META.get(HTTP_USER_AGENT_HEADER, ''),
ip_address=request.META.get(HTTP_IP_ADDRESS_HEADER, ''),
)

# send a signal that the password token was created
# let whoever receives this signal handle sending the email for the password reset
reset_password_token_created.send(sender=self.__class__, instance=self, reset_password_token=token)

return Response({'status': 'OK'})


Expand Down
45 changes: 43 additions & 2 deletions tests/test/test_auth_test_case.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
from datetime import timedelta

from django.contrib.auth.models import User
from django.test import override_settings
from django.utils import timezone
from rest_framework import status
from rest_framework.test import APITestCase

from django_rest_passwordreset.models import ResetPasswordToken
from django_rest_passwordreset.models import ResetPasswordToken, get_password_reset_token_expiry_time
from django_rest_passwordreset.views import clear_expired_tokens, generate_token_for_email
from tests.test.helpers import HelperMixin, patch


Expand Down Expand Up @@ -67,7 +70,7 @@ def test_validate_token(self, mock_reset_password_token_created):
# there should be one token
self.assertEqual(ResetPasswordToken.objects.all().count(), 1)

# try to login with the old username/password (should work)
# try to log in with the old username/password (should work)
self.assertTrue(
self.django_check_login("user1", "secret1"),
msg="User 1 should still be able to login with the old credentials"
Expand Down Expand Up @@ -386,3 +389,41 @@ def test_user_without_password_where_not_required(self, mock_reset_password_toke
self.django_check_login("user4", "new_secret"),
msg="User 4 should be able to login with the modified credentials"
)

def test_clear_expired_tokens(self):
""" Tests clearance of expired tokens """

password_reset_token_validation_time = get_password_reset_token_expiry_time()

# there should be zero tokens
self.assertEqual(ResetPasswordToken.objects.all().count(), 0)

# request a new token
response = self.rest_do_request_reset_token(email="[email protected]")
self.assertEqual(response.status_code, status.HTTP_200_OK)

# there should be one token
self.assertEqual(ResetPasswordToken.objects.all().count(), 1)

# let the token expire
token = ResetPasswordToken.objects.all().first()
token.created_at = timezone.now() - timedelta(hours=password_reset_token_validation_time)
token.save()

# clear expired tokens
clear_expired_tokens()

# there should be zero tokens
self.assertEqual(ResetPasswordToken.objects.all().count(), 0)

def test_generate_token_for_email(self):
""" Tests generating tokens for a specific email address programmatically """

# there should be zero tokens
self.assertEqual(ResetPasswordToken.objects.all().count(), 0)

# request a new token
generate_token_for_email(email="[email protected]")

# there should be one token
self.assertEqual(ResetPasswordToken.objects.all().count(), 1)
Loading