Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix doubled messaged in MAM bug #4374

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 151 additions & 2 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,12 @@ is_skipped(_, _) ->
basic_groups() ->
[
{mam_all, [parallel],
[{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()},
[{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()
++ [{stream_management, [], stream_management_cases()}]},
{mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features]
++ stanzaid_cases() ++ retract_cases()
++ metadata_cases() ++ fetch_specific_msgs_cases()},
++ metadata_cases() ++ fetch_specific_msgs_cases()
++ [{stream_management, [], stream_management_cases()}]},
{nostore, [parallel], nostore_cases()},
{archived, [parallel], archived_cases()},
{configurable_archiveid, [], configurable_archiveid_cases()},
Expand Down Expand Up @@ -513,6 +515,11 @@ impl_specific() ->
pm_sql_query_failed,
async_pools_batch_flush].

stream_management_cases() ->
[reconnect_ack,
reconnect_no_ack,
reconnect_no_ack_different_resource].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

Expand Down Expand Up @@ -582,6 +589,13 @@ init_per_group(with_rsm04, Config) ->
[{props, mam04_props()}, {with_rsm, true}|Config];
init_per_group(nostore, Config) ->
Config;
init_per_group(stream_management, Config) ->
Config1 = dynamic_modules:save_modules(host_type(), Config),
DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management),
MnesiaOrCets = ct_helper:get_internal_database(),
SMConfig = DefaultSMConfig#{backend => MnesiaOrCets},
dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, SMConfig}]),
Config1;
init_per_group(archived, Config) ->
Config;
init_per_group(muc04, Config) ->
Expand Down Expand Up @@ -683,6 +697,8 @@ end_per_group(G, Config) when G == drop_msg;
G == muc_drop_msg ->
teardown_meck(),
Config;
end_per_group(stream_management, Config) ->
dynamic_modules:restore_modules(Config);
end_per_group(muc_configurable_archiveid, Config) ->
dynamic_modules:restore_modules(Config),
Config;
Expand Down Expand Up @@ -3854,6 +3870,139 @@ check_user_exist(Config) ->
%% cleanup
ok = rpc(mim(), ejabberd_auth, remove_user, [JID]).

reconnect_no_ack(Config) ->
%% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
% She may get the ack request before the message for some reason
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, session),

% We have to send presence by hand, because the message may be received first
sm_helper:send_initial_presence(NewAlice),
% Current behaviour - unacked stanza is rerouted when a quick reconnection occurs
% there is no delay element, or any indication of retransmission
NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(Presence) -> escalus_pred:is_presence(Presence) end],
NewResp),

AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),

% There is only one message in MAM, even though it was resent
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_ack(Config) ->
% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and acknowledges
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),
escalus_connection:send(Alice, escalus_stanza:sm_ack(2)),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, presence),

% 5. Check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% No new messages in MAM as well
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_no_ack_different_resource(Config) ->
%% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}),
Alice = sm_helper:connect_spec(Spec, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects a different resource
NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual),

% 2nd resource doesn't get the stanza, only the delayed presence.
Presence = escalus:wait_for_stanza(NewAlice),
escalus:assert(is_presence, Presence),

% 5. Check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% No new messages in MAM as well
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

%% This function supports only one device, one user.
%% We don't send initial presence to avoid presence broadcasts between resources
%% of the same user from different stories.
Expand Down
20 changes: 18 additions & 2 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
filter_packet/3,
remove_user/3,
determine_amp_strategy/3,
sm_filter_offline_message/3]).
sm_filter_offline_message/3,
filter_unacknowledged_messages/3]).

%% ejabberd handlers
-export([process_mam_iq/5]).
Expand Down Expand Up @@ -273,6 +274,11 @@
Entries = mongoose_hooks:get_mam_pm_gdpr_data(HostType, ArcJID),
{ok, [{mam_pm, Schema, Entries} | Acc]}.

-spec filter_unacknowledged_messages(Buffer :: [mongoose_acc:t()], Params :: map(), Extra :: map()) ->
{ok, [mongoose_acc:t()]}.
filter_unacknowledged_messages(Buffer, _, _) ->
{ok, [acc_with_perm_mam_id(Acc) || Acc <- Buffer]}.

%% ----------------------------------------------------------------------
%% Internal functions

Expand Down Expand Up @@ -531,6 +537,15 @@
true -> mongoose_acc:set_permanent(mam, mam_id, ExtMessId, Acc)
end.

-spec acc_with_perm_mam_id(Acc :: mongoose_acc:t()) -> Acc :: mongoose_acc:t().
acc_with_perm_mam_id(Acc) ->
case mongoose_acc:get(mam, mam_id, undefined, Acc) of
undefined ->
Acc;

Check warning on line 544 in src/mam/mod_mam_pm.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_pm.erl#L544

Added line #L544 was not covered by tests
MamID ->
mongoose_acc:set_permanent(mam, mam_id, MamID, Acc)
end.

is_interesting(LocJID, RemJID) ->
HostType = jid_to_host_type(LocJID),
ArcID = archive_id_int(HostType, LocJID),
Expand Down Expand Up @@ -732,7 +747,8 @@
{anonymous_purge, HostType, fun ?MODULE:remove_user/3, #{}, 50},
{amp_determine_strategy, HostType, fun ?MODULE:determine_amp_strategy/3, #{}, 20},
{sm_filter_offline_message, HostType, fun ?MODULE:sm_filter_offline_message/3, #{}, 50},
{get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50}
{get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50},
{filter_unacknowledged_messages, HostType, fun ?MODULE:filter_unacknowledged_messages/3, #{}, 50}
].

add_iq_handlers(HostType, Opts) ->
Expand Down
2 changes: 1 addition & 1 deletion src/mongoose_acc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
% Strip with or without stanza replacement
-export([strip/1, strip/2]).

-ignore_xref([delete/2, ref/1]).
-ignore_xref([delete/2, ref/1, strip/1]).

%% Note about 'undefined' to_jid and from_jid: these are the special cases when JID may be
%% truly unknown: before a client is authorized.
Expand Down
3 changes: 1 addition & 2 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) ->

-spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t().
notify_unacknowledged_msg(Acc, Jid) ->
NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid),
mongoose_acc:strip(NewAcc).
mongoose_hooks:unacknowledged_message(Acc, Jid).

-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
Expand Down