Skip to content

Commit

Permalink
Added risk events support
Browse files Browse the repository at this point in the history
  • Loading branch information
BobDickinson authored and thehappydinoa committed Dec 15, 2023
1 parent eed29bc commit 7da93dd
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 0 deletions.
40 changes: 40 additions & 0 deletions censys/asm/risks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,49 @@ class Risks(CensysAsmAPI):
"""Risks API class."""

base_path = "/v2/risk"
risk_events_path = f"{base_path}-events"
risk_instances_path = f"{base_path}-instances"
risk_types_path = f"{base_path}-types"

def get_risk_events(
self,
start: Optional[str] = None,
end: Optional[str] = None,
after_id: Optional[int] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
accept: Optional[str] = None,
) -> dict:
"""Retrieve risk events.
Args:
start (str): Optional; Starting event time, inclusive (in RFC3339 format).
end (str): Optional; Ending event time, inclusive (in RFC3339 format).
after_id (int): Optional; Risk event ID to query for events after.
limit (int): Optional; Max number of events to return.
cursor (str): Optional; Cursor value to continue collecting events started in a previous request.
accept (str): Optional; Accept header.
Returns:
dict: Risk events result.
"""
args = {}
if start:
args["start"] = start
if end:
args["end"] = end
if after_id:
args["afterID"] = after_id
if limit:
args["limit"] = limit
if cursor:
args["cursor"] = cursor
return self._get(
self.risk_events_path,
args=args,
headers={"Accept": accept} if accept else None,
)

def get_risk_instances(
self, include_events: Optional[bool] = None, accept: Optional[str] = None
) -> dict:
Expand Down
4 changes: 4 additions & 0 deletions docs/usage-asm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ Below we show an example of **getting risk instances**.
r = Risks()
# Get risk events
risk_events = r.get_risk_events()
print(risk_events)
# Get a dict that returns all risk instances
risk_instances = r.get_risk_instances()
print(risk_instances)
Expand Down
36 changes: 36 additions & 0 deletions tests/asm/test_risks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
"srcID": "string",
"ts": "2022-02-15T20:22:30.262Z",
}
TEST_RISK_EVENTS_JSON = {
"total": 1,
"next": "eyJhZnRlcklEIjo3NzQwLCJsaW1pdCI6MTAwfQ==",
"events": [TEST_EVENT_JSON],
"endOfEvents": True,
}
TEST_RISK_INSTANCE_JSON = {
"id": 0,
"events": [TEST_EVENT_JSON],
Expand Down Expand Up @@ -70,6 +76,36 @@ def setUp(self):
super().setUp()
self.setUpApi(Risks(self.api_key))

@parameterized.expand(
[
({}, ""),
(
{"start": "2023-12-14T03:27:00Z", "end": "2022-03-18T09:18:54Z"},
"?start=2023-12-14T03:27:00Z&end=2022-03-18T09:18:54Z"
),
(
{"after_id": 489, "limit": 100},
"?afterID=489&limit=100"
),
(
{"cursor": "eyJhZnRlcklEIjo3NzQwLCJsaW1pdCI6MTAwfQ=="},
"?cursor=eyJhZnRlcklEIjo3NzQwLCJsaW1pdCI6MTAwfQ=="
),
]
)
def test_get_risk_events(self, kwargs, params):
# Setup response
self.responses.add(
responses.GET,
V2_URL + f"/risk-events{params}",
status=200,
json=TEST_RISK_EVENTS_JSON,
)
# Actual call
res = self.api.get_risk_events(**kwargs)
# Assertions
assert res == TEST_RISK_EVENTS_JSON

@parameterized.expand(
[
({}, ""),
Expand Down

0 comments on commit 7da93dd

Please sign in to comment.