Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AzureApplicationCredential #19403

Merged
merged 4 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
variable `AZURE_REGIONAL_AUTHORITY_NAME`. See `azure.identity.RegionalAuthority`
for possible values.
([#19301](https://github.com/Azure/azure-sdk-for-python/issues/19301))
- `AzureApplicationCredential`, a default credential chain for applications
deployed to Azure
([#19309](https://github.com/Azure/azure-sdk-for-python/issues/19309))

## 1.7.0b1 (2021-06-08)
Beginning with this release, this library requires Python 2.7 or 3.6+.
Expand Down
4 changes: 3 additions & 1 deletion sdk/identity/azure-identity/azure/identity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from ._exceptions import AuthenticationRequiredError, CredentialUnavailableError
from ._constants import AzureAuthorityHosts, KnownAuthorities
from ._credentials import (
AzureCliCredential,
AuthorizationCodeCredential,
AzureApplicationCredential,
AzureCliCredential,
AzurePowerShellCredential,
CertificateCredential,
ChainedTokenCredential,
Expand All @@ -31,6 +32,7 @@
"AuthenticationRecord",
"AuthenticationRequiredError",
"AuthorizationCodeCredential",
"AzureApplicationCredential",
"AzureAuthorityHosts",
"AzureCliCredential",
"AzurePowerShellCredential",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from .application import AzureApplicationCredential
from .authorization_code import AuthorizationCodeCredential
from .azure_powershell import AzurePowerShellCredential
from .browser import InteractiveBrowserCredential
Expand All @@ -21,6 +22,7 @@

__all__ = [
"AuthorizationCodeCredential",
"AzureApplicationCredential",
"AzureArcCredential",
"AzureCliCredential",
"AzurePowerShellCredential",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
import os
from typing import TYPE_CHECKING

from .chained import ChainedTokenCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
from .._constants import EnvironmentVariables
from .._internal import get_default_authority, normalize_authority

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any
from azure.core.credentials import AccessToken

_LOGGER = logging.getLogger(__name__)


class AzureApplicationCredential(ChainedTokenCredential):
"""A credential for Azure Active Directory applications.

This credential is designed for applications deployed to Azure (:class:`~azure.identity.DefaultAzureCredential` is
better suited to local development). It authenticates service principals and managed identities.

For service principal authentication, set these environment variables to identify a principal:

- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its "directory" ID.
- **AZURE_CLIENT_ID**: the service principal's client ID

And one of these to authenticate that principal:

- **AZURE_CLIENT_SECRET**: one of the service principal's client secrets

**or**

- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
certificate must not be password-protected.

See `Azure CLI documentation <https://docs.microsoft.com/cli/azure/create-an-azure-service-principal-azure-cli>`_
for more information about creating and managing service principals.

When this environment configuration is incomplete, the credential will attempt to authenticate a managed identity.
See `Azure Active Directory documentation
<https://docs.microsoft.com/azure/active-directory/managed-identities-azure-resources/overview>`_ for an overview of
managed identities.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com",
the authority for Azure Public Cloud, which is the default when no value is given for this keyword argument or
environment variable AZURE_AUTHORITY_HOST. :class:`~azure.identity.AzureAuthorityHosts` defines authorities for
other clouds. Authority configuration applies only to service principal authentication.
:keyword str managed_identity_client_id: The client ID of a user-assigned managed identity. Defaults to the value
of the environment variable AZURE_CLIENT_ID, if any. If not specified, a system-assigned identity will be used.
"""

def __init__(self, **kwargs):
# type: (**Any) -> None
authority = kwargs.pop("authority", None)
authority = normalize_authority(authority) if authority else get_default_authority()
super(AzureApplicationCredential, self).__init__(
EnvironmentCredential(authority=authority, **kwargs),
ManagedIdentityCredential(
client_id=kwargs.pop(
"managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID)
),
**kwargs
),
)

def get_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> AccessToken
"""Request an access token for `scopes`.

This method is called automatically by Azure SDK clients.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The exception has a
`message` attribute listing each authentication attempt and its error message.
"""
if self._successful_credential:
token = self._successful_credential.get_token(*scopes, **kwargs)
_LOGGER.info(
"%s acquired a token from %s", self.__class__.__name__, self._successful_credential.__class__.__name__
)
return token

return super(AzureApplicationCredential, self).get_token(*scopes, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
import os
from typing import TYPE_CHECKING

from .chained import ChainedTokenCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
from ..._constants import EnvironmentVariables
from ..._internal import get_default_authority, normalize_authority

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any
from azure.core.credentials import AccessToken

_LOGGER = logging.getLogger(__name__)


class AzureApplicationCredential(ChainedTokenCredential):
"""A credential for Azure Active Directory applications.

This credential is designed for applications deployed to Azure (:class:`~azure.identity.aio.DefaultAzureCredential`
is better suited to local development). It authenticates service principals and managed identities.

For service principal authentication, set these environment variables to identify a principal:

- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its "directory" ID.
- **AZURE_CLIENT_ID**: the service principal's client ID

And one of these to authenticate that principal:

- **AZURE_CLIENT_SECRET**: one of the service principal's client secrets

**or**

- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
certificate must not be password-protected.

See `Azure CLI documentation <https://docs.microsoft.com/cli/azure/create-an-azure-service-principal-azure-cli>`_
for more information about creating and managing service principals.

When this environment configuration is incomplete, the credential will attempt to authenticate a managed identity.
See `Azure Active Directory documentation
<https://docs.microsoft.com/azure/active-directory/managed-identities-azure-resources/overview>`_ for an overview of
managed identities.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com",
the authority for Azure Public Cloud, which is the default when no value is given for this keyword argument or
environment variable AZURE_AUTHORITY_HOST. :class:`~azure.identity.AzureAuthorityHosts` defines authorities for
other clouds. Authority configuration applies only to service principal authentication.
:keyword str managed_identity_client_id: The client ID of a user-assigned managed identity. Defaults to the value
of the environment variable AZURE_CLIENT_ID, if any. If not specified, a system-assigned identity will be used.
"""

def __init__(self, **kwargs: "Any") -> None:
authority = kwargs.pop("authority", None)
authority = normalize_authority(authority) if authority else get_default_authority()
super().__init__(
EnvironmentCredential(authority=authority, **kwargs),
ManagedIdentityCredential(
client_id=kwargs.pop(
"managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID)
),
**kwargs
),
)

async def get_token(self, *scopes: str, **kwargs: "Any") -> "AccessToken":
"""Asynchronously request an access token for `scopes`.

This method is called automatically by Azure SDK clients.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The exception has a
`message` attribute listing each authentication attempt and its error message.
"""
if self._successful_credential:
token = await self._successful_credential.get_token(*scopes, **kwargs)
_LOGGER.info(
"%s acquired a token from %s", self.__class__.__name__, self._successful_credential.__class__.__name__
)
return token

return await super().get_token(*scopes, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os

from azure.core.credentials import AccessToken
from azure.identity import AzureApplicationCredential, CredentialUnavailableError
from azure.identity._constants import EnvironmentVariables
import pytest
from six.moves.urllib_parse import urlparse

try:
from unittest.mock import Mock, patch
except ImportError: # python < 3.3
from mock import Mock, patch # type: ignore


def test_iterates_only_once():
"""When a credential succeeds, AzureApplicationCredential should use that credential thereafter"""

expected_token = AccessToken("***", 42)
unavailable_credential = Mock(get_token=Mock(side_effect=CredentialUnavailableError(message="...")))
successful_credential = Mock(get_token=Mock(return_value=expected_token))

credential = AzureApplicationCredential()
credential.credentials = [
unavailable_credential,
successful_credential,
Mock(get_token=Mock(side_effect=Exception("iteration didn't stop after a credential provided a token"))),
]

for n in range(3):
token = credential.get_token("scope")
assert token.token == expected_token.token
assert unavailable_credential.get_token.call_count == 1
assert successful_credential.get_token.call_count == n + 1


@pytest.mark.parametrize("authority", ("localhost", "https://localhost"))
def test_authority(authority):
"""the credential should accept authority configuration by keyword argument or environment"""

parsed_authority = urlparse(authority)
expected_netloc = parsed_authority.netloc or authority # "localhost" parses to netloc "", path "localhost"

def test_initialization(mock_credential, expect_argument):
AzureApplicationCredential(authority=authority)
assert mock_credential.call_count == 1

# N.B. if os.environ has been patched somewhere in the stack, that patch is in place here
environment = dict(os.environ, **{EnvironmentVariables.AZURE_AUTHORITY_HOST: authority})
with patch.dict(AzureApplicationCredential.__module__ + ".os.environ", environment, clear=True):
AzureApplicationCredential()
assert mock_credential.call_count == 2

for _, kwargs in mock_credential.call_args_list:
if expect_argument:
actual = urlparse(kwargs["authority"])
assert actual.scheme == "https"
assert actual.netloc == expected_netloc
else:
assert "authority" not in kwargs

# authority should be passed to EnvironmentCredential as a keyword argument
environment = {var: "foo" for var in EnvironmentVariables.CLIENT_SECRET_VARS}
with patch(AzureApplicationCredential.__module__ + ".EnvironmentCredential") as mock_credential:
with patch.dict("os.environ", environment, clear=True):
test_initialization(mock_credential, expect_argument=True)

# authority should not be passed to ManagedIdentityCredential
with patch(AzureApplicationCredential.__module__ + ".ManagedIdentityCredential") as mock_credential:
with patch.dict("os.environ", {EnvironmentVariables.MSI_ENDPOINT: "localhost"}, clear=True):
test_initialization(mock_credential, expect_argument=False)


def test_managed_identity_client_id():
"""the credential should accept a user-assigned managed identity's client ID by kwarg or environment variable"""

expected_args = {"client_id": "the-client"}

with patch(AzureApplicationCredential.__module__ + ".ManagedIdentityCredential") as mock_credential:
AzureApplicationCredential(managed_identity_client_id=expected_args["client_id"])
mock_credential.assert_called_once_with(**expected_args)

# client id can also be specified in $AZURE_CLIENT_ID
with patch.dict(os.environ, {EnvironmentVariables.AZURE_CLIENT_ID: expected_args["client_id"]}, clear=True):
with patch(AzureApplicationCredential.__module__ + ".ManagedIdentityCredential") as mock_credential:
AzureApplicationCredential()
mock_credential.assert_called_once_with(**expected_args)

# keyword argument should override environment variable
with patch.dict(
os.environ, {EnvironmentVariables.AZURE_CLIENT_ID: "not-" + expected_args["client_id"]}, clear=True
):
with patch(AzureApplicationCredential.__module__ + ".ManagedIdentityCredential") as mock_credential:
AzureApplicationCredential(managed_identity_client_id=expected_args["client_id"])
mock_credential.assert_called_once_with(**expected_args)
Loading