Skip to content

Commit

Permalink
Adding support for Vulnerability Policies API endpoint (#66)
Browse files Browse the repository at this point in the history
* feat: adding vulnerability policies support

* fix: simple UI fix in debug messaging

* tests: marking entire Alerts class as flaky until release

* tests: adding simple tests for Evidence endpoint
  • Loading branch information
alannix-lw committed Apr 5, 2022
1 parent 27fec08 commit 05c0b7e
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 5 deletions.
2 changes: 2 additions & 0 deletions laceworksdk/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .v2.user_profile import UserProfileAPI
from .v2.vulnerabilities import VulnerabilitiesAPI
from .v2.vulnerability_exceptions import VulnerabilityExceptionsAPI
from .v2.vulnerability_policies import VulnerabilityPoliciesAPI

from laceworksdk.config import (
LACEWORK_ACCOUNT_ENVIRONMENT_VARIABLE,
Expand Down Expand Up @@ -167,6 +168,7 @@ def __init__(self,
self.user_profile = UserProfileAPI(self._session)
self.vulnerabilities = VulnerabilitiesAPI(self._session)
self.vulnerability_exceptions = VulnerabilityExceptionsAPI(self._session)
self.vulnerability_policies = VulnerabilityPoliciesAPI(self._session)

def set_org_level_access(self, org_level_access):
"""
Expand Down
182 changes: 182 additions & 0 deletions laceworksdk/api/v2/vulnerability_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-
"""
Lacework VulnerabilityPolicies API wrapper.
"""

from laceworksdk.api.crud_endpoint import CrudEndpoint


class VulnerabilityPoliciesAPI(CrudEndpoint):

def __init__(self, session):
"""
Initializes the VulnerabilityPoliciesAPI object.
:param session: An instance of the HttpSession class
:return VulnerabilityPoliciesAPI object.
"""

super().__init__(session, "VulnerabilityPolicies")

def create(self,
policy_type,
policy_name,
severity,
state,
filter,
props,
policy_eval_type=None,
fail_on_violation=False,
alert_on_violation=False,
**request_params):
"""
A method to create a new VulnerabilityPolicies object.
:param policy_type: A string representing the type of the policy.
:param policy_name: A string representing the name of the policy.
:param severity: A string representing the severity of the policy.
("Info", "Low", "Medium", "High", "Critical")
:param state: A boolean representing the state of the policy.
:param filter:
obj:
:param rule: An object representing a policy filter rule.
obj:
:param operator: A string representing the rule operator.
("include", "exclude", "equals", "notEquals")
:param values: An array of strings representing the rule values.
:param exception: An object representing a policy filter exception.
obj:
:param operator: A string representing the rule operator.
("include", "exclude", "equals", "notEquals")
:param values: An array of strings representing the exception values.
:param props: An object containing properties of the policy.
obj:
:param description: A string representing the property description.
:param createdBy: A string representing the creator of the property.
:param updatedBy: A string representing the updater of the property.
:param policy_eval_type: A string representing the policy evaluation type.
:param fail_on_violation: A boolean representing whether the policy should fail on violations.
:param alert_on_violation: A boolean representing whether the policy should alert on violations.
:param request_params: Additional request parameters.
(provides support for parameters that may be added in the future)
:return response json
"""

return super().create(
policy_type=policy_type,
policy_name=policy_name,
severity=severity,
state=int(bool(state)),
filter=filter,
props=props,
policy_eval_type=policy_eval_type,
fail_on_violation=int(bool(fail_on_violation)),
alert_on_violation=int(bool(alert_on_violation)),
**request_params
)

def get(self,
guid=None):
"""
A method to get VulnerabilityPolicies objects.
:param guid: A string representing the object GUID.
:return response json
"""

return super().get(id=guid)

def get_by_guid(self,
guid):
"""
A method to get a VulnerabilityPolicies object by GUID.
:param guid: A string representing the object GUID.
:return response json
"""

return self.get(guid=guid)

def update(self,
guid,
policy_type=None,
policy_name=None,
severity=None,
state=None,
filter=None,
props=None,
policy_eval_type=None,
fail_on_violation=None,
alert_on_violation=None,
**request_params):
"""
A method to update a VulnerabilityPolicies object.
:param guid: A string representing the object GUID.
:param policy_type: A string representing the type of the policy.
:param policy_name: A string representing the name of the policy.
:param severity: A string representing the severity of the policy.
("Info", "Low", "Medium", "High", "Critical")
:param state: A boolean representing the state of the policy.
:param filter:
obj:
:param rule: An object representing a policy filter rule.
obj:
:param operator: A string representing the rule operator.
("include", "exclude", "equals", "notEquals")
:param values: An array of strings representing the rule values.
:param exception: An object representing a policy filter exception.
obj:
:param operator: A string representing the rule operator.
("include", "exclude", "equals", "notEquals")
:param values: An array of strings representing the exception values.
:param props: An object containing properties of the policy.
obj:
:param description: A string representing the property description.
:param createdBy: A string representing the creator of the property.
:param updatedBy: A string representing the updater of the property.
:param policy_eval_type: A string representing the policy evaluation type.
:param fail_on_violation: A boolean representing whether the policy should fail on violations.
:param alert_on_violation: A boolean representing whether the policy should alert on violations.
:param request_params: Additional request parameters.
(provides support for parameters that may be added in the future)
:return response json
"""

if state is not None:
state = int(bool(state))
if fail_on_violation is not None:
fail_on_violation = int(bool(fail_on_violation))
if alert_on_violation is not None:
alert_on_violation = int(bool(alert_on_violation))

return super().update(
guid,
policy_type=policy_type,
policy_name=policy_name,
severity=severity,
state=state,
filter=filter,
props=props,
policy_eval_type=policy_eval_type,
fail_on_violation=fail_on_violation,
alert_on_violation=alert_on_violation,
**request_params
)

def delete(self,
guid):
"""
A method to delete a VulnerabilityPolicies object.
:param guid: A string representing the object GUID.
:return response json
"""

return super().delete(id=guid)
2 changes: 1 addition & 1 deletion laceworksdk/http_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def _request(self, method, uri, **kwargs):
data = kwargs.get("data", "")
json = kwargs.get("json", "")
if data or json:
logger.debug(f"{method} request data:\n{data}{json}")
logger.debug(f"{method} request data:\nData: {data}\nJSON: {json}")

# TODO: Remove this on v1.0 release - this is done for back compat
if data and not json:
Expand Down
5 changes: 1 addition & 4 deletions tests/api/v2/test_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,28 @@ def api_object(api):
return api.alerts


@pytest.mark.flaky_test
class TestAlerts(ReadEndpoint):

OBJECT_ID_NAME = "alertId"
OBJECT_TYPE = AlertsAPI

@pytest.mark.flaky_test
def test_get_by_date(self, api_object):
start_time, end_time = self._get_start_end_times()
response = api_object.get(start_time=start_time, end_time=end_time)
assert "data" in response.keys()

@pytest.mark.flaky_test
def test_get_by_date_camelcase(self, api_object):
start_time, end_time = self._get_start_end_times()
response = api_object.get(startTime=start_time, endTime=end_time)
assert "data" in response.keys()

@pytest.mark.flaky_test
def test_get_duplicate_key(self, api_object):
start_time, end_time = self._get_start_end_times()
tester = TestCase()
with tester.assertRaises(KeyError):
api_object.get(start_time=start_time, startTime=start_time, endTime=end_time)

@pytest.mark.flaky_test
def test_get_details(self, api_object):
guid = self._get_random_object(api_object, self.OBJECT_ID_NAME)
response = api_object.get_details(guid)
Expand Down
23 changes: 23 additions & 0 deletions tests/api/v2/test_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
"""
Test suite for the community-developed Python SDK for interacting with Lacework APIs.
"""

import pytest

from laceworksdk.api.v2.evidence import (
EvidenceAPI
)
from tests.api.test_search_endpoint import SearchEndpoint

# Tests


@pytest.fixture(scope="module")
def api_object(api):
return api.evidence


class TestEvidenceEndpoint(SearchEndpoint):

OBJECT_TYPE = EvidenceAPI
58 changes: 58 additions & 0 deletions tests/api/v2/test_vulnerability_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
"""
Test suite for the community-developed Python SDK for interacting with Lacework APIs.
"""

from ast import operator
import pytest

from laceworksdk.api.v2.vulnerability_policies import VulnerabilityPoliciesAPI
from tests.api.test_crud_endpoint import CrudEndpoint


# Tests

@pytest.fixture(scope="module")
def api_object(api):
return api.vulnerability_policies


@pytest.fixture(scope="module")
def api_object_create_body(random_text):
return {
"policy_type": "DockerFile",
"policy_name": f"Test Container Policy {random_text}",
"severity": "High",
"state": True,
"filter": {
"rule": {
"operator": "include",
"values": [
"test"
]
}
},
"props": {
"description": f"Test Container Policy Description {random_text}"
}
}


@pytest.fixture(scope="module")
def api_object_update_body(random_text):
return {
"policy_name": f"Test Container Exception {random_text} (Updated)",
"severity": "Medium",
"props": {
"description": f"Test Container Exception Description {random_text} (Updated)"
}
}


class TestVulnerabilityExceptions(CrudEndpoint):

OBJECT_ID_NAME = "policyGuid"
OBJECT_TYPE = VulnerabilityPoliciesAPI

def test_api_get_by_guid(self, api_object):
self._get_object_classifier_test(api_object, "guid", self.OBJECT_ID_NAME)

0 comments on commit 05c0b7e

Please sign in to comment.