Skip to content

Commit

Permalink
reactor: finally settled on a structure for the API classes that seem…
Browse files Browse the repository at this point in the history
…s repeatable
  • Loading branch information
Alan Nix committed Jan 4, 2022
1 parent 727ea07 commit 44a4c2f
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 188 deletions.
49 changes: 12 additions & 37 deletions laceworksdk/api/base_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,36 @@ class BaseEndpoint(object):
def __init__(self,
session,
endpoint_path,
object_type,
allowed_params):
object_type):
"""
:param session: An instance of the HttpSession class.
:param endpoint_path: The URL endpoint to use for the object type.
:param object_type: The Lacework object type to use for validation. (TODO)
:param allowed_params: The allowed parameters when making requests for the object type.
"""

super().__init__()
self._session = session
self._endpoint_path = endpoint_path
self._object_type = object_type
self._allowed_params = allowed_params

def _build_params(self, **kwargs):
def build_dict_from_items(self, *dicts, **items):
"""
A method to build URL params based on the kwargs provided to the _build_requests method.
"""

params = {}
keys_to_pop = []

for key, value in kwargs.items():
# Convert the key into lowerCamelCase
lower_camel_case_key = self._convert_lower_camel_case(key)

if lower_camel_case_key in self._allowed_params:
keys_to_pop.append(key)
params[lower_camel_case_key] = value

logger.debug(f"kwargs keys to pop: {keys_to_pop}")

# Clean up kwargs that will be passed as params
for key in keys_to_pop:
logger.debug(f"Popping {key} from kwargs...")
kwargs.pop(key)

return params, kwargs

def _build_data(self, **kwargs):
"""
A method to build a data payload based on the kwargs provided.
A method to build a dictionary based on inputs, pruning items that are None
"""

dict_list = list(dicts)
dict_list.append(items)
result = {}
for key, value in kwargs.items():
# Convert the key into lowerCamelCase
lower_camel_case_key = self._convert_lower_camel_case(key)

if value is not None:
result[lower_camel_case_key] = value
for d in dict_list:
for key, value in d.items():
camel_key = self._convert_lower_camel_case(key)
if value is not None:
result[camel_key] = value

return result

def _build_url(self, id=None, resource=None):
def build_url(self, id=None, resource=None):
"""
Builds the URL to use based on the endpoint path, resource, and ID.
"""
Expand Down
114 changes: 43 additions & 71 deletions laceworksdk/api/v2/agent_access_tokens.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
# -*- coding: utf-8 -*-
"""
Lacework Agent Access Tokens API wrapper.
Lacework AgentAccessTokens API wrapper.
"""

import logging
from laceworksdk.api.base_endpoint import BaseEndpoint

logger = logging.getLogger(__name__)
API_ENDPOINT = "/api/v2/AgentAccessTokens"
OBJECT_TYPE = "AgentAccessTokens"


class AgentAccessTokensAPI(object):
"""
Lacework Agent Access Tokens API.
"""

class AgentAccessTokensAPI(BaseEndpoint):
def __init__(self, session):
"""
Initializes the AgentAccessTokensAPI object.
Expand All @@ -22,131 +19,106 @@ def __init__(self, session):
:return AgentAccessTokensAPI object.
"""

super(AgentAccessTokensAPI, self).__init__()

self._session = session
super(AgentAccessTokensAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE)

def create(self,
alias,
alias=None,
enabled=True,
org=False):
**request_params):
"""
A method to create a new agent access token.
A method to create a new AgentAccessTokens object.
:param alias: A string representing the alias of the agent access token.
:param enabled: A boolean/integer representing whether the agent access token is enabled.
(0 or 1)
:param org: A boolean representing whether the request should be performed
at the Organization level
:param request_params: Additional request parameters.
(provides support for parameters that may be added in the future)
:return response json
"""

logger.info("Creating agent access token in Lacework...")
if alias is None:
alias = ""

# Build the Agent Access Tokens request URI
api_uri = "/api/v2/AgentAccessTokens"

data = {
"tokenAlias": alias,
"tokenEnabled": int(bool(enabled))
}

response = self._session.post(api_uri, org=org, data=data)
json = self.build_dict_from_items(
request_params,
token_alias=alias,
token_enabled=int(bool(enabled))
)

response = self._session.post(self.build_url(), json=json)
return response.json()

def get(self,
id=None,
org=False):
id=None):
"""
A method to get agent access tokens.
A method to get AgentAccessTokens objects.
:param id: A string representing the agent access token ID.
:param org: A boolean representing whether the request should be performed
at the Organization level
:return response json
"""

logger.info("Getting agent access token info from Lacework...")

# Build the Agent Access Tokens request URI
if id:
api_uri = f"/api/v2/AgentAccessTokens/{id}"
else:
api_uri = "/api/v2/AgentAccessTokens"

response = self._session.get(api_uri, org=org)
response = self._session.get(self.build_url(id=id))

return response.json()

def get_by_id(self,
id,
org=False):
id):
"""
A method to get an agent access token by ID.
A method to get an AgentAccessTokens object by ID.
:param id: A string representing the agent access token ID.
:param org: A boolean representing whether the request should be performed
at the Organization level
:return response json
"""

return self.get(id=id, org=org)
return self.get(id=id)

def search(self,
query_data=None,
org=False):
json=None,
query_data=None):
"""
A method to search agent access tokens.
A method to search AgentAccessTokens objects.
:param query_data: A dictionary containing the desired search parameters.
:param json: A dictionary containing the desired search parameters.
(filters, returns)
:return response json
"""

logger.info("Searching agent access tokens from Lacework...")

# Build the Agent Access Tokens request URI
api_uri = "/api/v2/AgentAccessTokens/search"
# TODO: Remove this on v1.0 release - provided for back compat
if query_data:
json = query_data

response = self._session.post(api_uri, data=query_data, org=org)
response = self._session.post(self.build_url(resource='search'), json=json)

return response.json()

def update(self,
id,
alias=None,
enabled=None,
org=False):
**request_params):
"""
A method to update an agent access token.
A method to update an AgentAccessTokens object.
:param id: A string representing the agent access token ID.
:param alias: A string representing the alias of the agent access token.
:param enabled: A boolean/integer representing whether the agent access token is enabled.
(0 or 1)
:param org: A boolean representing whether the request should be performed
at the Organization level
:param request_params: Additional request parameters.
(provides support for parameters that may be added in the future)
:return response json
"""

logger.info("Updating agent access token in Lacework...")

# Build the Agent Access Tokens request URI
api_uri = f"/api/v2/AgentAccessTokens/{id}"

tmp_data = {}

if alias:
tmp_data["tokenAlias"] = alias
if enabled is not None:
tmp_data["tokenEnabled"] = int(bool(enabled))
json = self.build_dict_from_items(
request_params,
token_alias=alias,
token_enabled=int(bool(enabled))
)

response = self._session.patch(api_uri, org=org, data=tmp_data)
response = self._session.patch(self.build_url(id=id), json=json)

return response.json()
Loading

0 comments on commit 44a4c2f

Please sign in to comment.