Skip to content

Commit

Permalink
CCA can be tested by: python -m msal
Browse files Browse the repository at this point in the history
  • Loading branch information
rayluo committed Feb 21, 2024
1 parent bf87155 commit 4f0e03d
Showing 1 changed file with 63 additions and 11 deletions.
74 changes: 63 additions & 11 deletions msal/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ def _select_options(
if raw_data and accept_nonempty_string:
return raw_data

enable_debug_log = _input_boolean("Enable MSAL Python's DEBUG log?")
logging.basicConfig(level=logging.DEBUG if enable_debug_log else logging.INFO)
try:
from dotenv import load_dotenv
load_dotenv()
logging.info("Loaded environment variables from .env file")
except ImportError:
logging.warning(
"python-dotenv is not installed. "
"You may need to set environment variables manually.")

def _input_scopes():
scopes = _select_options([
"https://graph.microsoft.com/.default",
Expand Down Expand Up @@ -100,6 +111,7 @@ def _acquire_token_silent(app):

def _acquire_token_interactive(app, scopes=None, data=None):
"""acquire_token_interactive() - User will be prompted if app opts to do select_account."""
assert isinstance(app, msal.PublicClientApplication)
scopes = scopes or _input_scopes() # Let user input scope param before less important prompt and login_hint
prompt = _select_options([
{"value": None, "description": "Unspecified. Proceed silently with a default account (if any), fallback to prompt."},
Expand Down Expand Up @@ -143,6 +155,7 @@ def _acquire_token_by_username_password(app):

def _acquire_token_by_device_flow(app):
"""acquire_token_by_device_flow() - Note that this one does not go through broker"""
assert isinstance(app, msal.PublicClientApplication)
flow = app.initiate_device_flow(scopes=_input_scopes())
print(flow["message"])
sys.stdout.flush() # Some terminal needs this to ensure the message is shown
Expand All @@ -156,6 +169,7 @@ def _acquire_token_by_device_flow(app):

def _acquire_ssh_cert_silently(app):
"""Acquire an SSH Cert silently- This typically only works with Azure CLI"""
assert isinstance(app, msal.PublicClientApplication)
account = _select_account(app)
if account:
result = app.acquire_token_silent(
Expand All @@ -170,6 +184,7 @@ def _acquire_ssh_cert_silently(app):

def _acquire_ssh_cert_interactive(app):
"""Acquire an SSH Cert interactively - This typically only works with Azure CLI"""
assert isinstance(app, msal.PublicClientApplication)
result = _acquire_token_interactive(app, scopes=_SSH_CERT_SCOPE, data=_SSH_CERT_DATA)
if result.get("token_type") != "ssh-cert":
logging.error("Unable to acquire an ssh-cert")
Expand All @@ -185,6 +200,7 @@ def _acquire_ssh_cert_interactive(app):

def _acquire_pop_token_interactive(app):
"""Acquire a POP token interactively - This typically only works with Azure CLI"""
assert isinstance(app, msal.PublicClientApplication)
POP_SCOPE = ['6256c85f-0aad-4d50-b960-e6e9b21efe35/.default'] # KAP 1P Server App Scope, obtained from https://github.com/Azure/azure-cli-extensions/pull/4468/files#diff-a47efa3186c7eb4f1176e07d0b858ead0bf4a58bfd51e448ee3607a5b4ef47f6R116
result = _acquire_token_interactive(app, scopes=POP_SCOPE, data=_POP_DATA)
print_json(result)
Expand All @@ -198,6 +214,16 @@ def _remove_account(app):
app.remove_account(account)
print('Account "{}" and/or its token(s) are signed out from MSAL Python'.format(account["username"]))

def _acquire_token_for_client(app):
"""CCA.acquire_token_for_client() - Rerun this will get same token from cache."""
assert isinstance(app, msal.ConfidentialClientApplication)
print_json(app.acquire_token_for_client(scopes=_input_scopes()))

def _remove_tokens_for_client(app):
"""CCA.remove_tokens_for_client() - Run this to evict tokens from cache."""
assert isinstance(app, msal.ConfidentialClientApplication)
app.remove_tokens_for_client()

def _exit(app):
"""Exit"""
bug_link = (
Expand Down Expand Up @@ -235,12 +261,23 @@ def _main():
{"client_id": _AZURE_CLI, "name": "Azure CLI (Correctly configured for MSA-PT)"},
{"client_id": _VISUAL_STUDIO, "name": "Visual Studio (Correctly configured for MSA-PT)"},
{"client_id": "95de633a-083e-42f5-b444-a4295d8e9314", "name": "Whiteboard Services (Non MSA-PT app. Accepts AAD & MSA accounts.)"},
{
"client_id": os.getenv("CLIENT_ID"),
"client_secret": os.getenv("CLIENT_SECRET"),
"name": "A confidential client app (CCA) whose settings are defined "
"in environment variables CLIENT_ID and CLIENT_SECRET",
},
],
option_renderer=lambda a: a["name"],
header="Impersonate this app (or you can type in the client_id of your own app)",
header="Impersonate this app "
"(or you can type in the client_id of your own public client app)",
accept_nonempty_string=True)
enable_broker = _input_boolean("Enable broker? It will error out later if your app has not registered some redirect URI")
enable_debug_log = _input_boolean("Enable MSAL Python's DEBUG log?")
is_cca = isinstance(chosen_app, dict) and "client_secret" in chosen_app
if is_cca and not (chosen_app["client_id"] and chosen_app["client_secret"]):
raise ValueError("You need to set environment variables CLIENT_ID and CLIENT_SECRET")
enable_broker = (not is_cca) and _input_boolean("Enable broker? "
"(It will error out later if your app has not registered some redirect URI)"
)
enable_pii_log = _input_boolean("Enable PII in broker's log?") if enable_broker and enable_debug_log else False
authority = _select_options([
"https://login.microsoftonline.com/common",
Expand All @@ -255,29 +292,44 @@ def _main():
instance_discovery = _input_boolean(
"You input an unusual authority which might fail the Instance Discovery. "
"Now, do you want to perform Instance Discovery on your input authority?"
) if not authority.startswith("https://login.microsoftonline.com") else None
) if authority and not authority.startswith(
"https://login.microsoftonline.com") else None
app = msal.PublicClientApplication(
chosen_app["client_id"] if isinstance(chosen_app, dict) else chosen_app,
authority=authority,
instance_discovery=instance_discovery,
enable_broker_on_windows=enable_broker,
enable_pii_log=enable_pii_log,
token_cache=global_cache,
) if not is_cca else msal.ConfidentialClientApplication(
chosen_app["client_id"],
client_credential=chosen_app["client_secret"],
authority=authority,
instance_discovery=instance_discovery,
enable_pii_log=enable_pii_log,
token_cache=global_cache,
)
if enable_debug_log:
logging.basicConfig(level=logging.DEBUG)
while True:
func = _select_options([
methods_to_be_tested = [
_acquire_token_silent,
] + ([
_acquire_token_interactive,
_acquire_token_by_username_password,
_acquire_token_by_device_flow,
_acquire_ssh_cert_silently,
_acquire_ssh_cert_interactive,
_acquire_pop_token_interactive,
] if isinstance(app, msal.PublicClientApplication) else []
) + [
_acquire_token_by_username_password,
_remove_account,
_exit,
], option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:")
] + ([
_acquire_token_for_client,
_remove_tokens_for_client,
] if isinstance(app, msal.ConfidentialClientApplication) else []
)
while True:
func = _select_options(
methods_to_be_tested + [_exit],
option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:")
try:
func(app)
except ValueError as e:
Expand Down

0 comments on commit 4f0e03d

Please sign in to comment.