diff --git a/Makefile b/Makefile index aa45ecb..f2476d0 100644 --- a/Makefile +++ b/Makefile @@ -63,8 +63,7 @@ run: $(VENV)/pyvenv.cfg .PHONY: lint lint: $(VENV)/pyvenv.cfg . $(VENV_BIN)/activate && \ - black --check $(ALL_PY_SRCS) && \ - isort --check $(ALL_PY_SRCS) && \ + ruff format --check $(ALL_PY_SRCS) && \ ruff $(ALL_PY_SRCS) && \ mypy $(PY_MODULE) && \ bandit -c pyproject.toml -r $(PY_MODULE) && \ @@ -74,8 +73,7 @@ lint: $(VENV)/pyvenv.cfg reformat: $(VENV)/pyvenv.cfg . $(VENV_BIN)/activate && \ ruff --fix $(ALL_PY_SRCS) && \ - black $(ALL_PY_SRCS) && \ - isort $(ALL_PY_SRCS) + ruff format $(ALL_PY_SRCS) .PHONY: test test: $(VENV)/pyvenv.cfg diff --git a/id/__init__.py b/id/__init__.py index 4439bc2..9baa0de 100644 --- a/id/__init__.py +++ b/id/__init__.py @@ -59,6 +59,7 @@ def detect_credential(audience: str) -> Optional[str]: """ from ._internal.oidc.ambient import ( detect_buildkite, + detect_circleci, detect_gcp, detect_github, detect_gitlab, @@ -69,6 +70,7 @@ def detect_credential(audience: str) -> Optional[str]: detect_gcp, detect_buildkite, detect_gitlab, + detect_circleci, ] for detector in detectors: credential = detector(audience) diff --git a/id/__main__.py b/id/__main__.py index c9fe5ab..891bee1 100644 --- a/id/__main__.py +++ b/id/__main__.py @@ -36,9 +36,7 @@ def _parser() -> argparse.ArgumentParser: description="a tool for generating OIDC identities", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument( - "-V", "--version", action="version", version=f"%(prog)s {__version__}" - ) + parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") parser.add_argument( "-v", "--verbose", diff --git a/id/_internal/oidc/ambient.py b/id/_internal/oidc/ambient.py index eadfb81..e12d68f 100644 --- a/id/_internal/oidc/ambient.py +++ b/id/_internal/oidc/ambient.py @@ -16,6 +16,7 @@ Ambient OIDC credential detection. """ +import json import logging import os import re @@ -31,9 +32,15 @@ logger = logging.getLogger(__name__) _GCP_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name" -_GCP_TOKEN_REQUEST_URL = "http://metadata/computeMetadata/v1/instance/service-accounts/default/token" # noqa # nosec B105 -_GCP_IDENTITY_REQUEST_URL = "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity" # noqa -_GCP_GENERATEIDTOKEN_REQUEST_URL = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken" # noqa +_GCP_TOKEN_REQUEST_URL = ( + "http://metadata/computeMetadata/v1/instance/service-accounts/default/token" # noqa # nosec B105 +) +_GCP_IDENTITY_REQUEST_URL = ( + "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity" # noqa +) +_GCP_GENERATEIDTOKEN_REQUEST_URL = ( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken" # noqa +) _env_var_regex = re.compile(r"[^A-Z0-9_]|^[^A-Z_]") @@ -178,15 +185,11 @@ def detect_gcp(audience: str) -> Optional[str]: with open(_GCP_PRODUCT_NAME_FILE) as f: name = f.read().strip() except OSError: - logger.debug( - "GCP: environment doesn't have GCP product name file; giving up" - ) + logger.debug("GCP: environment doesn't have GCP product name file; giving up") return None if name not in {"Google", "Google Compute Engine"}: - logger.debug( - f"GCP: product name file exists, but product name is {name!r}; giving up" - ) + logger.debug(f"GCP: product name file exists, but product name is {name!r}; giving up") return None logger.debug("GCP: requesting OIDC token") @@ -292,9 +295,43 @@ def detect_gitlab(audience: str) -> Optional[str]: var_name = f"{sanitized_audience}_ID_TOKEN" token = os.getenv(var_name) if not token: - raise AmbientCredentialError( - f"GitLab: Environment variable {var_name} not found" - ) + raise AmbientCredentialError(f"GitLab: Environment variable {var_name} not found") logger.debug(f"GitLab: Found token in environment variable {var_name}") return token + + +def detect_circleci(audience: str) -> Optional[str]: + """ + Detect and return a CircleCI ambient OIDC credential. + + Returns `None` if the context is not a CircleCI environment. + + Raises if the environment is GitHub Actions, but is incorrect or + insufficiently permissioned for an OIDC credential. + """ + logger.debug("CircleCI: looking for OIDC credentials") + + if not os.getenv("CIRCLECI"): + logger.debug("CircleCI: environment doesn't look like CircleCI; giving up") + return None + + # Check that the circleci executable exists in the `PATH`. + if shutil.which("circleci") is None: + raise AmbientCredentialError("CircleCI: could not find `circleci` in the environment") + + # See NOTE on `detect_buildkite` for why we silence these warnings. + payload = json.dumps({"aud": audience}) + process = subprocess.run( # nosec B603, B607 + ["circleci", "run", "oidc", "get", "--claims", payload], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + if process.returncode != 0: + raise AmbientCredentialError( + f"CircleCI: the `circleci` tool encountered an error: {process.stdout}" + ) + + return process.stdout.strip() diff --git a/pyproject.toml b/pyproject.toml index 95a9b76..b58b75a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,8 +34,6 @@ Source = "https://github.com/di/id" test = ["pytest", "pytest-cov", "pretend", "coverage[toml]"] lint = [ "bandit", - "black", - "isort", "interrogate", "mypy", # NOTE(ww): ruff is under active development, so we pin conservatively here @@ -45,11 +43,6 @@ lint = [ ] dev = ["build", "bump >= 1.3.2", "id[test,lint]"] -[tool.isort] -multi_line_output = 3 -known_first_party = "id" -include_trailing_comma = true - [tool.interrogate] # don't enforce documentation coverage for packaging, testing, the virtual # environment, or the CLI (which is documented separately). @@ -85,4 +78,4 @@ exclude_dirs = ["./test"] line-length = 100 # TODO: Enable "UP" here once Pydantic allows us to: # See: https://github.com/pydantic/pydantic/issues/4146 -select = ["E", "F", "W"] +select = ["I", "E", "F", "W"] diff --git a/test/unit/internal/oidc/test_ambient.py b/test/unit/internal/oidc/test_ambient.py index 7f7c22b..8f82e54 100644 --- a/test/unit/internal/oidc/test_ambient.py +++ b/test/unit/internal/oidc/test_ambient.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import pretend import pytest from requests import HTTPError, Timeout @@ -95,9 +97,7 @@ def test_detect_github_request_fails(monkeypatch): status_code=999, content=b"something", ) - requests = pretend.stub( - get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError - ) + requests = pretend.stub(get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError) monkeypatch.setattr(ambient, "requests", requests) with pytest.raises( @@ -148,9 +148,7 @@ def test_detect_github_bad_payload(monkeypatch): monkeypatch.setenv("ACTIONS_ID_TOKEN_REQUEST_TOKEN", "faketoken") monkeypatch.setenv("ACTIONS_ID_TOKEN_REQUEST_URL", "fakeurl") - resp = pretend.stub( - raise_for_status=lambda: None, json=pretend.call_recorder(lambda: {}) - ) + resp = pretend.stub(raise_for_status=lambda: None, json=pretend.call_recorder(lambda: {})) requests = pretend.stub(get=pretend.call_recorder(lambda url, **kw: resp)) monkeypatch.setattr(ambient, "requests", requests) @@ -195,9 +193,7 @@ def test_detect_github(monkeypatch): def test_gcp_impersonation_access_token_request_fail(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -207,9 +203,7 @@ def test_gcp_impersonation_access_token_request_fail(monkeypatch): status_code=999, content=b"something", ) - requests = pretend.stub( - get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError - ) + requests = pretend.stub(get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError) monkeypatch.setattr(ambient, "requests", requests) with pytest.raises( @@ -226,9 +220,7 @@ def test_gcp_impersonation_access_token_request_fail(monkeypatch): def test_gcp_impersonation_access_token_request_timeout(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -255,9 +247,7 @@ def test_gcp_impersonation_access_token_request_timeout(monkeypatch): def test_gcp_impersonation_access_token_missing(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -280,9 +270,7 @@ def test_gcp_impersonation_access_token_missing(monkeypatch): def test_gcp_impersonation_identity_token_request_fail(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -318,9 +306,7 @@ def test_gcp_impersonation_identity_token_request_fail(monkeypatch): def test_gcp_impersonation_identity_token_request_timeout(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -353,9 +339,7 @@ def test_gcp_impersonation_identity_token_request_timeout(monkeypatch): def test_gcp_impersonation_identity_token_missing(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -387,9 +371,7 @@ def test_gcp_impersonation_identity_token_missing(monkeypatch): def test_gcp_impersonation_succeeds(monkeypatch): - monkeypatch.setenv( - "GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com" - ) + monkeypatch.setenv("GOOGLE_SERVICE_ACCOUNT_NAME", "identity@project.iam.gserviceaccount.com") logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) monkeypatch.setattr(ambient, "logger", logger) @@ -399,9 +381,7 @@ def test_gcp_impersonation_succeeds(monkeypatch): get_resp = pretend.stub( raise_for_status=lambda: None, json=lambda: {"access_token": access_token} ) - post_resp = pretend.stub( - raise_for_status=lambda: None, json=lambda: {"token": oidc_token} - ) + post_resp = pretend.stub(raise_for_status=lambda: None, json=lambda: {"token": oidc_token}) requests = pretend.stub( get=pretend.call_recorder(lambda url, **kw: get_resp), post=pretend.call_recorder(lambda url, **kw: post_resp), @@ -430,9 +410,7 @@ def test_gcp_bad_env(monkeypatch): assert ambient.detect_gcp("some-audience") is None assert logger.debug.calls == [ pretend.call("GCP: looking for OIDC credentials"), - pretend.call( - "GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation" - ), + pretend.call("GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation"), pretend.call("GCP: environment doesn't have GCP product name file; giving up"), ] @@ -451,9 +429,7 @@ def test_gcp_wrong_product(monkeypatch): assert logger.debug.calls == [ pretend.call("GCP: looking for OIDC credentials"), - pretend.call( - "GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation" - ), + pretend.call("GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation"), pretend.call( "GCP: product name file exists, but product name is 'Unsupported Product'; giving up" ), @@ -472,9 +448,7 @@ def test_detect_gcp_request_fails(monkeypatch): status_code=999, content=b"something", ) - requests = pretend.stub( - get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError - ) + requests = pretend.stub(get=pretend.call_recorder(lambda url, **kw: resp), HTTPError=HTTPError) monkeypatch.setattr(ambient, "requests", requests) with pytest.raises( @@ -551,9 +525,7 @@ def test_detect_gcp(monkeypatch, product_name): ] assert logger.debug.calls == [ pretend.call("GCP: looking for OIDC credentials"), - pretend.call( - "GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation" - ), + pretend.call("GCP: GOOGLE_SERVICE_ACCOUNT_NAME not set; skipping impersonation"), pretend.call("GCP: requesting OIDC token"), pretend.call("GCP: successfully requested OIDC token"), ] @@ -580,9 +552,7 @@ def test_buildkite_agent_error(monkeypatch): monkeypatch.setenv("BUILDKITE", "true") # Mock out the `which` call to show that we have a `buildkite-agent` in our `PATH`. - shutil = pretend.stub( - which=pretend.call_recorder(lambda bin: "/usr/bin/buildkite-agent") - ) + shutil = pretend.stub(which=pretend.call_recorder(lambda bin: "/usr/bin/buildkite-agent")) monkeypatch.setattr(ambient, "shutil", shutil) # Mock out `run` call to emulate getting a non-zero return code from the `buildkite-agent`. @@ -590,9 +560,7 @@ def test_buildkite_agent_error(monkeypatch): returncode=-1, stdout="mock error message", ) - subprocess = pretend.stub( - run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None - ) + subprocess = pretend.stub(run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None) monkeypatch.setattr(ambient, "subprocess", subprocess) with pytest.raises( @@ -616,9 +584,7 @@ def test_buildkite(monkeypatch): monkeypatch.setenv("BUILDKITE", "true") # Mock out the `which` call to show that we have a `buildkite-agent` in our `PATH`. - shutil = pretend.stub( - which=pretend.call_recorder(lambda bin: "/usr/bin/buildkite-agent") - ) + shutil = pretend.stub(which=pretend.call_recorder(lambda bin: "/usr/bin/buildkite-agent")) monkeypatch.setattr(ambient, "shutil", shutil) # Mock out `run` call to emulate getting a successful return code from the `buildkite-agent`. @@ -626,9 +592,7 @@ def test_buildkite(monkeypatch): returncode=0, stdout="fakejwt", ) - subprocess = pretend.stub( - run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None - ) + subprocess = pretend.stub(run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None) monkeypatch.setattr(ambient, "subprocess", subprocess) assert ambient.detect_buildkite("some-audience") == "fakejwt" @@ -698,11 +662,98 @@ def test_gitlab(monkeypatch): assert ambient.detect_gitlab("11 other audience") == "fakejwt2" assert logger.debug.calls == [ pretend.call("GitLab: looking for OIDC credentials"), - pretend.call( - "GitLab: Found token in environment variable SOME_AUDIENCE_ID_TOKEN" - ), + pretend.call("GitLab: Found token in environment variable SOME_AUDIENCE_ID_TOKEN"), pretend.call("GitLab: looking for OIDC credentials"), + pretend.call("GitLab: Found token in environment variable _1_OTHER_AUDIENCE_ID_TOKEN"), + ] + + +def test_circleci_bad_env(monkeypatch): + monkeypatch.delenv("CIRCLECI", False) + + logger = pretend.stub(debug=pretend.call_recorder(lambda s: None)) + monkeypatch.setattr(ambient, "logger", logger) + + assert ambient.detect_circleci("some-audience") is None + assert logger.debug.calls == [ + pretend.call("CircleCI: looking for OIDC credentials"), + pretend.call("CircleCI: environment doesn't look like CircleCI; giving up"), + ] + + +def test_circleci_no_circleci_cli(monkeypatch): + monkeypatch.setenv("CIRCLECI", "true") + + # Mock out the `which` call. We don't expect this to exist in the `PATH` but + # just in case someone is running these tests on a Buildkite host... + shutil = pretend.stub(which=pretend.call_recorder(lambda bin: None)) + monkeypatch.setattr(ambient, "shutil", shutil) + + with pytest.raises( + ambient.AmbientCredentialError, + match=r"CircleCI: could not find `circleci` in the environment", + ): + ambient.detect_circleci("some-audience") + + assert shutil.which.calls == [pretend.call("circleci")] + + +def test_circleci_circlecli_error(monkeypatch): + monkeypatch.setenv("CIRCLECI", "true") + + # Mock out the `which` call to show that we have a `circleci` in our `PATH`. + shutil = pretend.stub(which=pretend.call_recorder(lambda bin: "/usr/bin/circleci")) + monkeypatch.setattr(ambient, "shutil", shutil) + + # Mock out `run` call to emulate getting a non-zero return code from the `circleci`. + resp = pretend.stub( + returncode=-1, + stdout="mock error message", + ) + subprocess = pretend.stub(run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None) + monkeypatch.setattr(ambient, "subprocess", subprocess) + payload = json.dumps({"aud": "some-audience"}) + + with pytest.raises( + ambient.AmbientCredentialError, + match=r"CircleCI: the `circleci` tool encountered an error: mock error message", + ): + ambient.detect_circleci("some-audience") + + assert shutil.which.calls == [pretend.call("circleci")] + assert subprocess.run.calls == [ pretend.call( - "GitLab: Found token in environment variable _1_OTHER_AUDIENCE_ID_TOKEN" - ), + ["circleci", "run", "oidc", "get", "--claims", payload], + stdout=None, + stderr=None, + text=True, + ) + ] + + +def test_circleci(monkeypatch): + monkeypatch.setenv("CIRCLECI", "true") + + # Mock out the `which` call to show that we have a `circleci` in our `PATH`. + shutil = pretend.stub(which=pretend.call_recorder(lambda bin: "/usr/bin/circleci")) + monkeypatch.setattr(ambient, "shutil", shutil) + + # Mock out `run` call to emulate getting a successful return code from the `circleci`. + resp = pretend.stub( + returncode=0, + stdout="fakejwt", + ) + subprocess = pretend.stub(run=pretend.call_recorder(lambda run_args, **kw: resp), PIPE=None) + monkeypatch.setattr(ambient, "subprocess", subprocess) + payload = json.dumps({"aud": "some-audience"}) + + assert ambient.detect_circleci("some-audience") == "fakejwt" + assert shutil.which.calls == [pretend.call("circleci")] + assert subprocess.run.calls == [ + pretend.call( + ["circleci", "run", "oidc", "get", "--claims", payload], + stdout=None, + stderr=None, + text=True, + ) ]