-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2948 from stveit/add-jwt-token-creation
Add module for generating JWTs
- Loading branch information
Showing
5 changed files
with
164 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add module for generating JWTs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from datetime import datetime, timedelta, timezone | ||
from typing import Any, Optional | ||
|
||
import jwt | ||
|
||
from nav.jwtconf import JWTConf, ACCESS_TOKEN_EXPIRE_DELTA, REFRESH_TOKEN_EXPIRE_DELTA | ||
|
||
|
||
def generate_access_token(token_data: Optional[dict[str, Any]] = None) -> str: | ||
"""Generates and returns an access token in JWT format. | ||
Will use `token_data` as a basis for claims in the the new token, | ||
but the following claims will be overridden: `exp`, `nbf`, `iat`, `aud`, `iss`, `token_type` | ||
""" | ||
return _generate_token(token_data, ACCESS_TOKEN_EXPIRE_DELTA, "access_token") | ||
|
||
|
||
def generate_refresh_token(token_data: Optional[dict[str, Any]] = None) -> str: | ||
"""Generates and returns a refresh token in JWT format. | ||
Will use `token_data` as a basis for claims in the the new token, | ||
but the following claims will be overridden: `exp`, `nbf`, `iat`, `aud`, `iss`, `token_type` | ||
""" | ||
return _generate_token(token_data, REFRESH_TOKEN_EXPIRE_DELTA, "refresh_token") | ||
|
||
|
||
def _generate_token( | ||
token_data: Optional[dict[str, Any]], expiry_delta: timedelta, token_type: str | ||
) -> str: | ||
"""Generates and returns a token in JWT format. | ||
Will use `token_data` as a basis for claims in the the new token, | ||
but the following claims will be overridden: `exp`, `nbf`, `iat`, `aud`, `iss`, `token_type` | ||
""" | ||
if token_data is None: | ||
new_token = dict() | ||
else: | ||
new_token = dict(token_data) | ||
|
||
now = datetime.now(timezone.utc) | ||
name = JWTConf().get_nav_name() | ||
updated_claims = { | ||
'exp': (now + expiry_delta).timestamp(), | ||
'nbf': now.timestamp(), | ||
'iat': now.timestamp(), | ||
'aud': name, | ||
'iss': name, | ||
'token_type': token_type, | ||
} | ||
new_token.update(updated_claims) | ||
encoded_token = jwt.encode( | ||
new_token, JWTConf().get_nav_private_key(), algorithm="RS256" | ||
) | ||
return encoded_token |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,3 +49,5 @@ PyOpenSSL==23.3.0 | |
service-identity==21.1.0 | ||
|
||
requests | ||
|
||
pyjwt>=2.6.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import pytest | ||
from unittest.mock import Mock, patch | ||
from datetime import datetime | ||
|
||
import jwt | ||
|
||
from nav.web.jwtgen import generate_access_token, generate_refresh_token | ||
|
||
|
||
class TestTokenGeneration: | ||
"""Tests behaviour that should be identical for both access and refresh token generation""" | ||
|
||
@pytest.mark.parametrize("func", [generate_access_token, generate_refresh_token]) | ||
def test_nbf_should_be_in_the_past(self, func): | ||
encoded_token = func() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['nbf'] < datetime.now().timestamp() | ||
|
||
@pytest.mark.parametrize("func", [generate_access_token, generate_refresh_token]) | ||
def test_exp_should_be_in_the_future(self, func): | ||
encoded_token = func() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['exp'] > datetime.now().timestamp() | ||
|
||
@pytest.mark.parametrize("func", [generate_access_token, generate_refresh_token]) | ||
def test_iat_should_be_in_the_past(self, func): | ||
encoded_token = func() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['iat'] < datetime.now().timestamp() | ||
|
||
@pytest.mark.parametrize("func", [generate_access_token, generate_refresh_token]) | ||
def test_aud_should_match_name_from_jwt_conf(self, func, nav_name): | ||
encoded_token = func() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['aud'] == nav_name | ||
|
||
@pytest.mark.parametrize("func", [generate_access_token, generate_refresh_token]) | ||
def test_iss_should_match_name_from_jwt_conf(self, func, nav_name): | ||
encoded_token = func() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['iss'] == nav_name | ||
|
||
|
||
class TestGenerateAccessToken: | ||
def test_token_type_should_be_access_token(self): | ||
encoded_token = generate_access_token() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['token_type'] == "access_token" | ||
|
||
|
||
class TestGenerateRefreshToken: | ||
def test_token_type_should_be_refresh_token(self): | ||
encoded_token = generate_refresh_token() | ||
data = jwt.decode(encoded_token, options={'verify_signature': False}) | ||
assert data['token_type'] == "refresh_token" | ||
|
||
|
||
@pytest.fixture(scope="module", autouse=True) | ||
def jwtconf_mock(private_key, nav_name) -> str: | ||
"""Mocks the get_nav_name and get_nav_private_key functions for | ||
the JWTConf class | ||
""" | ||
with patch("nav.web.jwtgen.JWTConf") as _jwtconf_mock: | ||
instance = _jwtconf_mock.return_value | ||
instance.get_nav_name = Mock(return_value=nav_name) | ||
instance.get_nav_private_key = Mock(return_value=private_key) | ||
yield _jwtconf_mock | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def nav_name() -> str: | ||
yield "nav" | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def private_key() -> str: | ||
"""Yields a private key in PEM format""" | ||
key = """-----BEGIN PRIVATE KEY----- | ||
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQCp+4AEZM4uYZKu | ||
/hrKzySMTFFx3/ncWo6XAFpADQHXLOwRB9Xh1/OwigHiqs/wHRAAmnrlkwCCQA8r | ||
xiHBAMjp5ApbkyggQz/DVijrpSba6Tiy1cyBTZC3cvOK2FpJzsakJLhIXD1HaULO | ||
ClyIJB/YrmHmQc8SL3Uzou5mMpdcBC2pzwmEW1cvQURpnvgrDF8V86GrQkjK6nIP | ||
IEeuW6kbD5lWFAPfLf1ohDWex3yxeSFyXNRApJhbF4HrKFemPkOi7acsky38UomQ | ||
jZgAMHPotJNkQvAHcnXHhg0FcWGdohv5bc/Ctt9GwZOzJxwyJLBBsSewbE310TZi | ||
3oLU1TmvAgMBAAECgf8zrhi95+gdMeKRpwV+TnxOK5CXjqvo0vTcnr7Runf/c9On | ||
WeUtRPr83E4LxuMcSGRqdTfoP0loUGb3EsYwZ+IDOnyWWvytfRoQdExSA2RM1PDo | ||
GRiUN4Dy8CrGNqvnb3agG99Ay3Ura6q5T20n9ykM4qKL3yDrO9fmWyMgRJbAOAYm | ||
xzf7H910mDZghXPpq8nzDky0JLNZcaqbxuPQ3+EI4p2dLNXbNqMPs8Y20JKLeOPs | ||
HikRM0zfhHEJSt5IPFQ54/CzscGHGeCleQINWTgvDLMcE5fJMvbLLZixV+YsBfAq | ||
e2JsSubS+9RI2ktMlSKaemr8yeoIpsXfAiJSHkECgYEA0NKU18xK+9w5IXfgNwI4 | ||
peu2tWgwyZSp5R2pdLT7O1dJoLYRoAmcXNePB0VXNARqGxTNypJ9zmMawNmf3YRS | ||
BqG8aKz7qpATlx9OwYlk09fsS6MeVmaur8bHGHP6O+gt7Xg+zhiFPvU9P5LB+C0Z | ||
0d4grEmIxNhJCtJRQOThD8ECgYEA0GKRO9SJdnhw1b6LPLd+o/AX7IEzQDHwdtfi | ||
0h7hKHHGBlUMbIBwwjKmyKm6cSe0PYe96LqrVg+cVf84wbLZPAixhOjyplLznBzF | ||
LqOrfFPfI5lQVhslE1H1CdLlk9eyT96jDgmLAg8EGSMV8aLGj++Gi2l/isujHlWF | ||
BI4YpW8CgYEAsyKyhJzABmbYq5lGQmopZkxapCwJDiP1ypIzd+Z5TmKGytLlM8CK | ||
3iocjEQzlm/jBfBGyWv5eD8UCDOoLEMCiqXcFn+uNJb79zvoN6ZBVGl6TzhTIhNb | ||
73Y5/QQguZtnKrtoRSxLwcJnFE41D0zBRYOjy6gZJ6PSpPHeuiid2QECgYACuZc+ | ||
mgvmIbMQCHrXo2qjiCs364SZDU4gr7gGmWLGXZ6CTLBp5tASqgjmTNnkSumfeFvy | ||
ZCaDbJbVxQ2f8s/GajKwEz/BDwqievnVH0zJxmr/kyyqw5Ybh5HVvA1GfqaVRssJ | ||
DvTjZQDft0a9Lyy7ix1OS2XgkcMjTWj840LNPwKBgDPXMBgL5h41jd7jCsXzPhyr | ||
V96RzQkPcKsoVvrCoNi8eoEYgRd9jwfiU12rlXv+fgVXrrfMoJBoYT6YtrxEJVdM | ||
RAjRpnE8PMqCUA8Rd7RFK9Vp5Uo8RxTNvk9yPvDv1+lHHV7lEltIk5PXuKPHIrc1 | ||
nNUyhzvJs2Qba2L/huNC | ||
-----END PRIVATE KEY-----""" | ||
yield key |