-
Notifications
You must be signed in to change notification settings - Fork 1
/
sdm_service.py
147 lines (131 loc) · 5.76 KB
/
sdm_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copied from: https://github.com/strongdm/accessbot/blob/main/plugins/sdm/lib/service/sdm_service.py
import strongdm
import os
def create_sdm_service(api_access_key, api_secret_key, log):
client = strongdm.Client(
api_access_key,
api_secret_key,
host=os.getenv("SERVER_HOST"),
insecure=True
)
return SdmService(client, log)
class NotFoundException(Exception):
pass
class SdmService:
def __init__(self, client, log):
self.__client = client
self.__log = log
def get_resource_by_name(self, name):
"""
Return a SDM resouce by name
"""
try:
self.__log.debug("##SDM## SdmService.get_resource_by_name name: %s", name)
sdm_resources = list(self.__client.resources.list('name:"{}"'.format(name)))
except Exception as ex:
raise Exception("List resources failed: " + str(ex)) from ex
if len(sdm_resources) == 0:
raise NotFoundException("Sorry, cannot find that resource!")
return sdm_resources[0]
def get_account_by_email(self, email):
"""
Return a SDM account by email
"""
try:
self.__log.debug("##SDM## SdmService.get_account_by_email email: %s", email)
sdm_accounts = list(self.__client.accounts.list('email:{}'.format(email)))
except Exception as ex:
raise Exception("List accounts failed: " + str(ex)) from ex
if len(sdm_accounts) == 0:
raise Exception("Sorry, cannot find your account!")
return sdm_accounts[0]
def account_grant_exists(self, resource_id, account_id):
"""
Does an account grant exists - resource assigned to an account
"""
try:
self.__log.debug("##SDM## SdmService.account_grant_exists resource_id: %s account_id: %s", resource_id, account_id)
account_grants = list(self.__client.account_grants.list(f"resource_id:{resource_id},account_id:{account_id}"))
return len(account_grants) > 0
except Exception as ex:
raise Exception("Account grant exists failed: " + str(ex)) from ex
def role_grant_exists(self, resource_id, account_id):
"""
Does a role grant exists - resource assigned to a role that is assigned to an account
account -> account_attachment -> role -> role_grant -> resource
"""
try:
self.__log.debug("##SDM## SdmService.role_grant_exists resource_id: %s account_id: %s", resource_id, account_id)
for aa in list(self.__client.account_attachments.list(f"account_id:{account_id}")):
role = self.__client.roles.get(aa.role_id).role
for rg in list(self.__client.role_grants.list(f"role_id:{role.id}")):
if rg.resource_id == resource_id:
return True
return False
except Exception as ex:
raise Exception("Role grant exists failed: " + str(ex)) from ex
def grant_temporary_access(self, resource_id, account_id, start_from, valid_until):
"""
Grant temporary access to a SDM resource for an account
"""
try:
self.__log.debug(
"##SDM## SdmService.grant_temporary_access resource_id: %s account_id: %s start_from: %s valid_until: %s",
resource_id, account_id, str(start_from), str(valid_until)
)
sdm_grant = strongdm.AccountGrant(
resource_id = resource_id,
account_id = account_id,
start_from = start_from,
valid_until = valid_until
)
self.__client.account_grants.create(sdm_grant)
except Exception as ex:
raise Exception("Grant failed: " + str(ex)) from ex
def get_all_resources(self, filter = ''):
"""
Return all resources
"""
self.__log.debug("##SDM## SdmService.get_all_resources")
try:
return self.remove_none_values(self.__client.resources.list(filter))
except Exception as ex:
raise Exception("List resources failed: " + str(ex)) from ex
def get_all_resources_by_role(self, role_name, filter = ''):
"""
Return all resources by role name
"""
self.__log.debug("##SDM## SdmService.get_all_resources_by_role_name role_name: %s", role_name)
try:
sdm_role = self.get_role_by_name(role_name)
sdm_role_grants = list(self.__client.role_grants.list(f"role_id:{sdm_role.id}"))
resources_filter = ",".join([f"id:{rg.resource_id}" for rg in sdm_role_grants])
if filter:
resources_filter += f",{filter}"
return self.remove_none_values(self.__client.resources.list(resources_filter))
except Exception as ex:
raise Exception("List resources by role failed: " + str(ex)) from ex
def get_role_by_name(self, name):
"""
Return a SDM role by name
"""
try:
self.__log.debug("##SDM## SdmService.get_role_by_name name: %s", name)
sdm_roles = list(self.__client.roles.list('name:"{}"'.format(name)))
except Exception as ex:
raise Exception("List roles failed: " + str(ex)) from ex
if len(sdm_roles) == 0:
raise NotFoundException("Sorry, cannot find that role!")
return sdm_roles[0]
def get_all_roles(self):
"""
Return all roles
"""
self.__log.debug("##SDM## SdmService.get_all_roles")
try:
return list(self.__client.roles.list(''))
except Exception as ex:
raise Exception("List roles failed: " + str(ex)) from ex
@staticmethod
def remove_none_values(elements):
return [e for e in elements if e is not None]