diff --git a/setup.cfg b/setup.cfg index d915eb44..a3e04f90 100644 --- a/setup.cfg +++ b/setup.cfg @@ -14,7 +14,7 @@ install_requires = momento-wire-types==0.8.1 build setuptools - pyjwt + pyjwt[crypto] grpcio [options.packages.find] @@ -25,4 +25,4 @@ mypy = mypy==0.931 black = - black==22.3.0 \ No newline at end of file + black==22.3.0 diff --git a/src/momento/momento_signer.py b/src/momento/momento_signer.py new file mode 100644 index 00000000..84093755 --- /dev/null +++ b/src/momento/momento_signer.py @@ -0,0 +1,182 @@ +import json +import urllib +from enum import Enum +from typing import Optional, Dict +from jwt.api_jwk import PyJWK +from urllib.parse import quote +import jwt + +from .errors import InvalidArgumentError + + +class CacheOperation(Enum): + GET = 1 + SET = 2 + + +class SigningRequest: + def __init__( + self, + cache_name: str, + cache_key: str, + cache_operation: CacheOperation, + expiry_epoch_seconds: int, + ttl_seconds: Optional[int] = None, + ): + """Initializes a new SigningRequest. + + Args: + cache_name: The name of the cache. + cache_key: The key of the object. + cache_operation: The operation performed on the item in the cache. + expiry_epoch_seconds: The timestamp that the pre-signed URL is valid until. + ttl_seconds: Time to Live for the item in Cache. This is an optional property that will only be used for CacheOperation.SET. + """ + self._cache_name = cache_name + self._cache_key = cache_key + self._cache_operation = cache_operation + self._expiry_epoch_seconds = expiry_epoch_seconds + self._ttl_seconds = ttl_seconds + + def expiry_epoch_seconds(self) -> int: + return self._expiry_epoch_seconds + + def cache_name(self) -> str: + return self._cache_name + + def cache_key(self) -> str: + return self._cache_key + + def cache_operation(self) -> CacheOperation: + return self._cache_operation + + def ttl_seconds(self) -> Optional[int]: + return self._ttl_seconds + + +class MomentoSigner: + def __init__(self, jwk_json_string: str): + """Initializes MomentoSigner with the specified private key. + + Args: + jwk_json_string: the JSON string of the JWK key. This can be obtained from create_signing_key response. + + Raises: + InvalidArgumentError: If the supplied private key is not valid. + """ + try: + self._jwk_json: Dict[str, str] = json.loads(jwk_json_string) + except json.decoder.JSONDecodeError as e: + raise InvalidArgumentError( + f"Invalid JWK Json String: {jwk_json_string}" + ) from e + + try: + self._jwk: PyJWK = PyJWK.from_dict(self._jwk_json) # type: ignore[no-untyped-call, misc] + except jwt.exceptions.PyJWKError as e: + raise InvalidArgumentError(f"Invalid JWK: {jwk_json_string}") from e + except jwt.exceptions.InvalidKeyError as e: + raise InvalidArgumentError(f"Invalid JWK: {jwk_json_string}") from e + + if self._jwk.key_id is None: # type: ignore[misc] + raise InvalidArgumentError(f"JWK missing kid attribute: {jwk_json_string}") + + self._alg = self._jwk_json.get("alg", None) + if self._alg is None: + self._alg = self._alg_fallback_logic() + + def sign_access_token(self, signing_request: SigningRequest) -> str: + """Creates an access token to be used as a JWT token. + + Args: + signing_request: Contains parameters for the generated token. + + Returns: + str: the JWT token. + """ + claims: Dict[str, object] = { + "exp": signing_request.expiry_epoch_seconds(), + "cache": signing_request.cache_name(), + "key": signing_request.cache_key(), + } + + if signing_request.cache_operation() == CacheOperation.GET: + claims["method"] = ["get"] + elif signing_request.cache_operation() == CacheOperation.SET: + claims["method"] = ["set"] + if signing_request.ttl_seconds() is not None: + claims["ttl"] = signing_request.ttl_seconds() + else: + raise NotImplementedError( + f"Unrecognized Operation: {signing_request.cache_operation()}" + ) + + # jwt.encode will automatically insert "typ" and "alg" into the header for us. + # We still need to specify "kid" to be included in the header however. + return jwt.encode( + claims, self._jwk.key, algorithm=self._alg, headers={"kid": self._jwk.key_id} # type: ignore[misc] + ) + + def create_presigned_url( + self, hostname: str, signing_request: SigningRequest + ) -> str: + """Creates a pre-signed HTTPS URL. + + Args: + hostname: Hostname of SimpleCacheService. This value can be obtained from create_signing_key response. + signing_request: Contains parameters for the generated URL. + + Returns: + str: a pre-signed HTTPS URL with JWT token. + """ + token = self.sign_access_token(signing_request) + cache_name = quote(signing_request.cache_name(), safe="") + cache_key = quote(signing_request.cache_key(), safe="") + if signing_request.cache_operation() == CacheOperation.GET: + return ( + f"https://{hostname}/cache/get/{cache_name}/{cache_key}?token={token}" + ) + elif signing_request.cache_operation() == CacheOperation.SET: + url = f"https://{hostname}/cache/set/{cache_name}/{cache_key}?token={token}" + ttl_seconds = signing_request.ttl_seconds() + if ttl_seconds is not None: + url = url + f"&ttl_milliseconds={ttl_seconds * 1000}" + return url + else: + raise NotImplementedError( + f"Unrecognized Operation: {signing_request.cache_operation()}" + ) + + # Logic stolen from https://github.com/jpadilla/pyjwt/blob/master/jwt/api_jwk.py#L19 + # to handle the case when alg is missing from JWK. + def _alg_fallback_logic(self) -> str: + kty = self._jwk_json.get("kty", None) + if kty is None: + raise InvalidArgumentError("kty is not found: %s" % self._jwk_json) + crv = self._jwk_json.get("crv", None) + if kty == "EC": + if crv == "P-256" or not crv: + algorithm = "ES256" + elif crv == "P-384": + algorithm = "ES384" + elif crv == "P-521": + algorithm = "ES512" + elif crv == "secp256k1": + algorithm = "ES256K" + else: + raise InvalidArgumentError("Unsupported crv: %s" % crv) + elif kty == "RSA": + algorithm = "RS256" + elif kty == "oct": + algorithm = "HS256" + elif kty == "OKP": + if not crv: + raise InvalidArgumentError("crv is not found: %s" % self._jwk_json) + if crv == "Ed25519": + algorithm = "EdDSA" + else: + raise InvalidArgumentError("Unsupported crv: %s" % crv) + else: + raise InvalidArgumentError("Unsupported kty: %s" % kty) + + return algorithm diff --git a/tests/test_momento_signer.py b/tests/test_momento_signer.py new file mode 100644 index 00000000..c06ec0e9 --- /dev/null +++ b/tests/test_momento_signer.py @@ -0,0 +1,103 @@ +import unittest + +from momento.momento_signer import MomentoSigner, CacheOperation, SigningRequest +from momento.errors import InvalidArgumentError +import jwt +from jwt.api_jwk import PyJWK + +_RSA_NO_ALG_JWK = '{"p":"8qu5SjdYE-pnqp8KxgD5VRxXVhu5bqlufsQR4ki13-nOap8hXHP5-X6dtHDef77NbjeHlSsqdoiEQzVLkTNnY2S9owshBQ_E0nq1XSf_ma4IA9d50KCQF9jHkl2npziyanJgvu33RB7NVh5cuVWzFwZqVq08oMLHiyAs_TfIqu0","kty":"RSA","q":"uiDceVagmm9JMtr4lOLTVp4VnX0evpmqUpcq7a6fl696m8cvnHT9VXNURFPagp0QfWOYGB_j30Fw1dWrnLe5IXSuK_qNgSdEm64fspN9oCuRanK7a7WYw54msr7-gsowu5mIcGaM7H-Lu7wWsA78y6nkBEsuEGWE2y-a1TLJUms","d":"CeJjOaYzZADIoU0uqKYi0jmfwbmI-AOzKo1H_GPoEdo1NvlaHcqMWnXEie8LbN1IvyC6OvknJ3saJ5em0QzjNWyv_IethCMu_kBpZnKXqBuUZ5MtNlXuSsF4J37skMe_zWehpBXun4iWozrHtugdAiYKBLB-tazkiWZ0b03CIzSLYMz7gtj2OHMCiigLEl0dhX32rfRK3-c6ax_yK7XGnT46COQkqDuqSAJrVS7i-4P5MyLIGphoSC0guWS4Vqqo_2yxE4UFEnANtGB59fRylLFunDGo-9Jvdkatj3zpSTrySUhHI9JC2us_iB_LRp7tfyMUtnYeU4g-nJEixvoYZQ","e":"AQAB","kid":"testKeyId","qi":"RBkWNYGJZ0jcEMm7KdnEkc2KDe5LxH5SAaZPe3l_tOruu4RXbLxB_BtuZ2uQSIfsUmvK4zSjhp52BCMGn_cqvNA4ZrGJ_IGRHD6P5dt4HlCg3L0xeB6PZ5risJj45KTiUqqG24HfY56QN7SpRz4vVoTLJsmrlDJq0X9G-44D1Yo","dp":"O5yJiLytqz7CtnwZJmio1wp-Pc3TsGZ4mTVK-15HJzkFFtX-WPq4Zlx_Gws67QCO8Es9yBvxc2q3qtbVuFZ7SEQ__WRHeTnVbKruEHM566N_non5B5HZs7Hx3HebLo3T7igosd49BoPWhxgwSOrPcpGF38LwiMEwSXHe-1kPt0U","dq":"VtIBRbA81gzXDhvKHFj5z8uJtZ6peqrfIgtVgO0VkIHQJV3yPX7stLFJO14J7Scqi_Kq_YXSm09BPN2gYUfp2Us9-1GyM-6HOD8ulfPqg44PFKJT_lgE3CqnTnV87rE1rixd0mBjl-We3oFL6-_xx2aF7-LJp-hS4pMAHDbGZeU","n":"sG_rFa4BMxYupx7ru5WPjoRE-81Jomd8g4jx7WAmVSe2uvnEC57zfBYaZ8saULkGwd9A7dzvw1skRW8RTmL1qttjM2ZTlsMaa_bpS1a2PloNZfIKNzux0KOkKwbNdnO3bPpAMrsgAIC6BFom9ISdCrTP1cgbe-klAp04osEsK0jNNglJhZFIIBzwqbYvGGLaat3ribY-OB9KN3Vhh9Z7v7F-i9dobbZk68nVUd0sgGZ-ht3xF-mdnN-CtugZjO0_Ke7t0jIRu_qNvZsbi9MqhSB6FRhqkFcs4n5HNxu082OwraU7MMZWbDjRYeq01MlGKGFPd-8xJoZ93bDWFbRbDw"}' +_RSA_NO_ALG_JWK_PUB = '{"kty":"RSA","e":"AQAB","kid":"testKeyId","n":"sG_rFa4BMxYupx7ru5WPjoRE-81Jomd8g4jx7WAmVSe2uvnEC57zfBYaZ8saULkGwd9A7dzvw1skRW8RTmL1qttjM2ZTlsMaa_bpS1a2PloNZfIKNzux0KOkKwbNdnO3bPpAMrsgAIC6BFom9ISdCrTP1cgbe-klAp04osEsK0jNNglJhZFIIBzwqbYvGGLaat3ribY-OB9KN3Vhh9Z7v7F-i9dobbZk68nVUd0sgGZ-ht3xF-mdnN-CtugZjO0_Ke7t0jIRu_qNvZsbi9MqhSB6FRhqkFcs4n5HNxu082OwraU7MMZWbDjRYeq01MlGKGFPd-8xJoZ93bDWFbRbDw"}' +_RSA_256_JWK = '{"p":"4Xhu9y3DCDO6ttlDrb5JuSk9-F6oKIE7y3zDxObR1UbLBPua4X3qW82VGYjzn6yO7_qRECzN-K1LLd6yIwK5L7i6rIglP_z-Kyzp-UwacQrnvvf0ZM1xE2E6JUAkevXf1khCXAuKd04S9NxEKv0gDvwdspw-WwqiOMVU1gn1aF0","kty":"RSA","q":"0Pyj5MqoVGuUSGYL0-HLYBspD7vgi9G8HjNh8MQC1AF5w58k29dtAH2-dj3IO3b_CrNisKyMpYLT1OJBsYaJGi2TZRnBGbZb-LyZTz5vDBu8f3SgUxSHcqUsVOO7Pr555OJKdNRr3ag3m8RwIn3Agbvrs0HQPM-XkfUI7p7Bc28","d":"EqUgXXSG8gWx2wGJpjKZTRMd7lzuct96AyzWBsCTmYAVj6BsczKERFUFtoUs6kgF4aZiq8ym-mNQdVvane6aeK3p0F5RfvGwtGg7xkavNidtOf51fEPa8yCXLIwFclM_tav4Y_LlGdiXNsXVX6X8ZgTiS5vUCRzt_5e8Nks1T1kMPFdiW_9F-emXWrl5hmrIiG1XpBX4sG6YNGWty1u7Reu9a8ydzMmcFSuxE5bJodDPE1ZiZOQOtVxk3TVAKUEQ7SqOksN_w_FX_6Du5bC8cjz38dJhfL3R9B_FQA5oM_nO1XklgigF8lg8_b2sTb-OQc6dY6qE241ZknwxDbKIIQ","e":"AQAB","kid":"testKeyId","qi":"0NHMRRg_aQT8hKItg1HxnUqgWHHb4S92DqhBY5AERmOPUUQvK7qkpc0DN4OVQwZGpl0_lBqsadjoCROrJ6HrVMkx0fkoNPQIKRzT_ew6ORXPPrB5d2elS4c93mjYZjvQtxdpQKtyM4_9Va1v6PRa2IGUSPzJt6h8BQBM9NQhHuY","dp":"aWizgA2942TDwt46HM0cjFsypJ4kQaOBf_WZVMGQkgQhv_edBhSm7zpinWiAdULoJFthXE2GEd96iTxWzbVlPGFBrI2N1KeDcE30KN-icPznMUmc0U-WsLfAxk-BfpbaicSIeZ3Po0014ZHksLBcP4UwoSMYp9mF08K1kcdgGuU","alg":"RS256","dq":"DsvUTr6KbG-xb-7Jp5a073j8z0BeBYgz6W9537IBAUGZfWAnG-mEriQ49-Yn5w3lwLwyoI-W5aD9nnTmccs0qcXQSbgpE8j1egbgU9v3wMO19NAtCbTKYjOPj_MPrsGNn8blvp_Lg0YFqeGejtKYbpb_eRGPzL5l3M-cckiLKcE","n":"uBBdD0DnAeArN2cqaG9zUDmZU_hEen7glZN0ssVSf_ZML_HdEu7BR5G9gGToYQ7dkH010FzciZMbouXUDovd71snXqlvnwyvqpU3UUfVgudmN-CDa97170WnjpjIQLVErQCX-3_PIctJweDmJLzaQsmQCPoSzZkiQ17NsMwN-xw2L-eRfwmyOWKv0mRkDpZRYgn4rrm70hepOZp0-YEpEC_vTYykc2WVuvEVez_9nF-VwXGVX-a1_zChIAz5V_-VXIqtoL7ot945wAEyEhtV4Yqr5LoSL8mvSL3v7WlF56NqaltDWe2yrowlZiQ2EDwwUKxArcYd7BnNmdYqo2cHUw"}' +_RSA_256_JWK_PUB = '{"kty":"RSA","e":"AQAB","kid":"testKeyId","alg":"RS256","n":"uBBdD0DnAeArN2cqaG9zUDmZU_hEen7glZN0ssVSf_ZML_HdEu7BR5G9gGToYQ7dkH010FzciZMbouXUDovd71snXqlvnwyvqpU3UUfVgudmN-CDa97170WnjpjIQLVErQCX-3_PIctJweDmJLzaQsmQCPoSzZkiQ17NsMwN-xw2L-eRfwmyOWKv0mRkDpZRYgn4rrm70hepOZp0-YEpEC_vTYykc2WVuvEVez_9nF-VwXGVX-a1_zChIAz5V_-VXIqtoL7ot945wAEyEhtV4Yqr5LoSL8mvSL3v7WlF56NqaltDWe2yrowlZiQ2EDwwUKxArcYd7BnNmdYqo2cHUw"}' +_RSA_384_JWK = '{"p":"2UqxTsQeIH2wsAP7FsDltFNGkco526v1nYjTSguReJr11klvMjju5H2cm6vrJGURQ6-e_yvyAzM7avG-D2o7ZGrYQ-pJXwF-lS7Nzwm7XWLvNv78vyH95aIdWh1KaeaBswN-JbGyamcmGgtwO_0ICTOMmyGpku38b52CHVgq-Dc","kty":"RSA","q":"0fF9Fnv2CHso23K13AcZCf5eTFy2tk9zd3zhheq_IbnvEjPiMp_UXAozd3Lh8MZMXpM0HhB1Zpxs89eF8Pyy3GwWq08F2lMOxDGaYxeTU258Kq2KFPQ8VDsY6SGuAUAGkwYM_IXX0_ixt3FqrJIAuLI9pPAkymV9Hi_oq-FOopM","d":"SNx9spekPEb3VY9gOuD4HPE9BkLbG-dAgTVgKYuhG5xqHLeHBqB7Y9k7U1fDoLOszV59yQjM9mQXhjWa0bGFpYAl_U_i3eSw62Fh3b5NG1mkbJL2Vc0QhPtmH7X7rQOL01PxyJ83RDQ08xlC5vcMv_cAfIICqxo-qsOCnEN9_6sUrPQkv19oakwfMq9ACXDWPgkHNwWTBbyO7CtUCf6chYLPr13MsjLe_FSqY0rrNKr68qq0WtXFaldJR1SeE_lEd7pR1kAMoX0rm57yNqBljsgMweOLMz6swulIRZBWtmEfxvvYL0H5RnmwSbz00NSjErqwv518KPpNzS9O8qE-dw","e":"AQAB","kid":"testKeyId","qi":"nxcUfqj-jKFtGmGKIQo8b4M3eOnXdpm0QWyfgazzk_v1uhpbGg_VC7ZKxqY1DMfQHyjxFEz8a3mFsusVvwsCFuSzmqp36-f7tuXiAVfJjHs8cNIVZbO0GU1iRzd21yfBuj1JIr2FoU2g_1G--h48-tWXOmrPmBc9_xTSfjEeNGE","dp":"DiCqGIntv4UMiNUpbRhLlwbXDsGMM3khtgVgX28THTlOBImvvh8vgRGdrg1mc25SygjQGJ0d1hFtqo1fIxdwFx5PQ1MnRBMPzNlHLk_eq7qz_OplOnQWUujQabx_yxTel-oBOKguBncAZi8aM_xGmnqMiMWOhewNPqCKBihmWs8","alg":"RS384","dq":"zV2YqyHfbjRrpx7y3qTizW_R9ojLAlN98-hpA4K6LNehEQFHx5WpOc-QwMvUUJ7pnaoJVU9sSE_EFFNDZpUKsavaEQFgDE0rKKgNCdnJ99cgBu9zH0Q6r3qPx512hSqIQ9GramnS0jt4PKXpX54CrqlMu8dddc8JMTpUM65WKZk","n":"sjL0Psd5o5TdDbiHQrzfUkrVB0YaLJvSPkJJHyM5pKx09u9hFrpSYgPhYIPUChAN_wwmE17aDPR99L9KK90CQnYuakzqtTwm2FxWOK60A1Tu4KARWz6QZsGrVoqRRTF4oDqXtIl5PhZ9iSWnswR-AEZRZowOSJYo1OkjJl9f_X6lZyuR0UYs4GrG3sHAErndUtOSiC7CU1_-Vb43CHVXSBm3iM0UKHB98nDRp5dCIjrIqGw0SfTvsKTsLwYKo7aYF26B7_sIX7HDXtrnJ7CuvLG29RLuYrOwagVuiT_yueN1VTVQ-QJZs587EnhGI0tYI6WH_vMjeEUM9Y6hyYFVlQ"}' +_RSA_384_JWK_PUB = '{"kty":"RSA","e":"AQAB","kid":"testKeyId","alg":"RS384","n":"sjL0Psd5o5TdDbiHQrzfUkrVB0YaLJvSPkJJHyM5pKx09u9hFrpSYgPhYIPUChAN_wwmE17aDPR99L9KK90CQnYuakzqtTwm2FxWOK60A1Tu4KARWz6QZsGrVoqRRTF4oDqXtIl5PhZ9iSWnswR-AEZRZowOSJYo1OkjJl9f_X6lZyuR0UYs4GrG3sHAErndUtOSiC7CU1_-Vb43CHVXSBm3iM0UKHB98nDRp5dCIjrIqGw0SfTvsKTsLwYKo7aYF26B7_sIX7HDXtrnJ7CuvLG29RLuYrOwagVuiT_yueN1VTVQ-QJZs587EnhGI0tYI6WH_vMjeEUM9Y6hyYFVlQ"}' +_ES_256_JWK = '{"kty":"EC","d":"VmWWG6AU_TTajGJvrBWnG_NaUyH9rWJjUtzzCjrRPEU","crv":"P-256","kid":"testKeyId","x":"xtu5hUhexZV77FWXdeZ4rhgE9mT9i8UPwlEpbaBfiTk","y":"medk7WxeUgrA2T0oIybFfpAoTBlzZg5wKWEz4eR-Fbc","alg":"ES256"}' +_ES_256_JWK_PUB = '{"kty":"EC","crv":"P-256","kid":"testKeyId","x":"xtu5hUhexZV77FWXdeZ4rhgE9mT9i8UPwlEpbaBfiTk","y":"medk7WxeUgrA2T0oIybFfpAoTBlzZg5wKWEz4eR-Fbc","alg":"ES256"}' +_ES_NO_ALG_JWK = '{"kty":"EC","d":"ZhrhvO1Zk8ENkqlDXpHrEJ2TWgZhPSyjgX0j-8jUWig","crv":"P-256","kid":"testKeyId","x":"5BU5xuaUvasp9gUfSS3HGtqd1oHdGoHH3KtrzoQLd0Q","y":"WUjUeDikRXRHa-AWyNdH5Ye1Nyifd3P26F52Uv4eTVo"}' +_ES_NO_ALG_JWK_PUB = '{"kty":"EC","crv":"P-256","kid":"testKeyId","x":"5BU5xuaUvasp9gUfSS3HGtqd1oHdGoHH3KtrzoQLd0Q","y":"WUjUeDikRXRHa-AWyNdH5Ye1Nyifd3P26F52Uv4eTVo"}' + +class TestMomentoSigner(unittest.TestCase): + def test_rsa256(self): + token = MomentoSigner(_RSA_256_JWK).sign_access_token(SigningRequest( + cache_name="foo", + cache_key="bar", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + verified_claims = jwt.decode(token, PyJWK.from_json(_RSA_256_JWK_PUB).key, algorithms=["RS256"]) + self.assertEqual(verified_claims, {'exp': 4079276098, 'cache': 'foo', 'key': 'bar', 'method': ['get']}) + + def test_rsa_no_alg(self): + token = MomentoSigner(_RSA_NO_ALG_JWK).sign_access_token(SigningRequest( + cache_name="foo", + cache_key="bar", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + verified_claims = jwt.decode(token, PyJWK.from_json(_RSA_NO_ALG_JWK_PUB).key, algorithms=["RS256"]) + self.assertEqual(verified_claims, {'exp': 4079276098, 'cache': 'foo', 'key': 'bar', 'method': ['get']}) + + def test_rsa384(self): + token = MomentoSigner(_RSA_384_JWK).sign_access_token(SigningRequest( + cache_name="foo", + cache_key="bar", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + verified_claims = jwt.decode(token, PyJWK.from_json(_RSA_384_JWK_PUB).key, algorithms=["RS384"]) + self.assertEqual(verified_claims, {'exp': 4079276098, 'cache': 'foo', 'key': 'bar', 'method': ['get']}) + + + def test_es256(self): + token = MomentoSigner(_ES_256_JWK).sign_access_token(SigningRequest( + cache_name="foo", + cache_key="bar", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + verified_claims = jwt.decode(token, PyJWK.from_json(_ES_256_JWK_PUB).key, algorithms=["ES256"]) + self.assertEqual(verified_claims, {'exp': 4079276098, 'cache': 'foo', 'key': 'bar', 'method': ['get']}) + + def test_es_no_alg(self): + token = MomentoSigner(_ES_NO_ALG_JWK).sign_access_token(SigningRequest( + cache_name="foo", + cache_key="bar", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + verified_claims = jwt.decode(token, PyJWK.from_json(_ES_NO_ALG_JWK_PUB).key, algorithms=["ES256"]) + self.assertEqual(verified_claims, {'exp': 4079276098, 'cache': 'foo', 'key': 'bar', 'method': ['get']}) + + def test_empty_jwk_json_string(self): + with self.assertRaises(InvalidArgumentError): + MomentoSigner('') + + def test_nothing_jwk_json_string(self): + with self.assertRaises(InvalidArgumentError): + MomentoSigner('{}') + + def test_incomplete_jwk_json_string(self): + with self.assertRaises(InvalidArgumentError): + MomentoSigner('{"alg":"foo","kid":"bar","kty":"RSA"}') + + + def test_create_presigned_url_for_get(self): + result = MomentoSigner(_RSA_256_JWK).create_presigned_url("example.com", SigningRequest( + cache_name='!#$&\'()*+,/:;=?@[]', + cache_key="!#$&\'()*+,/:;=?@[]", + cache_operation=CacheOperation.GET, + expiry_epoch_seconds=4079276098 + )) + + self.assertEqual(result, 'https://example.com/cache/get/%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D/%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6InRlc3RLZXlJZCJ9.eyJleHAiOjQwNzkyNzYwOTgsImNhY2hlIjoiISMkJicoKSorLC86Oz0_QFtdIiwia2V5IjoiISMkJicoKSorLC86Oz0_QFtdIiwibWV0aG9kIjpbImdldCJdfQ.TpM6MLCnEWKJfwy0Mp5n9c9ygS5KwklpHqNTTCCncICTgENblbz3BGMUXUw4ljrTCt0uwxebX2iIROMGP32SevMlcygxsnXweHHVvkONAeok5ASvhiQtJcClb_4BMCkxPL7OlmaDlJVrcWdIqNa5E_HG6IWA6TXDJDzRrNPvknD7TVMRYDxaUMagdF3kPMBXZeO6CdlhLGb6Kfhmyc2mkdt8o-aCK3-n2vIXqiEwRNkSVr2iGBlP1l6nlVQ_dXfb0I56fLTncF1xWI1zDf2pbiQn9S3Z4n45_0C2yZy_FI8csWM2gYNqKK5VqsxkpbhlFJWyINxnyNA-FMBDhdJUZQ') + + def test_create_presigned_url_for_set(self): + result = MomentoSigner(_RSA_256_JWK).create_presigned_url("example.com", SigningRequest( + cache_name="!#$&\'()*+,/:;=?@[]", + cache_key="!#$&\'()*+,/:;=?@[]", + cache_operation=CacheOperation.SET, + expiry_epoch_seconds=4079276098, + ttl_seconds=5 + )) + + self.assertEqual(result, 'https://example.com/cache/set/%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D/%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6InRlc3RLZXlJZCJ9.eyJleHAiOjQwNzkyNzYwOTgsImNhY2hlIjoiISMkJicoKSorLC86Oz0_QFtdIiwia2V5IjoiISMkJicoKSorLC86Oz0_QFtdIiwibWV0aG9kIjpbInNldCJdLCJ0dGwiOjV9.GlsGrxuoMMvwyr-SHBxfAK_CEk55P218jcsBTW9PiXoYgd85BNuDaHcQJaE_31CRdJ5emXj_qIQZjFLz3LDb3zHSAHCSYzg_pDZyVB-yLaW4nOCiztaxlr_FsihgghHUziO2lFyPgNpx2iZUQ5RnUvaCkhwN8R-FbKhBQ4Oh8hG4xBuILEIA5fJ8PAhbvmqzgmgbzplbhPMVvNPVXbdEn5YCdqIuoo6oQTB8ksgm788d7zRBgJmcyF07lDviGFaXt7OYshBWxKZ8f8Iv9PTaDtIFWPJDdaYCTcaYoaOqA2VXFEFmqcuDwcRIaNGkaYd8emqnlKc4ItdASLWV5k1Wjg&ttl_milliseconds=5000')