forked from searxng/searx-space
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssl_info.py
102 lines (79 loc) · 3.4 KB
/
ssl_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import ssl
import httpx
import httpx.config
import httpx.backends.asyncio
from OpenSSL.crypto import load_certificate, FILETYPE_ASN1
def set_or_concat_value(obj, key, value):
if key in obj:
obj[key] = obj[key] + ', ' + value
else:
obj[key] = value
def cert_to_obj(cert):
obj = {}
for field in ['issuer', 'subject']:
obj[field] = {}
for keys_values_for_cert_field in cert.get(field, {}):
for cert_key_value in keys_values_for_cert_field:
set_or_concat_value(obj[field], cert_key_value[0], cert_key_value[1])
for field in ['version', 'serialNumber', 'notBefore', 'notAfter', 'OCSP', 'caIssuers', 'crlDistributionPoints']:
if field in cert:
obj[field] = cert.get(field)
return obj
def update_obj_with_bin(cert_obj, cert_bin):
bincert = load_certificate(FILETYPE_ASN1, cert_bin)
cert_obj['sha256'] = bincert.digest('sha256').decode('utf-8')
cert_obj['notAfter'] = bincert.get_notAfter().decode('utf-8')
cert_obj['notBefore'] = bincert.get_notBefore().decode('utf-8')
cert_obj['signatureAlgorithm'] = bincert.get_signature_algorithm().decode('utf-8')
cert_obj['subject'] = {
'commonName': bincert.get_subject().commonName,
'countryName': bincert.get_subject().countryName,
'organizationName': bincert.get_subject().organizationName,
}
cert_obj['issuer'] = {
'commonName': bincert.get_issuer().commonName,
'countryName': bincert.get_issuer().countryName,
'organizationName': bincert.get_issuer().organizationName,
}
for i in range(0, bincert.get_extension_count()):
ex = bincert.get_extension(i)
if ex.get_short_name() == b'subjectAltName':
cert_obj['subject']['altName'] = str(ex)
class SslInfo:
__slots__ = ['_ssl_info']
def __init__(self):
self._ssl_info = dict()
def parse_sslobject(self, hostname: str, sslobj: ssl.SSLObject):
if sslobj is None:
return
if hostname not in self._ssl_info:
cert_dict = sslobj.getpeercert(binary_form=False)
cert_bin = sslobj.getpeercert(binary_form=True)
# make cert_obj using cert_dict and cert_bin
cert_obj = cert_to_obj(cert_dict)
if cert_bin is not None and 'sha256' not in cert_obj:
update_obj_with_bin(cert_obj, cert_bin)
# store values
self._ssl_info[hostname] = {
'version': sslobj.version(),
'certificate': cert_obj
}
def get(self, hostname: str):
return self._ssl_info.get(hostname, {})
class AsyncioBackendLogCert(httpx.backends.asyncio.AsyncioBackend):
__slots__ = ['_sslinfo']
def __init__(self, sslinfo: SslInfo):
super().__init__()
self._sslinfo = sslinfo
async def open_tcp_stream(self, hostname, port, ssl_context, timeout):
value = await super().open_tcp_stream(hostname, port, ssl_context, timeout)
sslobj = value.stream_reader._transport.get_extra_info('ssl_object') # pylint: disable=protected-access
self._sslinfo.parse_sslobject(hostname, sslobj)
return value
SSLINFO = SslInfo()
def get_httpx_backend():
global SSLINFO # pylint: disable=global-statement
return AsyncioBackendLogCert(SSLINFO)
def get_ssl_info(hostname):
global SSLINFO # pylint: disable=global-statement
return SSLINFO.get(hostname)