Skip to content

Commit

Permalink
Run pre-commit on x509 module
Browse files Browse the repository at this point in the history
  • Loading branch information
alxwr authored and dwoz committed Aug 24, 2020
1 parent a04d19e commit 0a1be82
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 52 deletions.
44 changes: 21 additions & 23 deletions salt/modules/x509.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Manage X509 certificates
Expand All @@ -7,7 +6,6 @@
:depends: M2Crypto
"""
from __future__ import absolute_import, print_function, unicode_literals

import ast
import ctypes
Expand Down Expand Up @@ -151,7 +149,7 @@ def _new_extension(name, value, critical=0, issuer=None, _pyfree=1):

if x509_ext_ptr is None:
raise M2Crypto.X509.X509Error(
"Cannot create X509_Extension with name '{0}' and value '{1}'".format(
"Cannot create X509_Extension with name '{}' and value '{}'".format(
name, value
)
)
Expand All @@ -170,7 +168,7 @@ def _parse_openssl_req(csr_filename):
"""
if not salt.utils.path.which("openssl"):
raise salt.exceptions.SaltInvocationError("openssl binary not found in path")
cmd = "openssl req -text -noout -in {0}".format(csr_filename)
cmd = "openssl req -text -noout -in {}".format(csr_filename)

output = __salt__["cmd.run_stdout"](cmd)

Expand Down Expand Up @@ -213,7 +211,7 @@ def _parse_openssl_crl(crl_filename):
"""
if not salt.utils.path.which("openssl"):
raise salt.exceptions.SaltInvocationError("openssl binary not found in path")
cmd = "openssl crl -text -noout -in {0}".format(crl_filename)
cmd = "openssl crl -text -noout -in {}".format(crl_filename)

output = __salt__["cmd.run_stdout"](cmd)

Expand Down Expand Up @@ -298,7 +296,7 @@ def _dec2hex(decval):
"""
Converts decimal values to nicely formatted hex strings
"""
return _pretty_hex("{0:X}".format(decval))
return _pretty_hex("{:X}".format(decval))


def _isfile(path):
Expand Down Expand Up @@ -486,9 +484,9 @@ def get_pem_entry(text, pem_type=None):
pem_temp = pem_temp[pem_temp.index("-") :]
text = "\n".join(pem_fixed)

errmsg = "PEM text not valid:\n{0}".format(text)
errmsg = "PEM text not valid:\n{}".format(text)
if pem_type:
errmsg = "PEM does not contain a single entry of type {0}:\n" "{1}".format(
errmsg = "PEM does not contain a single entry of type {}:\n" "{}".format(
pem_type, text
)

Expand Down Expand Up @@ -675,7 +673,7 @@ def read_crl(crl):
text = get_pem_entry(text, pem_type="X509 CRL")

crltempfile = tempfile.NamedTemporaryFile(delete=True)
crltempfile.write(salt.utils.stringutils.to_bytes(text, encoding='ascii'))
crltempfile.write(salt.utils.stringutils.to_bytes(text, encoding="ascii"))
crltempfile.flush()
crlparsed = _parse_openssl_crl(crltempfile.name)
crltempfile.close()
Expand Down Expand Up @@ -805,7 +803,7 @@ def write_pem(text, path, overwrite=True, pem_type=None):
_fp.write(salt.utils.stringutils.to_str(text))
if pem_type and pem_type == "CERTIFICATE" and _dhparams:
_fp.write(salt.utils.stringutils.to_str(_dhparams))
return "PEM written to {0}".format(path)
return "PEM written to {}".format(path)


def create_private_key(
Expand Down Expand Up @@ -1074,7 +1072,7 @@ def sign_remote_certificate(argdic, **kwargs):
if "signing_policy" in argdic:
signing_policy = _get_signing_policy(argdic["signing_policy"])
if not signing_policy:
return "Signing policy {0} does not exist.".format(argdic["signing_policy"])
return "Signing policy {} does not exist.".format(argdic["signing_policy"])

if isinstance(signing_policy, list):
dict_ = {}
Expand All @@ -1086,7 +1084,7 @@ def sign_remote_certificate(argdic, **kwargs):
if "__pub_id" not in kwargs:
return "minion sending this request could not be identified"
if not _match_minions(signing_policy["minions"], kwargs["__pub_id"]):
return "{0} not permitted to use signing policy {1}".format(
return "{} not permitted to use signing policy {}".format(
kwargs["__pub_id"], argdic["signing_policy"]
)

Expand All @@ -1110,7 +1108,7 @@ def get_signing_policy(signing_policy_name):
"""
signing_policy = _get_signing_policy(signing_policy_name)
if not signing_policy:
return "Signing policy {0} does not exist.".format(signing_policy_name)
return "Signing policy {} does not exist.".format(signing_policy_name)
if isinstance(signing_policy, list):
dict_ = {}
for item in signing_policy:
Expand Down Expand Up @@ -1419,7 +1417,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
if "signing_policy" not in kwargs:
raise salt.exceptions.SaltInvocationError(
"signing_policy must be specified"
"if requesting remote certificate from ca_server {0}.".format(ca_server)
"if requesting remote certificate from ca_server {}.".format(ca_server)
)
if "csr" in kwargs:
kwargs["csr"] = get_pem_entry(
Expand Down Expand Up @@ -1517,7 +1515,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
time = datetime.datetime.strptime(kwargs["not_before"], fmt)
except:
raise salt.exceptions.SaltInvocationError(
"not_before: {0} is not in required format {1}".format(
"not_before: {} is not in required format {}".format(
kwargs["not_before"], fmt
)
)
Expand All @@ -1535,7 +1533,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
time = datetime.datetime.strptime(kwargs["not_after"], fmt)
except:
raise salt.exceptions.SaltInvocationError(
"not_after: {0} is not in required format {1}".format(
"not_after: {} is not in required format {}".format(
kwargs["not_after"], fmt
)
)
Expand Down Expand Up @@ -1628,7 +1626,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
name=extname, value=extval, critical=critical, issuer=issuer
)
if not ext.x509_ext:
log.info("Invalid X509v3 Extension. {0}: {1}".format(extname, extval))
log.info("Invalid X509v3 Extension. {}: {}".format(extname, extval))
continue

cert.add_ext(ext)
Expand All @@ -1649,8 +1647,8 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
public_key=signing_cert,
):
raise salt.exceptions.SaltInvocationError(
"signing_private_key: {0} "
"does no match signing_cert: {1}".format(
"signing_private_key: {} "
"does no match signing_cert: {}".format(
kwargs["signing_private_key"], kwargs.get("signing_cert", "")
)
)
Expand Down Expand Up @@ -1790,7 +1788,7 @@ def create_csr(path=None, text=False, **kwargs):
name=extname, value=extval, critical=critical, issuer=issuer
)
if not ext.x509_ext:
log.info("Invalid X509v3 Extension. {0}: {1}".format(extname, extval))
log.info("Invalid X509v3 Extension. {}: {}".format(extname, extval))
continue

extstack.push(ext)
Expand Down Expand Up @@ -1892,16 +1890,16 @@ def verify_crl(crl, cert):
crltext = _text_or_file(crl)
crltext = get_pem_entry(crltext, pem_type="X509 CRL")
crltempfile = tempfile.NamedTemporaryFile(delete=True)
crltempfile.write(salt.utils.stringutils.to_bytes(crltext, encoding='ascii'))
crltempfile.write(salt.utils.stringutils.to_bytes(crltext, encoding="ascii"))
crltempfile.flush()

certtext = _text_or_file(cert)
certtext = get_pem_entry(certtext, pem_type="CERTIFICATE")
certtempfile = tempfile.NamedTemporaryFile(delete=True)
certtempfile.write(salt.utils.stringutils.to_bytes(certtext, encoding='ascii'))
certtempfile.write(salt.utils.stringutils.to_bytes(certtext, encoding="ascii"))
certtempfile.flush()

cmd = "openssl crl -noout -in {0} -CAfile {1}".format(
cmd = "openssl crl -noout -in {} -CAfile {}".format(
crltempfile.name, certtempfile.name
)

Expand Down
63 changes: 34 additions & 29 deletions tests/integration/states/test_x509.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

import datetime
import hashlib
import logging
Expand Down Expand Up @@ -103,7 +100,7 @@ def tearDown(self):
self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion")

def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ
ret = super(x509Test, self).run_function(*args, **kwargs)
ret = super().run_function(*args, **kwargs)
return ret

@staticmethod
Expand Down Expand Up @@ -159,27 +156,32 @@ def test_crl_managed(self):
"state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
)
key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
RUNTIME_VARS.TMP,
RUNTIME_VARS.TMP
RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
)

# hints for easier debugging
#import json
#print(json.dumps(ret[key], indent=4, sort_keys=True))
#print(ret[key]['comment'])
# import json
# print(json.dumps(ret[key], indent=4, sort_keys=True))
# print(ret[key]['comment'])

assert key in ret
assert "changes" in ret[key]
self.assertEqual(ret[key]['result'], True)
self.assertEqual(ret[key]["result"], True)
assert "New" in ret[key]["changes"]
assert "Revoked Certificates" in ret[key]["changes"]["New"]
self.assertEqual(ret[key]['changes']['Old'], "{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP))
self.assertEqual(
ret[key]["changes"]["Old"],
"{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP),
)

@slowTest
def test_crl_managed_replacing_existing_crl(self):
os.mkdir(os.path.join(RUNTIME_VARS.TMP, 'pki'))
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP, 'pki/ca.crl'), 'wb') as crl_file:
crl_file.write(b"""-----BEGIN RSA PRIVATE KEY-----
os.mkdir(os.path.join(RUNTIME_VARS.TMP, "pki"))
with salt.utils.files.fopen(
os.path.join(RUNTIME_VARS.TMP, "pki/ca.crl"), "wb"
) as crl_file:
crl_file.write(
b"""-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
Expand All @@ -194,27 +196,30 @@ def test_crl_managed_replacing_existing_crl(self):
tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
-----END RSA PRIVATE KEY-----
""")
"""
)

ret = self.run_function(
"state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
)
key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
RUNTIME_VARS.TMP,
RUNTIME_VARS.TMP
RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
)

# hints for easier debugging
#import json
#print(json.dumps(ret[key], indent=4, sort_keys=True))
#print(ret[key]['comment'])
# import json
# print(json.dumps(ret[key], indent=4, sort_keys=True))
# print(ret[key]['comment'])

assert key in ret
assert "changes" in ret[key]
self.assertEqual(ret[key]['result'], True)
self.assertEqual(ret[key]["result"], True)
assert "New" in ret[key]["changes"]
assert "Revoked Certificates" in ret[key]["changes"]["New"]
self.assertEqual(ret[key]['changes']['Old'], "{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP))
self.assertEqual(
ret[key]["changes"]["Old"],
"{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP),
)

def test_cert_issue_not_before_not_after(self):
ret = self.run_function(
Expand Down Expand Up @@ -273,7 +278,7 @@ def test_cert_issue_not_after(self):
@with_tempfile(suffix=".crt", create=False)
@with_tempfile(suffix=".key", create=False)
def test_issue_41858(self, keyfile, crtfile):
ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
signing_policy = "no_such_policy"
ret = self.run_function(
"state.apply",
Expand Down Expand Up @@ -303,7 +308,7 @@ def test_issue_41858(self, keyfile, crtfile):
@with_tempfile(suffix=".crt", create=False)
@with_tempfile(suffix=".key", create=False)
def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile):
ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
signing_policy = "compound_match"
ret = self.run_function(
"state.apply",
Expand Down Expand Up @@ -337,7 +342,7 @@ def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile):
minion_tgt="sub_minion",
)

ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
signing_policy = "compound_match"
self.run_function(
"state.apply",
Expand Down Expand Up @@ -413,7 +418,7 @@ def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
"days_remaining": 10,
},
)
key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
self.assertEqual(
"Certificate is valid and up to date",
first_run[key]["changes"]["Status"]["New"],
Expand Down Expand Up @@ -473,7 +478,7 @@ def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
"subjectAltName": "DNS:alt.service.local",
},
)
key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
self.assertEqual(
"Certificate is valid and up to date",
first_run[key]["changes"]["Status"]["New"],
Expand Down Expand Up @@ -563,7 +568,7 @@ def test_file_properties_are_updated(self, keyfile, crtfile):
["x509.self_signed_different_properties"],
pillar={"keyfile": keyfile, "crtfile": crtfile, "fileMode": "0755"},
)
key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
self.assertEqual(
"Certificate is valid and up to date",
first_run[key]["changes"]["Status"]["New"],
Expand Down Expand Up @@ -608,7 +613,7 @@ def test_file_managed_failure(self, keyfile, crtfile):
pillar={"keyfile": keyfile, "crtfile": bad_crtfile},
)

key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(bad_crtfile)
key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(bad_crtfile)
self.assertFalse(ret[key]["result"], "State should have failed.")
self.assertEqual({}, ret[key]["changes"])
self.assertFalse(
Expand Down

0 comments on commit 0a1be82

Please sign in to comment.