Skip to content

Commit

Permalink
Fail if attestations have unsupported predicate type
Browse files Browse the repository at this point in the history
  • Loading branch information
facutuesca committed Jun 20, 2024
1 parent 4075073 commit 45819f3
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 29 deletions.
2 changes: 1 addition & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ rfc3986
sentry-sdk
setuptools
sigstore~=3.0.0
pypi-attestation-models==0.0.4
pypi-attestation-models==0.0.5
sqlalchemy[asyncio]>=2.0,<3.0
stdlib-list
stripe
Expand Down
6 changes: 3 additions & 3 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1736,9 +1736,9 @@ pyparsing==3.1.2 \
--hash=sha256:a1bac0ce561155ecc3ed78ca94d3c9378656ad4c94c1270de543f621420f94ad \
--hash=sha256:f9db75911801ed778fe61bb643079ff86601aca99fcae6345aa67292038fb742
# via linehaul
pypi-attestation-models==0.0.4 \
--hash=sha256:72693503fc636959f0cf15ca5f067f011e28829600cc3f2f1cd8340eeccc7af6 \
--hash=sha256:9ee0d7151c1a02b89b77332cd204ff1334d5808bcd4bad11685e9c70bea12740
pypi-attestation-models==0.0.5 \
--hash=sha256:cceb48aec1c9d93d880d2a6c8c9581bedb503b66203e37081e1ba2e863b6bac9 \
--hash=sha256:d105bc4cf167d4d1db180177bc464bcc4cea8437cdc583c6598424b712c8b068
# via -r requirements/main.in
pyqrcode==1.2.1 \
--hash=sha256:1b2812775fa6ff5c527977c4cd2ccb07051ca7d0bc0aecf937a43864abe5eff6 \
Expand Down
178 changes: 177 additions & 1 deletion tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3446,7 +3446,12 @@ def test_upload_with_valid_attestation_succeeds(
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

verify = pretend.call_recorder(lambda _self, _verifier, _policy, _dist: None)
verify = pretend.call_recorder(
lambda _self, _verifier, _policy, _dist: (
"https://docs.pypi.org/attestations/publish/v1",
None,
)
)
monkeypatch.setattr(Attestation, "verify", verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

Expand All @@ -3456,6 +3461,177 @@ def test_upload_with_valid_attestation_succeeds(

assert len(verify.calls) == 1

def test_upload_with_invalid_attestation_predicate_type_fails(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
):
from warehouse.events.models import HasEvents

project = ProjectFactory.create()
version = "1.0"
publisher = GitHubPublisherFactory.create(projects=[project])
claims = {
"sha": "somesha",
"repository": f"{publisher.repository_owner}/{publisher.repository_name}",
"workflow": "workflow_name",
}
identity = PublisherTokenContext(publisher, SignedClaims(claims))
db_request.oidc_publisher = identity.publisher
db_request.oidc_claims = identity.claims

db_request.db.add(Classifier(classifier="Environment :: Other Environment"))
db_request.db.add(Classifier(classifier="Programming Language :: Python"))

filename = "{}-{}.tar.gz".format(project.name, "1.0")
attestation = Attestation(
version=1,
verification_material=VerificationMaterial(
certificate="somebase64string", transparency_entries=[dict()]
),
envelope=Envelope(
statement="somebase64string",
signature="somebase64string",
),
)

pyramid_config.testing_securitypolicy(identity=identity)
db_request.user = None
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": f"[{attestation.model_dump_json()}]",
"version": version,
"summary": "This is my summary!",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMetricsService: metrics,
}.get(svc)

record_event = pretend.call_recorder(
lambda self, *, tag, request=None, additional: None
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

invalid_predicate_type = "Unsupported predicate type"
verify = pretend.call_recorder(
lambda _self, _verifier, _policy, _dist: (invalid_predicate_type, None)
)
monkeypatch.setattr(Attestation, "verify", verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status.startswith(
f"400 Attestation with unsupported predicate type: {invalid_predicate_type}"
)

def test_upload_with_multiple_attestations_fails(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
):
from warehouse.events.models import HasEvents

project = ProjectFactory.create()
version = "1.0"
publisher = GitHubPublisherFactory.create(projects=[project])
claims = {
"sha": "somesha",
"repository": f"{publisher.repository_owner}/{publisher.repository_name}",
"workflow": "workflow_name",
}
identity = PublisherTokenContext(publisher, SignedClaims(claims))
db_request.oidc_publisher = identity.publisher
db_request.oidc_claims = identity.claims

db_request.db.add(Classifier(classifier="Environment :: Other Environment"))
db_request.db.add(Classifier(classifier="Programming Language :: Python"))

filename = "{}-{}.tar.gz".format(project.name, "1.0")
attestation = Attestation(
version=1,
verification_material=VerificationMaterial(
certificate="somebase64string", transparency_entries=[dict()]
),
envelope=Envelope(
statement="somebase64string",
signature="somebase64string",
),
)

pyramid_config.testing_securitypolicy(identity=identity)
db_request.user = None
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": f"[{attestation.model_dump_json()},"
f" {attestation.model_dump_json()}]",
"version": version,
"summary": "This is my summary!",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMetricsService: metrics,
}.get(svc)

record_event = pretend.call_recorder(
lambda self, *, tag, request=None, additional: None
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

verify = pretend.call_recorder(
lambda _self, _verifier, _policy, _dist: (
"https://docs.pypi.org/attestations/publish/v1",
None,
)
)
monkeypatch.setattr(Attestation, "verify", verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status.startswith(
"400 Only a single attestation per-file is supported at the moment."
)

def test_upload_with_malformed_attestation_fails(
self,
monkeypatch,
Expand Down
72 changes: 48 additions & 24 deletions warehouse/forklift/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,42 +1083,66 @@ def file_upload(request):
attestations = TypeAdapter(list[Attestation]).validate_json(
request.POST["attestations"]
)
verification_policy = publisher.publisher_verification_policy(
request.oidc_claims
)
for attestation_model in attestations:
# For now, attestations are not stored, just verified
attestation_model.verify(
Verifier.production(),
verification_policy,
Path(temporary_filename),
)
# Log successful attestation upload
metrics.increment("warehouse.upload.attestations.ok")
except ValidationError as e:
# Log invalid (malformed) attestation upload
metrics.increment("warehouse.upload.attestations.malformed")
raise _exc_with_message(
HTTPBadRequest,
f"Error while decoding the included attestation: {e}",
)
except VerificationError as e:
# Log invalid (failed verification) attestation upload
metrics.increment("warehouse.upload.attestations.failed_verify")
raise _exc_with_message(
HTTPBadRequest,
f"Could not verify the uploaded artifact using the included "
f"attestation: {e}",
)
except Exception as e:
sentry_sdk.capture_message(
f"Unexpected error while verifying attestation: {e}"

if len(attestations) > 1:
metrics.increment(
"warehouse.upload.attestations." "failed_multiple_attestations"
)
raise _exc_with_message(
HTTPBadRequest,
f"Unknown error while trying to verify included attestations: {e}",
"Only a single attestation per-file is supported at the moment.",
)

verification_policy = publisher.publisher_verification_policy(
request.oidc_claims
)
for attestation_model in attestations:
try:
# For now, attestations are not stored, just verified
predicate_type, _ = attestation_model.verify(
Verifier.production(),
verification_policy,
Path(temporary_filename),
)
except VerificationError as e:
# Log invalid (failed verification) attestation upload
metrics.increment("warehouse.upload.attestations.failed_verify")
raise _exc_with_message(
HTTPBadRequest,
f"Could not verify the uploaded artifact using the included "
f"attestation: {e}",
)
except Exception as e:
sentry_sdk.capture_message(
f"Unexpected error while verifying attestation: {e}"
)
raise _exc_with_message(
HTTPBadRequest,
f"Unknown error while trying to verify included "
f"attestations: {e}",
)

if predicate_type != "https://docs.pypi.org/attestations/publish/v1":
metrics.increment(
"warehouse.upload.attestations."
"failed_unsupported_predicate_type"
)
raise _exc_with_message(
HTTPBadRequest,
f"Attestation with unsupported predicate type: "
f"{predicate_type}",
)

# Log successful attestation upload
metrics.increment("warehouse.upload.attestations.ok")

# TODO: This should be handled by some sort of database trigger or a
# SQLAlchemy hook or the like instead of doing it inline in this
# view.
Expand Down

0 comments on commit 45819f3

Please sign in to comment.