Skip to content

Commit

Permalink
instrument failed login attempts for valid users (#10119)
Browse files Browse the repository at this point in the history
* instrument failed login attempts for valid users

* Complete testing of instrumented login failures

* display failed logins in account view
  • Loading branch information
ewdurbin authored Oct 4, 2021
1 parent 1a25e37 commit 50c30e4
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 121 deletions.
99 changes: 83 additions & 16 deletions tests/unit/accounts/test_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,14 @@ def test_validate_password_ok(self):
]

def test_validate_password_notok(self, db_session):
request = pretend.stub()
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda userid: 1),
check_password=pretend.call_recorder(
lambda userid, password, tags=None: False
),
is_disabled=pretend.call_recorder(lambda userid: (False, None)),
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
breach_service = pretend.stub()
form = forms.LoginForm(
Expand All @@ -170,6 +171,14 @@ def test_validate_password_notok(self, db_session):
]
assert user_service.is_disabled.calls == [pretend.call(1)]
assert user_service.check_password.calls == [pretend.call(1, "pw", tags=None)]
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_password"},
)
]

def test_validate_password_too_many_failed(self):
@pretend.call_recorder
Expand Down Expand Up @@ -577,18 +586,24 @@ class TestTOTPAuthenticationForm:
def test_creation(self):
user_id = pretend.stub()
user_service = pretend.stub()
form = forms.TOTPAuthenticationForm(user_id=user_id, user_service=user_service)
form = forms.TOTPAuthenticationForm(
request=pretend.stub(), user_id=user_id, user_service=user_service
)

assert form.user_service is user_service

def test_totp_secret_exists(self, pyramid_config):
form = forms.TOTPAuthenticationForm(
data={"totp_value": ""}, user_id=pretend.stub(), user_service=pretend.stub()
request=pretend.stub(),
data={"totp_value": ""},
user_id=pretend.stub(),
user_service=pretend.stub(),
)
assert not form.validate()
assert form.totp_value.errors.pop() == "This field is required."

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "not_a_real_value"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
Expand All @@ -597,36 +612,53 @@ def test_totp_secret_exists(self, pyramid_config):
assert str(form.totp_value.errors.pop()) == "TOTP code must be 6 digits."

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "1 2 3 4 5 6 7"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert not form.validate()
assert str(form.totp_value.errors.pop()) == "TOTP code must be 6 digits."

user_service = pretend.stub(
record_event=pretend.call_recorder(lambda *a, **kw: None),
check_totp_value=lambda *a: False,
)
form = forms.TOTPAuthenticationForm(
request=pretend.stub(remote_addr="127.0.0.1"),
data={"totp_value": "123456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: False),
user_id=1,
user_service=user_service,
)
assert not form.validate()
assert str(form.totp_value.errors.pop()) == "Invalid TOTP code."
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_totp"},
)
]

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "123456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert form.validate()

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": " 1 2 3 4 5 6 "},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert form.validate()

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "123 456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
Expand All @@ -636,13 +668,15 @@ def test_totp_secret_exists(self, pyramid_config):

class TestWebAuthnAuthenticationForm:
def test_creation(self):
request = pretend.stub()
user_id = pretend.stub()
user_service = pretend.stub()
challenge = pretend.stub()
origin = pretend.stub()
rp_id = pretend.stub()

form = forms.WebAuthnAuthenticationForm(
request=request,
user_id=user_id,
user_service=user_service,
challenge=challenge,
Expand All @@ -653,7 +687,9 @@ def test_creation(self):
assert form.challenge is challenge

def test_credential_bad_payload(self, pyramid_config):
request = pretend.stub()
form = forms.WebAuthnAuthenticationForm(
request=request,
credential="not valid json",
user_id=pretend.stub(),
user_service=pretend.stub(),
Expand All @@ -668,23 +704,37 @@ def test_credential_bad_payload(self, pyramid_config):
)

def test_credential_invalid(self):
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
record_event=pretend.call_recorder(lambda *a, **kw: None),
verify_webauthn_assertion=pretend.raiser(
AuthenticationRejectedException("foo")
),
)
form = forms.WebAuthnAuthenticationForm(
request=request,
credential=json.dumps({}),
user_id=pretend.stub(),
user_service=pretend.stub(
verify_webauthn_assertion=pretend.raiser(
AuthenticationRejectedException("foo")
)
),
user_id=1,
user_service=user_service,
challenge=pretend.stub(),
origin=pretend.stub(),
rp_id=pretend.stub(),
)
assert not form.validate()
assert form.credential.errors.pop() == "foo"
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_webauthn"},
)
]

def test_credential_valid(self):
request = pretend.stub()
form = forms.WebAuthnAuthenticationForm(
request=request,
credential=json.dumps({}),
user_id=pretend.stub(),
user_service=pretend.stub(
Expand Down Expand Up @@ -720,17 +770,20 @@ def test_creation(self):

class TestRecoveryCodeForm:
def test_creation(self):
request = pretend.stub()
user_id = pretend.stub()
user_service = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
user_id=user_id, user_service=user_service
request=request, user_id=user_id, user_service=user_service
)

assert form.user_id is user_id
assert form.user_service is user_service

def test_missing_value(self):
request = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": ""},
user_id=pretend.stub(),
user_service=pretend.stub(),
Expand All @@ -739,19 +792,33 @@ def test_missing_value(self):
assert form.recovery_code_value.errors.pop() == "This field is required."

def test_invalid_recovery_code(self, pyramid_config):
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
check_recovery_code=pretend.call_recorder(lambda *a, **kw: False),
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": "invalid"},
user_id=pretend.stub(),
user_service=pretend.stub(
check_recovery_code=pretend.call_recorder(lambda *a, **kw: False)
),
user_id=1,
user_service=user_service,
)

assert not form.validate()
assert str(form.recovery_code_value.errors.pop()) == "Invalid recovery code."
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_recovery_code"},
)
]

def test_valid_recovery_code(self):
request = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": "valid"},
user_id=pretend.stub(),
user_service=pretend.stub(
Expand Down
8 changes: 7 additions & 1 deletion tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def test_get_returns_totp_form(self, pyramid_request, redirect_url):
assert form_class.calls == [
pretend.call(
pyramid_request.POST,
request=pyramid_request,
user_id=1,
user_service=user_service,
check_password_metrics_tags=["method:auth", "auth_method:login_form"],
Expand Down Expand Up @@ -945,7 +946,12 @@ def test_get_returns_form(self, pyramid_request):
]
assert result == {"form": form_obj}
assert form_class.calls == [
pretend.call(pyramid_request.POST, user_id=1, user_service=user_service)
pretend.call(
pyramid_request.POST,
request=pyramid_request,
user_id=1,
user_service=user_service,
)
]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
Expand Down
27 changes: 26 additions & 1 deletion warehouse/accounts/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def validate_password(self, field):
if not self.user_service.check_password(
userid, field.data, tags=self._check_password_metrics_tags
):
self.user_service.record_event(
userid,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_password"},
)
raise wtforms.validators.ValidationError(
_("The password is invalid. Try again.")
)
Expand Down Expand Up @@ -290,8 +296,9 @@ def validate_password(self, field):


class _TwoFactorAuthenticationForm(forms.Form):
def __init__(self, *args, user_id, user_service, **kwargs):
def __init__(self, *args, request, user_id, user_service, **kwargs):
super().__init__(*args, **kwargs)
self.request = request
self.user_id = user_id
self.user_service = user_service

Expand All @@ -301,6 +308,12 @@ def validate_totp_value(self, field):
totp_value = field.data.replace(" ", "").encode("utf8")

if not self.user_service.check_totp_value(self.user_id, totp_value):
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_totp"},
)
raise wtforms.validators.ValidationError(_("Invalid TOTP code."))


Expand Down Expand Up @@ -331,6 +344,12 @@ def validate_credential(self, field):
)

except webauthn.AuthenticationRejectedException as e:
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_webauthn"},
)
raise wtforms.validators.ValidationError(str(e))

self.validated_credential = validated_credential
Expand Down Expand Up @@ -361,6 +380,12 @@ def validate_recovery_code_value(self, field):
recovery_code_value = field.data.encode("utf-8")

if not self.user_service.check_recovery_code(self.user_id, recovery_code_value):
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_recovery_code"},
)
raise wtforms.validators.ValidationError(_("Invalid recovery code."))


Expand Down
6 changes: 5 additions & 1 deletion warehouse/accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def two_factor_and_totp_validate(request, _form_class=TOTPAuthenticationForm):
if user_service.has_totp(userid):
two_factor_state["totp_form"] = _form_class(
request.POST,
request=request,
user_id=userid,
user_service=user_service,
check_password_metrics_tags=["method:auth", "auth_method:login_form"],
Expand Down Expand Up @@ -327,6 +328,7 @@ def webauthn_authentication_validate(request):
user_service = request.find_service(IUserService, context=None)
form = WebAuthnAuthenticationForm(
**request.POST,
request=request,
user_id=userid,
user_service=user_service,
challenge=request.session.get_webauthn_challenge(),
Expand Down Expand Up @@ -387,7 +389,9 @@ def recovery_code(request, _form_class=RecoveryCodeAuthenticationForm):

user_service = request.find_service(IUserService, context=None)

form = _form_class(request.POST, user_id=userid, user_service=user_service)
form = _form_class(
request.POST, request=request, user_id=userid, user_service=user_service
)

if request.method == "POST":
if form.validate():
Expand Down
Loading

0 comments on commit 50c30e4

Please sign in to comment.