From e74b95d0728d6847c71a8ce47ec823f638a3389e Mon Sep 17 00:00:00 2001 From: Abilio Marques Date: Tue, 24 Oct 2023 19:22:51 -0700 Subject: [PATCH 1/4] notify bridge of suback value, disconnect on error Signed-off-by: Abilio Marques --- lib/handle_suback.c | 8 ++++++++ src/bridge.c | 8 ++++++++ src/mosquitto_broker_internal.h | 1 + 3 files changed, 17 insertions(+) diff --git a/lib/handle_suback.c b/lib/handle_suback.c index a16bdccca2..8e3df98941 100644 --- a/lib/handle_suback.c +++ b/lib/handle_suback.c @@ -85,6 +85,14 @@ int handle__suback(struct mosquitto *mosq) mosquitto_property_free_all(&properties); return rc; } + +#ifdef WITH_BRIDGE + if(mosq->bridge){ + rc = bridge__on_suback(mosq, qos); + if(rc) return rc; + } +#endif + granted_qos[i] = (int)qos; i++; } diff --git a/src/bridge.c b/src/bridge.c index 149c4cf87b..edd3f39a69 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -713,6 +713,14 @@ int bridge__on_connect(struct mosquitto *context) return MOSQ_ERR_SUCCESS; } +int bridge__on_suback(struct mosquitto *context, int qos) { + if (qos > 2) { + log__printf(NULL, MOSQ_LOG_ERR, "Error on bridge subscription: %s", mosquitto_reason_string(qos)); + do_disconnect(context, MOSQ_ERR_CONN_LOST); + } + + return MOSQ_ERR_SUCCESS; +} int bridge__register_local_connections(void) { diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index ef1a2e6613..8adf4b290d 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -801,6 +801,7 @@ int bridge__connect(struct mosquitto *context); int bridge__connect_step3(struct mosquitto *context); #endif int bridge__on_connect(struct mosquitto *context); +int bridge__on_suback(struct mosquitto *context, int qos); void bridge_check(void); int bridge__register_local_connections(void); int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix); From a04ce051213b9a80486c75f5b5ef11896a894382 Mon Sep 17 00:00:00 2001 From: Abilio Marques Date: Wed, 25 Oct 2023 22:12:30 -0700 Subject: [PATCH 2/4] test that bridge disconnect on subscription errors Signed-off-by: Abilio Marques --- .../06-bridge-disconnect_on_sub_errors.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100755 test/broker/06-bridge-disconnect_on_sub_errors.py diff --git a/test/broker/06-bridge-disconnect_on_sub_errors.py b/test/broker/06-bridge-disconnect_on_sub_errors.py new file mode 100755 index 0000000000..817116620c --- /dev/null +++ b/test/broker/06-bridge-disconnect_on_sub_errors.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 + +# Does a bridge disconnect on subscription errors? + +from mosq_test_helper import * + +def write_config(filename, port1, port2): + with open(filename, 'w') as f: + f.write("listener %d\n" % (port2)) + f.write("allow_anonymous true\n") + f.write("\n") + f.write("connection bridge_sample\n") + f.write("address 127.0.0.1:%d\n" % (port1)) + f.write("topic in_topic in\n") + f.write("notifications false\n") + f.write("restart_timeout 5\n") + f.write("cleansession true\n") + +def is_connected(sock): + try: + sock.recv(1) # if still connected, the recv will timeout + return False + except TimeoutError as e: + return True + +def do_test(): + (port1, port2) = mosq_test.get_port(2) + conf_file = os.path.basename(__file__).replace('.py', '.conf') + write_config(conf_file, port1, port2) + + rc = 1 + client_id = socket.gethostname()+".bridge_sample" + connect_packet = mosq_test.gen_connect(client_id, proto_ver=132) + connack_packet = mosq_test.gen_connack() + + mid = 1 + subscribe_packet = mosq_test.gen_subscribe(mid, "in_topic", 0) + suback_packet = mosq_test.gen_suback(mid, 0x80) + + ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock.settimeout(40) + ssock.bind(('', port1)) + ssock.listen(1) + + broker = None + + try: + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + + (bridge, address) = ssock.accept() + bridge.settimeout(5) + + mosq_test.expect_packet(bridge, "connect", connect_packet) + bridge.send(connack_packet) + + mosq_test.expect_packet(bridge, "subscribe", subscribe_packet) + bridge.send(suback_packet) + + time.sleep(0.25) # give the broker some time to react + + # should be disconnected by now + rc = bool(is_connected(bridge)) + except mosq_test.TestError: + pass + finally: + os.remove(conf_file) + try: + bridge.close() + except NameError: + pass + + broker.terminate() + if mosq_test.wait_for_subprocess(broker): + print("broker not terminated") + rc = 1 + (stdo, stde) = broker.communicate() + ssock.close() + if rc: + print(stde.decode('utf-8')) + exit(rc) + +do_test() + +exit(0) From 91fc7ad70e9af74495c2e137214780256564e70d Mon Sep 17 00:00:00 2001 From: Abilio Marques Date: Tue, 24 Oct 2023 20:05:20 -0700 Subject: [PATCH 3/4] add option to control whether sub errors are fatal for bridge connections Signed-off-by: Abilio Marques --- ChangeLog.txt | 4 +++- fuzzing/corpora/broker_conf.dict | 1 + man/mosquitto.conf.5.xml | 11 ++++++++++- mosquitto.conf | 6 ++++++ src/bridge.c | 6 ++++-- src/conf.c | 8 ++++++++ src/mosquitto_broker_internal.h | 1 + 7 files changed, 33 insertions(+), 4 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index a2484507ee..1f19a095b7 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -74,7 +74,9 @@ Broker: choose TLS v1.1, but this is not recommended and will be removed in a future version. - Add -q option to allow logging to be disabled at the command line. -- Add suport for PROXY protocol v2. +- Add support for PROXY protocol v2. +- Add `bridge_fatal_sub_errors` option to control what happens when a bridge + fails to subscribe to a topic. Plugins / plugin interface: - Add persist-sqlite plugin. diff --git a/fuzzing/corpora/broker_conf.dict b/fuzzing/corpora/broker_conf.dict index 2d3511e9e8..8c9b07be31 100644 --- a/fuzzing/corpora/broker_conf.dict +++ b/fuzzing/corpora/broker_conf.dict @@ -34,6 +34,7 @@ "bridge_max_packet_size" "bridge_max_topic_alias" "bridge_outgoing_retain" +"bridge_fatal_sub_errors" "bridge_protocol_version" "bridge_psk" "bridge_receive_maximum" diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index 2170e64a41..8569783a30 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -1974,7 +1974,16 @@ openssl dhparam -out dhparam.pem 2048 other setting. Defaults to true. - + + [ true | false ] + + A failure to subscribe to a topic will cause an immediate + disconnection. The hope is that a temporary failure will disappear + after reconnecting. If you desire to silently ignore subscription + errors (not advised), you can set the bridge_fatal_sub_errors + option to false. Defaults to true. + + version diff --git a/mosquitto.conf b/mosquitto.conf index 083e9b592f..6a4affe40d 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -940,6 +940,12 @@ # all outgoing messages to that bridge, regardless of any other setting. #bridge_outgoing_retain true +# A failure to subscribe to a topic will cause an immediate disconnection. The +# hope is that a temporary failure will disappear after reconnecting. If you +# desire to silently ignore subscription errors (not advised), you can set the +# bridge_fatal_sub_errors option to false. +#bridge_fatal_sub_errors true + # If you wish to restrict the size of messages sent to a remote bridge, use the # bridge_max_packet_size option. This sets the maximum number of bytes for # the total message, including headers and payload. diff --git a/src/bridge.c b/src/bridge.c index edd3f39a69..87c6c3c5d7 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -714,9 +714,11 @@ int bridge__on_connect(struct mosquitto *context) } int bridge__on_suback(struct mosquitto *context, int qos) { - if (qos > 2) { + if(qos>2){ log__printf(NULL, MOSQ_LOG_ERR, "Error on bridge subscription: %s", mosquitto_reason_string(qos)); - do_disconnect(context, MOSQ_ERR_CONN_LOST); + if(context->bridge->fatal_sub_errors){ + do_disconnect(context, MOSQ_ERR_CONN_LOST); + } } return MOSQ_ERR_SUCCESS; diff --git a/src/conf.c b/src/conf.c index edc1b6dd96..47347b5bee 100644 --- a/src/conf.c +++ b/src/conf.c @@ -1294,6 +1294,13 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload, if(conf__parse_bool(&token, "bridge_outgoing_retain", &cur_bridge->outgoing_retain, &saveptr)) return MOSQ_ERR_INVAL; #else log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); +#endif + }else if(!strcmp(token, "bridge_fatal_sub_errors")){ +#if defined(WITH_BRIDGE) + REQUIRE_BRIDGE(token); + if(conf__parse_bool(&token, "bridge_fatal_sub_errors", &cur_bridge->fatal_sub_errors, &saveptr)) return MOSQ_ERR_INVAL; +#else + log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); #endif }else if(!strcmp(token, "bridge_keyfile")){ #if defined(WITH_BRIDGE) && defined(WITH_TLS) @@ -1566,6 +1573,7 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload, cur_bridge->protocol_version = mosq_p_mqtt311; cur_bridge->primary_retry_sock = INVALID_SOCKET; cur_bridge->outgoing_retain = true; + cur_bridge->fatal_sub_errors = true; cur_bridge->clean_start_local = -1; cur_bridge->reload_type = brt_lazy; cur_bridge->max_topic_alias = 10; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 8adf4b290d..33ef5ab3ef 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -596,6 +596,7 @@ struct mosquitto__bridge{ bool attempt_unsubscribe; bool initial_notification_done; bool outgoing_retain; + bool fatal_sub_errors; enum mosquitto_bridge_reload_type reload_type; uint16_t max_topic_alias; #ifdef WITH_TLS From 88a6adf3e87e29549430a64bcb5a199b7a1dd2ea Mon Sep 17 00:00:00 2001 From: Abilio Marques Date: Wed, 25 Oct 2023 22:18:32 -0700 Subject: [PATCH 4/4] add test for bridge_fatal_sub_errors Signed-off-by: Abilio Marques --- ..._errors.py => 06-bridge-fatal-sub-errors.py} | 17 ++++++++++------- test/broker/16-config-huge.py | 1 + .../16-config-parse-errors-without-tls.py | 1 + test/broker/test.py | 1 + 4 files changed, 13 insertions(+), 7 deletions(-) rename test/broker/{06-bridge-disconnect_on_sub_errors.py => 06-bridge-fatal-sub-errors.py} (80%) diff --git a/test/broker/06-bridge-disconnect_on_sub_errors.py b/test/broker/06-bridge-fatal-sub-errors.py similarity index 80% rename from test/broker/06-bridge-disconnect_on_sub_errors.py rename to test/broker/06-bridge-fatal-sub-errors.py index 817116620c..73d5a0f884 100755 --- a/test/broker/06-bridge-disconnect_on_sub_errors.py +++ b/test/broker/06-bridge-fatal-sub-errors.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 -# Does a bridge disconnect on subscription errors? +# Does a bridge with bridge_fatal_sub_errors enabled +# disconnect on subscription errors? Does it remain connected otherwise? from mosq_test_helper import * -def write_config(filename, port1, port2): +def write_config(filename, port1, port2, fatal_sub_errors): with open(filename, 'w') as f: f.write("listener %d\n" % (port2)) f.write("allow_anonymous true\n") @@ -15,6 +16,7 @@ def write_config(filename, port1, port2): f.write("notifications false\n") f.write("restart_timeout 5\n") f.write("cleansession true\n") + f.write("bridge_fatal_sub_errors %s\n" % str(fatal_sub_errors).lower()) def is_connected(sock): try: @@ -23,10 +25,10 @@ def is_connected(sock): except TimeoutError as e: return True -def do_test(): +def do_test(fatal_sub_errors): (port1, port2) = mosq_test.get_port(2) conf_file = os.path.basename(__file__).replace('.py', '.conf') - write_config(conf_file, port1, port2) + write_config(conf_file, port1, port2, fatal_sub_errors) rc = 1 client_id = socket.gethostname()+".bridge_sample" @@ -59,8 +61,8 @@ def do_test(): time.sleep(0.25) # give the broker some time to react - # should be disconnected by now - rc = bool(is_connected(bridge)) + # if (connected and not fatal) or (disconnected and fatal): success, else: failure + rc = 0 if is_connected(bridge) != fatal_sub_errors else 1 except mosq_test.TestError: pass finally: @@ -80,6 +82,7 @@ def do_test(): print(stde.decode('utf-8')) exit(rc) -do_test() +do_test(True) +do_test(False) exit(0) diff --git a/test/broker/16-config-huge.py b/test/broker/16-config-huge.py index 5acf136939..dbdf4e7918 100755 --- a/test/broker/16-config-huge.py +++ b/test/broker/16-config-huge.py @@ -75,6 +75,7 @@ def write_config(filename, ports, per_listener_settings, plugver, acl_file): f.write("bridge_max_packet_size 10000\n") f.write("bridge_max_topic_alias 1000\n") f.write("bridge_outgoing_retain false\n") + f.write("bridge_fatal_sub_errors false\n") f.write("bridge_protocol_version mqttv50\n") #f.write("bridge_psk\n") f.write("bridge_receive_maximum 100\n") diff --git a/test/broker/16-config-parse-errors-without-tls.py b/test/broker/16-config-parse-errors-without-tls.py index 3b444bc3bd..209d409fd2 100755 --- a/test/broker/16-config-parse-errors-without-tls.py +++ b/test/broker/16-config-parse-errors-without-tls.py @@ -38,6 +38,7 @@ do_test_broker_failure(conf_file, ["bridge_max_packet_size 1000"], port, 3, "Error: The 'bridge_max_packet_size' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_max_topic_alias 1000"], port, 3, "Error: The 'bridge_max_topic_alias' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_outgoing_retain false"], port, 3, "Error: The 'bridge_outgoing_retain' option requires a bridge to be defined first.") # Missing bridge config +do_test_broker_failure(conf_file, ["bridge_fatal_sub_errors false"], 3, "Error: The 'bridge_fatal_sub_errors' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_protocol_version string"], port, 3, "Error: The 'bridge_protocol_version' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_receive_maximum 10"], port, 3, "Error: The 'bridge_receive_maximum' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_reload_type string"], port, 3, "Error: The 'bridge_reload_type' option requires a bridge to be defined first.") # Missing bridge config diff --git a/test/broker/test.py b/test/broker/test.py index 8615da2842..36a22af7e0 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -113,6 +113,7 @@ (2, './06-bridge-reconnect-local-out.py'), (2, './06-bridge-remote-shutdown.py'), (2, './06-bridge-config-reload.py'), + (2, './06-bridge-fatal-sub-errors.py'), (1, './07-will-control.py'), (1, './07-will-delay-invalid-573191.py'),