Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add token support #16

Merged
merged 12 commits into from
Oct 21, 2023
144 changes: 100 additions & 44 deletions conda_auth/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,33 @@
from conda.models.channel import Channel

from .condarc import CondaRC, CondaRCError
from .constants import OAUTH2_NAME, HTTP_BASIC_AUTH_NAME
from .exceptions import CondaAuthError, InvalidCredentialsError
from .handlers import AuthManager, oauth2_manager, basic_auth_manager
from .exceptions import CondaAuthError
from .handlers import (
AuthManager,
basic_auth_manager,
token_auth_manager,
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .options import CustomOption

# Constants
AUTH_MANAGER_MAPPING = {
OAUTH2_NAME: oauth2_manager,
HTTP_BASIC_AUTH_NAME: basic_auth_manager,
TOKEN_NAME: token_auth_manager,
}
SUCCESSFUL_LOGIN_MESSAGE = "Successfully logged in"
SUCCESSFUL_LOGOUT_MESSAGE = "Successfully logged out"

SUCCESSFUL_LOGIN_MESSAGE = "Successfully stored credentials"

SUCCESSFUL_LOGOUT_MESSAGE = "Successfully removed credentials"

SUCCESSFUL_COLOR = "green"
MAX_LOGIN_ATTEMPTS = 3

FAILURE_COLOR = "red"

VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys())

OPTION_DEFAULT = "CONDA_AUTH_DEFAULT"


def parse_channel(ctx, param, value):
Expand All @@ -28,28 +43,48 @@ def parse_channel(ctx, param, value):
return Channel(value)


def get_auth_manager(options) -> tuple[str, AuthManager]:
class ExtraContext:
"""
Used to provide more information about the running environment
"""

def __init__(self):
self.used_options = set()


def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]:
"""
Based on CLI options provided, return the correct auth manager to use.
"""
auth_type = options.get("type") or options.get("auth")
auth_type = options.get("auth")

if auth_type is not None:
auth_manager = AUTH_MANAGER_MAPPING.get(auth_type)
if auth_manager is None:
raise CondaAuthError(
f'Invalid authentication type. Valid types are: "{HTTP_BASIC_AUTH_NAME}"'
f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"'
)
return auth_type, auth_manager

# we use http basic auth when username or password are present
elif options.get("username") is not None or options.get("password") is not None:
# we use http basic auth when "username" or "password" are present
if "basic" in extra_context.used_options:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME

# default authentication handler
# we use token auth when "token" is present
elif "token" in extra_context.used_options:
auth_manager = token_auth_manager
auth_type = TOKEN_NAME

# raise error if authentication type not found
else:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME
raise CondaAuthError(
click.style(
"Please specify an authentication type to use"
" with either the `--basic` or `--token` options.",
fg=FAILURE_COLOR,
)
)

return auth_type, auth_manager

Expand All @@ -64,10 +99,12 @@ def get_channel_settings(channel: str) -> MutableMapping[str, str] | None:


@click.group("auth")
def group():
@click.pass_context
def group(ctx):
"""
Commands for handling authentication within conda
"""
ctx.obj = ExtraContext()


def auth_wrapper(args):
Expand All @@ -76,58 +113,77 @@ def auth_wrapper(args):


@group.command("login")
@click.option("-u", "--username", help="Username to use for HTTP Basic Authentication")
@click.option("-p", "--password", help="Password to use for HTTP Basic Authentication")
@click.argument("channel", callback=parse_channel)
@click.option(
"-u",
"--username",
help="Username to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
mutually_exclusive=("token",),
prompt_when="basic",
)
@click.option(
"-p",
"--password",
help="Password to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
hide_input=True,
mutually_exclusive=("token",),
prompt_when="basic",
)
@click.option(
"-t",
"--type",
help='Manually specify the type of authentication to use. Choices are: "http-basic"',
"--token",
help="Token to use for private channels using an API token",
prompt=True,
prompt_required=False,
cls=CustomOption,
mutually_exclusive=("username", "password"),
)
@click.argument("channel", callback=parse_channel)
def login(channel: Channel, **kwargs):
@click.option(
"-b",
"--basic",
is_flag=True,
cls=CustomOption,
help="Save login credentials as HTTP basic authentication",
)
@click.pass_obj
def login(extra_context: ExtraContext, channel: Channel, **kwargs):
"""
Login to a channel
Log in to a channel by storing the credentials or tokens associated with it
"""
kwargs = {key: val for key, val in kwargs.items() if val is not None}
settings = get_channel_settings(channel.canonical_name) or {}
settings.update(kwargs)

auth_type, auth_manager = get_auth_manager(settings)
attempts = 0

while True:
try:
username = auth_manager.authenticate(channel, settings)
break
except InvalidCredentialsError as exc:
auth_manager.remove_channel_cache(channel.canonical_name)
attempts += 1
if attempts >= MAX_LOGIN_ATTEMPTS:
raise CondaAuthError(f"Max attempts reached; {exc}")
settings = {key: val for key, val in kwargs.items() if val is not None}

auth_type, auth_manager = get_auth_manager(settings, extra_context)
username: str | None = auth_manager.store(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR))

try:
condarc = CondaRC()
condarc.update_channel_settings(channel.canonical_name, username, auth_type)
if auth_type == TOKEN_NAME:
username = None
condarc.update_channel_settings(channel.canonical_name, auth_type, username)
condarc.save()
except CondaRCError as exc:
raise CondaAuthError(str(exc))


@group.command("logout")
@click.argument("channel", callback=parse_channel)
def logout(channel: Channel):
@click.pass_obj
def logout(extra_context: ExtraContext, channel: Channel):
"""
Logout of a channel
Log out of a channel by removing any credentials or tokens associated with it.
"""
settings = get_channel_settings(channel.canonical_name)

if settings is None:
raise CondaAuthError("Unable to find information about logged in session.")

settings["type"] = settings["auth"]
auth_type, auth_manager = get_auth_manager(settings)
auth_type, auth_manager = get_auth_manager(settings, extra_context)
auth_manager.remove_secret(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR))
13 changes: 11 additions & 2 deletions conda_auth/condarc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@
except YAMLError as exc:
raise CondaRCError(f"Could not parse condarc: {exc}")

def update_channel_settings(self, channel: str, username: str, auth_type: str):
def update_channel_settings(
self, channel: str, auth_type: str, username: str | None = None
):
"""
Update the condarc file's "channel_settings" section
"""
updated_settings = {"channel": channel, "auth": auth_type, "username": username}
if username is None:
updated_settings = {"channel": channel, "auth": auth_type}

Check warning on line 42 in conda_auth/condarc.py

View check run for this annotation

Codecov / codecov/patch

conda_auth/condarc.py#L42

Added line #L42 was not covered by tests
else:
updated_settings = {
"channel": channel,
"auth": auth_type,
"username": username,
}

channel_settings = self.loaded_yaml.get("channel_settings", []) or []

Expand Down
7 changes: 0 additions & 7 deletions conda_auth/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,4 @@

PLUGIN_NAME = "conda-auth"

# move to the handlers module
OAUTH2_NAME = "oauth2"

# move to the handlers module
HTTP_BASIC_AUTH_NAME = "http-basic"

# Error messages
LOGOUT_ERROR_MESSAGE = "Unable to logout."
12 changes: 7 additions & 5 deletions conda_auth/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# flake8: noqa: F401
from .base import AuthManager
from .oauth2 import (
OAuth2Manager,
OAuth2Handler,
manager as oauth2_manager,
)
from .basic_auth import (
BasicAuthManager,
BasicAuthHandler,
manager as basic_auth_manager,
HTTP_BASIC_AUTH_NAME,
)
from .token import (
TokenAuthManager,
TokenAuthHandler,
manager as token_auth_manager,
TOKEN_NAME,
)
67 changes: 18 additions & 49 deletions conda_auth/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,24 @@

import conda.base.context
import keyring
import requests
from conda.gateways.connection.session import CondaSession
from conda.models.channel import Channel

from ..exceptions import InvalidCredentialsError

INVALID_CREDENTIALS_ERROR_MESSAGE = "Provided credentials are not correct."
from conda.base.context import context as global_context


class AuthManager(ABC):
"""
Defines an interface for auth handlers to use within plugin
"""

def __init__(self, context: conda.base.context.Context, cache: dict | None = None):
def __init__(
self,
context: conda.base.context.Context | None = None,
cache: dict | None = None,
):
"""
Optionally set a cache to use and configuration parameters to retrieve from
``conda.base.context.context.channel_settings``.
Optionally set a cache and context object to use
"""
self._context = context
self._context = context or global_context
self._cache = {} if cache is None else cache

def hook_action(self, command: str) -> None:
Expand All @@ -38,9 +36,9 @@
channel.canonical_name in self._context.channels
and settings.get("auth") == self.get_auth_type()
):
self.authenticate(channel, settings)
self.store(channel, settings)

def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str:
def store(self, channel: Channel, settings: Mapping[str, str]) -> str:
"""
Used to retrieve credentials and store them on the ``cache`` property

Expand All @@ -52,7 +50,6 @@
}
username, secret = self.fetch_secret(channel, extra_params)

verify_credentials(channel, self.get_auth_class())
self.save_credentials(channel, username, secret)

return username
Expand All @@ -64,9 +61,7 @@
TODO: Method may be expanded in the future to allow the use of other storage
mechanisms.
"""
keyring.set_password(
self.get_keyring_id(channel.canonical_name), username, secret
)
keyring.set_password(self.get_keyring_id(channel), username, secret)

def fetch_secret(
self, channel: Channel, settings: Mapping[str, str | None]
Expand All @@ -93,14 +88,14 @@

return secrets

def remove_channel_cache(self, channel_name: str) -> None:
def cache_clear(self, channel_name: str | None = None) -> None:
"""
Removes the cached secret for the given channel name
Remove the internal cache for the manager object
"""
try:
del self._cache[channel_name]
except KeyError:
pass
if channel_name:
self._cache.pop(channel_name, None)

Check warning on line 96 in conda_auth/handlers/base.py

View check run for this annotation

Codecov / codecov/patch

conda_auth/handlers/base.py#L96

Added line #L96 was not covered by tests
else:
self._cache.clear()

@abstractmethod
def _fetch_secret(
Expand All @@ -127,7 +122,7 @@
"""

@abstractmethod
def get_keyring_id(self, channel_name: str) -> str:
def get_keyring_id(self, channel: Channel) -> str:
"""
Implementation should return the keyring id that will be used by the manager classes
"""
Expand All @@ -138,29 +133,3 @@
Returns the authentication class to use (requests.auth.AuthBase subclass) for the given
authentication manager
"""


def verify_credentials(channel: Channel, auth_cls: type) -> None:
"""
Verify the credentials that have been currently set for the channel.

Raises exception if unable to make a successful request.

TODO:
We need a better way to tell if the credentials work. We might need
to fetch (or perform a HEAD request) on something specific like
repodata.json.
"""
for url in channel.base_urls:
session = CondaSession(auth=auth_cls(channel.canonical_name))
resp = session.head(url, allow_redirects=False)

try:
resp.raise_for_status()
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == requests.codes["unauthorized"]:
error_message = INVALID_CREDENTIALS_ERROR_MESSAGE
else:
error_message = str(exc)

raise InvalidCredentialsError(error_message)
Loading
Loading