Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Cherry-Pick Merge: Add validation for duplicated access requests (#252)
Browse files Browse the repository at this point in the history
Co-authored-by: vassalo <[email protected]>
  • Loading branch information
wallrony and vassalo committed Aug 19, 2022
1 parent bebcaeb commit 38268e5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 0 deletions.
8 changes: 8 additions & 0 deletions e2e/slack/test_accessbot_slack_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ def test_access_command_grant_with_strange_casing(self, mocked_testbot):
assert "access request" in mocked_testbot.pop_message()
assert "Granting" in mocked_testbot.pop_message()

def test_access_command_fails_when_duplicating_request(self, mocked_testbot):
mocked_testbot.push_message(f"access to {resource_name}")
mocked_testbot.push_message(f"access to {resource_name}")
assert "valid request" in mocked_testbot.pop_message()
assert "access request" in mocked_testbot.pop_message()
assert "already have a pending grant request" in mocked_testbot.pop_message()


class Test_access_flow_from_access_form(ErrBotExtraTestSettings):
@pytest.fixture
def mocked_testbot(self, testbot):
Expand Down
6 changes: 6 additions & 0 deletions plugins/sdm/accessbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def access_resource(self, message, match):
try:
self.get_arguments_helper().check_required_flags(flags_validators.keys(), self.config['REQUIRED_FLAGS'], flags)
self.check_requester_flag(message, flags.get('requester'))
self.__grant_requests_helper.check_request_already_exists(resource_name, GrantRequestType.ACCESS_RESOURCE, message.frm.person)
except Exception as e:
yield str(e)
return
Expand All @@ -198,6 +199,11 @@ def assign_role(self, message, match):
return
self.__metrics_helper.increment_access_requests()
role_name = re.sub(ASSIGN_ROLE_REGEX, "\\1", match.string.replace("*", ""), flags=re.IGNORECASE)
try:
self.__grant_requests_helper.check_request_already_exists(role_name, GrantRequestType.ASSIGN_ROLE, message.frm.person)
except Exception as e:
yield str(e)
return
yield from self.get_role_grant_helper().request_access(message, role_name)
self.__metrics_helper.reset_consecutive_errors()

Expand Down
8 changes: 8 additions & 0 deletions plugins/sdm/lib/helper/grant_request_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ def remove(self, request_id: str):
self.__grant_requests.pop(request_id, None)
self.save_state()

def check_request_already_exists(self, sdm_object_name: str, grant_request_type: GrantRequestType, user: str):
for grant_request in self.__grant_requests.values():
if grant_request["type"] == grant_request_type.value and grant_request["message"].frm.person == user \
and grant_request["sdm_object"].name.lower() == sdm_object_name.lower():
obj_type_name = "resource" if grant_request_type == GrantRequestType.ACCESS_RESOURCE else "role"
raise Exception(
f"You already have a pending grant request for that {obj_type_name}. Please, wait for an admin evaluation.")

def __sdm_model_to_dict(self, object):
return object if type(object) is dict else object.to_dict()

Expand Down
17 changes: 17 additions & 0 deletions plugins/sdm/lib/helper/test_grant_request_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ def test_restore_state_when_has_stored_requests(self):
assert helper.get(request_id) is None
assert not helper.exists(request_id)

def test_raise_exception_when_add_duplicated_request(self):
bot = get_mocked_bot()
with patch("builtins.open", mock_open(read_data=mocked_file_data)) as handle:
helper = GrantRequestHelper(bot)
assert helper.get(request_id) is not None
assert helper.exists(request_id)
assert len(helper.get_request_ids()) == 1
file = handle()
file.read.assert_called_once()
with pytest.raises(Exception) as e:
helper.add(request_id, get_mocked_message(), get_mock_sdm_object(), get_mock_sdm_account(),
GrantRequestType.ACCESS_RESOURCE)
assert "already have a pending grant request" in e.value
helper.remove(request_id)
assert helper.get(request_id) is None
assert not helper.exists(request_id)

def test_dont_restore_state_when_has_stored_requests_and_is_disabled(self):
with patch("os.path.isfile") as mock_isfile:
mock_isfile.side_effect = [True]
Expand Down

0 comments on commit 38268e5

Please sign in to comment.