From 44a4c2fadea3da4f14281bbfaa5fd06bb0fb5dc6 Mon Sep 17 00:00:00 2001 From: Alan Nix Date: Tue, 4 Jan 2022 00:30:25 -0500 Subject: [PATCH] reactor: finally settled on a structure for the API classes that seems repeatable --- laceworksdk/api/base_endpoint.py | 49 +++------- laceworksdk/api/v2/agent_access_tokens.py | 114 ++++++++-------------- laceworksdk/api/v2/alert_channels.py | 67 ++++++------- laceworksdk/api/v2/cloud_activities.py | 61 +++++++----- laceworksdk/http_session.py | 12 --- tests/api/v2/test_alert_channels.py | 12 ++- 6 files changed, 127 insertions(+), 188 deletions(-) diff --git a/laceworksdk/api/base_endpoint.py b/laceworksdk/api/base_endpoint.py index 4e42f41..2146438 100644 --- a/laceworksdk/api/base_endpoint.py +++ b/laceworksdk/api/base_endpoint.py @@ -12,61 +12,36 @@ class BaseEndpoint(object): def __init__(self, session, endpoint_path, - object_type, - allowed_params): + object_type): """ :param session: An instance of the HttpSession class. :param endpoint_path: The URL endpoint to use for the object type. :param object_type: The Lacework object type to use for validation. (TODO) - :param allowed_params: The allowed parameters when making requests for the object type. """ super().__init__() self._session = session self._endpoint_path = endpoint_path self._object_type = object_type - self._allowed_params = allowed_params - def _build_params(self, **kwargs): + def build_dict_from_items(self, *dicts, **items): """ - A method to build URL params based on the kwargs provided to the _build_requests method. - """ - - params = {} - keys_to_pop = [] - - for key, value in kwargs.items(): - # Convert the key into lowerCamelCase - lower_camel_case_key = self._convert_lower_camel_case(key) - - if lower_camel_case_key in self._allowed_params: - keys_to_pop.append(key) - params[lower_camel_case_key] = value - - logger.debug(f"kwargs keys to pop: {keys_to_pop}") - - # Clean up kwargs that will be passed as params - for key in keys_to_pop: - logger.debug(f"Popping {key} from kwargs...") - kwargs.pop(key) - - return params, kwargs - - def _build_data(self, **kwargs): - """ - A method to build a data payload based on the kwargs provided. + A method to build a dictionary based on inputs, pruning items that are None """ + dict_list = list(dicts) + dict_list.append(items) result = {} - for key, value in kwargs.items(): - # Convert the key into lowerCamelCase - lower_camel_case_key = self._convert_lower_camel_case(key) - if value is not None: - result[lower_camel_case_key] = value + for d in dict_list: + for key, value in d.items(): + camel_key = self._convert_lower_camel_case(key) + if value is not None: + result[camel_key] = value + return result - def _build_url(self, id=None, resource=None): + def build_url(self, id=None, resource=None): """ Builds the URL to use based on the endpoint path, resource, and ID. """ diff --git a/laceworksdk/api/v2/agent_access_tokens.py b/laceworksdk/api/v2/agent_access_tokens.py index a708b58..37d75bc 100644 --- a/laceworksdk/api/v2/agent_access_tokens.py +++ b/laceworksdk/api/v2/agent_access_tokens.py @@ -1,18 +1,15 @@ # -*- coding: utf-8 -*- """ -Lacework Agent Access Tokens API wrapper. +Lacework AgentAccessTokens API wrapper. """ -import logging +from laceworksdk.api.base_endpoint import BaseEndpoint -logger = logging.getLogger(__name__) +API_ENDPOINT = "/api/v2/AgentAccessTokens" +OBJECT_TYPE = "AgentAccessTokens" -class AgentAccessTokensAPI(object): - """ - Lacework Agent Access Tokens API. - """ - +class AgentAccessTokensAPI(BaseEndpoint): def __init__(self, session): """ Initializes the AgentAccessTokensAPI object. @@ -22,98 +19,79 @@ def __init__(self, session): :return AgentAccessTokensAPI object. """ - super(AgentAccessTokensAPI, self).__init__() - - self._session = session + super(AgentAccessTokensAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE) def create(self, - alias, + alias=None, enabled=True, - org=False): + **request_params): """ - A method to create a new agent access token. + A method to create a new AgentAccessTokens object. :param alias: A string representing the alias of the agent access token. :param enabled: A boolean/integer representing whether the agent access token is enabled. (0 or 1) - :param org: A boolean representing whether the request should be performed - at the Organization level + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return response json """ - logger.info("Creating agent access token in Lacework...") + if alias is None: + alias = "" - # Build the Agent Access Tokens request URI - api_uri = "/api/v2/AgentAccessTokens" - - data = { - "tokenAlias": alias, - "tokenEnabled": int(bool(enabled)) - } - - response = self._session.post(api_uri, org=org, data=data) + json = self.build_dict_from_items( + request_params, + token_alias=alias, + token_enabled=int(bool(enabled)) + ) + response = self._session.post(self.build_url(), json=json) return response.json() def get(self, - id=None, - org=False): + id=None): """ - A method to get agent access tokens. + A method to get AgentAccessTokens objects. :param id: A string representing the agent access token ID. - :param org: A boolean representing whether the request should be performed - at the Organization level :return response json """ - logger.info("Getting agent access token info from Lacework...") - - # Build the Agent Access Tokens request URI - if id: - api_uri = f"/api/v2/AgentAccessTokens/{id}" - else: - api_uri = "/api/v2/AgentAccessTokens" - - response = self._session.get(api_uri, org=org) + response = self._session.get(self.build_url(id=id)) return response.json() def get_by_id(self, - id, - org=False): + id): """ - A method to get an agent access token by ID. + A method to get an AgentAccessTokens object by ID. :param id: A string representing the agent access token ID. - :param org: A boolean representing whether the request should be performed - at the Organization level :return response json """ - return self.get(id=id, org=org) + return self.get(id=id) def search(self, - query_data=None, - org=False): + json=None, + query_data=None): """ - A method to search agent access tokens. + A method to search AgentAccessTokens objects. - :param query_data: A dictionary containing the desired search parameters. + :param json: A dictionary containing the desired search parameters. (filters, returns) :return response json """ - logger.info("Searching agent access tokens from Lacework...") - - # Build the Agent Access Tokens request URI - api_uri = "/api/v2/AgentAccessTokens/search" + # TODO: Remove this on v1.0 release - provided for back compat + if query_data: + json = query_data - response = self._session.post(api_uri, data=query_data, org=org) + response = self._session.post(self.build_url(resource='search'), json=json) return response.json() @@ -121,32 +99,26 @@ def update(self, id, alias=None, enabled=None, - org=False): + **request_params): """ - A method to update an agent access token. + A method to update an AgentAccessTokens object. :param id: A string representing the agent access token ID. :param alias: A string representing the alias of the agent access token. :param enabled: A boolean/integer representing whether the agent access token is enabled. (0 or 1) - :param org: A boolean representing whether the request should be performed - at the Organization level + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return response json """ - logger.info("Updating agent access token in Lacework...") - - # Build the Agent Access Tokens request URI - api_uri = f"/api/v2/AgentAccessTokens/{id}" - - tmp_data = {} - - if alias: - tmp_data["tokenAlias"] = alias - if enabled is not None: - tmp_data["tokenEnabled"] = int(bool(enabled)) + json = self.build_dict_from_items( + request_params, + token_alias=alias, + token_enabled=int(bool(enabled)) + ) - response = self._session.patch(api_uri, org=org, data=tmp_data) + response = self._session.patch(self.build_url(id=id), json=json) return response.json() diff --git a/laceworksdk/api/v2/alert_channels.py b/laceworksdk/api/v2/alert_channels.py index bb76140..ee851a1 100644 --- a/laceworksdk/api/v2/alert_channels.py +++ b/laceworksdk/api/v2/alert_channels.py @@ -6,7 +6,6 @@ from laceworksdk.api.base_endpoint import BaseEndpoint API_ENDPOINT = "/api/v2/AlertChannels" -ALLOWED_URL_PARAMS = None OBJECT_TYPE = "AlertChannels" @@ -20,14 +19,14 @@ def __init__(self, session): :return AlertChannelsAPI object. """ - super(AlertChannelsAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE, ALLOWED_URL_PARAMS) + super(AlertChannelsAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE) def create(self, name, type, enabled, data, - **kwargs): + **request_params): """ A method to create a new AlertChannels object. @@ -36,25 +35,27 @@ def create(self, :param enabled: A boolean/integer representing whether the alert channel is enabled. (0 or 1) :param data: A JSON object matching the schema for the specified type. - :param kwargs: Arguments passed on to the HttpSession object. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return response json """ - json = self._build_data( + json = self.build_dict_from_items( + request_params, name=name, type=type, enabled=int(bool(enabled)), data=data ) - response = self._session.post(self._build_url(), json=json, **kwargs) + response = self._session.post(self.build_url(), json=json) + return response.json() def get(self, guid=None, - type=None, - **kwargs): + type=None): """ A method to get AlertChannels objects. @@ -65,57 +66,51 @@ def get(self, :return response json """ - if guid: - response = self._session.get(self._build_url(id=guid), **kwargs) - elif type: - response = self._session.get(self._build_url(resource=type), **kwargs) - else: - response = self._session.get(self._build_url(), **kwargs) + response = self._session.get(self.build_url(id=guid, resource=type)) return response.json() def get_by_guid(self, - guid, - **kwargs): + guid): """ A method to get AlertChannels objects by GUID. :param guid: A string representing the alert channel GUID. - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - return self.get(guid=guid, **kwargs) + return self.get(guid=guid) def get_by_type(self, - type, - **kwargs): + type): """ A method to get AlertChannels objects by type. :param type: A string representing the alert channel type. - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - return self.get(type=type, **kwargs) + return self.get(type=type) def search(self, json=None, - **kwargs): + query_data=None): """ A method to search AlertChannels objects. :param json: A dictionary containing the desired search parameters. (filters, returns) - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - response = self._session.post(self._build_url(resource='search'), json=json, **kwargs) + # TODO: Remove this on v1.0 release - provided for back compat + if query_data: + json = query_data + + response = self._session.post(self.build_url(resource='search'), json=json) return response.json() @@ -125,7 +120,7 @@ def update(self, type=None, enabled=None, data=None, - **kwargs): + **request_params): """ A method to update an AlertChannels object. @@ -135,50 +130,48 @@ def update(self, :param enabled: A boolean/integer representing whether the alert channel is enabled. (0 or 1) :param data: A JSON object matching the schema for the specified type. - :param kwargs: Arguments passed on to the HttpSession object. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return response json """ - json = self._build_data( + json = self.build_dict_from_items( + request_params, name=name, type=type, enabled=int(bool(enabled)), data=data ) - response = self._session.patch(self._build_url(id=guid), json=json, **kwargs) + response = self._session.patch(self.build_url(id=guid), json=json) return response.json() def delete(self, - guid, - **kwargs): + guid): """ A method to delete an AlertChannels object. :param guid: A string representing the alert channel GUID. - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - response = self._session.delete(self._build_url(id=guid), **kwargs) + response = self._session.delete(self.build_url(id=guid)) return response def test(self, - guid, - **kwargs): + guid): """ A method to test an AlertChannels object. :param guid: A string representing the alert channel GUID. - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - response = self._session.post(self._build_url(resource=f"{guid}/test"), **kwargs) + response = self._session.post(self.build_url(resource=f"{guid}/test")) return response diff --git a/laceworksdk/api/v2/cloud_activities.py b/laceworksdk/api/v2/cloud_activities.py index 54085f4..dcb11de 100644 --- a/laceworksdk/api/v2/cloud_activities.py +++ b/laceworksdk/api/v2/cloud_activities.py @@ -5,10 +5,6 @@ from laceworksdk.api.base_endpoint import BaseEndpoint -API_ENDPOINT = "/api/v2/CloudActivities" -ALLOWED_URL_PARAMS = ["startTime", "endTime"] -OBJECT_TYPE = "CloudActivities" - API_ENDPOINT = "/api/v2/CloudActivities" OBJECT_TYPE = "CloudActivities" @@ -23,69 +19,79 @@ def __init__(self, session): :return CloudActivitiesAPI object. """ - super(CloudActivitiesAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE, ALLOWED_URL_PARAMS) + super(CloudActivitiesAPI, self).__init__(session, API_ENDPOINT, OBJECT_TYPE) def get(self, start_time=None, end_time=None, - **kwargs): + **request_params): """ A method to get CloudActivities objects. :param start_time: A "%Y-%m-%dT%H:%M:%SZ" structured timestamp to begin from. :param end_time: A "%Y-%m-%dT%H:%M:%S%Z" structured timestamp to end at. - :param kwargs: Arguments passed on to the HttpSession object. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return response json """ - params, kwargs = self._build_params(start_time=start_time, - end_time=end_time, - **kwargs) + params = self.build_dict_from_items( + request_params, + start_time=start_time, + end_time=end_time + ) + + response = self._session.get(self.build_url(), params=params) - response = self._session.get(self._build_url(), params=params, **kwargs) return response.json() def get_pages(self, start_time=None, end_time=None, - **kwargs): + **request_params): """ A method to get pages of CloudActivities objects. :param start_time: A "%Y-%m-%dT%H:%M:%SZ" structured timestamp to begin from. :param end_time: A "%Y-%m-%dT%H:%M:%S%Z" structured timestamp to end at. - :param kwargs: Arguments passed on to the HttpSession object. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return a generator which, when iterated, yields pages of CloudActivities objects returned by the Lacework API. """ - params, kwargs = self._build_params(start_time=start_time, - end_time=end_time, - **kwargs) + params = self.build_dict_from_items( + request_params, + start_time=start_time, + end_time=end_time + ) - for page in self._session.get_pages(self._build_url(), params=params, **kwargs): + for page in self._session.get_pages(self.build_url(), params=params): yield page.json() def get_items(self, start_time=None, end_time=None, - **kwargs): + **request_params): """ A method to get CloudActivities objects. :param start_time: A "%Y-%m-%dT%H:%M:%SZ" structured timestamp to begin from. :param end_time: A "%Y-%m-%dT%H:%M:%S%Z" structured timestamp to end at. - :param kwargs: Arguments passed on to the HttpSession object. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) :return a generator which, when iterated, yields CloudActivities objects returned by the Lacework API. """ - params, kwargs = self._build_params(start_time=start_time, - end_time=end_time, - **kwargs) + params = self.build_dict_from_items( + request_params, + start_time=start_time, + end_time=end_time + ) - for item in self._session.get_items(self._build_url(), params=params, **kwargs): + for item in self._session.get_items(self.build_url(), params=params): yield item def get_pages(self, @@ -146,17 +152,20 @@ def get_items(self, def search(self, json=None, - **kwargs): + query_data=None): """ A method to search CloudActivities objects. :param json: A dictionary containing the necessary search parameters. (timeFilter, filters, returns) - :param kwargs: Arguments passed on to the HttpSession object. :return response json """ - response = self._session.post(self._build_url(resource="search"), json=json, **kwargs) + # TODO: Remove this on v1.0 release - provided for back compat + if query_data: + json = query_data + + response = self._session.post(self.build_url(resource="search"), json=json) return response.json() diff --git a/laceworksdk/http_session.py b/laceworksdk/http_session.py index ef32802..c9cc557 100644 --- a/laceworksdk/http_session.py +++ b/laceworksdk/http_session.py @@ -233,18 +233,6 @@ def _request(self, method, uri, **kwargs): kwargs["json"] = data kwargs.pop("data") - # TODO: Remove this on v1.0 release - this is done for back compat - param = kwargs.pop("param", None) - if param: - logger.warning("The 'param' variable will be deprecated, please use 'params' moving forward.") - kwargs["params"] = param - - # TODO: Remove this on v1.0 release - this is done for back compat - query_data = kwargs.pop("query_data", None) - if query_data: - logger.warning("The 'query_data' variable will be deprecated, please use 'json' moving forward.") - kwargs["json"] = query_data - # Make the HTTP request to the API endpoint response = self._session.request(method, uri, headers=headers, **kwargs) diff --git a/tests/api/v2/test_alert_channels.py b/tests/api/v2/test_alert_channels.py index e9445de..d76cbaa 100644 --- a/tests/api/v2/test_alert_channels.py +++ b/tests/api/v2/test_alert_channels.py @@ -56,16 +56,16 @@ def test_alert_channels_api_create(api): def test_alert_channels_api_create_org(api): + api.set_org_level_access(True) response = api.alert_channels.create( name=f"Slack Test Org {RANDOM_TEXT}", type="SlackChannel", enabled=1, data={ "slackUrl": f"https://hooks.slack.com/services/TEST/WEBHOOK/{RANDOM_TEXT}" - }, - org=True + } ) - + api.set_org_level_access(False) assert "data" in response.keys() global INTEGRATION_GUID_ORG @@ -121,7 +121,7 @@ def test_alert_channels_api_update(api): assert INTEGRATION_GUID is not None if INTEGRATION_GUID: new_name = f"Slack Test {RANDOM_TEXT} Updated" - new_enabled = 0 + new_enabled = False response = api.alert_channels.update( INTEGRATION_GUID, @@ -144,5 +144,7 @@ def test_alert_channels_api_delete(api): def test_alert_channels_api_delete_org(api): assert INTEGRATION_GUID_ORG is not None if INTEGRATION_GUID_ORG: - response = api.alert_channels.delete(INTEGRATION_GUID_ORG, org=True) + api.set_org_level_access(True) + response = api.alert_channels.delete(INTEGRATION_GUID_ORG) + api.set_org_level_access(False) assert response.status_code == 204