From df20c36f99fcaa5b1eee71c72a10fa4b5e6ba522 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Fri, 6 Oct 2023 14:45:31 +0200 Subject: [PATCH 01/11] adding support for token authentication --- conda_auth/cli.py | 72 +++++++++++++--- conda_auth/constants.py | 7 -- conda_auth/handlers/__init__.py | 13 +-- conda_auth/handlers/base.py | 10 ++- conda_auth/handlers/basic_auth.py | 13 ++- conda_auth/handlers/oauth2.py | 17 ++-- conda_auth/handlers/token.py | 135 ++++++++++++++++++++++++++++++ conda_auth/hooks.py | 21 ++++- tests/cli/test_login.py | 2 +- tests/cli/test_logout.py | 3 +- tests/handlers/test_oauth2.py | 4 +- tests/handlers/test_token.py | 62 ++++++++++++++ 12 files changed, 318 insertions(+), 41 deletions(-) create mode 100644 conda_auth/handlers/token.py create mode 100644 tests/handlers/test_token.py diff --git a/conda_auth/cli.py b/conda_auth/cli.py index 08ac3de..b17cea4 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -7,19 +7,38 @@ from conda.models.channel import Channel from .condarc import CondaRC, CondaRCError -from .constants import OAUTH2_NAME, HTTP_BASIC_AUTH_NAME from .exceptions import CondaAuthError, InvalidCredentialsError -from .handlers import AuthManager, oauth2_manager, basic_auth_manager +from .handlers import ( + AuthManager, + oauth2_manager, + basic_auth_manager, + token_auth_manager, + OAUTH2_NAME, + HTTP_BASIC_AUTH_NAME, + TOKEN_NAME, +) +# Constants AUTH_MANAGER_MAPPING = { OAUTH2_NAME: oauth2_manager, HTTP_BASIC_AUTH_NAME: basic_auth_manager, + TOKEN_NAME: token_auth_manager, } + SUCCESSFUL_LOGIN_MESSAGE = "Successfully logged in" + SUCCESSFUL_LOGOUT_MESSAGE = "Successfully logged out" + SUCCESSFUL_COLOR = "green" + +INVALID_CREDENTIALS_MESSAGE = "Invalid credentials" + +FAILURE_COLOR = "red" + MAX_LOGIN_ATTEMPTS = 3 +VALID_AUTH_CHOICES = (HTTP_BASIC_AUTH_NAME, TOKEN_NAME) + def parse_channel(ctx, param, value): """ @@ -32,20 +51,25 @@ def get_auth_manager(options) -> tuple[str, AuthManager]: """ Based on CLI options provided, return the correct auth manager to use. """ - auth_type = options.get("type") or options.get("auth") + auth_type = options.get("auth") if auth_type is not None: auth_manager = AUTH_MANAGER_MAPPING.get(auth_type) if auth_manager is None: raise CondaAuthError( - f'Invalid authentication type. Valid types are: "{HTTP_BASIC_AUTH_NAME}"' + f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"' ) - # we use http basic auth when username or password are present + # we use http basic auth when "username" or "password" are present elif options.get("username") is not None or options.get("password") is not None: auth_manager = basic_auth_manager auth_type = HTTP_BASIC_AUTH_NAME + # we use token auth when "token" is present + elif options.get("token") is not None: + auth_manager = token_auth_manager + auth_type = TOKEN_NAME + # default authentication handler else: auth_manager = basic_auth_manager @@ -76,17 +100,31 @@ def auth_wrapper(args): @group.command("login") -@click.option("-u", "--username", help="Username to use for HTTP Basic Authentication") -@click.option("-p", "--password", help="Password to use for HTTP Basic Authentication") +@click.option( + "-u", + "--username", + help="Username to use for private channels using HTTP Basic Authentication", +) +@click.option( + "-p", + "--password", + help="Password to use for private channels using HTTP Basic Authentication", +) @click.option( "-t", - "--type", - help='Manually specify the type of authentication to use. Choices are: "http-basic"', + "--token", + help="Token to use for private channels using an API token", +) +@click.option( + "-a", + "--auth", + help="Specify the authentication type you would like to use", + type=click.Choice(VALID_AUTH_CHOICES), ) @click.argument("channel", callback=parse_channel) def login(channel: Channel, **kwargs): """ - Login to a channel + Log in to a channel by storing the credentials or tokens associated with it """ kwargs = {key: val for key, val in kwargs.items() if val is not None} settings = get_channel_settings(channel.canonical_name) or {} @@ -100,10 +138,20 @@ def login(channel: Channel, **kwargs): username = auth_manager.authenticate(channel, settings) break except InvalidCredentialsError as exc: + if settings.get("username") is not None: + settings = {"username": settings["username"]} + else: + settings = {} + auth_manager.remove_channel_cache(channel.canonical_name) attempts += 1 + if attempts >= MAX_LOGIN_ATTEMPTS: - raise CondaAuthError(f"Max attempts reached; {exc}") + raise CondaAuthError( + click.style(f"Max attempts reached; {exc}", fg=FAILURE_COLOR) + ) + + click.echo(click.style(INVALID_CREDENTIALS_MESSAGE, fg=FAILURE_COLOR)) click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR)) @@ -119,7 +167,7 @@ def login(channel: Channel, **kwargs): @click.argument("channel", callback=parse_channel) def logout(channel: Channel): """ - Logout of a channel + Log out of a by removing any credentials or tokens associated with it. """ settings = get_channel_settings(channel.canonical_name) diff --git a/conda_auth/constants.py b/conda_auth/constants.py index 48291a9..f583085 100644 --- a/conda_auth/constants.py +++ b/conda_auth/constants.py @@ -4,11 +4,4 @@ PLUGIN_NAME = "conda-auth" -# move to the handlers module -OAUTH2_NAME = "oauth2" - -# move to the handlers module -HTTP_BASIC_AUTH_NAME = "http-basic" - -# Error messages LOGOUT_ERROR_MESSAGE = "Unable to logout." diff --git a/conda_auth/handlers/__init__.py b/conda_auth/handlers/__init__.py index fcdb561..d70ddec 100644 --- a/conda_auth/handlers/__init__.py +++ b/conda_auth/handlers/__init__.py @@ -1,12 +1,15 @@ # flake8: noqa: F401 from .base import AuthManager -from .oauth2 import ( - OAuth2Manager, - OAuth2Handler, - manager as oauth2_manager, -) +from .oauth2 import OAuth2Manager, OAuth2Handler, manager as oauth2_manager, OAUTH2_NAME from .basic_auth import ( BasicAuthManager, BasicAuthHandler, manager as basic_auth_manager, + HTTP_BASIC_AUTH_NAME, +) +from .token import ( + TokenAuthManager, + TokenAuthHandler, + manager as token_auth_manager, + TOKEN_NAME, ) diff --git a/conda_auth/handlers/base.py b/conda_auth/handlers/base.py index dfc9651..5cd0736 100644 --- a/conda_auth/handlers/base.py +++ b/conda_auth/handlers/base.py @@ -9,7 +9,7 @@ from conda.gateways.connection.session import CondaSession from conda.models.channel import Channel -from ..exceptions import InvalidCredentialsError +from ..exceptions import InvalidCredentialsError, CondaAuthError INVALID_CREDENTIALS_ERROR_MESSAGE = "Provided credentials are not correct." @@ -52,7 +52,7 @@ def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str: } username, secret = self.fetch_secret(channel, extra_params) - verify_credentials(channel, self.get_auth_class()) + # verify_credentials(channel, self.get_auth_class()) self.save_credentials(channel, username, secret) return username @@ -153,9 +153,9 @@ def verify_credentials(channel: Channel, auth_cls: type) -> None: """ for url in channel.base_urls: session = CondaSession(auth=auth_cls(channel.canonical_name)) - resp = session.head(url, allow_redirects=False) try: + resp = session.head(url, allow_redirects=False) resp.raise_for_status() except requests.exceptions.HTTPError as exc: if exc.response.status_code == requests.codes["unauthorized"]: @@ -164,3 +164,7 @@ def verify_credentials(channel: Channel, auth_cls: type) -> None: error_message = str(exc) raise InvalidCredentialsError(error_message) + + # Catch-all for all other requests exceptions + except requests.exceptions.RequestException as exc: + raise CondaAuthError(str(exc)) diff --git a/conda_auth/handlers/basic_auth.py b/conda_auth/handlers/basic_auth.py index 5f528c7..6f3dc49 100644 --- a/conda_auth/handlers/basic_auth.py +++ b/conda_auth/handlers/basic_auth.py @@ -14,13 +14,24 @@ from conda.models.channel import Channel from conda.plugins.types import ChannelAuthBase -from ..constants import HTTP_BASIC_AUTH_NAME, LOGOUT_ERROR_MESSAGE, PLUGIN_NAME +from ..constants import LOGOUT_ERROR_MESSAGE, PLUGIN_NAME from ..exceptions import CondaAuthError from .base import AuthManager USERNAME_PARAM_NAME = "username" +""" +Name of the configuration parameter where username information is stored +""" PASSWORD_PARAM_NAME = "password" +""" +Name of the configuration parameter where password information is stored +""" + +HTTP_BASIC_AUTH_NAME = "http-basic" +""" +Name used to refer to this authentication handler in configuration +""" class BasicAuthManager(AuthManager): diff --git a/conda_auth/handlers/oauth2.py b/conda_auth/handlers/oauth2.py index 65c3141..d63b7d2 100644 --- a/conda_auth/handlers/oauth2.py +++ b/conda_auth/handlers/oauth2.py @@ -12,17 +12,24 @@ from conda.models.channel import Channel from conda.plugins.types import ChannelAuthBase -from ..constants import OAUTH2_NAME, LOGOUT_ERROR_MESSAGE, PLUGIN_NAME +from ..constants import LOGOUT_ERROR_MESSAGE, PLUGIN_NAME from ..exceptions import CondaAuthError from .base import AuthManager LOGIN_URL_PARAM_NAME = "login_url" """ -Setting name that appears in ``context.channel_settings``; used to direct user -to correct login screen. +Setting name that appears in configuration; used to direct user to correct login screen. """ USERNAME = "token" +""" +Placeholder value for username; This is written to the secret storage backend +""" + +OAUTH2_NAME = "oauth2" +""" +Name used to refer to this authentication handler in configuration +""" class OAuth2Manager(AuthManager): @@ -87,8 +94,8 @@ def get_auth_class(self) -> type: class OAuth2Handler(ChannelAuthBase): """ - Implementation of HTTPBasicAuth that relies on a cache location for - retrieving login credentials on object instantiation. + Implementation of OAuth2 that relies on a cache location for retrieving bearer token on + object instantiation. """ def __init__(self, channel_name: str): diff --git a/conda_auth/handlers/token.py b/conda_auth/handlers/token.py new file mode 100644 index 0000000..6e2776d --- /dev/null +++ b/conda_auth/handlers/token.py @@ -0,0 +1,135 @@ +""" +Token implementation for the conda auth handler plugin hook +""" +from __future__ import annotations + +from collections.abc import Mapping + +import keyring +from keyring.errors import PasswordDeleteError +from conda.base.context import context +from conda.exceptions import CondaError +from conda.models.channel import Channel +from conda.plugins.types import ChannelAuthBase + +from ..constants import LOGOUT_ERROR_MESSAGE, PLUGIN_NAME +from ..exceptions import CondaAuthError +from .base import AuthManager + +TOKEN_PARAM_NAME = "token" +""" +Name of the configuration parameter where token information is stored +""" + +USERNAME = "token" +""" +Placeholder value for username; This is written to the secret storage backend +""" + +TOKEN_NAME = "token" +""" +Name used to refer to this authentication handler in configuration +""" + + +class TokenAuthManager(AuthManager): + def get_keyring_id(self, channel_name: str) -> str: + return f"{PLUGIN_NAME}::{TOKEN_NAME}::{channel_name}" + + def _fetch_secret( + self, channel: Channel, settings: Mapping[str, str | None] + ) -> tuple[str, str]: + """ + Gets the secrets by checking the keyring and then falling back to interrupting + the program and asking the user for secret. + """ + keyring_id = self.get_keyring_id(channel.canonical_name) + token = keyring.get_password(keyring_id, USERNAME) + + if token is None: + token = self.get_token(settings) + + return USERNAME, token + + def remove_secret( + self, channel: Channel, settings: Mapping[str, str | None] + ) -> None: + keyring_id = self.get_keyring_id(channel.canonical_name) + + try: + keyring.delete_password(keyring_id, USERNAME) + except PasswordDeleteError as exc: + raise CondaAuthError(f"{LOGOUT_ERROR_MESSAGE} {exc}") + + def get_auth_type(self) -> str: + return TOKEN_NAME + + def get_config_parameters(self) -> tuple[str, ...]: + return (TOKEN_PARAM_NAME,) + + def get_token(self, settings: Mapping[str, str | None]): + """ + Attempt to first retrieve token from settings and then prompt the user for it. + """ + token = settings.get(TOKEN_PARAM_NAME) + + if token is None: + token = self.prompt_token() + + return token + + def prompt_token(self) -> str: + """ + This can be overriden for classes that do not want to use the built-in function ``input``. + """ + return input("Token: ") + + def get_auth_class(self) -> type: + return TokenAuthHandler + + +manager = TokenAuthManager(context) + + +def is_anaconda_dot_org(channel_name: str) -> bool: + """ + Determines whether the ``channel_name`` is a https://anaconda.org channel + """ + channel = Channel(channel_name) + domain_name = "anaconda.org" + + return any(domain_name in url for url in channel.base_urls) + + +class TokenAuthHandler(ChannelAuthBase): + """ + Implements token auth that inserts a token as a header for all network request + in conda for the channel specified on object instantiation. + + We make a special exception for anaconda.org and set the Authentication header as: + + Authentication: token + + In all other cases, we use the "bearer" format: + + Authentication: Bearer + """ + + def __init__(self, channel_name: str): + _, self.token = manager.get_secret(channel_name) + self.is_anaconda_dot_org = is_anaconda_dot_org(channel_name) + + if self.token is None: + raise CondaError( + f"Unable to find authorization token for requests with channel {channel_name}" + ) + + super().__init__(channel_name) + + def __call__(self, request): + if self.is_anaconda_dot_org: + request.headers["Authorization"] = f"token {self.token}" + else: + request.headers["Authorization"] = f"Bearer {self.token}" + + return request diff --git a/conda_auth/hooks.py b/conda_auth/hooks.py index df2bfd4..5e74454 100644 --- a/conda_auth/hooks.py +++ b/conda_auth/hooks.py @@ -3,10 +3,17 @@ """ from conda.plugins import CondaAuthHandler, CondaPreCommand, CondaSubcommand, hookimpl -from .handlers import OAuth2Handler, BasicAuthHandler -from .handlers.oauth2 import manager as oauth2_manager -from .handlers.basic_auth import manager as basic_auth_manager -from .constants import OAUTH2_NAME, HTTP_BASIC_AUTH_NAME +from .handlers import ( + oauth2_manager, + basic_auth_manager, + token_auth_manager, + OAuth2Handler, + BasicAuthHandler, + TokenAuthHandler, + OAUTH2_NAME, + HTTP_BASIC_AUTH_NAME, + TOKEN_NAME, +) from .cli import auth_wrapper @@ -35,6 +42,11 @@ def conda_pre_commands(): action=oauth2_manager.hook_action, run_for={"search", "install", "update", "notices", "create", "search"}, ) + yield CondaPreCommand( + name=f"{TOKEN_NAME}-collect_token", + action=token_auth_manager.hook_action, + run_for={"search", "install", "update", "notices", "create", "search"}, + ) @hookimpl @@ -44,3 +56,4 @@ def conda_auth_handlers(): """ yield CondaAuthHandler(name=HTTP_BASIC_AUTH_NAME, handler=BasicAuthHandler) yield CondaAuthHandler(name=OAUTH2_NAME, handler=OAuth2Handler) + yield CondaAuthHandler(name=TOKEN_NAME, handler=TokenAuthHandler) diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index a624f81..1182007 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,6 +1,6 @@ from conda_auth.cli import group, SUCCESSFUL_LOGIN_MESSAGE from conda_auth.condarc import CondaRCError -from conda_auth.constants import HTTP_BASIC_AUTH_NAME +from conda_auth.handlers.basic_auth import HTTP_BASIC_AUTH_NAME from conda_auth.exceptions import CondaAuthError, InvalidCredentialsError from conda_auth.handlers.base import INVALID_CREDENTIALS_ERROR_MESSAGE diff --git a/tests/cli/test_logout.py b/tests/cli/test_logout.py index 7ab35b0..0ff252e 100644 --- a/tests/cli/test_logout.py +++ b/tests/cli/test_logout.py @@ -1,5 +1,6 @@ from conda_auth.cli import group, SUCCESSFUL_LOGOUT_MESSAGE -from conda_auth.constants import HTTP_BASIC_AUTH_NAME, PLUGIN_NAME +from conda_auth.constants import PLUGIN_NAME +from conda_auth.handlers.basic_auth import HTTP_BASIC_AUTH_NAME from conda_auth.exceptions import CondaAuthError diff --git a/tests/handlers/test_oauth2.py b/tests/handlers/test_oauth2.py index c0486c1..e879156 100644 --- a/tests/handlers/test_oauth2.py +++ b/tests/handlers/test_oauth2.py @@ -2,9 +2,9 @@ from keyring.errors import PasswordDeleteError from conda.models.channel import Channel -from conda_auth.handlers.oauth2 import USERNAME, manager +from conda_auth.handlers.oauth2 import USERNAME, manager, OAUTH2_NAME from conda_auth.exceptions import CondaAuthError -from conda_auth.constants import LOGOUT_ERROR_MESSAGE, OAUTH2_NAME +from conda_auth.constants import LOGOUT_ERROR_MESSAGE @pytest.fixture(autouse=True) diff --git a/tests/handlers/test_token.py b/tests/handlers/test_token.py new file mode 100644 index 0000000..4b990d2 --- /dev/null +++ b/tests/handlers/test_token.py @@ -0,0 +1,62 @@ +import pytest +from conda.models.channel import Channel + +from conda_auth.handlers.token import is_anaconda_dot_org, manager, USERNAME, TOKEN_NAME + + +@pytest.fixture(autouse=True) +def clean_up_manager_cache(): + """Makes sure the manager cache gets emptied after each test run""" + manager._cache = {} + + +@pytest.mark.parametrize( + "channel_name,expected", (("conda-forge", True), ("http://localhost", False)) +) +def test_is_anaconda_dot_org(channel_name, expected): + """ + Tests the ``is_anaconda_dot_org`` function + """ + assert is_anaconda_dot_org(channel_name) == expected + + +def test_token_auth_manager_no_token(mocker, session, keyring): + """ + Test to make sure when there is no token set, we are able to set a new token via the ``input`` + function. + """ + token = "token" + settings = { + "auth": TOKEN_NAME, + } + channel = Channel("tester") + + # setup mocks + input_mock = mocker.patch("conda_auth.handlers.token.input") + input_mock.return_value = token + keyring(None) + + # run code under test + manager.authenticate(channel, settings) + + # make assertions + assert manager._cache == {channel.canonical_name: (USERNAME, token)} + + +def test_token_auth_manager_with_token(session, keyring): + """ + Test to make sure when there is a token set, we are able to set a new token via the ``input`` + function. + """ + token = "token" + settings = {"auth": TOKEN_NAME, "token": token} + channel = Channel("tester") + + # setup mocks + keyring(None) + + # run code under test + manager.authenticate(channel, settings) + + # make assertions + assert manager._cache == {channel.canonical_name: (USERNAME, token)} From f065fffd14a48ef5b9ce2594a2bd14d9e235ffe0 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Fri, 6 Oct 2023 15:07:06 +0200 Subject: [PATCH 02/11] fixing tests --- tests/conftest.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 091d4bc..5ed8589 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ class KeyringMocks(NamedTuple): oauth2: MagicMock basic: MagicMock + token: MagicMock base: MagicMock @@ -18,14 +19,16 @@ def keyring(mocker): """ def _keyring(secret): + token = mocker.patch("conda_auth.handlers.token.keyring") oauth2 = mocker.patch("conda_auth.handlers.oauth2.keyring") basic = mocker.patch("conda_auth.handlers.basic_auth.keyring") base = mocker.patch("conda_auth.handlers.base.keyring") oauth2.get_password.return_value = secret basic.get_password.return_value = secret + token.get_password.return_value = secret - return KeyringMocks(oauth2, basic, base) + return KeyringMocks(oauth2, basic, token, base) return _keyring From d1cbb3e89c9ae263db78cfcf6d7917a821cb8dfd Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Fri, 6 Oct 2023 15:10:56 +0200 Subject: [PATCH 03/11] adding comment about verify credentials --- conda_auth/handlers/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conda_auth/handlers/base.py b/conda_auth/handlers/base.py index 5cd0736..0b1fc6d 100644 --- a/conda_auth/handlers/base.py +++ b/conda_auth/handlers/base.py @@ -52,6 +52,8 @@ def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str: } username, secret = self.fetch_secret(channel, extra_params) + # TODO: Having a hard time figuring out how to actually verify credentials + # It might be better to just remove this check for now. # verify_credentials(channel, self.get_auth_class()) self.save_credentials(channel, username, secret) From 5a1dace98dbb43a80aac98e640b1e84fb44aba8b Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Sun, 8 Oct 2023 09:32:28 +0200 Subject: [PATCH 04/11] adds new tests for token plus substantial refactor --- conda_auth/cli.py | 40 +------ conda_auth/handlers/__init__.py | 1 - conda_auth/handlers/base.py | 52 +-------- conda_auth/handlers/oauth2.py | 114 ------------------ conda_auth/hooks.py | 20 ++-- tests/cli/test_group.py | 14 +++ tests/cli/test_login.py | 36 +++--- tests/conftest.py | 15 +-- tests/handlers/test_base.py | 0 tests/handlers/test_basic_auth.py | 186 ++++++++++++++++++++++-------- tests/handlers/test_oauth2.py | 160 ------------------------- tests/handlers/test_token.py | 173 +++++++++++++++++++++++++-- tests/test_hooks.py | 46 ++++++++ 13 files changed, 389 insertions(+), 468 deletions(-) delete mode 100644 conda_auth/handlers/oauth2.py create mode 100644 tests/cli/test_group.py create mode 100644 tests/handlers/test_base.py delete mode 100644 tests/handlers/test_oauth2.py create mode 100644 tests/test_hooks.py diff --git a/conda_auth/cli.py b/conda_auth/cli.py index b17cea4..6e3a9b1 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -7,37 +7,28 @@ from conda.models.channel import Channel from .condarc import CondaRC, CondaRCError -from .exceptions import CondaAuthError, InvalidCredentialsError +from .exceptions import CondaAuthError from .handlers import ( AuthManager, - oauth2_manager, basic_auth_manager, token_auth_manager, - OAUTH2_NAME, HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) # Constants AUTH_MANAGER_MAPPING = { - OAUTH2_NAME: oauth2_manager, HTTP_BASIC_AUTH_NAME: basic_auth_manager, TOKEN_NAME: token_auth_manager, } -SUCCESSFUL_LOGIN_MESSAGE = "Successfully logged in" +SUCCESSFUL_LOGIN_MESSAGE = "Successfully stored credentials" -SUCCESSFUL_LOGOUT_MESSAGE = "Successfully logged out" +SUCCESSFUL_LOGOUT_MESSAGE = "Successfully removed credentials" SUCCESSFUL_COLOR = "green" -INVALID_CREDENTIALS_MESSAGE = "Invalid credentials" - -FAILURE_COLOR = "red" - -MAX_LOGIN_ATTEMPTS = 3 - -VALID_AUTH_CHOICES = (HTTP_BASIC_AUTH_NAME, TOKEN_NAME) +VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys()) def parse_channel(ctx, param, value): @@ -131,27 +122,8 @@ def login(channel: Channel, **kwargs): settings.update(kwargs) auth_type, auth_manager = get_auth_manager(settings) - attempts = 0 - - while True: - try: - username = auth_manager.authenticate(channel, settings) - break - except InvalidCredentialsError as exc: - if settings.get("username") is not None: - settings = {"username": settings["username"]} - else: - settings = {} - - auth_manager.remove_channel_cache(channel.canonical_name) - attempts += 1 - - if attempts >= MAX_LOGIN_ATTEMPTS: - raise CondaAuthError( - click.style(f"Max attempts reached; {exc}", fg=FAILURE_COLOR) - ) - - click.echo(click.style(INVALID_CREDENTIALS_MESSAGE, fg=FAILURE_COLOR)) + + username = auth_manager.store(channel, settings) click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR)) diff --git a/conda_auth/handlers/__init__.py b/conda_auth/handlers/__init__.py index d70ddec..784f9dd 100644 --- a/conda_auth/handlers/__init__.py +++ b/conda_auth/handlers/__init__.py @@ -1,6 +1,5 @@ # flake8: noqa: F401 from .base import AuthManager -from .oauth2 import OAuth2Manager, OAuth2Handler, manager as oauth2_manager, OAUTH2_NAME from .basic_auth import ( BasicAuthManager, BasicAuthHandler, diff --git a/conda_auth/handlers/base.py b/conda_auth/handlers/base.py index 0b1fc6d..34f6661 100644 --- a/conda_auth/handlers/base.py +++ b/conda_auth/handlers/base.py @@ -5,14 +5,8 @@ import conda.base.context import keyring -import requests -from conda.gateways.connection.session import CondaSession from conda.models.channel import Channel -from ..exceptions import InvalidCredentialsError, CondaAuthError - -INVALID_CREDENTIALS_ERROR_MESSAGE = "Provided credentials are not correct." - class AuthManager(ABC): """ @@ -38,9 +32,9 @@ def hook_action(self, command: str) -> None: channel.canonical_name in self._context.channels and settings.get("auth") == self.get_auth_type() ): - self.authenticate(channel, settings) + self.store(channel, settings) - def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str: + def store(self, channel: Channel, settings: Mapping[str, str]) -> str: """ Used to retrieve credentials and store them on the ``cache`` property @@ -52,9 +46,6 @@ def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str: } username, secret = self.fetch_secret(channel, extra_params) - # TODO: Having a hard time figuring out how to actually verify credentials - # It might be better to just remove this check for now. - # verify_credentials(channel, self.get_auth_class()) self.save_credentials(channel, username, secret) return username @@ -95,15 +86,6 @@ def get_secret(self, channel_name: str) -> tuple[str | None, str | None]: return secrets - def remove_channel_cache(self, channel_name: str) -> None: - """ - Removes the cached secret for the given channel name - """ - try: - del self._cache[channel_name] - except KeyError: - pass - @abstractmethod def _fetch_secret( self, channel: Channel, settings: Mapping[str, str | None] @@ -140,33 +122,3 @@ def get_auth_class(self) -> type: Returns the authentication class to use (requests.auth.AuthBase subclass) for the given authentication manager """ - - -def verify_credentials(channel: Channel, auth_cls: type) -> None: - """ - Verify the credentials that have been currently set for the channel. - - Raises exception if unable to make a successful request. - - TODO: - We need a better way to tell if the credentials work. We might need - to fetch (or perform a HEAD request) on something specific like - repodata.json. - """ - for url in channel.base_urls: - session = CondaSession(auth=auth_cls(channel.canonical_name)) - - try: - resp = session.head(url, allow_redirects=False) - resp.raise_for_status() - except requests.exceptions.HTTPError as exc: - if exc.response.status_code == requests.codes["unauthorized"]: - error_message = INVALID_CREDENTIALS_ERROR_MESSAGE - else: - error_message = str(exc) - - raise InvalidCredentialsError(error_message) - - # Catch-all for all other requests exceptions - except requests.exceptions.RequestException as exc: - raise CondaAuthError(str(exc)) diff --git a/conda_auth/handlers/oauth2.py b/conda_auth/handlers/oauth2.py deleted file mode 100644 index d63b7d2..0000000 --- a/conda_auth/handlers/oauth2.py +++ /dev/null @@ -1,114 +0,0 @@ -""" -OAuth2 implementation for the conda auth handler plugin hook -""" -from __future__ import annotations - -from collections.abc import Mapping - -import keyring -from keyring.errors import PasswordDeleteError -from conda.base.context import context -from conda.exceptions import CondaError -from conda.models.channel import Channel -from conda.plugins.types import ChannelAuthBase - -from ..constants import LOGOUT_ERROR_MESSAGE, PLUGIN_NAME -from ..exceptions import CondaAuthError -from .base import AuthManager - -LOGIN_URL_PARAM_NAME = "login_url" -""" -Setting name that appears in configuration; used to direct user to correct login screen. -""" - -USERNAME = "token" -""" -Placeholder value for username; This is written to the secret storage backend -""" - -OAUTH2_NAME = "oauth2" -""" -Name used to refer to this authentication handler in configuration -""" - - -class OAuth2Manager(AuthManager): - def get_keyring_id(self, channel_name: str) -> str: - return f"{PLUGIN_NAME}::{OAUTH2_NAME}::{channel_name}" - - def _fetch_secret( - self, channel: Channel, settings: Mapping[str, str | None] - ) -> tuple[str, str]: - """ - Gets the secrets by checking the keyring and then falling back to interrupting - the program and asking the user for secret. - """ - login_url = settings.get(LOGIN_URL_PARAM_NAME) - - if login_url is None: - raise CondaAuthError( - f'`login_url` is not set for channel "{channel.canonical_name}"; ' - "please set this value in `channel_settings` before attempting to use this " - "channel with the " - f"{self.get_auth_type()} auth handler." - ) - - keyring_id = self.get_keyring_id(channel.canonical_name) - - token = keyring.get_password(keyring_id, USERNAME) - - if token is None: - token = self.prompt_token(login_url) - - return USERNAME, token - - def remove_secret( - self, channel: Channel, settings: Mapping[str, str | None] - ) -> None: - keyring_id = self.get_keyring_id(channel.canonical_name) - - try: - keyring.delete_password(keyring_id, USERNAME) - except PasswordDeleteError as exc: - raise CondaAuthError(f"{LOGOUT_ERROR_MESSAGE} {exc}") - - def get_auth_type(self) -> str: - return OAUTH2_NAME - - def get_config_parameters(self) -> tuple[str, ...]: - return (LOGIN_URL_PARAM_NAME,) - - def prompt_token(self, login_url: str) -> str: - """ - This can be overriden for classes that do not want to use the built-in function ``input``. - """ - print(f"Follow link to login: {login_url}") - return input("Copy and paste login token here: ") - - def get_auth_class(self) -> type: - return OAuth2Handler - - -manager = OAuth2Manager(context) - - -class OAuth2Handler(ChannelAuthBase): - """ - Implementation of OAuth2 that relies on a cache location for retrieving bearer token on - object instantiation. - """ - - def __init__(self, channel_name: str): - _, self.token = manager.get_secret(channel_name) - - if self.token is None: - raise CondaError( - f"Unable to find authorization token for requests with channel {channel_name}" - ) - - super().__init__(channel_name) - - def __call__(self, request): - request.headers["Authorization"] = f"Bearer {self.token}" - - return request diff --git a/conda_auth/hooks.py b/conda_auth/hooks.py index 5e74454..c3b05dd 100644 --- a/conda_auth/hooks.py +++ b/conda_auth/hooks.py @@ -4,17 +4,15 @@ from conda.plugins import CondaAuthHandler, CondaPreCommand, CondaSubcommand, hookimpl from .handlers import ( - oauth2_manager, basic_auth_manager, token_auth_manager, - OAuth2Handler, BasicAuthHandler, TokenAuthHandler, - OAUTH2_NAME, HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) from .cli import auth_wrapper +from .constants import PLUGIN_NAME @hookimpl @@ -33,17 +31,12 @@ def conda_pre_commands(): Registers pre-command hooks """ yield CondaPreCommand( - name=f"{HTTP_BASIC_AUTH_NAME}-collect_credentials", + name=f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}", action=basic_auth_manager.hook_action, run_for={"search", "install", "update", "notices", "create", "search"}, ) yield CondaPreCommand( - name=f"{OAUTH2_NAME}-collect_token", - action=oauth2_manager.hook_action, - run_for={"search", "install", "update", "notices", "create", "search"}, - ) - yield CondaPreCommand( - name=f"{TOKEN_NAME}-collect_token", + name=f"{PLUGIN_NAME}-{TOKEN_NAME}", action=token_auth_manager.hook_action, run_for={"search", "install", "update", "notices", "create", "search"}, ) @@ -54,6 +47,7 @@ def conda_auth_handlers(): """ Registers auth handlers """ - yield CondaAuthHandler(name=HTTP_BASIC_AUTH_NAME, handler=BasicAuthHandler) - yield CondaAuthHandler(name=OAUTH2_NAME, handler=OAuth2Handler) - yield CondaAuthHandler(name=TOKEN_NAME, handler=TokenAuthHandler) + yield CondaAuthHandler( + name=f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}", handler=BasicAuthHandler + ) + yield CondaAuthHandler(name=f"{PLUGIN_NAME}-{TOKEN_NAME}", handler=TokenAuthHandler) diff --git a/tests/cli/test_group.py b/tests/cli/test_group.py new file mode 100644 index 0000000..93e9c43 --- /dev/null +++ b/tests/cli/test_group.py @@ -0,0 +1,14 @@ +import pytest + +from conda_auth.cli import auth_wrapper + + +def test_auth_wrapper(): + """ + Test to make sure the ``auth_wrapper`` function works. + + It is run with no arguments which will print the help message and raise a ``SystemExit`` + exception. + """ + with pytest.raises(SystemExit): + auth_wrapper([]) diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 1182007..7a885fb 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,11 +1,10 @@ from conda_auth.cli import group, SUCCESSFUL_LOGIN_MESSAGE from conda_auth.condarc import CondaRCError from conda_auth.handlers.basic_auth import HTTP_BASIC_AUTH_NAME -from conda_auth.exceptions import CondaAuthError, InvalidCredentialsError -from conda_auth.handlers.base import INVALID_CREDENTIALS_ERROR_MESSAGE +from conda_auth.exceptions import CondaAuthError -def test_login_no_options_basic_auth(mocker, runner, session, keyring, condarc): +def test_login_no_options_basic_auth(mocker, runner, keyring, condarc): """ Runs the login command with no additional CLI options defined (e.g. --username) """ @@ -28,7 +27,7 @@ def test_login_no_options_basic_auth(mocker, runner, session, keyring, condarc): assert SUCCESSFUL_LOGIN_MESSAGE in result.output -def test_login_with_options_basic_auth(mocker, runner, session, keyring, condarc): +def test_login_with_options_basic_auth(mocker, runner, keyring, condarc): """ Runs the login command with CLI options defined (e.g. --username) """ @@ -48,7 +47,7 @@ def test_login_with_options_basic_auth(mocker, runner, session, keyring, condarc assert SUCCESSFUL_LOGIN_MESSAGE in result.output -def test_login_with_invalid_auth_type(mocker, runner, session, keyring, condarc): +def test_login_with_invalid_auth_type(mocker, runner, keyring, condarc): """ Runs the login command when there is an invalid auth type configured in settings """ @@ -70,7 +69,7 @@ def test_login_with_invalid_auth_type(mocker, runner, session, keyring, condarc) assert "Invalid authentication type." in exception.message -def test_login_with_non_existent_channel(mocker, runner, session, keyring, condarc): +def test_login_with_non_existent_channel(mocker, runner, keyring, condarc): """ Runs the login command for a channel that is not present in the settings file """ @@ -92,7 +91,7 @@ def test_login_with_non_existent_channel(mocker, runner, session, keyring, conda def test_login_succeeds_error_returned_when_updating_condarc( - mocker, runner, session, keyring, condarc + mocker, runner, keyring, condarc ): """ Test the case where the login runs successfully but an error is returned when trying to update @@ -117,26 +116,17 @@ def test_login_succeeds_error_returned_when_updating_condarc( assert "Could not save file" == exception.message -def test_login_exceed_max_login_retries(mocker, runner, session, keyring, condarc): +def test_login_with_token(mocker, runner, keyring, condarc): """ - Test the case where the login runs successfully but an error is returned when trying to update - the condarc file. + Test successful login with token """ channel_name = "tester" # setup mocks - mocker.patch("conda_auth.cli.context") - mock_manager = mocker.patch("conda_auth.cli.get_auth_manager") - mock_type = "http-basic" - mock_auth_manager = mocker.MagicMock() - mock_auth_manager.authenticate.side_effect = InvalidCredentialsError( - INVALID_CREDENTIALS_ERROR_MESSAGE - ) - mock_manager.return_value = (mock_type, mock_auth_manager) + mock_context = mocker.patch("conda_auth.cli.context") + mock_context.channel_settings = [] + keyring(None) - # run command - result = runner.invoke(group, ["login", channel_name], input="user") - exc_type, exception, _ = result.exc_info + result = runner.invoke(group, ["login", channel_name, "--token", "token"]) - assert exc_type == CondaAuthError - assert "Max attempts reached" in exception.message + assert result.exit_code == 0 diff --git a/tests/conftest.py b/tests/conftest.py index 5ed8589..cabd251 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,6 @@ class KeyringMocks(NamedTuple): - oauth2: MagicMock basic: MagicMock token: MagicMock base: MagicMock @@ -20,29 +19,17 @@ def keyring(mocker): def _keyring(secret): token = mocker.patch("conda_auth.handlers.token.keyring") - oauth2 = mocker.patch("conda_auth.handlers.oauth2.keyring") basic = mocker.patch("conda_auth.handlers.basic_auth.keyring") base = mocker.patch("conda_auth.handlers.base.keyring") - oauth2.get_password.return_value = secret basic.get_password.return_value = secret token.get_password.return_value = secret - return KeyringMocks(oauth2, basic, token, base) + return KeyringMocks(basic, token, base) return _keyring -@pytest.fixture -def session(mocker): - """ - Used to mock the get_session function from conda to mock network requests - """ - session_mock = mocker.patch("conda_auth.handlers.base.CondaSession") - - return session_mock - - @pytest.fixture def getpass(mocker): """ diff --git a/tests/handlers/test_base.py b/tests/handlers/test_base.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/handlers/test_basic_auth.py b/tests/handlers/test_basic_auth.py index 297ec64..e677372 100644 --- a/tests/handlers/test_basic_auth.py +++ b/tests/handlers/test_basic_auth.py @@ -1,52 +1,58 @@ from unittest.mock import MagicMock import pytest -from keyring.errors import PasswordDeleteError +from conda.exceptions import CondaError from conda.models.channel import Channel - -from conda_auth.handlers import BasicAuthManager +from keyring.errors import PasswordDeleteError +from requests.auth import _basic_auth_str + +from conda_auth.handlers.basic_auth import ( + manager, + BasicAuthHandler, + HTTP_BASIC_AUTH_NAME, + BasicAuthManager, +) from conda_auth.exceptions import CondaAuthError from conda_auth.constants import LOGOUT_ERROR_MESSAGE -def test_basic_auth_manager_no_previous_secret(session, keyring, getpass): +@pytest.fixture(autouse=True) +def clean_up_manager_cache(): + """Makes sure the manager cache gets emptied after each test run""" + manager._cache = {} + + +def test_basic_auth_manager_no_previous_secret(keyring, getpass): """ Test to make sure when there is no password set, we are able to set a new password via the ``getpass`` function. """ secret = "secret" settings = { - "auth": "conda-auth-basic-auth", "username": "admin", } - cache = {} channel = Channel("tester") # setup mocks getpass_mock = getpass(secret) keyring(None) - context_mock = MagicMock() # run code under test - basic_auth = BasicAuthManager(context_mock, cache) - basic_auth.authenticate(channel, settings) + manager.store(channel, settings) # make assertions - assert basic_auth._cache == {channel.canonical_name: ("admin", secret)} + assert manager._cache == {channel.canonical_name: ("admin", secret)} getpass_mock.assert_called_once() -def test_basic_auth_manager_no_secret_or_username(mocker, session, keyring, getpass): +def test_basic_auth_manager_no_secret_or_username(mocker, keyring, getpass): """ Test to make sure when there is no password or username set, we are able to provide a password via the ``getpass`` function and a username via the ``input`` function. """ username = "admin" secret = "secret" - settings = { - "auth": "conda-auth-basic-auth", - } - cache = {} + settings = {} channel = Channel("tester") # setup mocks @@ -54,45 +60,39 @@ def test_basic_auth_manager_no_secret_or_username(mocker, session, keyring, getp input_mock.return_value = username getpass_mock = getpass(secret) keyring(None) - context_mock = MagicMock() # run code under test - basic_auth = BasicAuthManager(context_mock, cache) - basic_auth.authenticate(channel, settings) + manager.store(channel, settings) # make assertions - assert basic_auth._cache == {channel.canonical_name: (username, secret)} + assert manager._cache == {channel.canonical_name: (username, secret)} getpass_mock.assert_called_once() -def test_basic_auth_manager_with_previous_secret(session, keyring, getpass): +def test_basic_auth_manager_with_previous_secret(keyring, getpass): """ Test to make sure when there is a password set, we retrieve it and set the cache object appropriately. """ secret = "secret" settings = { - "auth": "conda-auth-basic-auth", "username": "admin", } - cache = {} channel = Channel("tester") # setup mocks getpass_mock = getpass(secret) keyring(secret) - context_mock = MagicMock() # run code under test - basic_auth = BasicAuthManager(context_mock, cache) - basic_auth.authenticate(channel, settings) + manager.store(channel, settings) # make assertions - assert basic_auth._cache == {channel.canonical_name: ("admin", secret)} + assert manager._cache == {channel.canonical_name: ("admin", secret)} getpass_mock.assert_not_called() -def test_basic_auth_manager_cache_exists(session, keyring, getpass): +def test_basic_auth_manager_cache_exists(keyring, getpass): """ Test to make sure that everything works as expected when a cache entry already exists for a credential set. @@ -100,23 +100,20 @@ def test_basic_auth_manager_cache_exists(session, keyring, getpass): secret = "secret" username = "admin" settings = { - "auth": "conda-auth-basic-auth", "username": username, } channel = Channel("tester") - cache = {channel.canonical_name: (username, secret)} + manager._cache = {channel.canonical_name: (username, secret)} # setup mocks getpass_mock = getpass(secret) keyring_mock = keyring(secret) - context_mock = MagicMock() # run code under test - basic_auth = BasicAuthManager(context_mock, cache) - basic_auth.authenticate(channel, settings) + manager.store(channel, settings) # make assertions - assert basic_auth._cache == {channel.canonical_name: (username, secret)} + assert manager._cache == {channel.canonical_name: (username, secret)} getpass_mock.assert_not_called() keyring_mock.basic.get_password.assert_not_called() @@ -127,19 +124,15 @@ def test_basic_auth_manager_remove_existing_secret(keyring): """ secret = "secret" settings = { - "auth": "conda-auth-basic-auth", "username": "username", } - cache = {} channel = Channel("tester") # setup mocks keyring_mocks = keyring(secret) - context = MagicMock() # run code under test - basic_auth = BasicAuthManager(context, cache) - basic_auth.remove_secret(channel, settings) + manager.remove_secret(channel, settings) # make assertions keyring_mocks.basic.delete_password.assert_called_once() @@ -152,21 +145,16 @@ def test_basic_auth_manager_remove_existing_secret_no_username(mocker, keyring): """ secret = "secret" username = "username" - settings = { - "auth": "conda-auth-basic-auth", - } - cache = {} + settings = {} channel = Channel("tester") # setup mocks keyring_mocks = keyring(secret) input_mock = mocker.patch("conda_auth.handlers.basic_auth.input") input_mock.return_value = username - context = MagicMock() # run code under test - basic_auth = BasicAuthManager(context, cache) - basic_auth.remove_secret(channel, settings) + manager.remove_secret(channel, settings) # make assertions input_mock.assert_called_once() @@ -180,21 +168,119 @@ def test_basic_auth_manager_remove_non_existing_secret(keyring): """ secret = "secret" settings = { - "auth": "conda-auth-basic-auth", "username": "username", } - cache = {} channel = Channel("tester") # setup mocks keyring_mocks = keyring(secret) message = "Secret not found." keyring_mocks.basic.delete_password.side_effect = PasswordDeleteError(message) - context = MagicMock() # run code under test - basic_auth = BasicAuthManager(context, cache) # make assertions with pytest.raises(CondaAuthError, match=f"{LOGOUT_ERROR_MESSAGE} {message}"): - basic_auth.remove_secret(channel, settings) + manager.remove_secret(channel, settings) + + +def test_basic_auth_handler(keyring): + """ + Test to make sure that we can successfully instantiate and call the ``BasicAuthHandler`` + """ + channel_name = "channel" + password = "password" + username = "username" + channel = Channel(channel_name) + + # setup mocks + keyring(None) + + manager.store(channel, {"username": username, "password": password}) + + auth_handler = BasicAuthHandler(channel_name) + + request = MagicMock() + request.headers = {} + + request = auth_handler(request) + + assert request.headers == {"Authorization": _basic_auth_str(username, password)} + + +def test_basic_auth_handler_equals_methods(keyring): + """ + Test to make sure that we can instantiate multiple ``BasicAuthHandler`` objects and then + compare the two objects + """ + channel_name_one = "channel_two" + channel_name_two = "channel_two" + password = "password" + username = "username" + channel_one = Channel(channel_name_one) + channel_two = Channel(channel_name_two) + + # setup mocks + keyring(None) + + manager.store(channel_one, {"username": username, "password": password}) + manager.store(channel_two, {"username": username, "password": password}) + + auth_handler_one = BasicAuthHandler(channel_name_one) + auth_handler_two = BasicAuthHandler(channel_name_two) + + assert (auth_handler_one == auth_handler_two) is True + assert (auth_handler_one != auth_handler_two) is False + + +def test_basic_auth_handler_no_credentials_available_error(): + """ + Test to make sure that we raise an error when no credentials can be found in the application's + cache + """ + channel_name = "http://localhost" + + with pytest.raises( + CondaError, + match=f"Unable to find user credentials for requests with channel {channel_name}", + ): + BasicAuthHandler(channel_name) + + +def test_token_auth_manager_get_auth_class(): + """ + Simple test to make sure we get the expected type back from the ``get_auth_class`` + method + """ + assert manager.get_auth_class() is BasicAuthHandler + + +def test_token_auth_manager_get_auth_type(): + """ + Simple test to make sure we get the expected value back from the ``get_auth_type`` + method + """ + assert manager.get_auth_type() == HTTP_BASIC_AUTH_NAME + + +def test_basic_auth_manager_hook_action(keyring): + """ + Test to make sure we can successfully call the ``hook_action`` method for the + ``BasicAuthManager``. + """ + channel = "channel" + username = "username" + password = "password" + + # setup mocks + context = MagicMock() + context.channels = (channel,) + context.channel_settings = [ + {"channel": channel, "auth": HTTP_BASIC_AUTH_NAME, "username": username} + ] + keyring(password) + + token_manager = BasicAuthManager(context) + token_manager.hook_action("create") + + assert token_manager._cache == {channel: (username, password)} diff --git a/tests/handlers/test_oauth2.py b/tests/handlers/test_oauth2.py deleted file mode 100644 index e879156..0000000 --- a/tests/handlers/test_oauth2.py +++ /dev/null @@ -1,160 +0,0 @@ -import pytest -from keyring.errors import PasswordDeleteError -from conda.models.channel import Channel - -from conda_auth.handlers.oauth2 import USERNAME, manager, OAUTH2_NAME -from conda_auth.exceptions import CondaAuthError -from conda_auth.constants import LOGOUT_ERROR_MESSAGE - - -@pytest.fixture(autouse=True) -def clean_up_manager_cache(): - """Makes sure the manager cache gets emptied after each test run""" - manager._cache = {} - - -def test_oauth2_manager_no_previous_secret(mocker, session, keyring): - """ - Test to make sure when there is no password set, we are able to set a new - password via the ``getpass`` function. - """ - secret = "secret" - settings = { - "auth": OAUTH2_NAME, - "login_url": "http://localhost", - } - channel = Channel("tester") - - # setup mocks - input_mock = mocker.patch("conda_auth.handlers.oauth2.input") - input_mock.return_value = secret - keyring(None) - mocker.patch("conda_auth.handlers.oauth2.context") - - # run code under test - manager.authenticate(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: (USERNAME, secret)} - - -def test_oauth2_manager_no_login_url_present(mocker, session, keyring): - """ - Test to make sure we raise the appropriate exception when the ``login_url`` setting - is not present in our configuration. - """ - secret = "secret" - settings = { - "auth": OAUTH2_NAME, - } - channel = Channel("tester") - - # setup mocks - input_mock = mocker.patch("conda_auth.handlers.oauth2.input") - input_mock.return_value = secret - keyring(None) - mocker.patch("conda_auth.handlers.oauth2.context") - - with pytest.raises( - CondaAuthError, match='`login_url` is not set for channel "tester"' - ): - manager.authenticate(channel, settings) - - -def test_oauth2_manager_with_previous_secret(mocker, session, keyring): - """ - Test to make sure when there is a secret set, we retrieve it and set the - cache object appropriately. - """ - secret = "secret" - settings = { - "auth": OAUTH2_NAME, - "login_url": "http://localhost", - } - channel = Channel("tester") - manager._cache = {channel.canonical_name: (USERNAME, secret)} - - # setup mocks - input_mock = mocker.patch("conda_auth.handlers.oauth2.input") - keyring(secret) - mocker.patch("conda_auth.handlers.oauth2.context") - - # run code under test - manager.authenticate(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: (USERNAME, secret)} - input_mock.assert_not_called() - - -def test_oauth2_manager_cache_exists(session, keyring, getpass, mocker): - """ - Test to make sure that everything works as expected when a cache entry - already exists for a credential set. - """ - secret = "secret" - username = "admin" - settings = { - "auth": OAUTH2_NAME, - "login_url": "http://localhost", - } - channel = Channel("tester") - manager._cache = {channel.canonical_name: (username, secret)} - - # setup mocks - getpass_mock = getpass(secret) - keyring_mock = keyring(secret) - mocker.patch("conda_auth.handlers.oauth2.context") - - # run code under test - manager.authenticate(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: (username, secret)} - getpass_mock.assert_not_called() - keyring_mock.basic.get_password.assert_not_called() - - -def test_oauth2_manager_remove_existing_secret(keyring, mocker): - """ - Test to make sure that removing a password that exist works. - """ - secret = "secret" - settings = { - "auth": OAUTH2_NAME, - "login_url": "http://localhost", - } - channel = Channel("tester") - - # setup mocks - keyring_mocks = keyring(secret) - mocker.patch("conda_auth.handlers.oauth2.context") - - # run code under test - manager.remove_secret(channel, settings) - - # make assertions - keyring_mocks.oauth2.delete_password.assert_called_once() - - -def test_oauth2_manager_remove_non_existing_secret(keyring, mocker): - """ - Test make sure that when removing a secret that does not exist, the appropriate - exception and message is raised and shown. - """ - secret = "secret" - settings = { - "auth": OAUTH2_NAME, - "login_url": "http://localhost", - } - channel = Channel("tester") - - # setup mocks - keyring_mocks = keyring(secret) - message = "Secret not found." - keyring_mocks.oauth2.delete_password.side_effect = PasswordDeleteError(message) - mocker.patch("conda_auth.handlers.oauth2.context") - - # make assertions - with pytest.raises(CondaAuthError, match=f"{LOGOUT_ERROR_MESSAGE} {message}"): - manager.remove_secret(channel, settings) diff --git a/tests/handlers/test_token.py b/tests/handlers/test_token.py index 4b990d2..0a4bcbc 100644 --- a/tests/handlers/test_token.py +++ b/tests/handlers/test_token.py @@ -1,7 +1,20 @@ +from unittest.mock import MagicMock + import pytest +from conda.exceptions import CondaError from conda.models.channel import Channel +from keyring.errors import PasswordDeleteError -from conda_auth.handlers.token import is_anaconda_dot_org, manager, USERNAME, TOKEN_NAME +from conda_auth.constants import LOGOUT_ERROR_MESSAGE +from conda_auth.exceptions import CondaAuthError +from conda_auth.handlers.token import ( + is_anaconda_dot_org, + manager, + USERNAME, + TOKEN_NAME, + TokenAuthHandler, + TokenAuthManager, +) @pytest.fixture(autouse=True) @@ -20,15 +33,13 @@ def test_is_anaconda_dot_org(channel_name, expected): assert is_anaconda_dot_org(channel_name) == expected -def test_token_auth_manager_no_token(mocker, session, keyring): +def test_token_auth_manager_no_token(mocker, keyring): """ Test to make sure when there is no token set, we are able to set a new token via the ``input`` function. """ token = "token" - settings = { - "auth": TOKEN_NAME, - } + settings = {} channel = Channel("tester") # setup mocks @@ -37,26 +48,170 @@ def test_token_auth_manager_no_token(mocker, session, keyring): keyring(None) # run code under test - manager.authenticate(channel, settings) + manager.store(channel, settings) # make assertions assert manager._cache == {channel.canonical_name: (USERNAME, token)} -def test_token_auth_manager_with_token(session, keyring): +def test_token_auth_manager_with_token(keyring): """ Test to make sure when there is a token set, we are able to set a new token via the ``input`` function. """ token = "token" - settings = {"auth": TOKEN_NAME, "token": token} + settings = {"token": token} channel = Channel("tester") # setup mocks keyring(None) # run code under test - manager.authenticate(channel, settings) + manager.store(channel, settings) # make assertions assert manager._cache == {channel.canonical_name: (USERNAME, token)} + + +def test_basic_auth_manager_remove_existing_secret(keyring): + """ + Test to make sure that removing a password that exist works. + """ + secret = "secret" + settings = { + "username": USERNAME, + } + channel = Channel("tester") + + # setup mocks + keyring_mocks = keyring(secret) + + # run code under test + manager.remove_secret(channel, settings) + + # make assertions + keyring_mocks.token.delete_password.assert_called_once() + + +def test_basic_auth_manager_remove_non_existing_secret(keyring): + """ + Test make sure that when removing a secret that does not exist, the appropriate + exception and message is raised and shown. + """ + secret = "secret" + settings = { + "username": USERNAME, + } + channel = Channel("tester") + + # setup mocks + keyring_mocks = keyring(secret) + message = "Secret not found." + keyring_mocks.token.delete_password.side_effect = PasswordDeleteError(message) + + # make assertions + with pytest.raises(CondaAuthError, match=f"{LOGOUT_ERROR_MESSAGE} {message}"): + manager.remove_secret(channel, settings) + + +def test_token_auth_handler_with_anaconda_dot_org_token(keyring): + """ + Test to make sure that we can successfully instantiate and call the ``TokenAuthHandler`` + using the anaconda.org formatted token + """ + channel_name = "channel" + token = "token" + channel = Channel(channel_name) + + # setup mocks + keyring(None) + + manager.store(channel, {"token": token}) + + auth_handler = TokenAuthHandler(channel_name) + + request = MagicMock() + request.headers = {} + + request = auth_handler(request) + + assert request.headers == {"Authorization": f"token {token}"} + + +def test_token_auth_handler_with_bearer_token(keyring): + """ + Test to make sure that we can successfully instantiate and call the ``TokenAuthHandler`` + using a bearer token. + """ + channel_name = "http://localhost" + token = "token" + channel = Channel(channel_name) + + # setup mocks + keyring(None) + + manager.store(channel, {"token": token}) + + auth_handler = TokenAuthHandler(channel_name) + + request = MagicMock() + request.headers = {} + + request = auth_handler(request) + + assert request.headers == {"Authorization": f"Bearer {token}"} + + +def test_token_auth_handler_no_token_available_error(): + """ + Test to make sure that we raise an error when no token can be found in the application's + cache + """ + channel_name = "http://localhost" + + with pytest.raises( + CondaError, + match=f"Unable to find authorization token for requests with channel {channel_name}", + ): + TokenAuthHandler(channel_name) + + +def test_token_auth_manager_get_auth_class(): + """ + Simple test to make sure we get the expected type back from the ``get_auth_class`` + method + """ + assert manager.get_auth_class() is TokenAuthHandler + + +def test_token_auth_manager_get_auth_type(): + """ + Simple test to make sure we get the expected value back from the ``get_auth_type`` + method + """ + assert manager.get_auth_type() == TOKEN_NAME + + +def test_token_auth_manager_hook_action(keyring): + """ + Test to make sure we can successfully call the ``hook_action`` method for the + ``TokenAuthManager``. + """ + channel = "channel" + token = "token" + + # setup mocks + context = MagicMock() + context.channels = (channel,) + context.channel_settings = [ + { + "channel": channel, + "auth": TOKEN_NAME, + } + ] + keyring(token) + + token_manager = TokenAuthManager(context) + token_manager.hook_action("create") + + assert token_manager._cache == {channel: (USERNAME, token)} diff --git a/tests/test_hooks.py b/tests/test_hooks.py new file mode 100644 index 0000000..de135d7 --- /dev/null +++ b/tests/test_hooks.py @@ -0,0 +1,46 @@ +from conda_auth import hooks +from conda_auth.constants import PLUGIN_NAME +from conda_auth.handlers import ( + HTTP_BASIC_AUTH_NAME, + TOKEN_NAME, + BasicAuthHandler, + TokenAuthHandler, +) + + +def test_conda_subcommands_hook(): + """ + Test to make sure that this hook yields the correct objects. + """ + objs = list(hooks.conda_subcommands()) + + assert objs[0].name == "auth" + assert objs[0].summary == "Authentication commands for conda" + + +def test_conda_pre_commands_hook(): + """ + Test to make sure that this hook yields the correct objects. + """ + objs = list(hooks.conda_pre_commands()) + + run_for = {"search", "install", "update", "notices", "create", "search"} + + assert objs[0].name == f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}" + assert objs[0].run_for == run_for + + assert objs[1].name == f"{PLUGIN_NAME}-{TOKEN_NAME}" + assert objs[1].run_for == run_for + + +def test_conda_auth_handlers_hook(): + """ + Test to make sure that this hook yields the correct objects. + """ + objs = list(hooks.conda_auth_handlers()) + + assert objs[0].name == f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}" + assert objs[0].handler == BasicAuthHandler + + assert objs[1].name == f"{PLUGIN_NAME}-{TOKEN_NAME}" + assert objs[1].handler == TokenAuthHandler From 35c46054d1745a29bf1ac6d49687002d18ec38ba Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Tue, 10 Oct 2023 16:44:19 +0200 Subject: [PATCH 05/11] undoing a change that broke configuration usage --- conda_auth/hooks.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/conda_auth/hooks.py b/conda_auth/hooks.py index c3b05dd..a6545af 100644 --- a/conda_auth/hooks.py +++ b/conda_auth/hooks.py @@ -47,7 +47,5 @@ def conda_auth_handlers(): """ Registers auth handlers """ - yield CondaAuthHandler( - name=f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}", handler=BasicAuthHandler - ) - yield CondaAuthHandler(name=f"{PLUGIN_NAME}-{TOKEN_NAME}", handler=TokenAuthHandler) + yield CondaAuthHandler(name=HTTP_BASIC_AUTH_NAME, handler=BasicAuthHandler) + yield CondaAuthHandler(name=TOKEN_NAME, handler=TokenAuthHandler) From 2e68953b246fe48023f27e37e5f8c8f6577da3c0 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Wed, 11 Oct 2023 07:03:18 +0200 Subject: [PATCH 06/11] updates based on pull request review --- conda_auth/handlers/base.py | 27 +++++++++++++++++++-------- conda_auth/handlers/basic_auth.py | 11 +++++------ conda_auth/handlers/token.py | 11 +++++------ tests/handlers/test_base.py | 0 tests/handlers/test_basic_auth.py | 2 +- tests/handlers/test_token.py | 2 +- tests/test_hooks.py | 4 ++-- 7 files changed, 33 insertions(+), 24 deletions(-) delete mode 100644 tests/handlers/test_base.py diff --git a/conda_auth/handlers/base.py b/conda_auth/handlers/base.py index 34f6661..fd8268d 100644 --- a/conda_auth/handlers/base.py +++ b/conda_auth/handlers/base.py @@ -6,6 +6,7 @@ import conda.base.context import keyring from conda.models.channel import Channel +from conda.base.context import context as global_context class AuthManager(ABC): @@ -13,12 +14,15 @@ class AuthManager(ABC): Defines an interface for auth handlers to use within plugin """ - def __init__(self, context: conda.base.context.Context, cache: dict | None = None): + def __init__( + self, + context: conda.base.context.Context | None = None, + cache: dict | None = None, + ): """ - Optionally set a cache to use and configuration parameters to retrieve from - ``conda.base.context.context.channel_settings``. + Optionally set a cache and context object to use """ - self._context = context + self._context = context or global_context self._cache = {} if cache is None else cache def hook_action(self, command: str) -> None: @@ -57,9 +61,7 @@ def save_credentials(self, channel: Channel, username: str, secret: str) -> None TODO: Method may be expanded in the future to allow the use of other storage mechanisms. """ - keyring.set_password( - self.get_keyring_id(channel.canonical_name), username, secret - ) + keyring.set_password(self.get_keyring_id(channel), username, secret) def fetch_secret( self, channel: Channel, settings: Mapping[str, str | None] @@ -86,6 +88,15 @@ def get_secret(self, channel_name: str) -> tuple[str | None, str | None]: return secrets + def cache_clear(self, channel_name: str | None = None) -> None: + """ + Remove the internal cache for the manager object + """ + if channel_name: + self._cache.pop(channel_name, None) + else: + self._cache.clear() + @abstractmethod def _fetch_secret( self, channel: Channel, settings: Mapping[str, str | None] @@ -111,7 +122,7 @@ def get_config_parameters(self) -> tuple[str, ...]: """ @abstractmethod - def get_keyring_id(self, channel_name: str) -> str: + def get_keyring_id(self, channel: Channel) -> str: """ Implementation should return the keyring id that will be used by the manager classes """ diff --git a/conda_auth/handlers/basic_auth.py b/conda_auth/handlers/basic_auth.py index 6f3dc49..ee1d853 100644 --- a/conda_auth/handlers/basic_auth.py +++ b/conda_auth/handlers/basic_auth.py @@ -9,7 +9,6 @@ import keyring from keyring.errors import PasswordDeleteError from requests.auth import _basic_auth_str # type: ignore -from conda.base.context import context from conda.exceptions import CondaError from conda.models.channel import Channel from conda.plugins.types import ChannelAuthBase @@ -35,8 +34,8 @@ class BasicAuthManager(AuthManager): - def get_keyring_id(self, channel_name: str): - return f"{PLUGIN_NAME}::{HTTP_BASIC_AUTH_NAME}::{channel_name}" + def get_keyring_id(self, channel: Channel): + return f"{PLUGIN_NAME}::{HTTP_BASIC_AUTH_NAME}::{channel.canonical_name}" def _fetch_secret( self, channel: Channel, settings: Mapping[str, str | None] @@ -53,7 +52,7 @@ def _fetch_secret( def remove_secret( self, channel: Channel, settings: Mapping[str, str | None] ) -> None: - keyring_id = self.get_keyring_id(channel.canonical_name) + keyring_id = self.get_keyring_id(channel) username = self.get_username(settings, channel) try: @@ -97,7 +96,7 @@ def get_password( """ Attempts to get password and falls back to prompting the user for it if not found. """ - keyring_id = self.get_keyring_id(channel.canonical_name) + keyring_id = self.get_keyring_id(channel) password = keyring.get_password(keyring_id, username) if password is None: @@ -111,7 +110,7 @@ def get_auth_class(self) -> type: return BasicAuthHandler -manager = BasicAuthManager(context) +manager = BasicAuthManager() class BasicAuthHandler(ChannelAuthBase): diff --git a/conda_auth/handlers/token.py b/conda_auth/handlers/token.py index 6e2776d..4e61118 100644 --- a/conda_auth/handlers/token.py +++ b/conda_auth/handlers/token.py @@ -7,7 +7,6 @@ import keyring from keyring.errors import PasswordDeleteError -from conda.base.context import context from conda.exceptions import CondaError from conda.models.channel import Channel from conda.plugins.types import ChannelAuthBase @@ -33,8 +32,8 @@ class TokenAuthManager(AuthManager): - def get_keyring_id(self, channel_name: str) -> str: - return f"{PLUGIN_NAME}::{TOKEN_NAME}::{channel_name}" + def get_keyring_id(self, channel: Channel) -> str: + return f"{PLUGIN_NAME}::{TOKEN_NAME}::{channel.canonical_name}" def _fetch_secret( self, channel: Channel, settings: Mapping[str, str | None] @@ -43,7 +42,7 @@ def _fetch_secret( Gets the secrets by checking the keyring and then falling back to interrupting the program and asking the user for secret. """ - keyring_id = self.get_keyring_id(channel.canonical_name) + keyring_id = self.get_keyring_id(channel) token = keyring.get_password(keyring_id, USERNAME) if token is None: @@ -54,7 +53,7 @@ def _fetch_secret( def remove_secret( self, channel: Channel, settings: Mapping[str, str | None] ) -> None: - keyring_id = self.get_keyring_id(channel.canonical_name) + keyring_id = self.get_keyring_id(channel) try: keyring.delete_password(keyring_id, USERNAME) @@ -88,7 +87,7 @@ def get_auth_class(self) -> type: return TokenAuthHandler -manager = TokenAuthManager(context) +manager = TokenAuthManager() def is_anaconda_dot_org(channel_name: str) -> bool: diff --git a/tests/handlers/test_base.py b/tests/handlers/test_base.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/handlers/test_basic_auth.py b/tests/handlers/test_basic_auth.py index e677372..a16193c 100644 --- a/tests/handlers/test_basic_auth.py +++ b/tests/handlers/test_basic_auth.py @@ -19,7 +19,7 @@ @pytest.fixture(autouse=True) def clean_up_manager_cache(): """Makes sure the manager cache gets emptied after each test run""" - manager._cache = {} + manager.cache_clear() def test_basic_auth_manager_no_previous_secret(keyring, getpass): diff --git a/tests/handlers/test_token.py b/tests/handlers/test_token.py index 0a4bcbc..3ccd7e5 100644 --- a/tests/handlers/test_token.py +++ b/tests/handlers/test_token.py @@ -20,7 +20,7 @@ @pytest.fixture(autouse=True) def clean_up_manager_cache(): """Makes sure the manager cache gets emptied after each test run""" - manager._cache = {} + manager.cache_clear() @pytest.mark.parametrize( diff --git a/tests/test_hooks.py b/tests/test_hooks.py index de135d7..5a9f04b 100644 --- a/tests/test_hooks.py +++ b/tests/test_hooks.py @@ -39,8 +39,8 @@ def test_conda_auth_handlers_hook(): """ objs = list(hooks.conda_auth_handlers()) - assert objs[0].name == f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}" + assert objs[0].name == HTTP_BASIC_AUTH_NAME assert objs[0].handler == BasicAuthHandler - assert objs[1].name == f"{PLUGIN_NAME}-{TOKEN_NAME}" + assert objs[1].name == TOKEN_NAME assert objs[1].handler == TokenAuthHandler From b09067fb3e06aa029ae24508f2136975a653713a Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Wed, 11 Oct 2023 13:08:07 +0200 Subject: [PATCH 07/11] more changes requested from review --- conda_auth/cli.py | 70 +++++++++++++++++++++++++++++++++++-------- conda_auth/condarc.py | 13 ++++++-- conda_auth/options.py | 29 ++++++++++++++++++ tests/test_condarc.py | 4 +-- 4 files changed, 99 insertions(+), 17 deletions(-) create mode 100644 conda_auth/options.py diff --git a/conda_auth/cli.py b/conda_auth/cli.py index 6e3a9b1..f587834 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from collections.abc import MutableMapping import click @@ -15,6 +16,7 @@ HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) +from .options import MutuallyExclusiveOption # Constants AUTH_MANAGER_MAPPING = { @@ -30,6 +32,8 @@ VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys()) +OPTION_DEFAULT = "CONDA_AUTH_DEFAULT" + def parse_channel(ctx, param, value): """ @@ -50,14 +54,18 @@ def get_auth_manager(options) -> tuple[str, AuthManager]: raise CondaAuthError( f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"' ) + return auth_type, auth_manager # we use http basic auth when "username" or "password" are present - elif options.get("username") is not None or options.get("password") is not None: + if ( + "username" in option_tracker.options_used + or "password" in option_tracker.options_used + ): auth_manager = basic_auth_manager auth_type = HTTP_BASIC_AUTH_NAME # we use token auth when "token" is present - elif options.get("token") is not None: + elif "token" in option_tracker.options_used: auth_manager = token_auth_manager auth_type = TOKEN_NAME @@ -90,30 +98,66 @@ def auth_wrapper(args): group(args=args, prog_name="conda auth", standalone_mode=True) +class OptionTracker: + """ + Used to track whether the option was actually provided when command + was issued + """ + + def __init__(self): + self.options_used = set() + + def track_callback(self, ctx, param, value): + """ + Callback used to see if the option was provided + + This is also converts the ``OPTION_DEFAULT`` value to ``None`` + """ + for opt in param.opts: + if opt in sys.argv: + self.options_used.add(param.name) + break + + value = value if value != OPTION_DEFAULT else None + + return value + + +option_tracker = OptionTracker() + + @group.command("login") @click.option( "-u", "--username", help="Username to use for private channels using HTTP Basic Authentication", + cls=MutuallyExclusiveOption, + is_flag=False, + flag_value=OPTION_DEFAULT, + mutually_exclusive=("token",), + callback=option_tracker.track_callback, ) @click.option( "-p", "--password", help="Password to use for private channels using HTTP Basic Authentication", + cls=MutuallyExclusiveOption, + mutually_exclusive=("token",), + callback=option_tracker.track_callback, ) @click.option( "-t", "--token", help="Token to use for private channels using an API token", -) -@click.option( - "-a", - "--auth", - help="Specify the authentication type you would like to use", - type=click.Choice(VALID_AUTH_CHOICES), + is_flag=False, + flag_value=OPTION_DEFAULT, + cls=MutuallyExclusiveOption, + mutually_exclusive=("username", "password"), + callback=option_tracker.track_callback, ) @click.argument("channel", callback=parse_channel) -def login(channel: Channel, **kwargs): +@click.pass_context +def login(ctx, channel: Channel, **kwargs): """ Log in to a channel by storing the credentials or tokens associated with it """ @@ -122,14 +166,15 @@ def login(channel: Channel, **kwargs): settings.update(kwargs) auth_type, auth_manager = get_auth_manager(settings) - - username = auth_manager.store(channel, settings) + username: str | None = auth_manager.store(channel, settings) click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR)) try: condarc = CondaRC() - condarc.update_channel_settings(channel.canonical_name, username, auth_type) + if auth_type == TOKEN_NAME: + username = None + condarc.update_channel_settings(channel.canonical_name, auth_type, username) condarc.save() except CondaRCError as exc: raise CondaAuthError(str(exc)) @@ -146,7 +191,6 @@ def logout(channel: Channel): if settings is None: raise CondaAuthError("Unable to find information about logged in session.") - settings["type"] = settings["auth"] auth_type, auth_manager = get_auth_manager(settings) auth_manager.remove_secret(channel, settings) diff --git a/conda_auth/condarc.py b/conda_auth/condarc.py index 800b7ad..259177f 100644 --- a/conda_auth/condarc.py +++ b/conda_auth/condarc.py @@ -32,11 +32,20 @@ def __init__(self, condarc_path: Path | None = None): except YAMLError as exc: raise CondaRCError(f"Could not parse condarc: {exc}") - def update_channel_settings(self, channel: str, username: str, auth_type: str): + def update_channel_settings( + self, channel: str, auth_type: str, username: str | None = None + ): """ Update the condarc file's "channel_settings" section """ - updated_settings = {"channel": channel, "auth": auth_type, "username": username} + if username is None: + updated_settings = {"channel": channel, "auth": auth_type} + else: + updated_settings = { + "channel": channel, + "auth": auth_type, + "username": username, + } channel_settings = self.loaded_yaml.get("channel_settings", []) or [] diff --git a/conda_auth/options.py b/conda_auth/options.py new file mode 100644 index 0000000..2e1860c --- /dev/null +++ b/conda_auth/options.py @@ -0,0 +1,29 @@ +""" +Module for custom click.Option classes +""" +import click + + +class MutuallyExclusiveOption(click.Option): + def __init__(self, *args, **kwargs): + self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) + help_message = kwargs.get("help", "") + + if self.mutually_exclusive: + ex_str = ", ".join(f'"{option}"' for option in self.mutually_exclusive) + kwargs[ + "help" + ] = f"{help_message}; cannot be used with these options: {ex_str}" + + super().__init__(*args, **kwargs) + + def handle_parse_result(self, ctx, opts, args): + if self.mutually_exclusive.intersection(opts) and self.name in opts: + mutually_exclusive = ", ".join( + f'"{option}"' for option in self.mutually_exclusive + ) + raise click.UsageError( + f'Option "{self.name}" cannot be used with {mutually_exclusive}' + ) + + return super().handle_parse_result(ctx, opts, args) diff --git a/tests/test_condarc.py b/tests/test_condarc.py index dea7aa9..de89d5d 100644 --- a/tests/test_condarc.py +++ b/tests/test_condarc.py @@ -23,7 +23,7 @@ def test_update_non_existing_condarc_file(tmp_path): condarc_path = tmp_path / ".condarc" condarc = CondaRC(condarc_path) - condarc.update_channel_settings(channel, username, auth_type) + condarc.update_channel_settings(channel, auth_type, username) condarc.save() condarc_dict = yaml.load(condarc_path.read_text()) @@ -54,7 +54,7 @@ def test_update_existing_condarc_file(tmp_path): condarc_path.write_text(CONDARC_CONTENT) condarc = CondaRC(condarc_path) - condarc.update_channel_settings(channel, username, auth_type) + condarc.update_channel_settings(channel, auth_type, username) condarc.save() condarc_dict = yaml.load(condarc_path.read_text()) From 0210bdc31144e99679da736a6f384f0d9d9d5e13 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Thu, 12 Oct 2023 17:49:17 +0200 Subject: [PATCH 08/11] code refactor that removes prompting for the auth values from the manager classes and adds them to click instead --- conda_auth/cli.py | 83 ++++++++++++------------------- conda_auth/handlers/basic_auth.py | 24 ++------- conda_auth/handlers/token.py | 8 +-- conda_auth/options.py | 18 ++++++- tests/handlers/test_basic_auth.py | 40 ++++----------- tests/handlers/test_token.py | 9 ++-- 6 files changed, 69 insertions(+), 113 deletions(-) diff --git a/conda_auth/cli.py b/conda_auth/cli.py index f587834..dc5288e 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -1,6 +1,5 @@ from __future__ import annotations -import sys from collections.abc import MutableMapping import click @@ -16,7 +15,7 @@ HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) -from .options import MutuallyExclusiveOption +from .options import CustomOption # Constants AUTH_MANAGER_MAPPING = { @@ -42,7 +41,16 @@ def parse_channel(ctx, param, value): return Channel(value) -def get_auth_manager(options) -> tuple[str, AuthManager]: +class ExtraContext: + """ + Used to provide more information about the running environment + """ + + def __init__(self): + self.used_options = set() + + +def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]: """ Based on CLI options provided, return the correct auth manager to use. """ @@ -58,14 +66,14 @@ def get_auth_manager(options) -> tuple[str, AuthManager]: # we use http basic auth when "username" or "password" are present if ( - "username" in option_tracker.options_used - or "password" in option_tracker.options_used + "username" in extra_context.used_options + or "password" in extra_context.used_options ): auth_manager = basic_auth_manager auth_type = HTTP_BASIC_AUTH_NAME # we use token auth when "token" is present - elif "token" in option_tracker.options_used: + elif "token" in extra_context.used_options: auth_manager = token_auth_manager auth_type = TOKEN_NAME @@ -87,10 +95,12 @@ def get_channel_settings(channel: str) -> MutableMapping[str, str] | None: @click.group("auth") -def group(): +@click.pass_context +def group(ctx): """ Commands for handling authentication within conda """ + ctx.obj = ExtraContext() def auth_wrapper(args): @@ -98,66 +108,38 @@ def auth_wrapper(args): group(args=args, prog_name="conda auth", standalone_mode=True) -class OptionTracker: - """ - Used to track whether the option was actually provided when command - was issued - """ - - def __init__(self): - self.options_used = set() - - def track_callback(self, ctx, param, value): - """ - Callback used to see if the option was provided - - This is also converts the ``OPTION_DEFAULT`` value to ``None`` - """ - for opt in param.opts: - if opt in sys.argv: - self.options_used.add(param.name) - break - - value = value if value != OPTION_DEFAULT else None - - return value - - -option_tracker = OptionTracker() - - @group.command("login") @click.option( "-u", "--username", help="Username to use for private channels using HTTP Basic Authentication", - cls=MutuallyExclusiveOption, - is_flag=False, - flag_value=OPTION_DEFAULT, + cls=CustomOption, + prompt=True, + prompt_required=False, mutually_exclusive=("token",), - callback=option_tracker.track_callback, ) @click.option( "-p", "--password", help="Password to use for private channels using HTTP Basic Authentication", - cls=MutuallyExclusiveOption, + cls=CustomOption, + prompt=True, + hide_input=True, mutually_exclusive=("token",), - callback=option_tracker.track_callback, + prompt_when="username", ) @click.option( "-t", "--token", help="Token to use for private channels using an API token", - is_flag=False, - flag_value=OPTION_DEFAULT, - cls=MutuallyExclusiveOption, + prompt=True, + prompt_required=False, + cls=CustomOption, mutually_exclusive=("username", "password"), - callback=option_tracker.track_callback, ) @click.argument("channel", callback=parse_channel) -@click.pass_context -def login(ctx, channel: Channel, **kwargs): +@click.pass_obj +def login(extra_context: ExtraContext, channel: Channel, **kwargs): """ Log in to a channel by storing the credentials or tokens associated with it """ @@ -165,7 +147,7 @@ def login(ctx, channel: Channel, **kwargs): settings = get_channel_settings(channel.canonical_name) or {} settings.update(kwargs) - auth_type, auth_manager = get_auth_manager(settings) + auth_type, auth_manager = get_auth_manager(settings, extra_context) username: str | None = auth_manager.store(channel, settings) click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR)) @@ -182,7 +164,8 @@ def login(ctx, channel: Channel, **kwargs): @group.command("logout") @click.argument("channel", callback=parse_channel) -def logout(channel: Channel): +@click.pass_obj +def logout(extra_context: ExtraContext, channel: Channel): """ Log out of a by removing any credentials or tokens associated with it. """ @@ -191,7 +174,7 @@ def logout(channel: Channel): if settings is None: raise CondaAuthError("Unable to find information about logged in session.") - auth_type, auth_manager = get_auth_manager(settings) + auth_type, auth_manager = get_auth_manager(settings, extra_context) auth_manager.remove_secret(channel, settings) click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR)) diff --git a/conda_auth/handlers/basic_auth.py b/conda_auth/handlers/basic_auth.py index ee1d853..b858b9c 100644 --- a/conda_auth/handlers/basic_auth.py +++ b/conda_auth/handlers/basic_auth.py @@ -3,7 +3,6 @@ """ from __future__ import annotations -from getpass import getpass from collections.abc import Mapping import keyring @@ -44,7 +43,7 @@ def _fetch_secret( Gets the secrets by checking the keyring and then falling back to interrupting the program and asking the user for the credentials. """ - username = self.get_username(settings, channel) + username = self.get_username(settings) password = self.get_password(username, settings, channel) return username, password @@ -53,7 +52,7 @@ def remove_secret( self, channel: Channel, settings: Mapping[str, str | None] ) -> None: keyring_id = self.get_keyring_id(channel) - username = self.get_username(settings, channel) + username = self.get_username(settings) try: keyring.delete_password(keyring_id, username) @@ -66,27 +65,14 @@ def get_auth_type(self) -> str: def get_config_parameters(self) -> tuple[str, ...]: return USERNAME_PARAM_NAME, PASSWORD_PARAM_NAME - def prompt_password(self) -> str: - """ - This can be overriden for classes that do not want to use the ``getpass`` module. - """ - return getpass() - - def prompt_username(self, channel: Channel) -> str: - """ - This can be overriden for classes that do not want to use the built-in function ``input``. - """ - print(f"Please provide credentials for channel: {channel.canonical_name}") - return input("Username: ") - - def get_username(self, settings: Mapping[str, str | None], channel: Channel): + def get_username(self, settings: Mapping[str, str | None]): """ Attempts to find username in settings and falls back to prompting user for it if not found. """ username = settings.get(USERNAME_PARAM_NAME) if username is None: - username = self.prompt_username(channel) + raise CondaAuthError("Username not found") return username @@ -102,7 +88,7 @@ def get_password( if password is None: password = settings.get(PASSWORD_PARAM_NAME) if password is None: - password = self.prompt_password() + raise CondaAuthError("Password not found") return password diff --git a/conda_auth/handlers/token.py b/conda_auth/handlers/token.py index 4e61118..ae2f663 100644 --- a/conda_auth/handlers/token.py +++ b/conda_auth/handlers/token.py @@ -73,16 +73,10 @@ def get_token(self, settings: Mapping[str, str | None]): token = settings.get(TOKEN_PARAM_NAME) if token is None: - token = self.prompt_token() + raise CondaAuthError("Token not found") return token - def prompt_token(self) -> str: - """ - This can be overriden for classes that do not want to use the built-in function ``input``. - """ - return input("Token: ") - def get_auth_class(self) -> type: return TokenAuthHandler diff --git a/conda_auth/options.py b/conda_auth/options.py index 2e1860c..62f9ed3 100644 --- a/conda_auth/options.py +++ b/conda_auth/options.py @@ -4,9 +4,19 @@ import click -class MutuallyExclusiveOption(click.Option): +class CustomOption(click.Option): + """ + Custom option that does the following things: + + - Allows you to define a "mutually_exclusive" tuple so certain options cannot be passed + together + - If ``prompt=True`` is set, can optionally control it to be prompted only in the presence of + other options via ``prompt_when`` + - Adds options which have been passed to ``ctx.obj.used_options`` + """ def __init__(self, *args, **kwargs): self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) + self.prompt_when = kwargs.pop("prompt_when", None) help_message = kwargs.get("help", "") if self.mutually_exclusive: @@ -18,6 +28,12 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def handle_parse_result(self, ctx, opts, args): + if self.name in opts: + ctx.obj.used_options.add(self.name) + + if self.prompt_when is not None and self.prompt_when not in opts: + return None, args + if self.mutually_exclusive.intersection(opts) and self.name in opts: mutually_exclusive = ", ".join( f'"{option}"' for option in self.mutually_exclusive diff --git a/tests/handlers/test_basic_auth.py b/tests/handlers/test_basic_auth.py index a16193c..29ed576 100644 --- a/tests/handlers/test_basic_auth.py +++ b/tests/handlers/test_basic_auth.py @@ -27,46 +27,33 @@ def test_basic_auth_manager_no_previous_secret(keyring, getpass): Test to make sure when there is no password set, we are able to set a new password via the ``getpass`` function. """ - secret = "secret" settings = { "username": "admin", } channel = Channel("tester") # setup mocks - getpass_mock = getpass(secret) keyring(None) # run code under test - manager.store(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: ("admin", secret)} - getpass_mock.assert_called_once() + with pytest.raises(CondaAuthError, match="Password not found"): + manager.store(channel, settings) def test_basic_auth_manager_no_secret_or_username(mocker, keyring, getpass): """ - Test to make sure when there is no password or username set, we are able to provide a - password via the ``getpass`` function and a username via the ``input`` function. + Test to make sure when there is no password or username set, we raise the correct + exception. """ - username = "admin" - secret = "secret" settings = {} channel = Channel("tester") # setup mocks - input_mock = mocker.patch("conda_auth.handlers.basic_auth.input") - input_mock.return_value = username - getpass_mock = getpass(secret) keyring(None) # run code under test - manager.store(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: (username, secret)} - getpass_mock.assert_called_once() + with pytest.raises(CondaAuthError, match="Username not found"): + manager.store(channel, settings) def test_basic_auth_manager_with_previous_secret(keyring, getpass): @@ -140,25 +127,18 @@ def test_basic_auth_manager_remove_existing_secret(keyring): def test_basic_auth_manager_remove_existing_secret_no_username(mocker, keyring): """ - Test to make sure that removing a password that exist works when no username - is present in the settings file. + Test to make sure that when removing a password that exist it fails when no username is present """ secret = "secret" - username = "username" settings = {} channel = Channel("tester") # setup mocks - keyring_mocks = keyring(secret) - input_mock = mocker.patch("conda_auth.handlers.basic_auth.input") - input_mock.return_value = username + keyring(secret) # run code under test - manager.remove_secret(channel, settings) - - # make assertions - input_mock.assert_called_once() - keyring_mocks.basic.delete_password.assert_called_once() + with pytest.raises(CondaAuthError, match="Username not found"): + manager.remove_secret(channel, settings) def test_basic_auth_manager_remove_non_existing_secret(keyring): diff --git a/tests/handlers/test_token.py b/tests/handlers/test_token.py index 3ccd7e5..7bfa6f1 100644 --- a/tests/handlers/test_token.py +++ b/tests/handlers/test_token.py @@ -35,8 +35,7 @@ def test_is_anaconda_dot_org(channel_name, expected): def test_token_auth_manager_no_token(mocker, keyring): """ - Test to make sure when there is no token set, we are able to set a new token via the ``input`` - function. + Test to make sure when there is no token set, an exception is raised """ token = "token" settings = {} @@ -48,10 +47,8 @@ def test_token_auth_manager_no_token(mocker, keyring): keyring(None) # run code under test - manager.store(channel, settings) - - # make assertions - assert manager._cache == {channel.canonical_name: (USERNAME, token)} + with pytest.raises(CondaAuthError, match="Token not found"): + manager.store(channel, settings) def test_token_auth_manager_with_token(keyring): From 61734cd8d4bf58734b5abb2599bf5203ae14c404 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Thu, 12 Oct 2023 18:52:42 +0200 Subject: [PATCH 09/11] fixing tests --- tests/cli/test_login.py | 8 -------- tests/conftest.py | 14 -------------- tests/handlers/test_basic_auth.py | 14 +++++--------- 3 files changed, 5 insertions(+), 31 deletions(-) diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 7a885fb..082d1d4 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -17,8 +17,6 @@ def test_login_no_options_basic_auth(mocker, runner, keyring, condarc): mock_context.channel_settings = [ {"channel": channel_name, "auth": HTTP_BASIC_AUTH_NAME, "username": "user"} ] - mock_getpass = mocker.patch("conda_auth.handlers.basic_auth.getpass") - mock_getpass.return_value = secret # run command result = runner.invoke(group, ["login", channel_name]) @@ -74,13 +72,10 @@ def test_login_with_non_existent_channel(mocker, runner, keyring, condarc): Runs the login command for a channel that is not present in the settings file """ channel_name = "tester" - secret = "password" # setup mocks mock_context = mocker.patch("conda_auth.cli.context") mock_context.channel_settings = [] - mock_getpass = mocker.patch("conda_auth.handlers.basic_auth.getpass") - mock_getpass.return_value = secret keyring(None) # run command @@ -98,13 +93,10 @@ def test_login_succeeds_error_returned_when_updating_condarc( the condarc file. """ channel_name = "tester" - secret = "password" # setup mocks mock_context = mocker.patch("conda_auth.cli.context") mock_context.channel_settings = [] - mock_getpass = mocker.patch("conda_auth.handlers.basic_auth.getpass") - mock_getpass.return_value = secret keyring(None) condarc().save.side_effect = CondaRCError("Could not save file") diff --git a/tests/conftest.py b/tests/conftest.py index cabd251..1375afe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,20 +30,6 @@ def _keyring(secret): return _keyring -@pytest.fixture -def getpass(mocker): - """ - Used to return a factor function to configure the value that getpass returns - """ - - def _getpass(secret): - getpass_mock = mocker.patch("conda_auth.handlers.basic_auth.getpass") - getpass_mock.return_value = secret - return getpass_mock - - return _getpass - - @pytest.fixture def runner(): """ diff --git a/tests/handlers/test_basic_auth.py b/tests/handlers/test_basic_auth.py index 29ed576..3963e0e 100644 --- a/tests/handlers/test_basic_auth.py +++ b/tests/handlers/test_basic_auth.py @@ -22,7 +22,7 @@ def clean_up_manager_cache(): manager.cache_clear() -def test_basic_auth_manager_no_previous_secret(keyring, getpass): +def test_basic_auth_manager_no_previous_secret(keyring): """ Test to make sure when there is no password set, we are able to set a new password via the ``getpass`` function. @@ -40,7 +40,7 @@ def test_basic_auth_manager_no_previous_secret(keyring, getpass): manager.store(channel, settings) -def test_basic_auth_manager_no_secret_or_username(mocker, keyring, getpass): +def test_basic_auth_manager_no_secret_or_username(keyring): """ Test to make sure when there is no password or username set, we raise the correct exception. @@ -56,7 +56,7 @@ def test_basic_auth_manager_no_secret_or_username(mocker, keyring, getpass): manager.store(channel, settings) -def test_basic_auth_manager_with_previous_secret(keyring, getpass): +def test_basic_auth_manager_with_previous_secret(keyring): """ Test to make sure when there is a password set, we retrieve it and set the cache object appropriately. @@ -68,7 +68,6 @@ def test_basic_auth_manager_with_previous_secret(keyring, getpass): channel = Channel("tester") # setup mocks - getpass_mock = getpass(secret) keyring(secret) # run code under test @@ -76,10 +75,9 @@ def test_basic_auth_manager_with_previous_secret(keyring, getpass): # make assertions assert manager._cache == {channel.canonical_name: ("admin", secret)} - getpass_mock.assert_not_called() -def test_basic_auth_manager_cache_exists(keyring, getpass): +def test_basic_auth_manager_cache_exists(keyring): """ Test to make sure that everything works as expected when a cache entry already exists for a credential set. @@ -93,7 +91,6 @@ def test_basic_auth_manager_cache_exists(keyring, getpass): manager._cache = {channel.canonical_name: (username, secret)} # setup mocks - getpass_mock = getpass(secret) keyring_mock = keyring(secret) # run code under test @@ -101,7 +98,6 @@ def test_basic_auth_manager_cache_exists(keyring, getpass): # make assertions assert manager._cache == {channel.canonical_name: (username, secret)} - getpass_mock.assert_not_called() keyring_mock.basic.get_password.assert_not_called() @@ -125,7 +121,7 @@ def test_basic_auth_manager_remove_existing_secret(keyring): keyring_mocks.basic.delete_password.assert_called_once() -def test_basic_auth_manager_remove_existing_secret_no_username(mocker, keyring): +def test_basic_auth_manager_remove_existing_secret_no_username(keyring): """ Test to make sure that when removing a password that exist it fails when no username is present """ From edee5a2d7a4b20586cabd9c7fc1715431e6e53b5 Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Wed, 18 Oct 2023 12:01:11 +0200 Subject: [PATCH 10/11] hopefully the finishing touches for a work version of token based authentication --- conda_auth/cli.py | 35 ++++++++++------ conda_auth/handlers/basic_auth.py | 15 ++++--- conda_auth/handlers/token.py | 28 +++++-------- conda_auth/hooks.py | 14 ++++++- docs/index.md | 23 ++++++++-- docs/user/index.md | 29 +++++++++++-- tests/cli/test_login.py | 70 +++++++++++++------------------ tests/test_hooks.py | 4 +- 8 files changed, 131 insertions(+), 87 deletions(-) diff --git a/conda_auth/cli.py b/conda_auth/cli.py index dc5288e..9c68471 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -29,6 +29,8 @@ SUCCESSFUL_COLOR = "green" +FAILURE_COLOR = "red" + VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys()) OPTION_DEFAULT = "CONDA_AUTH_DEFAULT" @@ -65,10 +67,7 @@ def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthMan return auth_type, auth_manager # we use http basic auth when "username" or "password" are present - if ( - "username" in extra_context.used_options - or "password" in extra_context.used_options - ): + if "basic" in extra_context.used_options: auth_manager = basic_auth_manager auth_type = HTTP_BASIC_AUTH_NAME @@ -77,10 +76,15 @@ def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthMan auth_manager = token_auth_manager auth_type = TOKEN_NAME - # default authentication handler + # raise error if authentication type not found else: - auth_manager = basic_auth_manager - auth_type = HTTP_BASIC_AUTH_NAME + raise CondaAuthError( + click.style( + "Please specify an authentication type to use" + " with either the `--basic` or `--token` options.", + fg=FAILURE_COLOR, + ) + ) return auth_type, auth_manager @@ -109,14 +113,15 @@ def auth_wrapper(args): @group.command("login") +@click.argument("channel", callback=parse_channel) @click.option( "-u", "--username", help="Username to use for private channels using HTTP Basic Authentication", cls=CustomOption, prompt=True, - prompt_required=False, mutually_exclusive=("token",), + prompt_when="basic", ) @click.option( "-p", @@ -126,7 +131,7 @@ def auth_wrapper(args): prompt=True, hide_input=True, mutually_exclusive=("token",), - prompt_when="username", + prompt_when="basic", ) @click.option( "-t", @@ -137,15 +142,19 @@ def auth_wrapper(args): cls=CustomOption, mutually_exclusive=("username", "password"), ) -@click.argument("channel", callback=parse_channel) +@click.option( + "-b", + "--basic", + is_flag=True, + cls=CustomOption, + help="Save login credentials as HTTP basic authentication", +) @click.pass_obj def login(extra_context: ExtraContext, channel: Channel, **kwargs): """ Log in to a channel by storing the credentials or tokens associated with it """ - kwargs = {key: val for key, val in kwargs.items() if val is not None} - settings = get_channel_settings(channel.canonical_name) or {} - settings.update(kwargs) + settings = {key: val for key, val in kwargs.items() if val is not None} auth_type, auth_manager = get_auth_manager(settings, extra_context) username: str | None = auth_manager.store(channel, settings) diff --git a/conda_auth/handlers/basic_auth.py b/conda_auth/handlers/basic_auth.py index b858b9c..24f86a5 100644 --- a/conda_auth/handlers/basic_auth.py +++ b/conda_auth/handlers/basic_auth.py @@ -16,17 +16,17 @@ from ..exceptions import CondaAuthError from .base import AuthManager -USERNAME_PARAM_NAME = "username" +USERNAME_PARAM_NAME: str = "username" """ Name of the configuration parameter where username information is stored """ -PASSWORD_PARAM_NAME = "password" +PASSWORD_PARAM_NAME: str = "password" """ Name of the configuration parameter where password information is stored """ -HTTP_BASIC_AUTH_NAME = "http-basic" +HTTP_BASIC_AUTH_NAME: str = "http-basic" """ Name used to refer to this authentication handler in configuration """ @@ -82,11 +82,14 @@ def get_password( """ Attempts to get password and falls back to prompting the user for it if not found. """ - keyring_id = self.get_keyring_id(channel) - password = keyring.get_password(keyring_id, username) + # First see if a value has been passed in + password = settings.get(PASSWORD_PARAM_NAME) + # Now try retrieving it from the password manager if password is None: - password = settings.get(PASSWORD_PARAM_NAME) + keyring_id = self.get_keyring_id(channel) + password = keyring.get_password(keyring_id, username) + if password is None: raise CondaAuthError("Password not found") diff --git a/conda_auth/handlers/token.py b/conda_auth/handlers/token.py index ae2f663..5831a03 100644 --- a/conda_auth/handlers/token.py +++ b/conda_auth/handlers/token.py @@ -15,17 +15,17 @@ from ..exceptions import CondaAuthError from .base import AuthManager -TOKEN_PARAM_NAME = "token" +TOKEN_PARAM_NAME: str = "token" """ Name of the configuration parameter where token information is stored """ -USERNAME = "token" +USERNAME: str = "token" """ Placeholder value for username; This is written to the secret storage backend """ -TOKEN_NAME = "token" +TOKEN_NAME: str = "token" """ Name used to refer to this authentication handler in configuration """ @@ -42,11 +42,16 @@ def _fetch_secret( Gets the secrets by checking the keyring and then falling back to interrupting the program and asking the user for secret. """ - keyring_id = self.get_keyring_id(channel) - token = keyring.get_password(keyring_id, USERNAME) + # First tried the value we passed in + token = settings.get(TOKEN_PARAM_NAME) if token is None: - token = self.get_token(settings) + # Try password manager if there was nothing there + keyring_id = self.get_keyring_id(channel) + token = keyring.get_password(keyring_id, USERNAME) + + if token is None: + raise CondaAuthError("Token not found") return USERNAME, token @@ -66,17 +71,6 @@ def get_auth_type(self) -> str: def get_config_parameters(self) -> tuple[str, ...]: return (TOKEN_PARAM_NAME,) - def get_token(self, settings: Mapping[str, str | None]): - """ - Attempt to first retrieve token from settings and then prompt the user for it. - """ - token = settings.get(TOKEN_PARAM_NAME) - - if token is None: - raise CondaAuthError("Token not found") - - return token - def get_auth_class(self) -> type: return TokenAuthHandler diff --git a/conda_auth/hooks.py b/conda_auth/hooks.py index a6545af..188fce2 100644 --- a/conda_auth/hooks.py +++ b/conda_auth/hooks.py @@ -1,6 +1,7 @@ """ A place to register plugin hooks """ +from conda.cli.conda_argparse import BUILTIN_COMMANDS from conda.plugins import CondaAuthHandler, CondaPreCommand, CondaSubcommand, hookimpl from .handlers import ( @@ -14,6 +15,15 @@ from .cli import auth_wrapper from .constants import PLUGIN_NAME +ENV_COMMANDS = { + "env_config", + "env_create", + "env_export", + "env_list", + "env_remove", + "env_update", +} + @hookimpl def conda_subcommands(): @@ -33,12 +43,12 @@ def conda_pre_commands(): yield CondaPreCommand( name=f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}", action=basic_auth_manager.hook_action, - run_for={"search", "install", "update", "notices", "create", "search"}, + run_for=BUILTIN_COMMANDS.union(ENV_COMMANDS), ) yield CondaPreCommand( name=f"{PLUGIN_NAME}-{TOKEN_NAME}", action=token_auth_manager.hook_action, - run_for={"search", "install", "update", "notices", "create", "search"}, + run_for=BUILTIN_COMMANDS.union(ENV_COMMANDS), ) diff --git a/docs/index.md b/docs/index.md index 9d1e0bf..6063328 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,7 +10,7 @@ Developer Guide Conda auth is a conda plugin which adds more secure authentication support to conda. -Once installed, it provides two new commands you can use to manage credentials and +Once installed, it provides two new commands you can use to manage credentials and access private conda channels: - `conda auth login` for logging into a private channel and storing your credentials @@ -27,13 +27,28 @@ conda install --name base --channel conda-forge conda-auth ## Usage -**Log in** to a channel protected by HTTP basic authentication: +
HTTP basic authentication:
``` -conda auth login https://example.com/my-protected-channel --username $USERNAME +conda auth login https://example.com/my-protected-channel --basic ``` -**Log out** of a channel to remove credentials from your computer: +
Token authentication:
+ + +``` +conda auth login example --token +``` + +*The above example by default would authenticate for a channel at [anaconda.org](https://anaconda.org).* + +
Token authentication using other channels:
+ +``` +conda auth login https://example.com/my-protected-channel --token +``` + +
Removing credentials from your computer:
``` conda auth logout https://example.com/my-protected-channel diff --git a/docs/user/index.md b/docs/user/index.md index fa6f3f4..fe04236 100644 --- a/docs/user/index.md +++ b/docs/user/index.md @@ -15,12 +15,12 @@ conda install -c conda-forge conda-auth Once installed the plugin makes two new commands available: `conda auth login` and `conda auth logout`. The plugin supports various types of authentication schemes. Read below to learn how to use each. -### Logging in to a channel using HTTP basic authentication +### HTTP basic authentication To log in to a channel using HTTP basic authentication, run the following command: ``` -conda auth login +conda auth login --basic ``` Once this has been run, you will be prompted for your username and password. @@ -28,7 +28,30 @@ Once this has been run, you will be prompted for your username and password. You also have the ability to specify username and password as command options: ``` -conda auth login --username $USERNAME --password $PASSWORD +conda auth login --basic --username $USERNAME --password $PASSWORD +``` + +### Token authentication + +The following examples are for authenticating with channels using token based authentication. + +For [anaconda.org](https://anaconda.org) channels: + +``` +conda auth login --token +``` + +You will then be prompted for your token. Optionally, you can specify the token value as +an option: + +``` +conda auth login --token +``` + +For other channels not hosted at anaconda.org, use the full URL of the channel: + +``` +conda auth login https://example.com/my-protected-channel --token ``` ### Logging out of a channel diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 082d1d4..bc608f5 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,61 +1,54 @@ from conda_auth.cli import group, SUCCESSFUL_LOGIN_MESSAGE from conda_auth.condarc import CondaRCError -from conda_auth.handlers.basic_auth import HTTP_BASIC_AUTH_NAME from conda_auth.exceptions import CondaAuthError -def test_login_no_options_basic_auth(mocker, runner, keyring, condarc): +def test_login_basic_auth_no_options(runner, keyring, condarc): """ Runs the login command with no additional CLI options defined (e.g. --username) """ + username = "user" secret = "password" channel_name = "tester" # setup mocks - keyring(secret) - mock_context = mocker.patch("conda_auth.cli.context") - mock_context.channel_settings = [ - {"channel": channel_name, "auth": HTTP_BASIC_AUTH_NAME, "username": "user"} - ] + keyring(None) # run command - result = runner.invoke(group, ["login", channel_name]) + result = runner.invoke( + group, ["login", channel_name, "--basic"], input=f"{username}\n{secret}" + ) assert result.exit_code == 0 assert SUCCESSFUL_LOGIN_MESSAGE in result.output -def test_login_with_options_basic_auth(mocker, runner, keyring, condarc): +def test_login_with_options_basic_auth(runner, keyring, condarc): """ Runs the login command with CLI options defined (e.g. --username) """ channel_name = "tester" # setup mocks - mock_context = mocker.patch("conda_auth.cli.context") - mock_context.channel_settings = [] keyring(None) # run command result = runner.invoke( - group, ["login", channel_name, "--username", "test", "--password", "test"] + group, + ["login", channel_name, "--basic", "--username", "test", "--password", "test"], ) assert result.exit_code == 0 assert SUCCESSFUL_LOGIN_MESSAGE in result.output -def test_login_with_invalid_auth_type(mocker, runner, keyring, condarc): +def test_login_with_invalid_auth_type(runner, keyring, condarc): """ Runs the login command when there is an invalid auth type configured in settings """ channel_name = "tester" # setup mocks - mock_context = mocker.patch("conda_auth.cli.context") - mock_context.channel_settings = [ - {"channel": channel_name, "auth": "does-not-exist"} - ] keyring(None) # run command @@ -64,33 +57,33 @@ def test_login_with_invalid_auth_type(mocker, runner, keyring, condarc): assert result.exit_code == 1 assert exc_type == CondaAuthError - assert "Invalid authentication type." in exception.message + assert "Please specify an authentication type" in exception.message -def test_login_with_non_existent_channel(mocker, runner, keyring, condarc): +def test_login_succeeds_error_returned_when_updating_condarc(runner, keyring, condarc): """ - Runs the login command for a channel that is not present in the settings file + Test the case where the login runs successfully but an error is returned when trying to update + the condarc file. """ channel_name = "tester" # setup mocks - mock_context = mocker.patch("conda_auth.cli.context") - mock_context.channel_settings = [] keyring(None) + condarc().save.side_effect = CondaRCError("Could not save file") # run command - result = runner.invoke(group, ["login", channel_name], input="user") + result = runner.invoke( + group, ["login", channel_name, "--basic"], input="user\npassword" + ) + exc_type, exception, _ = result.exc_info - assert result.exit_code == 0 - assert SUCCESSFUL_LOGIN_MESSAGE in result.output + assert exc_type == CondaAuthError + assert "Could not save file" == exception.message -def test_login_succeeds_error_returned_when_updating_condarc( - mocker, runner, keyring, condarc -): +def test_login_token(mocker, runner, keyring, condarc): """ - Test the case where the login runs successfully but an error is returned when trying to update - the condarc file. + Test successful login with token """ channel_name = "tester" @@ -98,27 +91,22 @@ def test_login_succeeds_error_returned_when_updating_condarc( mock_context = mocker.patch("conda_auth.cli.context") mock_context.channel_settings = [] keyring(None) - condarc().save.side_effect = CondaRCError("Could not save file") - # run command - result = runner.invoke(group, ["login", channel_name], input="user") - exc_type, exception, _ = result.exc_info + result = runner.invoke(group, ["login", channel_name, "--token", "token"]) - assert exc_type == CondaAuthError - assert "Could not save file" == exception.message + assert result.exit_code == 0 -def test_login_with_token(mocker, runner, keyring, condarc): +def test_login_token_no_options(runner, keyring, condarc): """ - Test successful login with token + Test successful login with token without the value being supplied at the command line """ channel_name = "tester" # setup mocks - mock_context = mocker.patch("conda_auth.cli.context") - mock_context.channel_settings = [] keyring(None) - result = runner.invoke(group, ["login", channel_name, "--token", "token"]) + result = runner.invoke(group, ["login", channel_name, "--token"], input="token\n") assert result.exit_code == 0 + assert SUCCESSFUL_LOGIN_MESSAGE in result.output diff --git a/tests/test_hooks.py b/tests/test_hooks.py index 5a9f04b..015bd1c 100644 --- a/tests/test_hooks.py +++ b/tests/test_hooks.py @@ -1,3 +1,5 @@ +from conda.cli.conda_argparse import BUILTIN_COMMANDS + from conda_auth import hooks from conda_auth.constants import PLUGIN_NAME from conda_auth.handlers import ( @@ -24,7 +26,7 @@ def test_conda_pre_commands_hook(): """ objs = list(hooks.conda_pre_commands()) - run_for = {"search", "install", "update", "notices", "create", "search"} + run_for = BUILTIN_COMMANDS.union(hooks.ENV_COMMANDS) assert objs[0].name == f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}" assert objs[0].run_for == run_for From 44a3aef0cb05f30b29cd66ac89164002ce82a07b Mon Sep 17 00:00:00 2001 From: Travis Hathaway Date: Fri, 20 Oct 2023 08:07:22 +0200 Subject: [PATCH 11/11] Update conda_auth/cli.py Co-authored-by: Bianca Henderson --- conda_auth/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda_auth/cli.py b/conda_auth/cli.py index 9c68471..e452b70 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -176,7 +176,7 @@ def login(extra_context: ExtraContext, channel: Channel, **kwargs): @click.pass_obj def logout(extra_context: ExtraContext, channel: Channel): """ - Log out of a by removing any credentials or tokens associated with it. + Log out of a channel by removing any credentials or tokens associated with it. """ settings = get_channel_settings(channel.canonical_name)