-
Notifications
You must be signed in to change notification settings - Fork 62
/
pypi.py
135 lines (110 loc) · 4.93 KB
/
pypi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Functionality for using the [PyPI](https://warehouse.pypa.io/api-reference/json.html)
API as a `VulnerabilityService`.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import cast
import requests
from packaging.version import InvalidVersion, Version
from pip_audit._cache import caching_session
from pip_audit._service.interface import (
ConnectionError,
Dependency,
ResolvedDependency,
ServiceError,
SkippedDependency,
VulnerabilityResult,
VulnerabilityService,
)
logger = logging.getLogger(__name__)
class PyPIService(VulnerabilityService):
"""
An implementation of `VulnerabilityService` that uses PyPI to provide Python
package vulnerability information.
"""
def __init__(self, cache_dir: Path | None = None, timeout: int | None = None) -> None:
"""
Create a new `PyPIService`.
`cache_dir` is an optional cache directory to use, for caching and reusing PyPI API
requests. If `None`, `pip-audit` will attempt to use `pip`'s cache directory before falling
back on its own default cache directory.
`timeout` is an optional argument to control how many seconds the component should wait for
responses to network requests.
"""
self.session = caching_session(cache_dir)
self.timeout = timeout
def query(self, spec: Dependency) -> tuple[Dependency, list[VulnerabilityResult]]:
"""
Queries PyPI for the given `Dependency` specification.
See `VulnerabilityService.query`.
"""
if spec.is_skipped():
return spec, []
spec = cast(ResolvedDependency, spec)
url = f"https://pypi.org/pypi/{spec.canonical_name}/{str(spec.version)}/json"
try:
response: requests.Response = self.session.get(url=url, timeout=self.timeout)
response.raise_for_status()
except requests.TooManyRedirects:
# This should never happen with a healthy PyPI instance, but might
# happen during an outage or network event.
# Ref 2022-06-10: https://status.python.org/incidents/lgpr13fy71bk
raise ConnectionError("PyPI is not redirecting properly")
except requests.ConnectTimeout:
# Apart from a normal network outage, this can happen for two main
# reasons:
# 1. PyPI's APIs are offline
# 2. The user is behind a firewall or corporate network that blocks
# PyPI (and they're probably using custom indices)
raise ConnectionError("Could not connect to PyPI's vulnerability feed")
except requests.HTTPError as http_error:
if response.status_code == 404:
skip_reason = (
"Dependency not found on PyPI and could not be audited: "
f"{spec.canonical_name} ({spec.version})"
)
logger.debug(skip_reason)
return SkippedDependency(name=spec.name, skip_reason=skip_reason), []
raise ServiceError from http_error
response_json = response.json()
results: list[VulnerabilityResult] = []
vulns = response_json.get("vulnerabilities")
# No `vulnerabilities` key means that there are no vulnerabilities for any version
if vulns is None:
return spec, results
for v in vulns:
id = v["id"]
# If the vulnerability has been withdrawn, we skip it entirely.
withdrawn_at = v.get("withdrawn")
if withdrawn_at is not None:
logger.debug(f"PyPI vuln entry '{id}' marked as withdrawn at {withdrawn_at}")
continue
# Put together the fix versions list
try:
fix_versions = [Version(fixed_in) for fixed_in in v["fixed_in"]]
except InvalidVersion as iv:
raise ServiceError(f'Received malformed version from PyPI: {v["fixed_in"]}') from iv
# The ranges aren't guaranteed to come in chronological order
fix_versions.sort()
description = v.get("summary")
if description is None:
description = v.get("details")
if description is None:
description = "N/A"
# The "summary" field should be a single line, but "details" might
# be multiple (Markdown-formatted) lines. So, we normalize our
# description into a single line (and potentially break the Markdown
# formatting in the process).
description = description.replace("\n", " ")
results.append(
VulnerabilityResult(
id=id,
description=description,
fix_versions=fix_versions,
aliases=set(v["aliases"]),
published=self._parse_rfc3339(v.get("published")),
)
)
return spec, results