Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cert_info beacon to get cert information from local files #54902

Merged
merged 13 commits into from
Jan 6, 2020
Merged
163 changes: 163 additions & 0 deletions salt/beacons/cert_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
'''
Beacon to monitor certificate expiration dates from files on the filesystem.

.. versionadded:: Sodium

:maintainer: <[email protected]>
:maturity: new
:depends: OpenSSL
'''

# Import Python libs
from __future__ import absolute_import, unicode_literals
from datetime import datetime
import logging

# Import salt libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
from salt.ext.six.moves import map as _map
from salt.ext.six.moves import range as _range
# pylint: enable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
import salt.utils.files


# Import Third Party Libs
try:
from OpenSSL import crypto
HAS_OPENSSL = True
except ImportError:
HAS_OPENSSL = False

log = logging.getLogger(__name__)

DEFAULT_NOTIFY_DAYS = 45

__virtualname__ = 'cert_info'


def __virtual__():
if HAS_OPENSSL is False:
return False

return __virtualname__


def validate(config):
'''
Validate the beacon configuration
'''
_config = {}
list(_map(_config.update, config))

# Configuration for cert_info beacon should be a list of dicts
if not isinstance(config, list):
return False, ('Configuration for cert_info beacon must be a list.')

if 'files' not in _config:
return False, ('Configuration for cert_info beacon '
'must contain files option.')
return True, 'Valid beacon configuration'


def beacon(config):
'''
Monitor the certificate files on the minion.

Specify a notification threshold in days and only emit a beacon if any certificates are
expiring within that timeframe or if `notify_days` equals `-1` (always report information).
The default notification threshold is 45 days and can be overridden at the beacon level and
at an individual certificate level.

.. code-block:: yaml

beacons:
cert_info:
- files:
- /etc/pki/tls/certs/mycert.pem
- /etc/pki/tls/certs/yourcert.pem:
notify_days: 15
- /etc/pki/tls/certs/ourcert.pem
- notify_days: 45
- interval: 86400

'''
ret = []
certificates = []
CryptoError = crypto.Error # pylint: disable=invalid-name

_config = {}
list(_map(_config.update, config))

global_notify_days = _config.get('notify_days', DEFAULT_NOTIFY_DAYS)

for cert_path in _config.get('files', []):
notify_days = global_notify_days

if isinstance(cert_path, dict):
try:
notify_days = cert_path[cert_path.keys()[0]].get('notify_days', global_notify_days)
cert_path = cert_path.keys()[0]
except IndexError as exc:
log.error('Unable to load certificate %s (%s)', cert_path, exc)
continue

try:
with salt.utils.files.fopen(cert_path) as fp_:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, fp_.read())
except (IOError, CryptoError) as exc:
log.error('Unable to load certificate %s (%s)', cert_path, exc)
continue

cert_date = datetime.strptime(cert.get_notAfter().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ")
date_diff = (cert_date - datetime.today()).days
log.debug('Certificate %s expires in %s days.', cert_path, date_diff)

if notify_days < 0 or date_diff <= notify_days:
log.debug('Certificate %s triggered beacon due to %s day notification threshold.', cert_path, notify_days)
extensions = []
for ext in _range(0, cert.get_extension_count()):
extensions.append(
{
'ext_name': cert.get_extension(ext).get_short_name().decode(encoding='UTF-8'),
'ext_data': str(cert.get_extension(ext))
}
)

certificates.append(
{
'cert_path': cert_path,
'issuer': ','.join(
['{0}="{1}"'.format(
t[0].decode(encoding='UTF-8'),
t[1].decode(encoding='UTF-8')
) for t in cert.get_issuer().get_components()]),
'issuer_dict': {
k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_issuer().get_components()
},
'notAfter_raw': cert.get_notAfter().decode(encoding='UTF-8'),
'notAfter': cert_date.strftime("%Y-%m-%d %H:%M:%SZ"),
'notBefore_raw': cert.get_notBefore().decode(encoding='UTF-8'),
'notBefore': datetime.strptime(
cert.get_notBefore().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ"
).strftime("%Y-%m-%d %H:%M:%SZ"),
'serial_number': cert.get_serial_number(),
'signature_algorithm': cert.get_signature_algorithm().decode(encoding='UTF-8'),
'subject': ','.join(
['{0}="{1}"'.format(
t[0].decode(encoding='UTF-8'),
t[1].decode(encoding='UTF-8')
) for t in cert.get_subject().get_components()]),
'subject_dict': {
k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_subject().get_components()
},
'version': cert.get_version(),
'extensions': extensions,
'has_expired': cert.has_expired()
}
)

if certificates:
ret.append({'certificates': certificates})

return ret
107 changes: 107 additions & 0 deletions tests/unit/beacons/test_cert_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# coding: utf-8

# Python libs
from __future__ import absolute_import
import logging

# Salt testing libs
from tests.support.unit import TestCase
from tests.support.mock import patch, mock_open
from tests.support.mixins import LoaderModuleMockMixin

# Salt libs
import salt.beacons.cert_info as cert_info

log = logging.getLogger(__name__)


_TEST_CERT = '''
-----BEGIN CERTIFICATE-----
MIIC/jCCAeagAwIBAgIJAIQMfu6ShHvfMA0GCSqGSIb3DQEBCwUAMCQxIjAgBgNV
BAMMGXNhbHR0ZXN0LTAxLmV4YW1wbGUubG9jYWwwHhcNMTkwNjAzMjA1OTIyWhcN
MjkwNTMxMjA1OTIyWjAkMSIwIAYDVQQDDBlzYWx0dGVzdC0wMS5leGFtcGxlLmxv
Y2FsMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAv5UxxKGsOO8n2hUk
KjL8r2Rjt0om4wwdXUu0R1fQUlaSO0g+vk0wHHaovoVcEU6uZlhDPw1qZ4C+cp9Z
rDzSfwI2Njg813I5jzTBgox+3pJ+82vgXZ14xpqZ+f0ACMo4uRPjBkyQpHqYiDJ3
VockZSxm5s7RT05xDnedDfPgu1WAvzQovWO6slCs+Hlp8sh6QAy/hIwOZ0hT8y3J
NV6PSPqK7BEypOPak36+ogtiuPxxat4da74SUVS8Ffupnr40BjqVqEXBvfIIHiQt
3r5gpjoBjrWX2ccgQlHQP8gFaToFxWLSSYVT6E8Oj5UEywpmvPDRjJsJ5epscblT
oFyVXQIDAQABozMwMTAJBgNVHRMEAjAAMCQGA1UdEQQdMBuCGXNhbHR0ZXN0LTAx
LmV4YW1wbGUubG9jYWwwDQYJKoZIhvcNAQELBQADggEBABPqQlkaZDV5dPwNO/s2
PBT/19LroOwQ+fBJgZpbGha5/ZaSr+jcYZf2jAicPajWGlY/rXAdBSuxpmUYCC12
23tI4stwGyB8Quuoyg2Z+5LQJSDA1LxNJ1kxQfDUnS3tVQa0wJVtq8W9wNryNONL
noaQaDcdbGx3V15W+Bx0as5NfIWqz1uVi4MGGxI6hMBuDD7E7M+k1db8EaS+tI4u
seZBENjwjJA6zZmTXvYyzV5OBP4JyOhYuG9aqr7e6/yjPBEtZv0TJ9KMMbcywvE9
9FF+l4Y+wgKR/icrpDEpPlC4wYn64sy5vk7EGVagnVyhkjLJ52rn4trzyPox8FmO
2Zw=
-----END CERTIFICATE-----
'''


class CertInfoBeaconTestCase(TestCase, LoaderModuleMockMixin):
'''
Test case for salt.beacons.cert_info
'''

def setup_loader_modules(self):
return {
cert_info: {
'__context__': {},
'__salt__': {},
}
}

def test_non_list_config(self):
config = {}

ret = cert_info.validate(config)

self.assertEqual(ret, (False, 'Configuration for cert_info beacon must'
' be a list.'))

def test_empty_config(self):
config = [{}]

ret = cert_info.validate(config)

self.assertEqual(ret, (False, 'Configuration for cert_info beacon '
'must contain files option.'))

def test_cert_information(self):
with patch('salt.utils.files.fopen',
mock_open(read_data=_TEST_CERT)):
config = [{'files': ['/etc/pki/tls/certs/mycert.pem'],
'notify_days': -1
}]

ret = cert_info.validate(config)

self.assertEqual(ret, (True, 'Valid beacon configuration'))

_expected_return = [
{
'certificates': [
{
'cert_path': '/etc/pki/tls/certs/mycert.pem',
'extensions': [{'ext_data': 'CA:FALSE',
'ext_name': 'basicConstraints'},
{'ext_data': 'DNS:salttest-01.example.local',
'ext_name': 'subjectAltName'}],
'has_expired': False,
'issuer': 'CN="salttest-01.example.local"',
'issuer_dict': {'CN': 'salttest-01.example.local'},
'notAfter': '2029-05-31 20:59:22Z',
'notAfter_raw': '20290531205922Z',
'notBefore': '2019-06-03 20:59:22Z',
'notBefore_raw': '20190603205922Z',
'serial_number': 9515119675852487647,
'signature_algorithm': 'sha256WithRSAEncryption',
'subject': 'CN="salttest-01.example.local"',
'subject_dict': {'CN': 'salttest-01.example.local'},
'version': 2
}
]
}
]
ret = cert_info.beacon(config)
self.assertEqual(ret, _expected_return)