From 6623ca110a332fd7b5cfaa8a70f0b62732b24662 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Wed, 11 Sep 2024 16:14:13 +0200 Subject: [PATCH] Fix doubled messaged in MAM bug The issue came from the fact that MAM looks for a `mam_id` field in the accumulator when trying to save a message. This should prevent double messages in scenarios such as retransmitting a message. However, when rerouting uses the normal routing procedure, which strips the nonpersistent accumulator fields. This is done, because in the normal case the perspective of the message processing changes - sender processing is done, and receiver processing starts. When retrasmitting messages, we would like to process it from the receiver perspective once more, but with accumulator fields saved from the last time the message was processed. I decided to use the filter_unacknowledged_messages hook in mod_mam_pm, which is called before retransmitting messages to save the mam_id as a permanent field in the accumulator. The "filter" name is analogous to the filter_local_packet hook, which is also used for processing in the broader sense than only filtering. --- big_tests/tests/mam_SUITE.erl | 155 +++++++++++++++++- src/mam/mod_mam_pm.erl | 20 ++- src/mongoose_acc.erl | 2 +- .../mod_stream_management.erl | 3 +- 4 files changed, 174 insertions(+), 6 deletions(-) diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index 78e073d83e..359d9639c2 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -240,9 +240,12 @@ basic_groups() -> [ {mam_all, [parallel], [{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()}, + {mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases() + ++ [{sm, [], sm_cases()}]}, {mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features] ++ stanzaid_cases() ++ retract_cases() - ++ metadata_cases() ++ fetch_specific_msgs_cases()}, + ++ metadata_cases() ++ fetch_specific_msgs_cases() + ++ [{sm, [], sm_cases()}]}, {nostore, [parallel], nostore_cases()}, {archived, [parallel], archived_cases()}, {configurable_archiveid, [], configurable_archiveid_cases()}, @@ -513,6 +516,11 @@ impl_specific() -> pm_sql_query_failed, async_pools_batch_flush]. +sm_cases() -> + [reconnect_ack, + reconnect_no_ack, + reconnect_no_ack_different_resource]. + suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). @@ -582,6 +590,13 @@ init_per_group(with_rsm04, Config) -> [{props, mam04_props()}, {with_rsm, true}|Config]; init_per_group(nostore, Config) -> Config; +init_per_group(sm, Config) -> + Config1 = dynamic_modules:save_modules(host_type(), Config), + DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management), + MnesiaOrCets = ct_helper:get_internal_database(), + SMConfig = DefaultSMConfig#{backend => MnesiaOrCets}, + dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, SMConfig}]), + Config1; init_per_group(archived, Config) -> Config; init_per_group(muc04, Config) -> @@ -683,6 +698,8 @@ end_per_group(G, Config) when G == drop_msg; G == muc_drop_msg -> teardown_meck(), Config; +end_per_group(sm, Config) -> + dynamic_modules:restore_modules(Config); end_per_group(muc_configurable_archiveid, Config) -> dynamic_modules:restore_modules(Config), Config; @@ -3854,6 +3871,142 @@ check_user_exist(Config) -> %% cleanup ok = rpc(mim(), ejabberd_auth, remove_user, [JID]). +reconnect_no_ack(Config) -> + %% connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + % she may get the ack request before the message for some reason + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, session), + + + % we have to send presence by hand, because the message may be received first + sm_helper:send_initial_presence(NewAlice), + % Current behaviour - unacked stanza is rerouted when a quick reconnection occurs + % there is no delay element, or any indication of retransmission + NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(Presence) -> escalus_pred:is_presence(Presence) end], + NewResp), + + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + + % There is only one message in MAM, even though it was sent twice + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_ack(Config) -> + % connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and acknowledges + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + escalus_connection:send(Alice, escalus_stanza:sm_ack(2)), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, presence), + + % 5. check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % and no new messages in MAM + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_no_ack_different_resource(Config) -> + %% connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}), + Alice = sm_helper:connect_spec(Spec, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects a different resource + NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual), + + % 2nd resource doesn't get the stanza, only the delayed presence. + Presence = escalus:wait_for_stanza(NewAlice), + escalus:assert(is_presence, Presence), + + % 5. check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % and no new messages in MAM + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + %% This function supports only one device, one user. %% We don't send initial presence to avoid presence broadcasts between resources %% of the same user from different stories. diff --git a/src/mam/mod_mam_pm.erl b/src/mam/mod_mam_pm.erl index 3eab0fc8c6..1e8a664aa2 100644 --- a/src/mam/mod_mam_pm.erl +++ b/src/mam/mod_mam_pm.erl @@ -49,7 +49,8 @@ filter_packet/3, remove_user/3, determine_amp_strategy/3, - sm_filter_offline_message/3]). + sm_filter_offline_message/3, + filter_unacknowledged_messages/3]). %% ejabberd handlers -export([process_mam_iq/5]). @@ -273,6 +274,11 @@ get_personal_data(Acc, #{jid := ArcJID}, #{host_type := HostType}) -> Entries = mongoose_hooks:get_mam_pm_gdpr_data(HostType, ArcJID), {ok, [{mam_pm, Schema, Entries} | Acc]}. +-spec filter_unacknowledged_messages(Buffer :: [mongoose_acc:t()], Params :: map(), Extra :: map()) -> + {ok, [mongoose_acc:t()]}. +filter_unacknowledged_messages(Buffer, _, _) -> + {ok, [acc_with_perm_mam_id(Acc) || Acc <- Buffer]}. + %% ---------------------------------------------------------------------- %% Internal functions @@ -531,6 +537,15 @@ return_acc_with_mam_id_if_configured(ExtMessId, HostType, Acc) -> true -> mongoose_acc:set_permanent(mam, mam_id, ExtMessId, Acc) end. +-spec acc_with_perm_mam_id(Acc :: mongoose_acc:t()) -> Acc :: mongoose_acc:t(). +acc_with_perm_mam_id(Acc) -> + case mongoose_acc:get(mam, mam_id, undefined, Acc) of + undefined -> + Acc; + MamID -> + mongoose_acc:set_permanent(mam, mam_id, MamID, Acc) + end. + is_interesting(LocJID, RemJID) -> HostType = jid_to_host_type(LocJID), ArcID = archive_id_int(HostType, LocJID), @@ -732,7 +747,8 @@ hooks(HostType) -> {anonymous_purge, HostType, fun ?MODULE:remove_user/3, #{}, 50}, {amp_determine_strategy, HostType, fun ?MODULE:determine_amp_strategy/3, #{}, 20}, {sm_filter_offline_message, HostType, fun ?MODULE:sm_filter_offline_message/3, #{}, 50}, - {get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50} + {get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50}, + {filter_unacknowledged_messages, HostType, fun ?MODULE:filter_unacknowledged_messages/3, #{}, 50} ]. add_iq_handlers(HostType, Opts) -> diff --git a/src/mongoose_acc.erl b/src/mongoose_acc.erl index d1f9481639..141886c0d9 100644 --- a/src/mongoose_acc.erl +++ b/src/mongoose_acc.erl @@ -56,7 +56,7 @@ % Strip with or without stanza replacement -export([strip/1, strip/2]). --ignore_xref([delete/2, ref/1]). +-ignore_xref([delete/2, ref/1, strip/1]). %% Note about 'undefined' to_jid and from_jid: these are the special cases when JID may be %% truly unknown: before a client is authorized. diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index 28ccba4cf5..d7ac5c5a1b 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) -> -spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t(). notify_unacknowledged_msg(Acc, Jid) -> - NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid), - mongoose_acc:strip(NewAcc). + mongoose_hooks:unacknowledged_message(Acc, Jid). -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> mongoose_c2s_hooks:result().