Skip to content

Commit

Permalink
update building of outgoing message and fix deserialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
konstan committed Jun 17, 2024
1 parent 4c46c1a commit 059ffe8
Showing 1 changed file with 9 additions and 19 deletions.
28 changes: 9 additions & 19 deletions src/notify-mqtt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3

import json
import multiprocessing
import os
import threading
Expand All @@ -21,10 +21,6 @@
def message_content(msg_params: dict) -> dict:
log_local.debug('Building message content for: %s', msg_params)

r_uri = msg_params.get('RESOURCE_URI')
link_text = msg_params.get('RESOURCE_NAME') or r_uri
component_link = f'<{NUVLA_ENDPOINT}/ui/{r_uri}|{link_text}>'

for key, value in msg_params.items():
if value is None:
msg_params[key] = ''
Expand All @@ -48,18 +44,11 @@ def message_content(msg_params: dict) -> dict:

result = {}
for attr in attrs:
if msg_params.get(attr):
result[attr] = msg_params.get(attr)

# Order of the fields defines the layout of the message
if msg_params.get('TRIGGER_RESOURCE_PATH'):
resource_path = msg_params.get('TRIGGER_RESOURCE_PATH')
resource_name = msg_params.get('TRIGGER_RESOURCE_NAME')
trigger_link = \
f'<{NUVLA_ENDPOINT}/ui/{resource_path}|{resource_name}>'
result['TRIGGER_LINK'] = trigger_link
if attr in msg_params:
result[attr.lower()] = msg_params.get(attr)

result['COMPONENT_LINK'] = component_link
result['component_link'] = \
f'{NUVLA_ENDPOINT}/{msg_params.get("RESOURCE_URI")}'

return result

Expand All @@ -75,13 +64,13 @@ def extract_destination(dest: str) -> tuple:
return host, port, uri


def send_mqtt_notification(payload, mqtt_server: str):
def send_mqtt_notification(payload: str, mqtt_server: str):
host, port, topic = extract_destination(mqtt_server)
log_local.info(f'Sending message to {host}:{port}/{topic}')
return mqtt_publish.single(topic, payload, hostname=host, port=int(port))


def send_message(payload, mqtt_server):
def send_message(payload: str, mqtt_server):
def thread_function():
try:
send_mqtt_notification(payload, mqtt_server)
Expand All @@ -106,7 +95,8 @@ def worker(workq: multiprocessing.Queue):
subs_name = msg.value.get('NAME') or msg.value['SUBS_NAME']
dest = msg.value['DESTINATION']
try:
send_message(message_content(msg.value), dest)
msg_dict = message_content(msg.value)
send_message(json.dumps(msg_dict), dest)
except Exception as ex:
err_msg = f'Failed sending message: {subs_name} to {dest}'
log_local.error(err_msg)
Expand Down

0 comments on commit 059ffe8

Please sign in to comment.