Skip to content

Commit

Permalink
Merge pull request #835 from tomking2/feature/search_sharinggroup
Browse files Browse the repository at this point in the history
new: Search by sharing groups
  • Loading branch information
Rafiot authored Jun 7, 2022
2 parents cd4b5d5 + 1ac66a9 commit bb9f053
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2413,6 +2413,7 @@ def search(self, controller: str = 'events', return_format: str = 'json',
include_decay_score: Optional[bool] = None, includeDecayScore: Optional[bool] = None,
object_name: Optional[str] = None,
exclude_decayed: Optional[bool] = None,
sharinggroup: Optional[Union[int, List[int]]] = None,
pythonify: Optional[bool] = False,
**kwargs) -> Union[Dict, str, List[Union[MISPEvent, MISPAttribute, MISPObject]]]:
'''Search in the MISP instance
Expand Down Expand Up @@ -2453,6 +2454,7 @@ def search(self, controller: str = 'events', return_format: str = 'json',
:param include_correlations: [JSON Only - attribute] Include the correlations of the matching attributes.
:param object_name: [objects controller only] Search for objects with that name
:param exclude_decayed: [attributes controller only] Exclude the decayed attributes from the response
:param sharinggroup: Filter by sharing group ID(s)
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
Deprecated:
Expand Down Expand Up @@ -2553,6 +2555,8 @@ def search(self, controller: str = 'events', return_format: str = 'json',
query['includeCorrelations'] = self._make_misp_bool(include_correlations)
query['object_name'] = object_name
query['excludeDecayed'] = self._make_misp_bool(exclude_decayed)
query['sharinggroup'] = sharinggroup

url = urljoin(self.root_url, f'{controller}/restSearch')
if return_format == 'stix-xml':
response = self._prepare_request('POST', url, data=query, output_type='xml')
Expand Down
45 changes: 45 additions & 0 deletions tests/testlive_comprehensive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,51 @@ def test_sharing_group(self):
finally:
self.admin_misp_connector.delete_sharing_group(sharing_group.id)

def test_sharing_group_search(self):
# Add sharing group
sg = MISPSharingGroup()
sg.name = 'Testcases SG'
sg.releasability = 'Testing'
sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True)
# Add the org to the sharing group
self.admin_misp_connector.add_org_to_sharing_group(
sharing_group,
self.test_org, extend=True
)
# Add event
event = self.create_simple_event()
event.distribution = Distribution.sharing_group
event.sharing_group_id = sharing_group.id
# Create two attributes, one specifically for the sharing group,
# another which inherits the event's SG
event.add_attribute('ip-dst', '8.8.8.8', distribution=4, sharing_group_id=sharing_group.id)
event.add_attribute('ip-dst', '9.9.9.9')
event = self.user_misp_connector.add_event(event)
attribute_ids = {a.id for a in event.attributes}
try:
# Try to query for the event
events = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="events")
# There should be one event
self.assertTrue(len(events) == 1)
# This event should be the one we added
self.assertEqual(events[0].id, event.id)
# Make sure the search isn't just returning everything
events = self.user_misp_connector.search(sharinggroup=99999, controller="events")

self.assertTrue(len(events) == 0)

# Try to query for the attributes
attributes = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="attributes")
searched_attribute_ids = {a.id for a in attributes}
# There should be two attributes
# The extra 1 is the random UUID now created in the event
self.assertTrue(len(attributes) == 2 + 1)
# We should not be missing any of the attributes
self.assertFalse(attribute_ids.difference(searched_attribute_ids))
finally:
self.admin_misp_connector.delete_sharing_group(sharing_group.id)
self.user_misp_connector.delete_event(event.id)

def test_feeds(self):
# Add
feed = MISPFeed()
Expand Down

0 comments on commit bb9f053

Please sign in to comment.