From d7580eb6934adb68a7f77ab8ab88baa41b0686aa Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Wed, 11 Sep 2024 16:14:13 +0200 Subject: [PATCH] Fix doubled messaged in MAM bug The issue came from the fact that MAM looks for a `mam_id` field in the accumulator when trying to save a message. This should prevent double messages in scenarios such as retransmitting a message. However, for some reason, in mod_stream_management, the accumulator was stripped of its fields. This didn't make any sense, as the metadata could be useful (as is the case here). I'm not certain if the retransmission itself is a correct behaviour, and if the stanza shouldn't contain a "delay" element. --- big_tests/tests/mam_SUITE.erl | 154 +++++++++++++++++- .../mod_stream_management.erl | 3 +- 2 files changed, 154 insertions(+), 3 deletions(-) diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index 78e073d83e..115ecfd342 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -240,9 +240,12 @@ basic_groups() -> [ {mam_all, [parallel], [{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()}, + {mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases() + ++ [{sm, [], sm_cases()}]}, {mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features] ++ stanzaid_cases() ++ retract_cases() - ++ metadata_cases() ++ fetch_specific_msgs_cases()}, + ++ metadata_cases() ++ fetch_specific_msgs_cases() + ++ [{sm, [], sm_cases()}]}, {nostore, [parallel], nostore_cases()}, {archived, [parallel], archived_cases()}, {configurable_archiveid, [], configurable_archiveid_cases()}, @@ -513,6 +516,11 @@ impl_specific() -> pm_sql_query_failed, async_pools_batch_flush]. +sm_cases() -> + [reconnect_ack, + reconnect_no_ack, + reconnect_no_ack_different_resource]. + suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). @@ -582,6 +590,11 @@ init_per_group(with_rsm04, Config) -> [{props, mam04_props()}, {with_rsm, true}|Config]; init_per_group(nostore, Config) -> Config; +init_per_group(sm, Config) -> + Config1 = dynamic_modules:save_modules(host_type(), Config), + DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management), + dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, DefaultSMConfig}]), + Config1; init_per_group(archived, Config) -> Config; init_per_group(muc04, Config) -> @@ -683,6 +696,9 @@ end_per_group(G, Config) when G == drop_msg; G == muc_drop_msg -> teardown_meck(), Config; +end_per_group(sm, Config) -> + dynamic_modules:restore_modules(Config), + mim_loglevel:restore_log_level(Config); end_per_group(muc_configurable_archiveid, Config) -> dynamic_modules:restore_modules(Config), Config; @@ -3854,6 +3870,142 @@ check_user_exist(Config) -> %% cleanup ok = rpc(mim(), ejabberd_auth, remove_user, [JID]). +reconnect_no_ack(Config) -> + %% connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + % she may get the ack request before the message for some reason + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, session), + + + % we have to send presence by hand, because the message may be received first + sm_helper:send_initial_presence(NewAlice), + % Current behaviour - unacked stanza is rerouted when a quick reconnection occurs + % there is no delay element, or any indication of retransmission + NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(Presence) -> escalus_pred:is_presence(Presence) end], + NewResp), + + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + + % There is only one message in MAM, even though it was sent twice + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_ack(Config) -> + % connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and acknowledges + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + escalus_connection:send(Alice, escalus_stanza:sm_ack(2)), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, presence), + + % 5. check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % and no new messages in MAM + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_no_ack_different_resource(Config) -> + %% connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}), + Alice = sm_helper:connect_spec(Spec, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects a different resource + NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual), + + % 2nd resource doesn't get the stanza, only the delayed presence. + Presence = escalus:wait_for_stanza(NewAlice), + escalus:assert(is_presence, Presence), + + % 5. check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % and no new messages in MAM + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + %% This function supports only one device, one user. %% We don't send initial presence to avoid presence broadcasts between resources %% of the same user from different stories. diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index 28ccba4cf5..d7ac5c5a1b 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) -> -spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t(). notify_unacknowledged_msg(Acc, Jid) -> - NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid), - mongoose_acc:strip(NewAcc). + mongoose_hooks:unacknowledged_message(Acc, Jid). -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> mongoose_c2s_hooks:result().