From 2ac99f83e95cddca7e03f82bd74aa9ae08420b47 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Wed, 23 Nov 2022 02:11:34 -0500 Subject: [PATCH] Enhanced token handling methods. Closes #835. --- src/falconpy/_auth_object/_falcon_auth.py | 59 +++++++++++--------- src/falconpy/_auth_object/_uber_interface.py | 12 ++-- src/falconpy/api_complete.py | 39 ++++++++++--- src/falconpy/oauth2.py | 10 ++-- 4 files changed, 73 insertions(+), 47 deletions(-) diff --git a/src/falconpy/_auth_object/_falcon_auth.py b/src/falconpy/_auth_object/_falcon_auth.py index c3a831cfe..6ec4a757a 100644 --- a/src/falconpy/_auth_object/_falcon_auth.py +++ b/src/falconpy/_auth_object/_falcon_auth.py @@ -139,9 +139,9 @@ def logout(self) -> dict or bool: # The default behavior for both the login and logout handlers is to return # the entire dictionary created by the token API response. - def _login_handler(self) -> dict: - operation_id = "oauth2AccessToken" - target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if operation_id in ep[0]][0]}" + def _login_handler(self, stateful: bool = True) -> dict: + op_id = "oauth2AccessToken" + target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if op_id in ep[0]][0]}" header_payload = {} if self.cred_format_valid: data_payload = { @@ -154,37 +154,41 @@ def _login_handler(self) -> dict: headers=header_payload, verify=self.ssl_verify, proxy=self.proxy, timeout=self.timeout, user_agent=self.user_agent) - if isinstance(returned, dict): # Issue #433 - self.token_status = returned["status_code"] - if self.token_status == 201: - self.token_expiration = returned["body"]["expires_in"] - self.token_time = time.time() - self.token_value = returned["body"]["access_token"] - self.token_fail_reason = None - self.base_url = autodiscover_region(self.base_url, returned) - else: - self.token_expiration = 0 # Aligning to Uber class functionality - if "errors" in returned["body"]: - if returned["body"]["errors"]: - self.token_fail_reason = returned["body"]["errors"][0]["message"] + + if isinstance(returned, dict): # Issue 433 + if stateful: + self.token_status = returned["status_code"] + if self.token_status == 201: + self.token_expiration = returned["body"]["expires_in"] + self.token_time = time.time() + self.token_value = returned["body"]["access_token"] + self.token_fail_reason = None + self.base_url = autodiscover_region(self.base_url, returned) + else: + self.token_expiration = 0 # Aligning to Uber class functionality + if "errors" in returned["body"]: + if returned["body"]["errors"]: + self.token_fail_reason = returned["body"]["errors"][0]["message"] else: returned = generate_error_result("Unexpected API response received", 403) - self.token_expiration = 0 - self.token_fail_reason = TokenFailReason["UNEXPECTED"].value - self.token_status = 403 + if stateful: + self.token_expiration = 0 + self.token_fail_reason = TokenFailReason["UNEXPECTED"].value + self.token_status = 403 else: returned = generate_error_result("Invalid credentials specified", 403) - self.token_expiration = 0 - self.token_fail_reason = TokenFailReason["INVALID"].value - self.token_status = 403 + if stateful: + self.token_expiration = 0 + self.token_fail_reason = TokenFailReason["INVALID"].value + self.token_status = 403 return returned - def _logout_handler(self, token_value: str = None) -> dict: + def _logout_handler(self, token_value: str = None, stateful: bool = True) -> dict: if not token_value: token_value = self.token_value - operation_id = "oauth2RevokeToken" - target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if operation_id in ep[0]][0]}" + op_id = "oauth2RevokeToken" + target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if op_id in ep[0]][0]}" if self.cred_format_valid: b64cred = generate_b64cred(self.creds["client_id"], self.creds["client_secret"]) header_payload = {"Authorization": f"basic {b64cred}"} @@ -193,8 +197,9 @@ def _logout_handler(self, token_value: str = None) -> dict: headers=header_payload, verify=self.ssl_verify, proxy=self.proxy, timeout=self.timeout, user_agent=self.user_agent) - self.token_expiration = 0 - self.token_value = False + if stateful: + self.token_expiration = 0 + self.token_value = False else: returned = generate_error_result("Invalid credentials specified", 403) diff --git a/src/falconpy/_auth_object/_uber_interface.py b/src/falconpy/_auth_object/_uber_interface.py index e010c67c8..686c71c21 100644 --- a/src/falconpy/_auth_object/_uber_interface.py +++ b/src/falconpy/_auth_object/_uber_interface.py @@ -93,15 +93,15 @@ def __init__(self, # # Override the default login and logout handlers to # provide Uber Class-specific functionality. - def _login_handler(self) -> bool: + def login(self) -> bool: """Generate an authorization token.""" - super()._login_handler() + super().login() return self.authenticated - def _logout_handler(self) -> bool: + def logout(self) -> bool: """Revoke the current authorization token.""" - result = super()._logout_handler() + result = super().logout() return bool(result["status_code"] == 200) @@ -118,7 +118,7 @@ def authenticate(self) -> bool: ---- Consider updating your code to leverage the login method. """ - return super().login() + return self.login() def deauthenticate(self) -> bool: """Legacy Uber Class functionality handler. @@ -127,7 +127,7 @@ def deauthenticate(self) -> bool: ---- Consider updating your code to leverage the logout method. """ - return super().logout() + return self.logout() def valid_cred_format(self) -> bool: """Legacy property to confirm credential dictionary format. diff --git a/src/falconpy/api_complete.py b/src/falconpy/api_complete.py index 67426ff76..1acc0eed2 100644 --- a/src/falconpy/api_complete.py +++ b/src/falconpy/api_complete.py @@ -53,7 +53,7 @@ class APIHarness(UberInterface): - """This one does it all. It's like the One Ring with significantly fewer orcs. + """The FalconPy Uber Class. The Uber Class inherits from the UberInterface class, which is a stand alone class that encapsulates the FalconAuth class. This allows the Uber Class to @@ -63,6 +63,8 @@ class that encapsulates the FalconAuth class. This allows the Uber Class to This means the Uber Class does not include an auth_object, as it is one. As of FalconPy v1.3.0, Object Authentication is still unssupported for Uber Class usage scenarios. + + This one does it all. It's like the One Ring with significantly fewer orcs. """ # pylint: disable=R0913 # `-. @@ -70,10 +72,10 @@ class that encapsulates the FalconAuth class. This allows the Uber Class to # _._ `-._`. .--. `. # .-' '-. `-|\/ \| `-. # .' '-._\ (o)O) `-. - # / / _.--.\ '. `-. `-. + # / / _.--.) '. `-. `-. # /| ( | / -. ( -._( -._ '. '. - # / \ \-.__\ \_.-'`.`.__'. `-, '. .' - # | /\ | / \ \ `--')/ .-'.'.' + # / \ \-.__\ \_.-'`.`.__' . `-, '. .' + # | /\ | / \ \ `--' / .-'.'.' # .._/ / / / / / \ \ .' . .' .' # / ___/ | / \ \ \ \__ '.'. . . # \ \___ \ ( \ \ `._ `. .' . ' .' @@ -217,14 +219,33 @@ def command(self, *args, **kwargs) -> dict or bytes: kwargs = handle_body_payload_ids(kwargs) # Only accept allowed HTTP methods if method in _ALLOWED_METHODS: - returned = perform_request( - **uber_request_keywords(self, method, operation, target, kwargs, container) - ) + if operation == "oauth2AccessToken": + # Calling the token generation operation directly from the + # Uber Class does not change the underlying auth_object state. + returned = self._login_handler(stateful=False) + elif operation == "oauth2RevokeToken": + # Calling the token revocation operation directly requires a + # token_value. Doing so in this manner from the Uber Class does + # not change the underlying auth_object state. + token_value = kwargs.get("token_value", None) + if not token_value: + returned = generate_error_result( + message="The token_value keyword is required to use this operation.", + code=400 + ) + else: + returned = self._logout_handler(token_value=token_value, + stateful=False + ) + else: + returned = perform_request( + **uber_request_keywords(self, method, operation, target, kwargs, container) + ) else: # Bad HTTP method returned = generate_error_result(message="Invalid HTTP method specified.", - code=405 - ) + code=405 + ) else: # That command doesn't exist, have a cup of tea instead returned = generate_error_result(message="Invalid API operation specified.", code=418) diff --git a/src/falconpy/oauth2.py b/src/falconpy/oauth2.py index 17936782a..2797a6d8a 100644 --- a/src/falconpy/oauth2.py +++ b/src/falconpy/oauth2.py @@ -182,7 +182,7 @@ def revoke(self, token: str) -> dict: """ return self._logout_handler(token) - def token(self) -> dict: + def token(self, generate_only: bool = False) -> dict: """Generate an authorization token. HTTP Method: POST @@ -193,23 +193,23 @@ def token(self) -> dict: Keyword arguments ---- - This method does not accept keyword arguments. + generate_only : bool + Flag indicating if this request should update the stored token value and expiration. Arguments ---- - This method does not accept arguments. + When not specified as a keyword, generate_only is assumed as the only accepted argument. Returns ---- dict Dictionary object containing API response. """ - return self._login_handler() + return self._login_handler(not generate_only) # These method names align to the operation IDs in the API but # do not conform to snake_case / PEP8 and are defined here for # backwards compatibility / ease of use purposes - #login = token oauth2AccessToken = token oAuth2AccessToken = token oauth2RevokeToken = revoke