Skip to content

Commit

Permalink
Request thread: handle request on dedicated thread
Browse files Browse the repository at this point in the history
All requests for all sinks are handled on the reception thread
that is the single MQTT thread (handling publish and subscribe cb).

As some request may be blocking on IO (mainly accessing the UART of sink),
it prevents any publish during that period.
Concequences can be noticed only when UART is fully loaded or in case of
multiple sinks on same gateway and loading a scratchpad or processing it
on one of the gateway. It will delay all messages from the other sink.

This patch handles most of the request on a dedicated thread.
It will not impact the performances as creating a thread
is negligible compare to the time a request can take.
  • Loading branch information
GwendalRaoul committed Jun 3, 2019
1 parent f9bec2a commit df58d1c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 25 deletions.
68 changes: 43 additions & 25 deletions python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ class MQTTWrapper(Thread):
to avoid any dead lock from mqtt client.
"""

def __init__(self, logger, username, password, host, port, secure_auth=True, tlsfile=None,
# Keep alive time with broker
KEEP_ALIVE_S = 60

# Reconnect timeout in Seconds
TIMEOUT_RECONNECT_S = 120

def __init__(self, logger, username, password, host, port, secure_auth=True, ca_certs=None,
on_termination_cb=None, on_connect_cb=None):
Thread.__init__(self)
self.daemon = True
Expand All @@ -32,7 +38,7 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls
if secure_auth:
try:
self._client.tls_set(
tlsfile,
ca_certs=ca_certs,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
Expand All @@ -48,7 +54,7 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls
self._client.on_connect = self._on_connect

try:
self._client.connect(host, port, keepalive=60)
self._client.connect(host, port, keepalive=MQTTWrapper.KEEP_ALIVE_S)
except (socket.gaierror, ValueError) as e:
self.logger.error("Cannot connect to mqtt {}".format(e))
exit(-1)
Expand All @@ -58,6 +64,9 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls

self._publish_queue = SelectableQueue()

# Thread is not started yes
self.running = False

def _on_connect(self, client, userdata, flags, rc):
if rc != 0:
self.logger.error("MQTT cannot connect {}".format(rc))
Expand Down Expand Up @@ -95,12 +104,6 @@ def _do_select(self, sock):
# not a big issue. Just keep going
pass

except TimeoutError:
self.logger.debug("Timeout to send payload: {}".format(payload))
# In theory, mqtt client shouldn't loose the last packet
# If it is not the case, following line could be uncommented
# self._publish_queue.put((topic, payload, qos, retain))
raise
except queue.Empty:
# No more packet to publish
pass
Expand All @@ -121,19 +124,26 @@ def _get_socket(self):
self.logger.error("MQTT, unexpected disconnection")

# Socket is not opened anymore, try to reconnect
while True:
timeout = MQTTWrapper.TIMEOUT_RECONNECT_S
while timeout > 0:
try:
self._client.reconnect()
break
ret = self._client.reconnect()
if ret == mqtt.MQTT_ERR_SUCCESS:
break
except Exception:
# Retry to connect in 1 sec
# Retry to connect in 1 sec up to timeout
sleep(1)
timeout -= 1
self.logger.debug("Retrying to connect in 1 sec")

if timeout <= 0:
self.logger.error("Unable to reconnect after {} seconds".format(MQTTWrapper.TIMEOUT_RECONNECT_S))
return None

# Wait for socket to reopen
# Do it in polling as we are managing the thread ourself
# so no callback possible
while self._client.socket() is None:
sleep(1)
# Socket must be available once reconnect is successful
if self._client.socket() is None:
self.logger.error("Cannot get socket after reconnect")
return None

# Set options to new reopened socket
self._client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
Expand All @@ -144,13 +154,19 @@ def set_last_will(self, topic, data):
self._client.will_set(topic, data, qos=2, retain=True)

def run(self):
while True:
self.running = True
while self.running:
try:
# Get client socket to select on it
# This function manage the reconnect
sock = self._get_socket()

self._do_select(sock)
if sock is None:
# Cannot get the socket, probably an issue
# with connection. Exit the thread
self.logger.error("Cannot get MQTT socket, exit...")
self.running = False
else:
self._do_select(sock)
except TimeoutError:
self.logger.error("Timeout in connection, force a reconnect")
self._client.reconnect()
Expand All @@ -159,10 +175,12 @@ def run(self):
# All the transport module must be stopped in order to be fully
# restarted by the managing agent
self.logger.exception("Unexpected exception in MQTT wrapper Thread")
if self.on_termination_cb is not None:
# As this thread is daemonized, inform the parent that this
# thread has exited
self.on_termination_cb()
self.running = False

if self.on_termination_cb is not None:
# As this thread is daemonized, inform the parent that this
# thread has exited
self.on_termination_cb()

def publish(self, topic, payload, qos=1, retain=False) -> None:
"""
Expand Down
24 changes: 24 additions & 0 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,20 @@ def _send_asynchronous_get_configs_response(self):

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

def deferred_thread(fn):
"""
Decorator to handle a request on its own Thread
to avoid blocking the calling Thread on I/O.
It creates a new Thread but it shouldn't impact the performances
as requests are not supposed to be really frequent (few per seconds)
"""
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread

return wrapper

def on_sink_connected(self, name):
self.logger.info("Sink connected, sending new configs")
self._send_asynchronous_get_configs_response()
Expand All @@ -233,6 +247,7 @@ def on_sink_disconnected(self, name):
self.logger.info("Sink disconnected, sending new configs")
self._send_asynchronous_get_configs_response()

@deferred_thread
def _on_send_data_cmd_received(self, client, userdata, message):
self.logger.info("Request to send data")
try:
Expand Down Expand Up @@ -276,6 +291,7 @@ def _on_send_data_cmd_received(self, client, userdata, message):

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_get_configs_cmd_received(self, client, userdata, message):
self.logger.info("Config request received")
try:
Expand All @@ -301,6 +317,10 @@ def _on_get_configs_cmd_received(self, client, userdata, message):
self.mqtt_wrapper.publish(topic, response.payload, qos=2)

def _on_get_gateway_info_cmd_received(self, client, userdata, message):
"""
This function doesn't need the decorator @deferred_thread as request is handled
without I/O
"""
self.logger.info("Gateway info request received")
try:
request = wirepas_messaging.gateway.api.GetGatewayInfoRequest.from_payload(
Expand All @@ -323,6 +343,7 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message):
topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id)
self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_set_config_cmd_received(self, client, userdata, message):
self.logger.info("Set config request received")
try:
Expand Down Expand Up @@ -351,6 +372,7 @@ def _on_set_config_cmd_received(self, client, userdata, message):

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_otap_status_request_received(self, client, userdata, message):
self.logger.info("OTAP status request received")
try:
Expand Down Expand Up @@ -390,6 +412,7 @@ def _on_otap_status_request_received(self, client, userdata, message):

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_otap_upload_scratchpad_request_received(self, client, userdata, message):
self.logger.info("OTAP upload request received")
try:
Expand Down Expand Up @@ -418,6 +441,7 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message)

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_otap_process_scratchpad_request_received(self, client, userdata, message):
self.logger.info("OTAP process request received")
try:
Expand Down

0 comments on commit df58d1c

Please sign in to comment.