diff --git a/laceworksdk/api/__init__.py b/laceworksdk/api/__init__.py index f0b5e5f..ae6e733 100644 --- a/laceworksdk/api/__init__.py +++ b/laceworksdk/api/__init__.py @@ -41,6 +41,7 @@ from .v2.policies import PoliciesAPI from .v2.queries import QueriesAPI from .v2.report_rules import ReportRulesAPI +from .v2.reports import ReportsAPI from .v2.resource_groups import ResourceGroupsAPI from .v2.schemas import SchemasAPI from .v2.team_members import TeamMembersAPI @@ -161,6 +162,7 @@ def __init__(self, self.queries = QueriesAPI(self._session) self.recommendations = RecommendationsAPI(self._session) self.report_rules = ReportRulesAPI(self._session) + self.reports = ReportsAPI(self._session) self.resource_groups = ResourceGroupsAPI(self._session) self.run_reports = RunReportsAPI(self._session) self.schemas = SchemasAPI(self._session) diff --git a/laceworksdk/api/v2/reports.py b/laceworksdk/api/v2/reports.py new file mode 100644 index 0000000..2a1f954 --- /dev/null +++ b/laceworksdk/api/v2/reports.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +""" +Lacework Reports API wrapper. +""" + +from laceworksdk.api.base_endpoint import BaseEndpoint + + +class ReportsAPI(BaseEndpoint): + + def __init__(self, session): + """ + Initializes the ReportsAPI object. + + :param session: An instance of the HttpSession class + + :return ReportsAPI object. + """ + + super().__init__(session, "Reports") + + def get(self, + primary_query_id=None, + secondary_query_id=None, + format=None, + type=None, + report_name=None, + report_type=None, + template_name=None, + latest=None, + **request_params): + """ + A method to get Reports objects. + + :param primary_query_id: The primary ID that is used to fetch the report. + (AWS Account ID or Azure Tenant ID) + :param secondary_query_id: The secondary ID that is used to fetch the report. + (GCP Project ID or Azure Subscription ID) + :param format: The format of the report. + ("csv", "html", "json", "pdf") + :param type: The type of the report. + :param report_name: The name of the report definition to use when generating the report. + :param report_type: The type of the report definition to use when generating the report. + :param template_name: The name of the template to be used for the report. + :param latest: A boolean representing whether to retreive the latest report. + :param request_params: Additional request parameters. + (provides support for parameters that may be added in the future) + + :return response json + """ + + params = self.build_dict_from_items( + primary_query_id=primary_query_id, + secondary_query_id=secondary_query_id, + format=format, + type=type, + report_name=report_name, + report_type=report_type, + template_name=template_name, + latest=latest, + **request_params + ) + + response = self._session.get(self.build_url(), params=params) + + if format == "json": + return response.json() + else: + return response.content diff --git a/tests/api/v2/test_reports.py b/tests/api/v2/test_reports.py new file mode 100644 index 0000000..5560843 --- /dev/null +++ b/tests/api/v2/test_reports.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +""" +Test suite for the community-developed Python SDK for interacting with Lacework APIs. +""" + +import random + +import pytest + +from laceworksdk.api.v2.reports import ReportsAPI +from tests.api.test_base_endpoint import BaseEndpoint + + +# Tests + +@pytest.fixture(scope="module") +def api_object(api): + return api.reports + + +@pytest.fixture(scope="module") +def aws_account(api): + cloud_accounts = api.cloud_accounts.get_by_type("AwsCfg") + + if len(cloud_accounts["data"]): + aws_role = random.choice(cloud_accounts["data"])["data"]["crossAccountCredentials"]["roleArn"] + aws_account = aws_role.split(":")[4] + return aws_account + + +class TestReports(BaseEndpoint): + + OBJECT_TYPE = ReportsAPI + + def test_api_get_aws_soc2_json(self, api_object, aws_account): + if aws_account: + response = api_object.get( + primary_query_id=aws_account, + format="json", + type="COMPLIANCE", + report_type="AWS_SOC_Rev2", + template_name="DEFAULT", + latest=True + ) + assert "data" in response.keys() + + @pytest.mark.flaky_test + def test_api_get_aws_soc2_html(self, api_object, aws_account): + if aws_account: + response = api_object.get( + primary_query_id=aws_account, + format="html", + type="COMPLIANCE", + report_type="AWS_SOC_Rev2", + template_name="DEFAULT", + latest=True + ) + assert "".encode("utf-8") in response