-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add support for data export rules endpoint (#132)
* feat: add missing API endpoint Signed-off-by: Timothy MacDonald <[email protected]> * feat: Add support for data export rules endpoint Signed-off-by: Timothy MacDonald <[email protected]> --------- Signed-off-by: Timothy MacDonald <[email protected]>
- Loading branch information
Showing
7 changed files
with
196 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Lacework DataExportRules API wrapper. | ||
""" | ||
|
||
from laceworksdk.api.crud_endpoint import CrudEndpoint | ||
|
||
|
||
class DataExportRulesAPI(CrudEndpoint): | ||
|
||
def __init__(self, session): | ||
""" | ||
Initializes the DataExportRulesAPI object. | ||
:param session: An instance of the HttpSession class | ||
:return DataExportRulesAPI object. | ||
""" | ||
|
||
super().__init__(session, "DataExportRules") | ||
|
||
def create(self, | ||
type, | ||
filters, | ||
intg_guid_list, | ||
**request_params): | ||
""" | ||
A method to create a new DataExportRules object. | ||
Args: | ||
type(str): A string representing the type of rule to be added. | ||
filters(dict): A dictionary containing the name(string), description(string), enabled(bool), and | ||
profile_version(list[string]) fields. | ||
intg_guid_list(str): A list of strings representing the guids of the alert channels to use (s3 only). | ||
request_params(any): Additional request parameters. | ||
(provides support for parameters that may be added in the future) | ||
Return: | ||
response(json) | ||
""" | ||
|
||
return super().create( | ||
filters=self._format_filters(filters), | ||
type=type, | ||
intg_guid_list=intg_guid_list, | ||
**request_params | ||
) | ||
|
||
def get(self, | ||
guid=None): | ||
""" | ||
A method to get DataExportRules objects. | ||
Args: | ||
guid(str): A string representing the object GUID. | ||
Return: | ||
response(json) | ||
""" | ||
|
||
return super().get(id=guid) | ||
|
||
def get_by_guid(self, | ||
guid): | ||
""" | ||
A method to get an DataExportRules object by GUID. | ||
Args: | ||
guid(str): A string representing the object GUID. | ||
Return: | ||
response(json) | ||
""" | ||
|
||
return self.get(guid=guid) | ||
|
||
def update(self, | ||
guid, | ||
filters=None, | ||
intg_guid_list=None, | ||
type=None, | ||
**request_params): | ||
""" | ||
A method to update an existing DataExportRules object. | ||
Args: | ||
guid(str): A string representing the object GUID. | ||
type(str): A string representing the type of rule. | ||
filters(dict): A dictionary containing the name(string), description(string), enabled(bool), and | ||
profile_version(list[string]) fields. | ||
intg_guid_list(str): A list of strings representing the guids of the alert channels to use (s3 only). | ||
request_params(any): Additional request parameters. | ||
(provides support for parameters that may be added in the future) | ||
Return: | ||
response(json) | ||
""" | ||
|
||
return super().update( | ||
id=guid, | ||
filters=self._format_filters(filters), | ||
type=type, | ||
intg_guid_list=intg_guid_list, | ||
**request_params | ||
) | ||
|
||
def delete(self, | ||
guid): | ||
""" | ||
A method to delete a DataExportRules object. | ||
Args: | ||
guid(str): A string representing the object GUID. | ||
Return: | ||
response(json) | ||
""" | ||
|
||
return super().delete(id=guid) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Test suite for the community-developed Python SDK for interacting with Lacework APIs. | ||
""" | ||
|
||
import pytest | ||
|
||
from laceworksdk.api.v2.data_export_rules import DataExportRulesAPI | ||
from tests.api.test_crud_endpoint import CrudEndpoint | ||
|
||
|
||
# Tests | ||
|
||
@pytest.fixture(scope="module") | ||
def api_object(api): | ||
return api.data_export_rules | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def api_object_create_body(random_text, s3_alert_channel_guid): | ||
return { | ||
"type": "Dataexport", | ||
"filters": { | ||
"name": f"Test Data Export Rule {random_text}", | ||
"description": f"Test Data Export Rule Description {random_text}", | ||
"enabled": 1 | ||
}, | ||
"intg_guid_list": [s3_alert_channel_guid] | ||
} | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def api_object_update_body(random_text): | ||
return { | ||
"filters": { | ||
"name": f"Test Data Export Rule {random_text} (Updated)", | ||
"enabled": False | ||
} | ||
} | ||
|
||
|
||
class TestDataExportRules(CrudEndpoint): | ||
|
||
OBJECT_ID_NAME = "mcGuid" | ||
OBJECT_TYPE = DataExportRulesAPI | ||
|
||
def test_api_get_by_guid(self, api_object): | ||
self._get_object_classifier_test(api_object, "guid", self.OBJECT_ID_NAME) |