Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instrument failed login attempts for valid users #10119

Merged
merged 3 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 83 additions & 16 deletions tests/unit/accounts/test_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,14 @@ def test_validate_password_ok(self):
]

def test_validate_password_notok(self, db_session):
request = pretend.stub()
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda userid: 1),
check_password=pretend.call_recorder(
lambda userid, password, tags=None: False
),
is_disabled=pretend.call_recorder(lambda userid: (False, None)),
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
breach_service = pretend.stub()
form = forms.LoginForm(
Expand All @@ -170,6 +171,14 @@ def test_validate_password_notok(self, db_session):
]
assert user_service.is_disabled.calls == [pretend.call(1)]
assert user_service.check_password.calls == [pretend.call(1, "pw", tags=None)]
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_password"},
)
]

def test_validate_password_too_many_failed(self):
@pretend.call_recorder
Expand Down Expand Up @@ -577,18 +586,24 @@ class TestTOTPAuthenticationForm:
def test_creation(self):
user_id = pretend.stub()
user_service = pretend.stub()
form = forms.TOTPAuthenticationForm(user_id=user_id, user_service=user_service)
form = forms.TOTPAuthenticationForm(
request=pretend.stub(), user_id=user_id, user_service=user_service
)

assert form.user_service is user_service

def test_totp_secret_exists(self, pyramid_config):
form = forms.TOTPAuthenticationForm(
data={"totp_value": ""}, user_id=pretend.stub(), user_service=pretend.stub()
request=pretend.stub(),
data={"totp_value": ""},
user_id=pretend.stub(),
user_service=pretend.stub(),
)
assert not form.validate()
assert form.totp_value.errors.pop() == "This field is required."

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "not_a_real_value"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
Expand All @@ -597,36 +612,53 @@ def test_totp_secret_exists(self, pyramid_config):
assert str(form.totp_value.errors.pop()) == "TOTP code must be 6 digits."

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "1 2 3 4 5 6 7"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert not form.validate()
assert str(form.totp_value.errors.pop()) == "TOTP code must be 6 digits."

user_service = pretend.stub(
record_event=pretend.call_recorder(lambda *a, **kw: None),
check_totp_value=lambda *a: False,
)
form = forms.TOTPAuthenticationForm(
request=pretend.stub(remote_addr="127.0.0.1"),
data={"totp_value": "123456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: False),
user_id=1,
user_service=user_service,
)
assert not form.validate()
assert str(form.totp_value.errors.pop()) == "Invalid TOTP code."
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_totp"},
)
]

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "123456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert form.validate()

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": " 1 2 3 4 5 6 "},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
)
assert form.validate()

form = forms.TOTPAuthenticationForm(
request=pretend.stub(),
data={"totp_value": "123 456"},
user_id=pretend.stub(),
user_service=pretend.stub(check_totp_value=lambda *a: True),
Expand All @@ -636,13 +668,15 @@ def test_totp_secret_exists(self, pyramid_config):

class TestWebAuthnAuthenticationForm:
def test_creation(self):
request = pretend.stub()
user_id = pretend.stub()
user_service = pretend.stub()
challenge = pretend.stub()
origin = pretend.stub()
rp_id = pretend.stub()

form = forms.WebAuthnAuthenticationForm(
request=request,
user_id=user_id,
user_service=user_service,
challenge=challenge,
Expand All @@ -653,7 +687,9 @@ def test_creation(self):
assert form.challenge is challenge

def test_credential_bad_payload(self, pyramid_config):
request = pretend.stub()
form = forms.WebAuthnAuthenticationForm(
request=request,
credential="not valid json",
user_id=pretend.stub(),
user_service=pretend.stub(),
Expand All @@ -668,23 +704,37 @@ def test_credential_bad_payload(self, pyramid_config):
)

def test_credential_invalid(self):
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
record_event=pretend.call_recorder(lambda *a, **kw: None),
verify_webauthn_assertion=pretend.raiser(
AuthenticationRejectedException("foo")
),
)
form = forms.WebAuthnAuthenticationForm(
request=request,
credential=json.dumps({}),
user_id=pretend.stub(),
user_service=pretend.stub(
verify_webauthn_assertion=pretend.raiser(
AuthenticationRejectedException("foo")
)
),
user_id=1,
user_service=user_service,
challenge=pretend.stub(),
origin=pretend.stub(),
rp_id=pretend.stub(),
)
assert not form.validate()
assert form.credential.errors.pop() == "foo"
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_webauthn"},
)
]

def test_credential_valid(self):
request = pretend.stub()
form = forms.WebAuthnAuthenticationForm(
request=request,
credential=json.dumps({}),
user_id=pretend.stub(),
user_service=pretend.stub(
Expand Down Expand Up @@ -720,17 +770,20 @@ def test_creation(self):

class TestRecoveryCodeForm:
def test_creation(self):
request = pretend.stub()
user_id = pretend.stub()
user_service = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
user_id=user_id, user_service=user_service
request=request, user_id=user_id, user_service=user_service
)

assert form.user_id is user_id
assert form.user_service is user_service

def test_missing_value(self):
request = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": ""},
user_id=pretend.stub(),
user_service=pretend.stub(),
Expand All @@ -739,19 +792,33 @@ def test_missing_value(self):
assert form.recovery_code_value.errors.pop() == "This field is required."

def test_invalid_recovery_code(self, pyramid_config):
request = pretend.stub(remote_addr="127.0.0.1")
user_service = pretend.stub(
check_recovery_code=pretend.call_recorder(lambda *a, **kw: False),
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": "invalid"},
user_id=pretend.stub(),
user_service=pretend.stub(
check_recovery_code=pretend.call_recorder(lambda *a, **kw: False)
),
user_id=1,
user_service=user_service,
)

assert not form.validate()
assert str(form.recovery_code_value.errors.pop()) == "Invalid recovery code."
assert user_service.record_event.calls == [
pretend.call(
1,
tag="account:login:failure",
ip_address="127.0.0.1",
additional={"reason": "invalid_recovery_code"},
)
]

def test_valid_recovery_code(self):
request = pretend.stub()
form = forms.RecoveryCodeAuthenticationForm(
request=request,
data={"recovery_code_value": "valid"},
user_id=pretend.stub(),
user_service=pretend.stub(
Expand Down
8 changes: 7 additions & 1 deletion tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def test_get_returns_totp_form(self, pyramid_request, redirect_url):
assert form_class.calls == [
pretend.call(
pyramid_request.POST,
request=pyramid_request,
user_id=1,
user_service=user_service,
check_password_metrics_tags=["method:auth", "auth_method:login_form"],
Expand Down Expand Up @@ -945,7 +946,12 @@ def test_get_returns_form(self, pyramid_request):
]
assert result == {"form": form_obj}
assert form_class.calls == [
pretend.call(pyramid_request.POST, user_id=1, user_service=user_service)
pretend.call(
pyramid_request.POST,
request=pyramid_request,
user_id=1,
user_service=user_service,
)
]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
Expand Down
27 changes: 26 additions & 1 deletion warehouse/accounts/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def validate_password(self, field):
if not self.user_service.check_password(
userid, field.data, tags=self._check_password_metrics_tags
):
self.user_service.record_event(
userid,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_password"},
)
raise wtforms.validators.ValidationError(
_("The password is invalid. Try again.")
)
Expand Down Expand Up @@ -290,8 +296,9 @@ def validate_password(self, field):


class _TwoFactorAuthenticationForm(forms.Form):
def __init__(self, *args, user_id, user_service, **kwargs):
def __init__(self, *args, request, user_id, user_service, **kwargs):
super().__init__(*args, **kwargs)
self.request = request
self.user_id = user_id
self.user_service = user_service

Expand All @@ -301,6 +308,12 @@ def validate_totp_value(self, field):
totp_value = field.data.replace(" ", "").encode("utf8")

if not self.user_service.check_totp_value(self.user_id, totp_value):
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_totp"},
)
raise wtforms.validators.ValidationError(_("Invalid TOTP code."))


Expand Down Expand Up @@ -331,6 +344,12 @@ def validate_credential(self, field):
)

except webauthn.AuthenticationRejectedException as e:
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_webauthn"},
)
raise wtforms.validators.ValidationError(str(e))

self.validated_credential = validated_credential
Expand Down Expand Up @@ -361,6 +380,12 @@ def validate_recovery_code_value(self, field):
recovery_code_value = field.data.encode("utf-8")

if not self.user_service.check_recovery_code(self.user_id, recovery_code_value):
self.user_service.record_event(
self.user_id,
tag="account:login:failure",
ip_address=self.request.remote_addr,
additional={"reason": "invalid_recovery_code"},
)
raise wtforms.validators.ValidationError(_("Invalid recovery code."))


Expand Down
6 changes: 5 additions & 1 deletion warehouse/accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def two_factor_and_totp_validate(request, _form_class=TOTPAuthenticationForm):
if user_service.has_totp(userid):
two_factor_state["totp_form"] = _form_class(
request.POST,
request=request,
user_id=userid,
user_service=user_service,
check_password_metrics_tags=["method:auth", "auth_method:login_form"],
Expand Down Expand Up @@ -327,6 +328,7 @@ def webauthn_authentication_validate(request):
user_service = request.find_service(IUserService, context=None)
form = WebAuthnAuthenticationForm(
**request.POST,
request=request,
user_id=userid,
user_service=user_service,
challenge=request.session.get_webauthn_challenge(),
Expand Down Expand Up @@ -387,7 +389,9 @@ def recovery_code(request, _form_class=RecoveryCodeAuthenticationForm):

user_service = request.find_service(IUserService, context=None)

form = _form_class(request.POST, user_id=userid, user_service=user_service)
form = _form_class(
request.POST, request=request, user_id=userid, user_service=user_service
)

if request.method == "POST":
if form.validate():
Expand Down
Loading