From df58d1c2c64c5860e431d64690797a2ab1003844 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Mon, 20 May 2019 16:21:48 +0200 Subject: [PATCH] Request thread: handle request on dedicated thread All requests for all sinks are handled on the reception thread that is the single MQTT thread (handling publish and subscribe cb). As some request may be blocking on IO (mainly accessing the UART of sink), it prevents any publish during that period. Concequences can be noticed only when UART is fully loaded or in case of multiple sinks on same gateway and loading a scratchpad or processing it on one of the gateway. It will delay all messages from the other sink. This patch handles most of the request on a dedicated thread. It will not impact the performances as creating a thread is negligible compare to the time a request can take. --- .../wirepas_gateway/protocol/mqtt_wrapper.py | 68 ++++++++++++------- .../wirepas_gateway/transport_service.py | 24 +++++++ 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 01ba8120..79d3f372 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -19,7 +19,13 @@ class MQTTWrapper(Thread): to avoid any dead lock from mqtt client. """ - def __init__(self, logger, username, password, host, port, secure_auth=True, tlsfile=None, + # Keep alive time with broker + KEEP_ALIVE_S = 60 + + # Reconnect timeout in Seconds + TIMEOUT_RECONNECT_S = 120 + + def __init__(self, logger, username, password, host, port, secure_auth=True, ca_certs=None, on_termination_cb=None, on_connect_cb=None): Thread.__init__(self) self.daemon = True @@ -32,7 +38,7 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls if secure_auth: try: self._client.tls_set( - tlsfile, + ca_certs=ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, @@ -48,7 +54,7 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls self._client.on_connect = self._on_connect try: - self._client.connect(host, port, keepalive=60) + self._client.connect(host, port, keepalive=MQTTWrapper.KEEP_ALIVE_S) except (socket.gaierror, ValueError) as e: self.logger.error("Cannot connect to mqtt {}".format(e)) exit(-1) @@ -58,6 +64,9 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, tls self._publish_queue = SelectableQueue() + # Thread is not started yes + self.running = False + def _on_connect(self, client, userdata, flags, rc): if rc != 0: self.logger.error("MQTT cannot connect {}".format(rc)) @@ -95,12 +104,6 @@ def _do_select(self, sock): # not a big issue. Just keep going pass - except TimeoutError: - self.logger.debug("Timeout to send payload: {}".format(payload)) - # In theory, mqtt client shouldn't loose the last packet - # If it is not the case, following line could be uncommented - # self._publish_queue.put((topic, payload, qos, retain)) - raise except queue.Empty: # No more packet to publish pass @@ -121,19 +124,26 @@ def _get_socket(self): self.logger.error("MQTT, unexpected disconnection") # Socket is not opened anymore, try to reconnect - while True: + timeout = MQTTWrapper.TIMEOUT_RECONNECT_S + while timeout > 0: try: - self._client.reconnect() - break + ret = self._client.reconnect() + if ret == mqtt.MQTT_ERR_SUCCESS: + break except Exception: - # Retry to connect in 1 sec + # Retry to connect in 1 sec up to timeout sleep(1) + timeout -= 1 + self.logger.debug("Retrying to connect in 1 sec") + + if timeout <= 0: + self.logger.error("Unable to reconnect after {} seconds".format(MQTTWrapper.TIMEOUT_RECONNECT_S)) + return None - # Wait for socket to reopen - # Do it in polling as we are managing the thread ourself - # so no callback possible - while self._client.socket() is None: - sleep(1) + # Socket must be available once reconnect is successful + if self._client.socket() is None: + self.logger.error("Cannot get socket after reconnect") + return None # Set options to new reopened socket self._client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) @@ -144,13 +154,19 @@ def set_last_will(self, topic, data): self._client.will_set(topic, data, qos=2, retain=True) def run(self): - while True: + self.running = True + while self.running: try: # Get client socket to select on it # This function manage the reconnect sock = self._get_socket() - - self._do_select(sock) + if sock is None: + # Cannot get the socket, probably an issue + # with connection. Exit the thread + self.logger.error("Cannot get MQTT socket, exit...") + self.running = False + else: + self._do_select(sock) except TimeoutError: self.logger.error("Timeout in connection, force a reconnect") self._client.reconnect() @@ -159,10 +175,12 @@ def run(self): # All the transport module must be stopped in order to be fully # restarted by the managing agent self.logger.exception("Unexpected exception in MQTT wrapper Thread") - if self.on_termination_cb is not None: - # As this thread is daemonized, inform the parent that this - # thread has exited - self.on_termination_cb() + self.running = False + + if self.on_termination_cb is not None: + # As this thread is daemonized, inform the parent that this + # thread has exited + self.on_termination_cb() def publish(self, topic, payload, qos=1, retain=False) -> None: """ diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 9de0ff69..413249fa 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -225,6 +225,20 @@ def _send_asynchronous_get_configs_response(self): self.mqtt_wrapper.publish(topic, response.payload, qos=2) + def deferred_thread(fn): + """ + Decorator to handle a request on its own Thread + to avoid blocking the calling Thread on I/O. + It creates a new Thread but it shouldn't impact the performances + as requests are not supposed to be really frequent (few per seconds) + """ + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.start() + return thread + + return wrapper + def on_sink_connected(self, name): self.logger.info("Sink connected, sending new configs") self._send_asynchronous_get_configs_response() @@ -233,6 +247,7 @@ def on_sink_disconnected(self, name): self.logger.info("Sink disconnected, sending new configs") self._send_asynchronous_get_configs_response() + @deferred_thread def _on_send_data_cmd_received(self, client, userdata, message): self.logger.info("Request to send data") try: @@ -276,6 +291,7 @@ def _on_send_data_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread def _on_get_configs_cmd_received(self, client, userdata, message): self.logger.info("Config request received") try: @@ -301,6 +317,10 @@ def _on_get_configs_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) def _on_get_gateway_info_cmd_received(self, client, userdata, message): + """ + This function doesn't need the decorator @deferred_thread as request is handled + without I/O + """ self.logger.info("Gateway info request received") try: request = wirepas_messaging.gateway.api.GetGatewayInfoRequest.from_payload( @@ -323,6 +343,7 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id) self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread def _on_set_config_cmd_received(self, client, userdata, message): self.logger.info("Set config request received") try: @@ -351,6 +372,7 @@ def _on_set_config_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread def _on_otap_status_request_received(self, client, userdata, message): self.logger.info("OTAP status request received") try: @@ -390,6 +412,7 @@ def _on_otap_status_request_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): self.logger.info("OTAP upload request received") try: @@ -418,6 +441,7 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread def _on_otap_process_scratchpad_request_received(self, client, userdata, message): self.logger.info("OTAP process request received") try: