Skip to content

Commit

Permalink
[Role] az role assignment create: Do not invoke Graph API if `--ass…
Browse files Browse the repository at this point in the history
…ignee-principal-type` is provided (#19219)
  • Loading branch information
jiasli authored Aug 27, 2021
1 parent 8dbfa30 commit 3c34079
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 42 deletions.
66 changes: 29 additions & 37 deletions src/azure-cli/azure/cli/command_modules/role/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create_role_assignment(cmd, role, assignee=None, assignee_object_id=None, re
raise CLIError('usage error: --assignee STRING | --assignee-object-id GUID')

if assignee_principal_type and not assignee_object_id:
raise CLIError('usage error: --assignee-object-id GUID [--assignee-principal-type]')
raise CLIError('usage error: --assignee-object-id GUID --assignee-principal-type TYPE')

# If condition is set and condition-version is empty, condition-version defaults to "2.0".
if condition and not condition_version:
Expand All @@ -157,8 +157,18 @@ def create_role_assignment(cmd, role, assignee=None, assignee_object_id=None, re
if condition_version and not condition:
raise CLIError('usage error: When --condition-version is set, --condition must be set as well.')

object_id, principal_type = _resolve_assignee_object(cmd.cli_ctx, assignee, assignee_object_id,
assignee_principal_type)
if assignee:
object_id, principal_type = _resolve_object_id_and_type(cmd.cli_ctx, assignee, fallback_to_object_id=True)
else:
object_id = assignee_object_id
if assignee_principal_type:
# If principal type is provided, nothing to resolve, do not call Graph
principal_type = assignee_principal_type
else:
# Try best to get principal type
logger.warning('RBAC service might reject creating role assignment without --assignee-principal-type '
'in the future. Better to specify --assignee-principal-type manually.')
principal_type = _get_principal_type_from_object_id(cmd.cli_ctx, assignee_object_id)

try:
return _create_role_assignment(cmd.cli_ctx, role, object_id, resource_group_name, scope, resolve_assignee=False,
Expand Down Expand Up @@ -1766,46 +1776,24 @@ def _encode_custom_key_description(key_description):
return key_description.encode('utf-16')


def _resolve_assignee_object(cli_ctx, assignee, assignee_object_id, assignee_principal_type):
def _get_principal_type_from_object_id(cli_ctx, assignee_object_id):
client = _graph_client_factory(cli_ctx)
result = None

# resolve assignee (same as _resolve_object_id)
if assignee:
if assignee.find('@') >= 0: # looks like a user principal name
result = list(client.users.list(filter="userPrincipalName eq '{}'".format(assignee)))
if not result:
result = list(client.service_principals.list(
filter="servicePrincipalNames/any(c:c eq '{}')".format(assignee)))
if not result and is_guid(assignee): # assume an object id, let us verify it
result = _get_object_stubs(client, [assignee])

# 2+ matches should never happen, so we only check 'no match' here
if not result:
raise CLIError("Cannot find user or service principal in graph database for '{assignee}'. "
"If the assignee is an appId, make sure the corresponding service principal is created "
"with 'az ad sp create --id {assignee}'.".format(assignee=assignee))

return result[0].object_id, result[0].object_type

# try to resolve assignee object id
try:
result = _get_object_stubs(client, [assignee_object_id])
if result:
return result[0].object_id, result[0].object_type
return result[0].object_type
except CloudError:
pass

# If failed to verify assignee object id, DO NOT raise exception
# since --assignee-object-id is exposed to bypass Graph API
if not assignee_principal_type:
logger.warning('Failed to query --assignee-principal-type for %s by invoking Graph API.\n'
'RBAC server might reject creating role assignment without --assignee-principal-type '
'in the future. Better to specify --assignee-principal-type manually.', assignee_object_id)
return assignee_object_id, assignee_principal_type
logger.warning('Failed to query --assignee-principal-type for --assignee-object-id %s by invoking Graph API.',
assignee_object_id)
return None


def _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=False):
object_id, _ = _resolve_object_id_and_type(cli_ctx, assignee, fallback_to_object_id=fallback_to_object_id)
return object_id


def _resolve_object_id_and_type(cli_ctx, assignee, fallback_to_object_id=False):
client = _graph_client_factory(cli_ctx)
result = None
try:
Expand All @@ -1823,10 +1811,14 @@ def _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=False):
"If the assignee is an appId, make sure the corresponding service principal is created "
"with 'az ad sp create --id {assignee}'.".format(assignee=assignee))

return result[0].object_id
return result[0].object_id, result[0].object_type
except (CloudError, GraphErrorException):
logger.warning('Failed to query %s by invoking Graph API. '
'If you don\'t have permission to query Graph API, please '
'specify --assignee-object-id and --assignee-principal-type.', assignee)
if fallback_to_object_id and is_guid(assignee):
return assignee
logger.warning('Assuming %s as an object ID.', assignee)
return assignee, None
raise


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,20 @@ def _test_role_assignment(assignee_object_id, assignee_principal_type=None):
self.kwargs['object_id'] = assignee_object_id
self.kwargs['principal_type'] = assignee_principal_type
# test role assignment on subscription level
if assignee_principal_type:
self.cmd(
'role assignment create --assignee-object-id {object_id} --assignee-principal-type {principal_type} --role Reader -g {rg}')
else:
self.cmd('role assignment create --assignee-object-id {object_id} --role Reader -g {rg}')

with mock.patch('azure.graphrbac.operations.ObjectsOperations.get_objects_by_object_ids') \
as get_objects_by_object_ids_mock:
if assignee_principal_type:
self.cmd(
'role assignment create --assignee-object-id {object_id} '
'--assignee-principal-type {principal_type} --role Reader -g {rg}')
# Verify no graph call
get_objects_by_object_ids_mock.assert_not_called()
else:
self.cmd('role assignment create --assignee-object-id {object_id} --role Reader -g {rg}')
# Verify 1 graph call to resolve principal type
get_objects_by_object_ids_mock.assert_called_once()

self.cmd('role assignment list -g {rg}', checks=self.check("length([])", 1))
self.cmd('role assignment delete -g {rg}')
self.cmd('role assignment list -g {rg}', checks=self.check("length([])", 0))
Expand Down

0 comments on commit 3c34079

Please sign in to comment.