Skip to content

Commit

Permalink
Enhanced token handling methods. Closes #835.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Jan 12, 2023
1 parent 975d995 commit 2ac99f8
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 47 deletions.
59 changes: 32 additions & 27 deletions src/falconpy/_auth_object/_falcon_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ def logout(self) -> dict or bool:

# The default behavior for both the login and logout handlers is to return
# the entire dictionary created by the token API response.
def _login_handler(self) -> dict:
operation_id = "oauth2AccessToken"
target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if operation_id in ep[0]][0]}"
def _login_handler(self, stateful: bool = True) -> dict:
op_id = "oauth2AccessToken"
target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if op_id in ep[0]][0]}"
header_payload = {}
if self.cred_format_valid:
data_payload = {
Expand All @@ -154,37 +154,41 @@ def _login_handler(self) -> dict:
headers=header_payload, verify=self.ssl_verify,
proxy=self.proxy, timeout=self.timeout,
user_agent=self.user_agent)
if isinstance(returned, dict): # Issue #433
self.token_status = returned["status_code"]
if self.token_status == 201:
self.token_expiration = returned["body"]["expires_in"]
self.token_time = time.time()
self.token_value = returned["body"]["access_token"]
self.token_fail_reason = None
self.base_url = autodiscover_region(self.base_url, returned)
else:
self.token_expiration = 0 # Aligning to Uber class functionality
if "errors" in returned["body"]:
if returned["body"]["errors"]:
self.token_fail_reason = returned["body"]["errors"][0]["message"]

if isinstance(returned, dict): # Issue 433
if stateful:
self.token_status = returned["status_code"]
if self.token_status == 201:
self.token_expiration = returned["body"]["expires_in"]
self.token_time = time.time()
self.token_value = returned["body"]["access_token"]
self.token_fail_reason = None
self.base_url = autodiscover_region(self.base_url, returned)
else:
self.token_expiration = 0 # Aligning to Uber class functionality
if "errors" in returned["body"]:
if returned["body"]["errors"]:
self.token_fail_reason = returned["body"]["errors"][0]["message"]
else:
returned = generate_error_result("Unexpected API response received", 403)
self.token_expiration = 0
self.token_fail_reason = TokenFailReason["UNEXPECTED"].value
self.token_status = 403
if stateful:
self.token_expiration = 0
self.token_fail_reason = TokenFailReason["UNEXPECTED"].value
self.token_status = 403
else:
returned = generate_error_result("Invalid credentials specified", 403)
self.token_expiration = 0
self.token_fail_reason = TokenFailReason["INVALID"].value
self.token_status = 403
if stateful:
self.token_expiration = 0
self.token_fail_reason = TokenFailReason["INVALID"].value
self.token_status = 403

return returned

def _logout_handler(self, token_value: str = None) -> dict:
def _logout_handler(self, token_value: str = None, stateful: bool = True) -> dict:
if not token_value:
token_value = self.token_value
operation_id = "oauth2RevokeToken"
target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if operation_id in ep[0]][0]}"
op_id = "oauth2RevokeToken"
target_url = f"{self.base_url}{[ep[2] for ep in AuthEndpoints if op_id in ep[0]][0]}"
if self.cred_format_valid:
b64cred = generate_b64cred(self.creds["client_id"], self.creds["client_secret"])
header_payload = {"Authorization": f"basic {b64cred}"}
Expand All @@ -193,8 +197,9 @@ def _logout_handler(self, token_value: str = None) -> dict:
headers=header_payload, verify=self.ssl_verify,
proxy=self.proxy, timeout=self.timeout,
user_agent=self.user_agent)
self.token_expiration = 0
self.token_value = False
if stateful:
self.token_expiration = 0
self.token_value = False
else:
returned = generate_error_result("Invalid credentials specified", 403)

Expand Down
12 changes: 6 additions & 6 deletions src/falconpy/_auth_object/_uber_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def __init__(self,
#
# Override the default login and logout handlers to
# provide Uber Class-specific functionality.
def _login_handler(self) -> bool:
def login(self) -> bool:
"""Generate an authorization token."""
super()._login_handler()
super().login()

return self.authenticated

def _logout_handler(self) -> bool:
def logout(self) -> bool:
"""Revoke the current authorization token."""
result = super()._logout_handler()
result = super().logout()

return bool(result["status_code"] == 200)

Expand All @@ -118,7 +118,7 @@ def authenticate(self) -> bool:
----
Consider updating your code to leverage the login method.
"""
return super().login()
return self.login()

def deauthenticate(self) -> bool:
"""Legacy Uber Class functionality handler.
Expand All @@ -127,7 +127,7 @@ def deauthenticate(self) -> bool:
----
Consider updating your code to leverage the logout method.
"""
return super().logout()
return self.logout()

def valid_cred_format(self) -> bool:
"""Legacy property to confirm credential dictionary format.
Expand Down
39 changes: 30 additions & 9 deletions src/falconpy/api_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@


class APIHarness(UberInterface):
"""This one does it all. It's like the One Ring with significantly fewer orcs.
"""The FalconPy Uber Class.
The Uber Class inherits from the UberInterface class, which is a stand alone
class that encapsulates the FalconAuth class. This allows the Uber Class to
Expand All @@ -63,17 +63,19 @@ class that encapsulates the FalconAuth class. This allows the Uber Class to
This means the Uber Class does not include an auth_object, as it is one.
As of FalconPy v1.3.0, Object Authentication is still unssupported for
Uber Class usage scenarios.
This one does it all. It's like the One Ring with significantly fewer orcs.
"""
# pylint: disable=R0913
# `-.
# -._ `. `-.`-. `-.
# _._ `-._`. .--. `.
# .-' '-. `-|\/ \| `-.
# .' '-._\ (o)O) `-.
# / / _.--.\ '. `-. `-.
# / / _.--.) '. `-. `-.
# /| ( | / -. ( -._( -._ '. '.
# / \ \-.__\ \_.-'`.`.__'. `-, '. .'
# | /\ | / \ \ `--')/ .-'.'.'
# / \ \-.__\ \_.-'`.`.__' . `-, '. .'
# | /\ | / \ \ `--' / .-'.'.'
# .._/ / / / / / \ \ .' . .' .'
# / ___/ | / \ \ \ \__ '.'. . .
# \ \___ \ ( \ \ `._ `. .' . ' .'
Expand Down Expand Up @@ -217,14 +219,33 @@ def command(self, *args, **kwargs) -> dict or bytes:
kwargs = handle_body_payload_ids(kwargs)
# Only accept allowed HTTP methods
if method in _ALLOWED_METHODS:
returned = perform_request(
**uber_request_keywords(self, method, operation, target, kwargs, container)
)
if operation == "oauth2AccessToken":
# Calling the token generation operation directly from the
# Uber Class does not change the underlying auth_object state.
returned = self._login_handler(stateful=False)
elif operation == "oauth2RevokeToken":
# Calling the token revocation operation directly requires a
# token_value. Doing so in this manner from the Uber Class does
# not change the underlying auth_object state.
token_value = kwargs.get("token_value", None)
if not token_value:
returned = generate_error_result(
message="The token_value keyword is required to use this operation.",
code=400
)
else:
returned = self._logout_handler(token_value=token_value,
stateful=False
)
else:
returned = perform_request(
**uber_request_keywords(self, method, operation, target, kwargs, container)
)
else:
# Bad HTTP method
returned = generate_error_result(message="Invalid HTTP method specified.",
code=405
)
code=405
)
else:
# That command doesn't exist, have a cup of tea instead
returned = generate_error_result(message="Invalid API operation specified.", code=418)
Expand Down
10 changes: 5 additions & 5 deletions src/falconpy/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def revoke(self, token: str) -> dict:
"""
return self._logout_handler(token)

def token(self) -> dict:
def token(self, generate_only: bool = False) -> dict:
"""Generate an authorization token.
HTTP Method: POST
Expand All @@ -193,23 +193,23 @@ def token(self) -> dict:
Keyword arguments
----
This method does not accept keyword arguments.
generate_only : bool
Flag indicating if this request should update the stored token value and expiration.
Arguments
----
This method does not accept arguments.
When not specified as a keyword, generate_only is assumed as the only accepted argument.
Returns
----
dict
Dictionary object containing API response.
"""
return self._login_handler()
return self._login_handler(not generate_only)

# These method names align to the operation IDs in the API but
# do not conform to snake_case / PEP8 and are defined here for
# backwards compatibility / ease of use purposes
#login = token
oauth2AccessToken = token
oAuth2AccessToken = token
oauth2RevokeToken = revoke
Expand Down

0 comments on commit 2ac99f8

Please sign in to comment.