diff --git a/CHANGELOG.md b/CHANGELOG.md index 58295bf..dda8f5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Add + +- Expose internal business metrics for Prometheus scraping. + ## [0.7.1] - 2023-12-14 ### Changed diff --git a/Dockerfile b/Dockerfile index 99b6010..58734c5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,8 +9,12 @@ RUN pip3 install --prefix=/install -r /requirements.txt FROM base COPY --from=builder /install /usr/local COPY src/notify* /app/ +COPY src/metrics.py /app/ COPY src/run.sh /app/ COPY src/templates /app/templates +RUN mkdir /app/prom_data +ENV PROMETHEUS_MULTIPROC_DIR /app/prom_data WORKDIR /app +EXPOSE 9140 ENTRYPOINT [ "./run.sh" ] diff --git a/requirements.txt b/requirements.txt index 00c8c38..f6f1d6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ urllib3==1.26.2 Jinja2==2.11.2 # To fix https://github.com/aws/aws-sam-cli/issues/3661 markupsafe==2.0.1 +prometheus_client==0.19.0 \ No newline at end of file diff --git a/src/metrics.py b/src/metrics.py new file mode 100644 index 0000000..d63b29b --- /dev/null +++ b/src/metrics.py @@ -0,0 +1,27 @@ +import os +import shutil +from prometheus_client import multiprocess, CollectorRegistry +from prometheus_client import Counter, Enum + +registry = CollectorRegistry() + +path = os.environ.get('PROMETHEUS_MULTIPROC_DIR') +if path: + multiprocess.MultiProcessCollector(registry, path=path) + if os.path.exists(path): + shutil.rmtree(path) + os.mkdir(path) + +PROCESS_STATES = Enum('process_states', + 'State of the process', + states=['idle', 'processing', 'error - recoverable', 'error - need restart'], + namespace='kafka_notify', registry=registry) + +NOTIFICATIONS_SENT = Counter('notifications_sent', + 'Number of notifications sent', + ['type', 'name', 'endpoint'], + namespace='kafka_notify', registry=registry) +NOTIFICATIONS_ERROR = Counter('notifications_error', + 'Number of notifications that could not be sent due to error', + ['type', 'name', 'endpoint', 'exception'], + namespace='kafka_notify', registry=registry) diff --git a/src/notify-email.py b/src/notify-email.py index c1e2076..0672ff7 100755 --- a/src/notify-email.py +++ b/src/notify-email.py @@ -11,8 +11,9 @@ from datetime import datetime from notify_deps import get_logger, timestamp_convert, main -from notify_deps import NUVLA_ENDPOINT - +from notify_deps import NUVLA_ENDPOINT, prometheus_exporter_port +from prometheus_client import start_http_server +from metrics import PROCESS_STATES, NOTIFICATIONS_SENT, NOTIFICATIONS_ERROR, registry log_local = get_logger('email') @@ -142,26 +143,29 @@ def html_content(msg_params: dict): return get_email_template(msg_params).render(**params) -def send(server: smtplib.SMTP, recipients, subject, html, attempts=SEND_EMAIL_ATTEMPTS): +def send(server: smtplib.SMTP, recipients, subject, html, attempts=SEND_EMAIL_ATTEMPTS, + sleep_interval=0.5): msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = f'Nuvla <{server.user}>' msg['To'] = ', '.join(recipients) msg.attach(MIMEText(html, 'html', 'utf-8')) for i in range(attempts): - if i > 0: - log_local.warning(f'Failed sending email: retry {i}') - time.sleep(.5) - log_local.warning('Reconnecting to SMTP server...') - server = get_smtp_server() - log_local.warning('Reconnecting to SMTP server... done.') try: resp = server.sendmail(server.user, recipients, msg.as_string()) if resp: log_local.error(f'SMTP failed to deliver email to: {resp}') return + except smtplib.SMTPServerDisconnected: + if i < attempts - 1: # no need to sleep on the last iteration + time.sleep(sleep_interval) + log_local.warning('Reconnecting to SMTP server...') + server = get_smtp_server() + log_local.warning('Reconnecting to SMTP server... done.') except smtplib.SMTPException as ex: log_local.error(f'Failed sending email due to SMTP error: {ex}') + NOTIFICATIONS_ERROR.labels('email', subject, ','.join(recipients), type(ex)).inc() + PROCESS_STATES.state('error - recoverable') raise SendFailedMaxAttempts(f'Failed sending email after {attempts} attempts.') @@ -172,7 +176,9 @@ def get_recipients(v: dict): def worker(workq: multiprocessing.Queue): smtp_server = get_smtp_server() while True: + PROCESS_STATES.state('idle') msg = workq.get() + PROCESS_STATES.state('processing') if msg: recipients = get_recipients(msg.value) if len(recipients) == 0: @@ -185,13 +191,18 @@ def worker(workq: multiprocessing.Queue): html = html_content(msg.value) send(smtp_server, recipients, subject, html) log_local.info(f'sent: {msg} to {recipients}') + NOTIFICATIONS_SENT.labels('email', f'{r_name or r_id}', ','.join(recipients)).inc() except smtplib.SMTPException as ex: log_local.error(f'Failed sending email due to SMTP error: {ex}') log_local.warning('Reconnecting to SMTP server...') smtp_server = get_smtp_server() log_local.warning('Reconnecting to SMTP server... done.') + NOTIFICATIONS_ERROR.labels('email', r_name, ','.join(recipients), type(ex)).inc() + PROCESS_STATES.state('error - recoverable') except Exception as ex: log_local.error(f'Failed sending email: {ex}') + NOTIFICATIONS_ERROR.labels('email', r_name, ','.join(recipients), type(ex)).inc() + PROCESS_STATES.state('error - recoverable') def email_template(template_file=EMAIL_TEMPLATE_DEFAULT_FILE): @@ -207,4 +218,5 @@ def init_email_templates(default=EMAIL_TEMPLATE_DEFAULT_FILE, if __name__ == "__main__": init_email_templates() set_smtp_params() + start_http_server(prometheus_exporter_port(), registry=registry) main(worker, KAFKA_TOPIC, KAFKA_GROUP_ID) diff --git a/src/notify-slack.py b/src/notify-slack.py index 0591585..fdf6ea8 100755 --- a/src/notify-slack.py +++ b/src/notify-slack.py @@ -8,7 +8,9 @@ import re from notify_deps import get_logger, timestamp_convert, main -from notify_deps import NUVLA_ENDPOINT +from notify_deps import NUVLA_ENDPOINT, prometheus_exporter_port +from prometheus_client import start_http_server +from metrics import PROCESS_STATES, NOTIFICATIONS_SENT, NOTIFICATIONS_ERROR, registry KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC') or 'NOTIFICATIONS_SLACK_S' KAFKA_GROUP_ID = 'nuvla-notification-slack' @@ -101,15 +103,15 @@ def message_content(msg_params: dict): ) attachments = [{ - 'color': color, - 'author_name': 'Nuvla.io', - 'author_link': 'https://nuvla.io', - 'author_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg', - 'fields': fields, - 'footer': 'https://sixsq.com', - 'footer_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg', - 'ts': now_timestamp() - } + 'color': color, + 'author_name': 'Nuvla.io', + 'author_link': 'https://nuvla.io', + 'author_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg', + 'fields': fields, + 'footer': 'https://sixsq.com', + 'footer_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg', + 'ts': now_timestamp() + } ] return {'attachments': attachments} @@ -121,15 +123,30 @@ def send_message(dest, message): def worker(workq: multiprocessing.Queue): while True: + PROCESS_STATES.state('idle') msg = workq.get() + PROCESS_STATES.state('processing') if msg: dest = msg.value['DESTINATION'] - resp = send_message(dest, message_content(msg.value)) + try: + resp = send_message(dest, message_content(msg.value)) + except requests.exceptions.RequestException as ex: + log_local.error(f'Failed sending {msg} to {dest}: {ex}') + PROCESS_STATES.state('error - recoverable') + NOTIFICATIONS_ERROR.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}', + dest, type(ex)).inc() + continue if not resp.ok: log_local.error(f'Failed sending {msg} to {dest}: {resp.text}') + PROCESS_STATES.state('error - recoverable') + NOTIFICATIONS_ERROR.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}', + dest, resp.text).inc() else: + NOTIFICATIONS_SENT.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}', + dest).inc() log_local.info(f'sent: {msg} to {dest}') if __name__ == "__main__": + start_http_server(prometheus_exporter_port(), registry=registry) main(worker, KAFKA_TOPIC, KAFKA_GROUP_ID) diff --git a/src/notify_deps.py b/src/notify_deps.py index e6d9802..3340419 100644 --- a/src/notify_deps.py +++ b/src/notify_deps.py @@ -8,7 +8,6 @@ from datetime import datetime from kafka import KafkaConsumer - log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(process)d - %(levelname)s - %(message)s') stdout_handler = logging.StreamHandler(sys.stdout) @@ -37,6 +36,8 @@ def get_logger(who): work_queue = multiprocessing.Queue() +DEFAULT_PROMETHEUS_EXPORTER_PORT = 9140 + def kafka_consumer(topic, bootstrap_servers, group_id, auto_offset_reset='latest'): consumer = KafkaConsumer(topic, @@ -54,6 +55,10 @@ def timestamp_convert(ts): strftime('%Y-%m-%d %H:%M:%S UTC') +def prometheus_exporter_port(): + return int(os.environ.get('PROMETHEUS_EXPORTER_PORT', DEFAULT_PROMETHEUS_EXPORTER_PORT)) + + def main(worker, kafka_topic, group_id): pool = multiprocessing.Pool(5, worker, (work_queue,)) for msg in kafka_consumer(kafka_topic, KAFKA_BOOTSTRAP_SERVERS, group_id=group_id): @@ -68,4 +73,3 @@ def main(worker, kafka_topic, group_id): work_queue.join_thread() pool.close() pool.join() - diff --git a/tests/metrics.py b/tests/metrics.py new file mode 120000 index 0000000..bcc1ec8 --- /dev/null +++ b/tests/metrics.py @@ -0,0 +1 @@ +../src/metrics.py \ No newline at end of file diff --git a/tests/test_notify_email.py b/tests/test_notify_email.py index 027e568..7dba962 100644 --- a/tests/test_notify_email.py +++ b/tests/test_notify_email.py @@ -1,6 +1,15 @@ import os import unittest +from unittest.mock import Mock +import shutil +from prometheus_client import multiprocess +os.environ['PROMETHEUS_MULTIPROC_DIR'] = '' +os.path.exists = Mock(return_value=True) +os.mkdir = Mock() +shutil.rmtree = Mock() + +multiprocess.MultiProcessCollector = Mock() import notify_email from notify_email import get_recipients, html_content, email_template diff --git a/tests/test_notify_slack.py b/tests/test_notify_slack.py index a9ee83b..c37cb01 100644 --- a/tests/test_notify_slack.py +++ b/tests/test_notify_slack.py @@ -1,4 +1,15 @@ import unittest +import os +from unittest.mock import Mock +import shutil +from prometheus_client import multiprocess + +os.environ['PROMETHEUS_MULTIPROC_DIR'] = '' +os.path.exists = Mock(return_value=True) +os.mkdir = Mock() +shutil.rmtree = Mock() + +multiprocess.MultiProcessCollector = Mock() from notify_slack import now_timestamp, message_content