From 353855f5cda27d052c3355f09e495bdcc8c4b165 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Fri, 21 Jun 2019 13:29:23 +0200 Subject: [PATCH] Transport: expose all mqtt standard parameters (#28) - Allign them with backend-client - Keep old ones with deprecated mention - Directly send global settings in different classes to avoid specifying all the different parameters everywhere --- README.md | 60 +++-- .../wirepas_gateway/dbus/dbus_client.py | 3 +- .../wirepas_gateway/protocol/mqtt_wrapper.py | 33 +-- .../wirepas_gateway/transport_service.py | 173 ++++++++------- .../wirepas_gateway/utils/argument_tools.py | 209 ++++++++++++++++-- 5 files changed, 330 insertions(+), 148 deletions(-) diff --git a/README.md b/README.md index 850029bc..64eae1e6 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,11 @@ be built at installation time. pip3 install wirepas_gateway-*.tar.gz ``` -#### Configuration and starting services +## Configuration and starting services -- A sink service must be started for each connected sink on Gateway: +### Sink service + +A sink service must be started for each connected sink on Gateway: sink_service/build/sinkService -p -b -i @@ -103,17 +105,26 @@ Parameters are: - **bitrate:** bitrate of sink uart (default 125000) - **sink_id:** value between 0 and 9 (default 0). -If multiple sinks are present, they must have a different sink_id and a -transport service must be launched. +If multiple sinks are present, they must have a different sink_id + +### Transport service + +Parameters can be set from cmd line or from a setting file in YAML format. +To get an exhausted list of parameters, please run: + +```shell + wm-gw --help +``` -Parameters can be set from cmd line of from a setting file in YAML format: #### From cmd line +Here is an example to start the transport module from cmd line: + ```shell - wm-gw -s "" -p -u -pw \ - -i [-t ][-fp] [-ua][-iepf ] \ - [-wepf ] + wm-gw --mqtt_hostname "" --mqtt_port --mqtt_username --mqtt_password \ + --gateway_id [--ignored_endpoints_filter ] \ + [--whitened_endpoints_filter ] ``` where: @@ -130,25 +141,18 @@ where: > It must be unique for each gateway reporting to same broker. -- **tls_cert_file:** filepath to the root certificate to override -system one (**Cannot be used with -ua**) - -- **ua:** Disable TLS secure authentication - -- **fp:** Do not use the C extension (full python version) - -- **iepf:** Destination endpoints list to ignore (not published) +- **ignored endpoints list:** Destination endpoints list to ignore (not published) *Example:* - > -iepf "\[1,2, 10-12\]" to filter out destination ep 1, 2, 10, 11, 12 + > --ignored_endpoints_filter "\[1,2, 10-12\]" to filter out destination ep 1, 2, 10, 11, 12 -- **wepf:** Destination endpoints list to whiten +- **whitened endpoints list:** Destination endpoints list to whiten (no payload content, only size) *Example:* - > -wepf "\[1,2, 10-12\]" to whiten destination ep 1, 2, 10, 11, 12 + > --whitened_endpoints_filter "\[1,2, 10-12\]" to whiten destination ep 1, 2, 10, 11, 12 #### From configuration file @@ -164,24 +168,18 @@ file is given below: # # MQTT brocker Settings # - host: - port: - username: - password: - unsecure_authentication: + mqtt_hostname: + mqtt_port: + mqtt_username: + mqtt_password: # # Gateway settings # - gwid: + gateway_id: gateway_model: gateway_version: - # - # Implementation options - # - full_python: - # # Filtering Destination Endpoints # @@ -189,7 +187,7 @@ file is given below: whitened_endpoints_filter: ``` -#### Optional +### Optional Launch local gateway process to see messages received from sinks at Dbus level It can be launched from command line: diff --git a/python_transport/wirepas_gateway/dbus/dbus_client.py b/python_transport/wirepas_gateway/dbus/dbus_client.py index 2bedcf47..517ca31d 100644 --- a/python_transport/wirepas_gateway/dbus/dbus_client.py +++ b/python_transport/wirepas_gateway/dbus/dbus_client.py @@ -71,11 +71,12 @@ def __init__(self, logger=None, c_extension=True, ignored_ep_filter=None, **kwar # Register for packet on Dbus if c_extension: - self.logger.info("Starting c extension") + self.logger.info("Starting dbus client with c extension") self.c_extension_thread = DbusEventHandler( self._on_data_received_c, self.logger ) else: + self.logger.info("Starting dbus client without c extension") # Subscribe to all massages received from any sink (no need for # connected sink for that) self.bus.subscribe( diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 29b535f7..ce4c2a00 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -26,8 +26,11 @@ class MQTTWrapper(Thread): # Reconnect timeout in Seconds TIMEOUT_RECONNECT_S = 120 - def __init__(self, logger, username, password, host, port, secure_auth=True, ca_certs=None, - on_termination_cb=None, on_connect_cb=None): + def __init__(self, + settings, + logger, + on_termination_cb=None, + on_connect_cb=None): Thread.__init__(self) self.daemon = True self.running = False @@ -35,27 +38,29 @@ def __init__(self, logger, username, password, host, port, secure_auth=True, ca_ self.on_termination_cb = on_termination_cb self.on_connect_cb = on_connect_cb - self._client = mqtt.Client() - if secure_auth: + self._client = mqtt.Client(client_id=settings.gateway_id) + if not settings.mqtt_force_unsecure: try: self._client.tls_set( - ca_certs=ca_certs, - certfile=None, - keyfile=None, - cert_reqs=ssl.CERT_REQUIRED, - tls_version=ssl.PROTOCOL_TLSv1_2, - ciphers=None, + ca_certs=settings.mqtt_ca_certs, + certfile=settings.mqtt_certfile, + keyfile=settings.mqtt_keyfile, + cert_reqs=settings.mqtt_cert_reqs, + tls_version=settings.mqtt_tls_version, + ciphers=settings.mqtt_ciphers ) - except: + except Exception as e: self.logger.error( - "Cannot use secure authentication. attempting unsecure connection" + "Cannot use secure authentication. attempting unsecure connection {}" + .format(e) ) - self._client.username_pw_set(username, password) + self._client.username_pw_set(settings.mqtt_username, + settings.mqtt_password) self._client.on_connect = self._on_connect try: - self._client.connect(host, port, keepalive=MQTTWrapper.KEEP_ALIVE_S) + self._client.connect(settings.mqtt_hostname, settings.mqtt_port, keepalive=MQTTWrapper.KEEP_ALIVE_S) except (socket.gaierror, ValueError) as e: self.logger.error("Cannot connect to mqtt {}".format(e)) exit(-1) diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 413249fa..0eebad29 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -36,42 +36,29 @@ class TransportService(BusClient): def __init__( self, - host, - port, - username="", - password=None, - tlsfile=None, - gw_id=None, + settings, logger=None, - c_extension=False, - secure_auth=False, - gw_model=None, - gw_version=None, - ignored_endpoints_filter=None, - whitened_endpoints_filter=None, **kwargs ): super(TransportService, self).__init__( logger=logger, - c_extension=c_extension, - ignored_ep_filter=ignored_endpoints_filter, + c_extension=(settings.full_python == False), + ignored_ep_filter=settings.ignored_endpoints_filter, **kwargs ) - if gw_id is None: - self.gw_id = getnode() - else: - self.gw_id = gw_id - - self.gw_model = gw_model - self.gw_version = gw_version + self.gw_id = settings.gateway_id + self.gw_model = settings.gateway_model + self.gw_version = settings.gateway_version - self.whitened_ep_filter = whitened_endpoints_filter + self.whitened_ep_filter = settings.whitened_endpoints_filter self.mqtt_wrapper = MQTTWrapper( - self.logger, username, password, host, port, secure_auth, tlsfile, - self._on_mqtt_wrapper_termination_cb, self._on_connect + settings, + self.logger, + self._on_mqtt_wrapper_termination_cb, + self._on_connect ) self.mqtt_wrapper.start() @@ -520,88 +507,116 @@ def parse_setting_list(list_setting): return single_list +def _check_duplicate(args, old_param, new_param, default, logger): + old_param_val = getattr(args, old_param, default) + new_param_val = getattr(args, new_param, default) + if new_param_val == old_param_val: + # Nothing to update + return + + if old_param_val != default: + # Old param is set, check if new_param is also set + if new_param_val == default: + setattr(args, new_param, old_param_val) + logger.warning("Param {} is deprecated, please use {} instead" + .format(old_param, new_param)) + else: + logger.error("Param {} and {} cannot be set at the same time" + .format(old_param, new_param)) + exit() -def main(): - """ - Main service for transport module - - """ - ParserHelper() - parse = ParserHelper(description="Default arguments") - - parse.add_transport() - parse.add_file_settings() - - args = parse.settings(skip_undefined=False) - - try: - debug_level = os.environ["DEBUG_LEVEL"] - except KeyError: - debug_level = "debug" - - logger = setup_log("transport_service", level=debug_level) +def _update_parameters(settings, logger): + ''' + Function to handle the backward compatibility with old parameters name + Args: + settings: Full parameters - if args.unsecure_authentication and args.tlsfile: - # If tls cert file is provided, unsecure authentication cannot - # be set - logger.error("Cannot set tls file and disable secure authentication") - exit() + Returns: None + ''' - secure_authentication = not args.unsecure_authentication + _check_duplicate(settings, "host", "mqtt_hostname", None, logger) + _check_duplicate(settings, "port", "mqtt_port", 8883, logger) + _check_duplicate(settings, "username", "mqtt_username", None, logger) + _check_duplicate(settings, "password", "mqtt_password", None, logger) + _check_duplicate(settings, "tlsfile", "mqtt_certfile", None, logger) + _check_duplicate(settings, "unsecure_authentication", "mqtt_force_unsecure", False, logger) + _check_duplicate(settings, "gwid", "gateway_id", None, logger) - if args.full_python: - logger.info("Starting transport without C optimisation") - c_extension = False - else: - c_extension = True + if settings.gateway_id is None: + settings.gateway_id = getnode() # Parse EP list that should not be published - ignored_endpoints_filter = None - if args.ignored_endpoints_filter is not None: + if settings.ignored_endpoints_filter is not None: try: - ignored_endpoints_filter = parse_setting_list(args.ignored_endpoints_filter) - logger.debug("Ignored endpoints are: {}".format(ignored_endpoints_filter)) + settings.ignored_endpoints_filter = parse_setting_list(settings.ignored_endpoints_filter) + logger.debug("Ignored endpoints are: {}".format(settings.ignored_endpoints_filter)) except SyntaxError as e: logger.error( "Wrong format for ignored_endpoints_filter EP list ({})".format(e) ) exit() - # Parse EP list that should be published without payload - whitened_endpoints_filter = None - if args.whitened_endpoints_filter is not None: + if settings.whitened_endpoints_filter is not None: try: - whitened_endpoints_filter = parse_setting_list( - args.whitened_endpoints_filter + settings.whitened_endpoints_filter = parse_setting_list( + settings.whitened_endpoints_filter ) - logger.debug("Whitened endpoints are: {}".format(whitened_endpoints_filter)) + logger.debug("Whitened endpoints are: {}".format(settings.whitened_endpoints_filter)) except SyntaxError as e: logger.error( "Wrong format for whitened_endpoints_filter EP list ({})".format(e) ) exit() +def _check_parameters(settings, logger): + if settings.mqtt_force_unsecure and settings.mqtt_certfile: + # If tls cert file is provided, unsecure authentication cannot + # be set + logger.error("Cannot give certfile and disable secure authentication") + exit() + try: - if set(ignored_endpoints_filter) & set(whitened_endpoints_filter): - logger.warning("Some endpoints are both ignored and whitened") + if set(settings.ignored_endpoints_filter) &\ + set(settings.whitened_endpoints_filter): + logger.error("Some endpoints are both ignored and whitened") + exit() except TypeError: # One of the filter list is None pass +def main(): + """ + Main service for transport module + + """ + ParserHelper() + parse = ParserHelper(description="Default arguments") + + parse.add_file_settings() + parse.add_mqtt() + parse.add_gateway_config() + parse.add_filtering_config() + parse.add_deprecated_args() + + settings = parse.settings(skip_undefined=False) + + try: + debug_level = os.environ["DEBUG_LEVEL"] + except KeyError: + debug_level = "debug" + + logger = setup_log("transport_service", level=debug_level) + + _update_parameters(settings, logger) + # after this stage, mqtt deprecated argument cannot be used + + _check_parameters(settings, logger) + + secure_authentication = not settings.unsecure_authentication + TransportService( - args.host, - args.port, - args.username, - args.password, - args.tlsfile, - args.gwid, - logger, - c_extension, - secure_authentication, - args.gateway_model, - args.gateway_version, - ignored_endpoints_filter, - whitened_endpoints_filter, + settings, + logger ).run() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index ae09ed60..448aed60 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -125,45 +125,201 @@ def add_file_settings(self): help="settings file.", ) - def add_transport(self): - """ Transport module arguments """ - self.transport.add_argument( - "-s", "--host", default=None, type=str, help="MQTT broker address" + def add_mqtt(self): + """ Commonly used MQTT arguments """ + self.mqtt.add_argument( + "--mqtt_hostname", + default=None, + action="store", + type=str, + help="MQTT broker hostname ", + ) + + self.mqtt.add_argument( + "--mqtt_username", + default=None, + action="store", + type=str, + help="MQTT broker username ", + ) + + self.mqtt.add_argument( + "--mqtt_password", + default=None, + action="store", + type=str, + help="MQTT broker password", + ) + + self.mqtt.add_argument( + "--mqtt_port", + default=8883, + action="store", + type=int, + help="MQTT broker port", ) - self.transport.add_argument( - "-p", "--port", default=8883, type=int, help="MQTT broker port" + self.mqtt.add_argument( + "--mqtt_ca_certs", + default=None, + action="store", + type=str, + help=( + "A string path to the Certificate " + "Authority certificate files that " + "are to be treated as trusted by " + "this client" + ), ) - self.transport.add_argument( - "-u", "--username", default=None, type=str, help="MQTT broker username" + self.mqtt.add_argument( + "--mqtt_certfile", + default=None, + action="store", + type=str, + help=("Strings pointing to the PEM encoded client certificate"), ) - self.transport.add_argument( - "-pw", "--password", default=None, type=str, help="MQTT broker password" + self.mqtt.add_argument( + "--mqtt_keyfile", + default=None, + action="store", + type=str, + help=( + "Strings pointing to the PEM " + "encoded client private keys " + "respectively" + ), + ) + + self.mqtt.add_argument( + "--mqtt_cert_reqs", + default=ssl.CERT_REQUIRED, + action="store", + type=str, + help=( + "Defines the certificate " + "requirements that the client " + "imposes on the broker" + ), + ) + + self.mqtt.add_argument( + "--mqtt_tls_version", + default=ssl.PROTOCOL_TLSv1_2, + action="store", + type=str, + help=( + "Specifies the version of the " + " SSL / TLS protocol to be used" + ), + ) + + self.mqtt.add_argument( + "--mqtt_ciphers", + default=None, + action="store", + type=str, + help=( + "A string specifying which " + "encryption ciphers are allowable " + "for this connection" + ), + ) + + self.mqtt.add_argument( + "--mqtt_persist_session", + default=True, + action="store_true", + help=( + "When False the broker will buffer" + "session packets between " + "reconnection" + ), ) - self.transport.add_argument( + self.mqtt.add_argument( + "--mqtt_force_unsecure", + default=False, + action="store_true", + help=("When True the broker will skip the TLS handshake"), + ) + + self.mqtt.add_argument( + "--mqtt_allow_untrusted", + default=False, + action="store_true", + help=("When true the client will skip the TLS check"), + ) + + def add_deprecated_args(self): + """ Deprecated mqtt arguments in order to keep backward compatibility """ + self.deprecated.add_argument( + "-s", + "--host", + default=None, + type=str, + help="MQTT broker address (Deprecated, pleas use -mqtt_hostname)" + ) + + self.deprecated.add_argument( + "-p", + "--port", + default=8883, + type=int, + help="MQTT broker port (Deprecated, please use -mqtt_port)" + ) + + self.deprecated.add_argument( + "-u", + "--username", + default=None, + type=str, + help="MQTT broker username (Deprecated, please use -mqtt_username)" + ) + + self.deprecated.add_argument( + "-pw", + "--password", + default=None, + type=str, + help="MQTT broker password (Deprecated, please use -mqtt_password)" + ) + + self.deprecated.add_argument( "-t", "--tlsfile", default=None, help="MQTT broker tls cert file. Optional in case system certificates" - " are not up to date", + " are not up to date (Deprecated, please use -mqtt_certfile)", ) - self.transport.add_argument( + self.deprecated.add_argument( "-ua", "--unsecure_authentication", default=False, action="store_true", - help="Disable TLS secure authentication to the server", + help="Disable TLS secure authentication to the server" + "Deprecated, please use -mqtt_force_unsecure", ) - self.transport.add_argument( - "-i", "--gwid", default=None, type=str, help="Id of the gateway" + self.deprecated.add_argument( + "-i", + "--gwid", + default=None, + type=str, + help="Id of the gateway. It must be unique on same broker" ) - self.transport.add_argument( + def add_gateway_config(self): + self.gateway.add_argument( + "--gateway_id", + default=None, + type=str, + help="Id of the gateway. It must be unique on same broker" + ) + + self.gateway.add_argument( "-fp", "--full_python", default=False, @@ -171,22 +327,29 @@ def add_transport(self): help="Do not use C extension for optimization", ) - self.transport.add_argument( - "-gm", "--gateway_model", default=None, help="Model name of the gateway" + self.gateway.add_argument( + "-gm", + "--gateway_model", + default=None, + help="Model name of the gateway" ) - self.transport.add_argument( - "-gv", "--gateway_version", default=None, help="Version of the gateway" + self.gateway.add_argument( + "-gv", + "--gateway_version", + default=None, + help="Version of the gateway" ) - self.transport.add_argument( + def add_filtering_config(self): + self.filtering.add_argument( "-iepf", "--ignored_endpoints_filter", default=None, help="Destination endpoints list to ignore (not published)", ) - self.transport.add_argument( + self.filtering.add_argument( "-wepf", "--whitened_endpoints_filter", default=None,