diff --git a/ChangeLog.txt b/ChangeLog.txt index a2484507e..1f19a095b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -74,7 +74,9 @@ Broker: choose TLS v1.1, but this is not recommended and will be removed in a future version. - Add -q option to allow logging to be disabled at the command line. -- Add suport for PROXY protocol v2. +- Add support for PROXY protocol v2. +- Add `bridge_fatal_sub_errors` option to control what happens when a bridge + fails to subscribe to a topic. Plugins / plugin interface: - Add persist-sqlite plugin. diff --git a/fuzzing/corpora/broker_conf.dict b/fuzzing/corpora/broker_conf.dict index 2d3511e9e..8c9b07be3 100644 --- a/fuzzing/corpora/broker_conf.dict +++ b/fuzzing/corpora/broker_conf.dict @@ -34,6 +34,7 @@ "bridge_max_packet_size" "bridge_max_topic_alias" "bridge_outgoing_retain" +"bridge_fatal_sub_errors" "bridge_protocol_version" "bridge_psk" "bridge_receive_maximum" diff --git a/lib/handle_suback.c b/lib/handle_suback.c index a16bdccca..8e3df9894 100644 --- a/lib/handle_suback.c +++ b/lib/handle_suback.c @@ -85,6 +85,14 @@ int handle__suback(struct mosquitto *mosq) mosquitto_property_free_all(&properties); return rc; } + +#ifdef WITH_BRIDGE + if(mosq->bridge){ + rc = bridge__on_suback(mosq, qos); + if(rc) return rc; + } +#endif + granted_qos[i] = (int)qos; i++; } diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index 2170e64a4..8569783a3 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -1974,7 +1974,16 @@ openssl dhparam -out dhparam.pem 2048 other setting. Defaults to true. - + + [ true | false ] + + A failure to subscribe to a topic will cause an immediate + disconnection. The hope is that a temporary failure will disappear + after reconnecting. If you desire to silently ignore subscription + errors (not advised), you can set the bridge_fatal_sub_errors + option to false. Defaults to true. + + version diff --git a/mosquitto.conf b/mosquitto.conf index 083e9b592..6a4affe40 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -940,6 +940,12 @@ # all outgoing messages to that bridge, regardless of any other setting. #bridge_outgoing_retain true +# A failure to subscribe to a topic will cause an immediate disconnection. The +# hope is that a temporary failure will disappear after reconnecting. If you +# desire to silently ignore subscription errors (not advised), you can set the +# bridge_fatal_sub_errors option to false. +#bridge_fatal_sub_errors true + # If you wish to restrict the size of messages sent to a remote bridge, use the # bridge_max_packet_size option. This sets the maximum number of bytes for # the total message, including headers and payload. diff --git a/src/bridge.c b/src/bridge.c index 149c4cf87..87c6c3c5d 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -713,6 +713,16 @@ int bridge__on_connect(struct mosquitto *context) return MOSQ_ERR_SUCCESS; } +int bridge__on_suback(struct mosquitto *context, int qos) { + if(qos>2){ + log__printf(NULL, MOSQ_LOG_ERR, "Error on bridge subscription: %s", mosquitto_reason_string(qos)); + if(context->bridge->fatal_sub_errors){ + do_disconnect(context, MOSQ_ERR_CONN_LOST); + } + } + + return MOSQ_ERR_SUCCESS; +} int bridge__register_local_connections(void) { diff --git a/src/conf.c b/src/conf.c index edc1b6dd9..47347b5be 100644 --- a/src/conf.c +++ b/src/conf.c @@ -1294,6 +1294,13 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload, if(conf__parse_bool(&token, "bridge_outgoing_retain", &cur_bridge->outgoing_retain, &saveptr)) return MOSQ_ERR_INVAL; #else log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); +#endif + }else if(!strcmp(token, "bridge_fatal_sub_errors")){ +#if defined(WITH_BRIDGE) + REQUIRE_BRIDGE(token); + if(conf__parse_bool(&token, "bridge_fatal_sub_errors", &cur_bridge->fatal_sub_errors, &saveptr)) return MOSQ_ERR_INVAL; +#else + log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); #endif }else if(!strcmp(token, "bridge_keyfile")){ #if defined(WITH_BRIDGE) && defined(WITH_TLS) @@ -1566,6 +1573,7 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload, cur_bridge->protocol_version = mosq_p_mqtt311; cur_bridge->primary_retry_sock = INVALID_SOCKET; cur_bridge->outgoing_retain = true; + cur_bridge->fatal_sub_errors = true; cur_bridge->clean_start_local = -1; cur_bridge->reload_type = brt_lazy; cur_bridge->max_topic_alias = 10; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index ef1a2e661..33ef5ab3e 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -596,6 +596,7 @@ struct mosquitto__bridge{ bool attempt_unsubscribe; bool initial_notification_done; bool outgoing_retain; + bool fatal_sub_errors; enum mosquitto_bridge_reload_type reload_type; uint16_t max_topic_alias; #ifdef WITH_TLS @@ -801,6 +802,7 @@ int bridge__connect(struct mosquitto *context); int bridge__connect_step3(struct mosquitto *context); #endif int bridge__on_connect(struct mosquitto *context); +int bridge__on_suback(struct mosquitto *context, int qos); void bridge_check(void); int bridge__register_local_connections(void); int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix); diff --git a/test/broker/06-bridge-fatal-sub-errors.py b/test/broker/06-bridge-fatal-sub-errors.py new file mode 100755 index 000000000..73d5a0f88 --- /dev/null +++ b/test/broker/06-bridge-fatal-sub-errors.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +# Does a bridge with bridge_fatal_sub_errors enabled +# disconnect on subscription errors? Does it remain connected otherwise? + +from mosq_test_helper import * + +def write_config(filename, port1, port2, fatal_sub_errors): + with open(filename, 'w') as f: + f.write("listener %d\n" % (port2)) + f.write("allow_anonymous true\n") + f.write("\n") + f.write("connection bridge_sample\n") + f.write("address 127.0.0.1:%d\n" % (port1)) + f.write("topic in_topic in\n") + f.write("notifications false\n") + f.write("restart_timeout 5\n") + f.write("cleansession true\n") + f.write("bridge_fatal_sub_errors %s\n" % str(fatal_sub_errors).lower()) + +def is_connected(sock): + try: + sock.recv(1) # if still connected, the recv will timeout + return False + except TimeoutError as e: + return True + +def do_test(fatal_sub_errors): + (port1, port2) = mosq_test.get_port(2) + conf_file = os.path.basename(__file__).replace('.py', '.conf') + write_config(conf_file, port1, port2, fatal_sub_errors) + + rc = 1 + client_id = socket.gethostname()+".bridge_sample" + connect_packet = mosq_test.gen_connect(client_id, proto_ver=132) + connack_packet = mosq_test.gen_connack() + + mid = 1 + subscribe_packet = mosq_test.gen_subscribe(mid, "in_topic", 0) + suback_packet = mosq_test.gen_suback(mid, 0x80) + + ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock.settimeout(40) + ssock.bind(('', port1)) + ssock.listen(1) + + broker = None + + try: + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + + (bridge, address) = ssock.accept() + bridge.settimeout(5) + + mosq_test.expect_packet(bridge, "connect", connect_packet) + bridge.send(connack_packet) + + mosq_test.expect_packet(bridge, "subscribe", subscribe_packet) + bridge.send(suback_packet) + + time.sleep(0.25) # give the broker some time to react + + # if (connected and not fatal) or (disconnected and fatal): success, else: failure + rc = 0 if is_connected(bridge) != fatal_sub_errors else 1 + except mosq_test.TestError: + pass + finally: + os.remove(conf_file) + try: + bridge.close() + except NameError: + pass + + broker.terminate() + if mosq_test.wait_for_subprocess(broker): + print("broker not terminated") + rc = 1 + (stdo, stde) = broker.communicate() + ssock.close() + if rc: + print(stde.decode('utf-8')) + exit(rc) + +do_test(True) +do_test(False) + +exit(0) diff --git a/test/broker/16-config-huge.py b/test/broker/16-config-huge.py index 5acf13693..dbdf4e791 100755 --- a/test/broker/16-config-huge.py +++ b/test/broker/16-config-huge.py @@ -75,6 +75,7 @@ def write_config(filename, ports, per_listener_settings, plugver, acl_file): f.write("bridge_max_packet_size 10000\n") f.write("bridge_max_topic_alias 1000\n") f.write("bridge_outgoing_retain false\n") + f.write("bridge_fatal_sub_errors false\n") f.write("bridge_protocol_version mqttv50\n") #f.write("bridge_psk\n") f.write("bridge_receive_maximum 100\n") diff --git a/test/broker/16-config-parse-errors-without-tls.py b/test/broker/16-config-parse-errors-without-tls.py index 3b444bc3b..209d409fd 100755 --- a/test/broker/16-config-parse-errors-without-tls.py +++ b/test/broker/16-config-parse-errors-without-tls.py @@ -38,6 +38,7 @@ do_test_broker_failure(conf_file, ["bridge_max_packet_size 1000"], port, 3, "Error: The 'bridge_max_packet_size' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_max_topic_alias 1000"], port, 3, "Error: The 'bridge_max_topic_alias' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_outgoing_retain false"], port, 3, "Error: The 'bridge_outgoing_retain' option requires a bridge to be defined first.") # Missing bridge config +do_test_broker_failure(conf_file, ["bridge_fatal_sub_errors false"], 3, "Error: The 'bridge_fatal_sub_errors' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_protocol_version string"], port, 3, "Error: The 'bridge_protocol_version' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_receive_maximum 10"], port, 3, "Error: The 'bridge_receive_maximum' option requires a bridge to be defined first.") # Missing bridge config do_test_broker_failure(conf_file, ["bridge_reload_type string"], port, 3, "Error: The 'bridge_reload_type' option requires a bridge to be defined first.") # Missing bridge config diff --git a/test/broker/test.py b/test/broker/test.py index 8615da284..36a22af7e 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -113,6 +113,7 @@ (2, './06-bridge-reconnect-local-out.py'), (2, './06-bridge-remote-shutdown.py'), (2, './06-bridge-config-reload.py'), + (2, './06-bridge-fatal-sub-errors.py'), (1, './07-will-control.py'), (1, './07-will-delay-invalid-573191.py'),