Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added prometheus client #15

Merged
merged 20 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Add

- Expose internal business metrics for Prometheus scraping.

## [0.7.1] - 2023-12-14

### Changed
Expand Down
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ RUN pip3 install --prefix=/install -r /requirements.txt
FROM base
COPY --from=builder /install /usr/local
COPY src/notify* /app/
COPY src/metrics.py /app/
COPY src/run.sh /app/
COPY src/templates /app/templates
RUN mkdir /app/prom_data
ENV PROMETHEUS_MULTIPROC_DIR /app/prom_data

WORKDIR /app
EXPOSE 9140
ENTRYPOINT [ "./run.sh" ]
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ urllib3==1.26.2
Jinja2==2.11.2
# To fix https://github.com/aws/aws-sam-cli/issues/3661
markupsafe==2.0.1
prometheus_client==0.19.0
27 changes: 27 additions & 0 deletions src/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import shutil
from prometheus_client import multiprocess, CollectorRegistry
from prometheus_client import Counter, Enum

registry = CollectorRegistry()

path = os.environ.get('PROMETHEUS_MULTIPROC_DIR')
if path:
multiprocess.MultiProcessCollector(registry, path=path)
if os.path.exists(path):
shutil.rmtree(path)
os.mkdir(path)

PROCESS_STATES = Enum('process_states',
'State of the process',
states=['idle', 'processing', 'error - recoverable', 'error - need restart'],
namespace='kafka_notify', registry=registry)

NOTIFICATIONS_SENT = Counter('notifications_sent',
'Number of notifications sent',
['type', 'name', 'endpoint'],
namespace='kafka_notify', registry=registry)
NOTIFICATIONS_ERROR = Counter('notifications_error',
'Number of notifications that could not be sent due to error',
['type', 'name', 'endpoint', 'exception'],
namespace='kafka_notify', registry=registry)
30 changes: 21 additions & 9 deletions src/notify-email.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from datetime import datetime

from notify_deps import get_logger, timestamp_convert, main
from notify_deps import NUVLA_ENDPOINT

from notify_deps import NUVLA_ENDPOINT, prometheus_exporter_port
from prometheus_client import start_http_server
from metrics import PROCESS_STATES, NOTIFICATIONS_SENT, NOTIFICATIONS_ERROR, registry

log_local = get_logger('email')

Expand Down Expand Up @@ -142,26 +143,29 @@ def html_content(msg_params: dict):
return get_email_template(msg_params).render(**params)


def send(server: smtplib.SMTP, recipients, subject, html, attempts=SEND_EMAIL_ATTEMPTS):
def send(server: smtplib.SMTP, recipients, subject, html, attempts=SEND_EMAIL_ATTEMPTS,
sleep_interval=0.5):
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = f'Nuvla <{server.user}>'
msg['To'] = ', '.join(recipients)
msg.attach(MIMEText(html, 'html', 'utf-8'))
for i in range(attempts):
if i > 0:
log_local.warning(f'Failed sending email: retry {i}')
time.sleep(.5)
log_local.warning('Reconnecting to SMTP server...')
server = get_smtp_server()
log_local.warning('Reconnecting to SMTP server... done.')
try:
resp = server.sendmail(server.user, recipients, msg.as_string())
if resp:
log_local.error(f'SMTP failed to deliver email to: {resp}')
return
except smtplib.SMTPServerDisconnected:
if i < attempts - 1: # no need to sleep on the last iteration
time.sleep(sleep_interval)
log_local.warning('Reconnecting to SMTP server...')
server = get_smtp_server()
log_local.warning('Reconnecting to SMTP server... done.')
except smtplib.SMTPException as ex:
log_local.error(f'Failed sending email due to SMTP error: {ex}')
NOTIFICATIONS_ERROR.labels('email', subject, ','.join(recipients), type(ex)).inc()
PROCESS_STATES.state('error - recoverable')
raise SendFailedMaxAttempts(f'Failed sending email after {attempts} attempts.')


Expand All @@ -172,7 +176,9 @@ def get_recipients(v: dict):
def worker(workq: multiprocessing.Queue):
smtp_server = get_smtp_server()
while True:
PROCESS_STATES.state('idle')
msg = workq.get()
PROCESS_STATES.state('processing')
if msg:
recipients = get_recipients(msg.value)
if len(recipients) == 0:
Expand All @@ -185,13 +191,18 @@ def worker(workq: multiprocessing.Queue):
html = html_content(msg.value)
send(smtp_server, recipients, subject, html)
log_local.info(f'sent: {msg} to {recipients}')
NOTIFICATIONS_SENT.labels('email', f'{r_name or r_id}', ','.join(recipients)).inc()
except smtplib.SMTPException as ex:
log_local.error(f'Failed sending email due to SMTP error: {ex}')
log_local.warning('Reconnecting to SMTP server...')
smtp_server = get_smtp_server()
log_local.warning('Reconnecting to SMTP server... done.')
NOTIFICATIONS_ERROR.labels('email', r_name, ','.join(recipients), type(ex)).inc()
PROCESS_STATES.state('error - recoverable')
except Exception as ex:
log_local.error(f'Failed sending email: {ex}')
NOTIFICATIONS_ERROR.labels('email', r_name, ','.join(recipients), type(ex)).inc()
PROCESS_STATES.state('error - recoverable')


def email_template(template_file=EMAIL_TEMPLATE_DEFAULT_FILE):
Expand All @@ -207,4 +218,5 @@ def init_email_templates(default=EMAIL_TEMPLATE_DEFAULT_FILE,
if __name__ == "__main__":
init_email_templates()
set_smtp_params()
start_http_server(prometheus_exporter_port(), registry=registry)
main(worker, KAFKA_TOPIC, KAFKA_GROUP_ID)
39 changes: 28 additions & 11 deletions src/notify-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import re

from notify_deps import get_logger, timestamp_convert, main
from notify_deps import NUVLA_ENDPOINT
from notify_deps import NUVLA_ENDPOINT, prometheus_exporter_port
from prometheus_client import start_http_server
from metrics import PROCESS_STATES, NOTIFICATIONS_SENT, NOTIFICATIONS_ERROR, registry

KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC') or 'NOTIFICATIONS_SLACK_S'
KAFKA_GROUP_ID = 'nuvla-notification-slack'
Expand Down Expand Up @@ -101,15 +103,15 @@ def message_content(msg_params: dict):
)

attachments = [{
'color': color,
'author_name': 'Nuvla.io',
'author_link': 'https://nuvla.io',
'author_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg',
'fields': fields,
'footer': 'https://sixsq.com',
'footer_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg',
'ts': now_timestamp()
}
'color': color,
'author_name': 'Nuvla.io',
'author_link': 'https://nuvla.io',
'author_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg',
'fields': fields,
'footer': 'https://sixsq.com',
'footer_icon': 'https://sixsq.com/assets/img/logo-sixsq.svg',
'ts': now_timestamp()
}
]

return {'attachments': attachments}
Expand All @@ -121,15 +123,30 @@ def send_message(dest, message):

def worker(workq: multiprocessing.Queue):
while True:
PROCESS_STATES.state('idle')
msg = workq.get()
PROCESS_STATES.state('processing')
if msg:
dest = msg.value['DESTINATION']
resp = send_message(dest, message_content(msg.value))
try:
resp = send_message(dest, message_content(msg.value))
except requests.exceptions.RequestException as ex:
log_local.error(f'Failed sending {msg} to {dest}: {ex}')
PROCESS_STATES.state('error - recoverable')
NOTIFICATIONS_ERROR.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}',
dest, type(ex)).inc()
continue
if not resp.ok:
log_local.error(f'Failed sending {msg} to {dest}: {resp.text}')
PROCESS_STATES.state('error - recoverable')
NOTIFICATIONS_ERROR.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}',
dest, resp.text).inc()
else:
NOTIFICATIONS_SENT.labels('slack', f'{msg.value.get("NAME") or msg.value["SUBS_NAME"]}',
dest).inc()
log_local.info(f'sent: {msg} to {dest}')


if __name__ == "__main__":
start_http_server(prometheus_exporter_port(), registry=registry)
main(worker, KAFKA_TOPIC, KAFKA_GROUP_ID)
8 changes: 6 additions & 2 deletions src/notify_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from datetime import datetime
from kafka import KafkaConsumer


log_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(process)d - %(levelname)s - %(message)s')
stdout_handler = logging.StreamHandler(sys.stdout)
Expand Down Expand Up @@ -37,6 +36,8 @@ def get_logger(who):

work_queue = multiprocessing.Queue()

DEFAULT_PROMETHEUS_EXPORTER_PORT = 9140


def kafka_consumer(topic, bootstrap_servers, group_id, auto_offset_reset='latest'):
consumer = KafkaConsumer(topic,
Expand All @@ -54,6 +55,10 @@ def timestamp_convert(ts):
strftime('%Y-%m-%d %H:%M:%S UTC')


def prometheus_exporter_port():
return int(os.environ.get('PROMETHEUS_EXPORTER_PORT', DEFAULT_PROMETHEUS_EXPORTER_PORT))


def main(worker, kafka_topic, group_id):
pool = multiprocessing.Pool(5, worker, (work_queue,))
for msg in kafka_consumer(kafka_topic, KAFKA_BOOTSTRAP_SERVERS, group_id=group_id):
Expand All @@ -68,4 +73,3 @@ def main(worker, kafka_topic, group_id):
work_queue.join_thread()
pool.close()
pool.join()

1 change: 1 addition & 0 deletions tests/metrics.py
9 changes: 9 additions & 0 deletions tests/test_notify_email.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import os
import unittest
from unittest.mock import Mock
import shutil
from prometheus_client import multiprocess

os.environ['PROMETHEUS_MULTIPROC_DIR'] = ''
os.path.exists = Mock(return_value=True)
os.mkdir = Mock()
shutil.rmtree = Mock()

multiprocess.MultiProcessCollector = Mock()
import notify_email
from notify_email import get_recipients, html_content, email_template

Expand Down
11 changes: 11 additions & 0 deletions tests/test_notify_slack.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import unittest
import os
from unittest.mock import Mock
import shutil
from prometheus_client import multiprocess

os.environ['PROMETHEUS_MULTIPROC_DIR'] = ''
os.path.exists = Mock(return_value=True)
os.mkdir = Mock()
shutil.rmtree = Mock()

multiprocess.MultiProcessCollector = Mock()

from notify_slack import now_timestamp, message_content

Expand Down
Loading