diff --git a/big_tests/dynamic_domains.spec b/big_tests/dynamic_domains.spec index 2d2ae514c7..87f7c4b111 100644 --- a/big_tests/dynamic_domains.spec +++ b/big_tests/dynamic_domains.spec @@ -77,6 +77,23 @@ disco_rooms_rsm], "at the moment mod_disco doesn't support dynamic domains"}. +{suites, "tests", sm_SUITE}. +{skip_cases, "tests", sm_SUITE, + [basic_ack, + h_ok_after_session_enabled_after_session, + subscription_requests_are_buffered_properly], + "at the moment mod_roster doesn't support dynamic domains"}. +{skip_cases, "tests", sm_SUITE, + [resend_unacked_on_reconnection, + session_established, + wait_for_resumption, + resume_session_kills_old_C2S_gracefully, + resend_unacked_after_resume_timeout, + resend_more_offline_messages_than_buffer_size, + resume_expired_session_returns_correct_h, + unacknowledged_message_hook_offline], + "at the moment mod_offline doesn't support dynamic domains"}. + {config, ["dynamic_domains.config", "test.config"]}. {logdir, "ct_report"}. diff --git a/big_tests/tests/ct_helper.erl b/big_tests/tests/ct_helper.erl index 49d03e1931..106e3469af 100644 --- a/big_tests/tests/ct_helper.erl +++ b/big_tests/tests/ct_helper.erl @@ -1,8 +1,9 @@ -module(ct_helper). -export([is_ct_running/0, - repeat_all_until_all_ok/1, - repeat_all_until_all_ok/2 - ]). + repeat_all_until_all_ok/1, + repeat_all_until_all_ok/2, + repeat_all_until_any_fail/1, + repeat_all_until_any_fail/2]). -type group_name() :: atom(). @@ -27,6 +28,11 @@ is_ct_running() -> repeat_all_until_all_ok(GroupDefs) -> repeat_all_until_all_ok(GroupDefs, 3). +-spec repeat_all_until_any_fail([group_def() | group_def_dirty() | group_def_incomplete()]) -> + [group_def()]. +repeat_all_until_any_fail(GroupDefs) -> + repeat_all_until_any_fail(GroupDefs, 100). + %% @doc repeat_all_until_all_ok/2 will rewrite your group definitions so that %% the `{repeat_until_all_ok, Retries}` property is added to all of them. %% For example, for the following definitions: @@ -51,6 +57,13 @@ repeat_all_until_all_ok(GroupDefs, Retries) -> [ {Name, maybe_add_repeat_type(repeat_until_all_ok, Retries, Properties), Tests} || {Name, Properties, Tests} <- prepare_group_defs(GroupDefs) ]. +-spec repeat_all_until_any_fail([group_def() | group_def_dirty() | group_def_incomplete()], + repeat_num()) -> + [group_def()]. +repeat_all_until_any_fail(GroupDefs, Retries) -> + [ {Name, maybe_add_repeat_type(repeat_until_any_fail, Retries, Properties), Tests} + || {Name, Properties, Tests} <- prepare_group_defs(GroupDefs) ]. + -spec maybe_add_repeat_type(repeat_type(), repeat_num(), group_props()) -> group_props(). maybe_add_repeat_type(RepeatType, Retries, Properties) -> case lists:any(fun(El) -> proplists:is_defined(El, Properties) end, all_repeat_modes()) of diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index 0eeb9031c6..bfd41a7f21 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -18,6 +18,8 @@ -import(escalus_stanza, [setattr/3]). +-import(domain_helper, [host_type/0]). + -define(BIG_BIG_BIG_TIMEOUT, 3600). -define(SHORT_RESUME_TIMEOUT, 3). -define(SMALL_SM_BUFFER, 3). @@ -109,9 +111,6 @@ unacknowledged_message_hook() -> suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). -domain() -> - ct:get_config({hosts, mim, domain}). - stream_management_with_stale_h(RepeatAfter, Geriatric) -> [{mod_stream_management, [ @@ -141,45 +140,45 @@ end_per_suite(Config) -> init_per_group(G, Config) when G =:= unacknowledged_message_hook; G =:= manual_ack_freq_long_session_timeout -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]), escalus_users:update_userspec(Config, alice, manual_ack, true); init_per_group(parallel_manual_ack_freq_1, Config) -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]), - rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), ?SHORT_RESUME_TIMEOUT]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]), + rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), ?SHORT_RESUME_TIMEOUT]), escalus_users:update_userspec(Config, alice, manual_ack, true); init_per_group(stale_h, Config) -> escalus_users:update_userspec(Config, alice, manual_ack, true); init_per_group(stream_mgmt_disabled, Config) -> - Config2 = dynamic_modules:save_modules(domain(), Config), - dynamic_modules:stop(domain(), ?MOD_SM), + Config2 = dynamic_modules:save_modules(host_type(), Config), + dynamic_modules:stop(host_type(), ?MOD_SM), rpc(mim(), mnesia, delete_table, [sm_session]), escalus_users:update_userspec(Config2, alice, manual_ack, true); init_per_group(_GroupName, Config) -> Config. end_per_group(stream_mgmt_disabled, Config) -> - dynamic_modules:restore_modules(domain(), Config); + dynamic_modules:restore_modules(host_type(), Config); end_per_group(G, Config) when G =:= unacknowledged_message_hook; G =:= manual_ack_freq_long_session_timeout -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]), Config; end_per_group(parallel_manual_ack_freq_1, Config) -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]), - rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), 600]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]), + rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), 600]), Config; end_per_group(_GroupName, Config) -> Config. set_gc_parameters(RepeatAfter, Geriatric, Config) -> - Config2 = dynamic_modules:save_modules(domain(), Config), + Config2 = dynamic_modules:save_modules(host_type(), Config), dynamic_modules:ensure_modules( - domain(), stream_management_with_stale_h(RepeatAfter, Geriatric)), + host_type(), stream_management_with_stale_h(RepeatAfter, Geriatric)), Config2. register_smid(IntSmidId) -> S = {SMID = make_smid(), IntSmidId}, - ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [domain(), SMID, IntSmidId]), + ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), S. register_some_smid_h(Config) -> @@ -188,8 +187,8 @@ register_some_smid_h(Config) -> init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> Config2 = set_gc_parameters(?BIG_BIG_BIG_TIMEOUT, ?BIG_BIG_BIG_TIMEOUT, Config), - rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), ?SHORT_RESUME_TIMEOUT]), - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]), + rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), ?SHORT_RESUME_TIMEOUT]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]), escalus:init_per_testcase(CN, Config2); init_per_testcase(gc_repeat_after_never_means_no_cleaning = CN, Config) -> Config2 = set_gc_parameters(?BIG_BIG_BIG_TIMEOUT, ?SHORT_RESUME_TIMEOUT, Config), @@ -200,10 +199,10 @@ init_per_testcase(gc_repeat_after_timeout_does_clean = CN, Config) -> Config3 = register_some_smid_h(Config2), escalus:init_per_testcase(CN, Config3); init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 2]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 2]), escalus:init_per_testcase(CN, Config); init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> - register_handler(<<"localhost">>), + register_handler(), escalus:init_per_testcase(CN, Config); init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). @@ -212,15 +211,15 @@ end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_ CN =:= gc_repeat_after_never_means_no_cleaning; CN =:= gc_repeat_after_timeout_does_clean -> - dynamic_modules:stop(domain(), ?MOD_SM), + dynamic_modules:stop(host_type(), ?MOD_SM), rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]), - dynamic_modules:restore_modules(domain(), Config), + dynamic_modules:restore_modules(host_type(), Config), escalus:end_per_testcase(CN, Config); end_per_testcase(server_requests_ack_freq_2 = CN, Config) -> - true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]), + true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]), escalus:end_per_testcase(CN, Config); end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> - unregister_handler(<<"localhost">>), + unregister_handler(), escalus:end_per_testcase(CN, Config); end_per_testcase(CaseName, Config) -> escalus:end_per_testcase(CaseName, Config). @@ -676,15 +675,15 @@ resume_expired_session_returns_correct_h(Config) -> escalus_connection:stop(NewAlice). gc_repeat_after_never_means_no_cleaning(Config) -> - true = rpc(mim(), ?MOD_SM, set_stale_h_repeat_after, [domain(), ?BIG_BIG_BIG_TIMEOUT]), + true = rpc(mim(), ?MOD_SM, set_stale_h_repeat_after, [host_type(), ?BIG_BIG_BIG_TIMEOUT]), [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), - {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID1]), - {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID2]), - {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID3]). + {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), + {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), + {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). gc_repeat_after_timeout_does_clean(Config) -> [{SMID1, _} | _ ] = ?config(smid_test, Config), mongoose_helper:wait_until(fun() -> - rpc(mim(), ?MOD_SM, get_stale_h, [domain(), SMID1]) + rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) end, {error, smid_not_found}, #{name => smid_garbage_collected}). @@ -718,17 +717,22 @@ resume_session_state_send_message(Config) -> %% send some messages and check if c2s can handle it escalus_connection:send(Bob, escalus_stanza:chat_to(common_helper:get_bjid(AliceSpec), <<"msg-2">>)), escalus_connection:send(Bob, escalus_stanza:chat_to(common_helper:get_bjid(AliceSpec), <<"msg-3">>)), + %% suspend the process to ensure that Alice has enough time to reconnect, + %% before resumption timeout occurs. + ok = rpc(mim(), sys, suspend, [C2SPid]), %% alice comes back and receives unacked message {ok, NewAlice, _} = escalus_connection:start(AliceSpec, ConnSteps), escalus_connection:send(NewAlice, escalus_stanza:presence(<<"available">>)), - - Stanzas = [escalus_connection:get_stanza(NewAlice, msg) || _ <- lists:seq(1,4) ], + escalus:assert(is_presence, escalus_connection:get_stanza(NewAlice, presence)), + %% now we can resume c2s process of the old connection + %% and let it process session resumption timeout + ok = rpc(mim(), sys, resume, [C2SPid]), + Stanzas = escalus:wait_for_stanzas(NewAlice, 3), % what about order ? % alice receive presence from herself and 3 unacked messages from bob - escalus_new_assert:mix_match([is_presence, - is_chat(<<"msg-1">>), + escalus_new_assert:mix_match([is_chat(<<"msg-1">>), is_chat(<<"msg-2">>), is_chat(<<"msg-3">>)], Stanzas), @@ -768,14 +772,19 @@ resume_session_state_stop_c2s(Config) -> 1 = length(get_user_alive_resources(AliceSpec)), rpc(mim(), ejabberd_c2s, stop, [C2SPid] ), wait_for_c2s_state_change(C2SPid, resume_session), + %% suspend the process to ensure that Alice has enough time to reconnect, + %% before resumption timeout occurs. + ok = rpc(mim(), sys, suspend, [C2SPid]), %% alice comes back and receives unacked message {ok, NewAlice, _} = escalus_connection:start(AliceSpec, ConnSteps), escalus_connection:send(NewAlice, escalus_stanza:presence(<<"available">>)), + escalus:assert(is_presence, escalus_connection:get_stanza(NewAlice, presence)), + %% now we can resume c2s process of the old connection + %% and let it process session resumption timeout + ok = rpc(mim(), sys, resume, [C2SPid]), - Stanzas = [escalus_connection:get_stanza(NewAlice, msg), - escalus_connection:get_stanza(NewAlice, msg)], - escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), + escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), escalus_connection:stop(NewAlice), escalus_connection:stop(Bob). @@ -1312,9 +1321,9 @@ no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption %%-------------------------------------------------------------------- start_hook_listener(Resource) -> TestCasePid = self(), - rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource]). + rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). -rpc_start_hook_handler(TestCasePid, User) -> +rpc_start_hook_handler(TestCasePid, User, HostType) -> LUser=jid:nodeprep(User), Handler = fun(Acc, Jid) -> {U, _S, R} = jid:to_lower(Jid), @@ -1327,7 +1336,7 @@ rpc_start_hook_handler(TestCasePid, User) -> _ -> Acc end end, - ejabberd_hooks:add(unacknowledged_message, <<"localhost">>, Handler, 50). + ejabberd_hooks:add(unacknowledged_message, HostType, Handler, 50). wait_for_unacked_msg_hook(Counter, Res, Timeout) -> receive @@ -1373,26 +1382,26 @@ discard_offline_messages(Config, User, H) -> buffer_max(BufferMax) -> {buffer_max, fun () -> - rpc(mim(), ?MOD_SM, get_buffer_max, [domain(), unset]) + rpc(mim(), ?MOD_SM, get_buffer_max, [host_type(), unset]) end, fun (unset) -> ct:pal("buffer_max was not set - setting to 'undefined'"), - rpc(mim(), ?MOD_SM, set_buffer_max, [domain(), undefined]); + rpc(mim(), ?MOD_SM, set_buffer_max, [host_type(), undefined]); (V) -> - rpc(mim(), ?MOD_SM, set_buffer_max, [domain(), V]) + rpc(mim(), ?MOD_SM, set_buffer_max, [host_type(), V]) end, BufferMax}. ack_freq(AckFreq) -> {ack_freq, fun () -> - rpc(mim(), ?MOD_SM, get_ack_freq, [domain(), unset]) + rpc(mim(), ?MOD_SM, get_ack_freq, [host_type(), unset]) end, fun (unset) -> ct:pal("ack_freq was not set - setting to 'undefined'"), - rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), undefined]); + rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), undefined]); (V) -> - rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), V]) + rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), V]) end, AckFreq}. @@ -1512,14 +1521,18 @@ wait_for_queue_length(Pid, Length) -> regression_ns() -> <<"regression">>. -register_handler(Host) -> - rpc(mim(), gen_iq_handler, add_iq_handler, - [ejabberd_sm, Host, regression_ns(), ?MODULE, regression_handler, one_queue]). +register_handler() -> + HostType = host_type(), + rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, + [HostType, regression_ns(), ejabberd_sm, + fun ?MODULE:regression_handler/5, #{}, one_queue]). -unregister_handler(Host) -> - rpc(mim(), gen_iq_handler, remove_iq_handler, [ejabberd_sm, Host, regression_ns()]). +unregister_handler() -> + HostType = host_type(), + rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, + [HostType, regression_ns(), ejabberd_sm]). -regression_handler(_From, _To, Acc, IQ) -> +regression_handler(Acc, _From, _To, IQ, _Extra) -> %% A bit of a hack - will no longer work when the SID format changes {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), erlang:monitor(process, Pid), diff --git a/rel/mim1.vars-toml.config b/rel/mim1.vars-toml.config index 01bd3eb235..8dc74dbaa4 100644 --- a/rel/mim1.vars-toml.config +++ b/rel/mim1.vars-toml.config @@ -22,7 +22,8 @@ auth.methods = [\"dummy\"] auth.dummy.base_time = 1 auth.dummy.variance = 5 - [host_config.modules.mod_carboncopy]"}. + [host_config.modules.mod_carboncopy] + [host_config.modules.mod_stream_management]"}. {password_format, "password.format = \"scram\" password.hash = [\"sha256\"]"}. {scram_iterations, 64}. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 3afdaa5031..87970d002e 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -280,22 +280,23 @@ wait_for_stream(_UnexpectedItem, #state{server = Server} = StateData) -> handle_stream_start({xmlstreamstart, _Name, Attrs}, #state{} = S0) -> Server = jid:nameprep(xml:get_attr_s(<<"to">>, Attrs)), - StreamMgmtConfig = case gen_mod:is_loaded(Server, mod_stream_management) of - true -> false; - _ -> disabled - end, Lang = get_xml_lang(Attrs), - S = S0#state{server = Server, lang = Lang, stream_mgmt = StreamMgmtConfig}, + S1 = S0#state{server = Server, lang = Lang}, case {xml:get_attr_s(<<"xmlns:stream">>, Attrs), mongoose_domain_api:get_domain_host_type(Server)} of {?NS_STREAM, {ok, HostType}} -> + StreamMgmtConfig = case gen_mod:is_loaded(HostType, mod_stream_management) of + true -> false; + _ -> disabled + end, + S = S1#state{host_type = HostType, stream_mgmt = StreamMgmtConfig}, change_shaper(S, jid:make_noprep(<<>>, Server, <<>>)), Version = xml:get_attr_s(<<"version">>, Attrs), - stream_start_by_protocol_version(Version, S#state{host_type = HostType}); + stream_start_by_protocol_version(Version, S); {?NS_STREAM, {error, not_found}} -> - stream_start_error(mongoose_xmpp_errors:host_unknown(), S); + stream_start_error(mongoose_xmpp_errors:host_unknown(), S1); {_InvalidNS, _} -> - stream_start_error(mongoose_xmpp_errors:invalid_namespace(), S) + stream_start_error(mongoose_xmpp_errors:invalid_namespace(), S1) end. stream_start_error(Error, StateData) -> @@ -2730,7 +2731,7 @@ maybe_csi_inactive_optimisation(Acc, {From,To,El}, #state{csi_buffer = Buffer} = {ok, Acc, NewState}. flush_or_buffer_packets(State) -> - MaxBuffSize = gen_mod:get_module_opt(State#state.server, mod_csi, + MaxBuffSize = gen_mod:get_module_opt(State#state.host_type, mod_csi, buffer_max, 20), case length(State#state.csi_buffer) > MaxBuffSize of true -> @@ -2757,7 +2758,7 @@ bounce_csi_buffer(#state{csi_buffer = Buffer}) -> %%%---------------------------------------------------------------------- %%% XEP-0198: Stream Management %%%---------------------------------------------------------------------- -maybe_enable_stream_mgmt(NextState, El, StateData = #state{server = LServer}) -> +maybe_enable_stream_mgmt(NextState, El, StateData = #state{host_type = HostType}) -> case {xml:get_tag_attr_s(<<"xmlns">>, El), StateData#state.stream_mgmt, xml:get_tag_attr_s(<<"resume">>, El)} @@ -2771,9 +2772,9 @@ maybe_enable_stream_mgmt(NextState, El, StateData = #state{server = LServer}) -> enable_stream_resumption(StateData) end, send_element_from_server_jid(NewSD, EnabledEl), - BufferMax = get_buffer_max(LServer), - AckFreq = get_ack_freq(LServer), - ResumeTimeout = get_resume_timeout(LServer), + BufferMax = get_buffer_max(HostType), + AckFreq = get_ack_freq(HostType), + ResumeTimeout = get_resume_timeout(HostType), fsm_next_state(NextState, NewSD#state{stream_mgmt = true, stream_mgmt_buffer_max = BufferMax, @@ -2958,17 +2959,17 @@ drop_last(N, List) -> end, {N, []}, List), {N - ToDrop, List2}. --spec get_buffer_max(jid:lserver()) -> pos_integer() | infinity. -get_buffer_max(LServer) -> - mod_stream_management:get_buffer_max(LServer, ?STREAM_MGMT_CACHE_MAX). +-spec get_buffer_max(mongooseim:host_type()) -> pos_integer() | infinity. +get_buffer_max(HostType) -> + mod_stream_management:get_buffer_max(HostType, ?STREAM_MGMT_CACHE_MAX). --spec get_ack_freq(jid:lserver()) -> pos_integer(). -get_ack_freq(LServer) -> - mod_stream_management:get_ack_freq(LServer, ?STREAM_MGMT_ACK_FREQ). +-spec get_ack_freq(mongooseim:host_type()) -> pos_integer(). +get_ack_freq(HostType) -> + mod_stream_management:get_ack_freq(HostType, ?STREAM_MGMT_ACK_FREQ). --spec get_resume_timeout(jid:lserver()) -> pos_integer(). -get_resume_timeout(LServer) -> - mod_stream_management:get_resume_timeout(LServer, ?STREAM_MGMT_RESUME_TIMEOUT). +-spec get_resume_timeout(mongooseim:host_type()) -> pos_integer(). +get_resume_timeout(HostType) -> + mod_stream_management:get_resume_timeout(HostType, ?STREAM_MGMT_RESUME_TIMEOUT). maybe_send_ack_request(Acc, #state{stream_mgmt = StreamMgmt}) when StreamMgmt =:= false; StreamMgmt =:= disabled -> @@ -3043,11 +3044,11 @@ maybe_enter_resume_session(_SMID, #state{} = SD) -> end, {next_state, resume_session, NSD, hibernate()}. -maybe_resume_session(NextState, El, StateData = #state{server = LServer}) -> +maybe_resume_session(NextState, El, StateData = #state{host_type = HostType}) -> case {xml:get_tag_attr_s(<<"xmlns">>, El), xml:get_tag_attr_s(<<"previd">>, El)} of {?NS_STREAM_MGNT_3, SMID} -> - FromSMID = mod_stream_management:get_session_from_smid(LServer, SMID), + FromSMID = mod_stream_management:get_session_from_smid(HostType, SMID), do_resume_session(SMID, El, FromSMID, StateData); {InvalidNS, _} -> ?LOG_INFO(#{what => c2s_ignores_resume, diff --git a/src/mod_stream_management.erl b/src/mod_stream_management.erl index 3d4e65c767..7d26a96e37 100644 --- a/src/mod_stream_management.erl +++ b/src/mod_stream_management.erl @@ -7,6 +7,7 @@ -export([start/2, stop/1, config_spec/0, + supported_features/0, process_buffer_and_ack/1]). %% hooks handlers @@ -49,15 +50,15 @@ sid :: ejabberd_sm:sid() }). +-type buffer_max() :: pos_integer() | infinity | no_buffer. +-type ack_freq() :: pos_integer() | never. %% %% `gen_mod' callbacks %% -start(Host, Opts) -> +start(HostType, Opts) -> ?LOG_INFO(#{what => stream_management_starting}), - ejabberd_hooks:add(c2s_stream_features, Host, ?MODULE, add_sm_feature, 50), - ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, remove_smid, 50), - ejabberd_hooks:add(session_cleanup, Host, ?MODULE, session_cleanup, 50), + ejabberd_hooks:add(hooks(HostType)), mnesia:create_table(sm_session, [{ram_copies, [node()]}, {attributes, record_info(fields, sm_session)}]), mnesia:add_table_index(sm_session, sid), @@ -65,13 +66,16 @@ start(Host, Opts) -> stream_management_stale_h:maybe_start(Opts), ok. -stop(Host) -> +stop(HostType) -> ?LOG_INFO(#{what => stream_management_stopping}), - ejabberd_hooks:delete(sm_remove_connection_hook, Host, ?MODULE, remove_smid, 50), - ejabberd_hooks:delete(c2s_stream_features, Host, ?MODULE, add_sm_feature, 50), - ejabberd_hooks:delete(session_cleanup, Host, ?MODULE, session_cleanup, 50), + ejabberd_hooks:delete(hooks(HostType)), ok. +hooks(HostType) -> + [{sm_remove_connection_hook, HostType, ?MODULE, remove_smid, 50}, + {c2s_stream_features, HostType, ?MODULE, add_sm_feature, 50}, + {session_cleanup, HostType, ?MODULE, session_cleanup, 50}]. + -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{ @@ -88,6 +92,8 @@ config_spec() -> process = fun ?MODULE:process_buffer_and_ack/1 }. +supported_features() -> [dynamic_domains]. + process_buffer_and_ack(KVs) -> {[Buffer, Ack], Opts} = proplists:split(KVs, [buffer, ack]), OptsWithBuffer = check_buffer(Buffer, Opts), @@ -133,16 +139,19 @@ sm() -> Info :: undefined | [any()], Reason :: undefined | ejabberd_sm:close_reason(), Acc1 :: mongoose_acc:t(). -remove_smid(Acc, SID, #jid{lserver = LServer}, _Info, _Reason) -> - do_remove_smid(Acc, LServer, SID). +remove_smid(Acc, SID, _JID, _Info, _Reason) -> + HostType = mongoose_acc:host_type(Acc), + do_remove_smid(Acc, HostType, SID). -spec session_cleanup(Acc :: map(), LUser :: jid:luser(), LServer :: jid:lserver(), LResource :: jid:lresource(), SID :: ejabberd_sm:sid()) -> any(). -session_cleanup(Acc, _LUser, LServer, _LResource, SID) -> - do_remove_smid(Acc, LServer, SID). +session_cleanup(Acc, _LUser, _LServer, _LResource, SID) -> + HostType = mongoose_acc:host_type(Acc), + do_remove_smid(Acc, HostType, SID). --spec do_remove_smid(mongoose_acc:t(), jid:lserver(), ejabberd_sm:sid()) -> mongoose_acc:t(). -do_remove_smid(Acc, LServer, SID) -> +-spec do_remove_smid(mongoose_acc:t(), mongooseim:host_type(), ejabberd_sm:sid()) -> + mongoose_acc:t(). +do_remove_smid(Acc, HostType, SID) -> H = mongoose_acc:get(stream_mgmt, h, undefined, Acc), MaybeSMID = case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of [] -> {error, smid_not_found}; @@ -150,7 +159,7 @@ do_remove_smid(Acc, LServer, SID) -> mnesia:dirty_delete(sm_session, SMID), case H of undefined -> ok; - _ -> register_stale_smid_h(LServer, SMID, H) + _ -> register_stale_smid_h(HostType, SMID, H) end, {ok, SMID} end, @@ -160,75 +169,73 @@ do_remove_smid(Acc, LServer, SID) -> %% `mongooseim.toml' options (don't use outside of tests) %% --spec get_buffer_max(jid:lserver(), pos_integer() | infinity | no_buffer) - -> pos_integer() | infinity | no_buffer. -get_buffer_max(LServer, Default) -> - gen_mod:get_module_opt(LServer, ?MODULE, buffer_max, Default). +-spec get_buffer_max(mongooseim:host_type(), buffer_max()) -> buffer_max(). +get_buffer_max(HostType, Default) -> + gen_mod:get_module_opt(HostType, ?MODULE, buffer_max, Default). %% Return true if succeeded, false otherwise. --spec set_buffer_max(jid:lserver(), pos_integer() | infinity | no_buffer | undefined) - -> boolean(). -set_buffer_max(LServer, undefined) -> - del_module_opt(LServer, ?MODULE, buffer_max); -set_buffer_max(LServer, infinity) -> - set_module_opt(LServer, ?MODULE, buffer_max, infinity); -set_buffer_max(LServer, no_buffer) -> - set_module_opt(LServer, ?MODULE, buffer_max, no_buffer); -set_buffer_max(LServer, Seconds) when is_integer(Seconds), Seconds > 0 -> - set_module_opt(LServer, ?MODULE, buffer_max, Seconds). - --spec get_ack_freq(jid:lserver(), pos_integer() | never) -> pos_integer() | never. -get_ack_freq(LServer, Default) -> - gen_mod:get_module_opt(LServer, ?MODULE, ack_freq, Default). +-spec set_buffer_max(mongooseim:host_type(), buffer_max() | undefined) -> boolean(). +set_buffer_max(HostType, undefined) -> + del_module_opt(HostType, ?MODULE, buffer_max); +set_buffer_max(HostType, infinity) -> + set_module_opt(HostType, ?MODULE, buffer_max, infinity); +set_buffer_max(HostType, no_buffer) -> + set_module_opt(HostType, ?MODULE, buffer_max, no_buffer); +set_buffer_max(HostType, Seconds) when is_integer(Seconds), Seconds > 0 -> + set_module_opt(HostType, ?MODULE, buffer_max, Seconds). + +-spec get_ack_freq(mongooseim:host_type(), ack_freq()) -> ack_freq(). +get_ack_freq(HostType, Default) -> + gen_mod:get_module_opt(HostType, ?MODULE, ack_freq, Default). %% Return true if succeeded, false otherwise. --spec set_ack_freq(jid:lserver(), pos_integer() | never | undefined) -> boolean(). -set_ack_freq(LServer, undefined) -> - del_module_opt(LServer, ?MODULE, ack_freq); -set_ack_freq(LServer, never) -> - set_module_opt(LServer, ?MODULE, ack_freq, never); -set_ack_freq(LServer, Freq) when is_integer(Freq), Freq > 0 -> - set_module_opt(LServer, ?MODULE, ack_freq, Freq). - --spec get_resume_timeout(jid:lserver(), pos_integer()) -> pos_integer(). -get_resume_timeout(LServer, Default) -> - gen_mod:get_module_opt(LServer, ?MODULE, resume_timeout, Default). - --spec set_resume_timeout(jid:lserver(), pos_integer()) -> boolean(). -set_resume_timeout(LServer, ResumeTimeout) -> - set_module_opt(LServer, ?MODULE, resume_timeout, ResumeTimeout). - - --spec get_stale_h_opt(LServer :: jid:lserver(), Opt :: atom(), Def :: pos_integer()) -> pos_integer(). -get_stale_h_opt(LServer, Option, Default) -> - MaybeModOpts = gen_mod:get_module_opt(LServer, ?MODULE, stale_h, []), +-spec set_ack_freq(mongooseim:host_type(), ack_freq() | undefined) -> boolean(). +set_ack_freq(HostType, undefined) -> + del_module_opt(HostType, ?MODULE, ack_freq); +set_ack_freq(HostType, never) -> + set_module_opt(HostType, ?MODULE, ack_freq, never); +set_ack_freq(HostType, Freq) when is_integer(Freq), Freq > 0 -> + set_module_opt(HostType, ?MODULE, ack_freq, Freq). + +-spec get_resume_timeout(mongooseim:host_type(), pos_integer()) -> pos_integer(). +get_resume_timeout(HostType, Default) -> + gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout, Default). + +-spec set_resume_timeout(mongooseim:host_type(), pos_integer()) -> boolean(). +set_resume_timeout(HostType, ResumeTimeout) -> + set_module_opt(HostType, ?MODULE, resume_timeout, ResumeTimeout). + + +-spec get_stale_h_opt(mongooseim:host_type(), atom(), pos_integer()) -> pos_integer(). +get_stale_h_opt(HostType, Option, Default) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), proplists:get_value(Option, MaybeModOpts, Default). --spec get_stale_h_repeat_after(jid:lserver(), pos_integer()) -> pos_integer(). -get_stale_h_repeat_after(LServer, Default) -> - get_stale_h_opt(LServer, stale_h_repeat_after, Default). +-spec get_stale_h_repeat_after(mongooseim:host_type(), pos_integer()) -> pos_integer(). +get_stale_h_repeat_after(HostType, Default) -> + get_stale_h_opt(HostType, stale_h_repeat_after, Default). --spec get_stale_h_geriatric(jid:lserver(), pos_integer()) -> pos_integer(). -get_stale_h_geriatric(LServer, Default) -> - get_stale_h_opt(LServer, stale_h_geriatric, Default). +-spec get_stale_h_geriatric(mongooseim:host_type(), pos_integer()) -> pos_integer(). +get_stale_h_geriatric(HostType, Default) -> + get_stale_h_opt(HostType, stale_h_geriatric, Default). --spec set_stale_h_opt(LServer :: jid:lserver(), Option :: atom(), Value :: pos_integer()) -> boolean(). -set_stale_h_opt(LServer, Option, Value) -> - MaybeModOpts = gen_mod:get_module_opt(LServer, ?MODULE, stale_h, []), +-spec set_stale_h_opt(mongooseim:host_type(), atom(), pos_integer()) -> boolean(). +set_stale_h_opt(HostType, Option, Value) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), case MaybeModOpts of [] -> false; GCOpts -> NewGCOpts = lists:keystore(Option, 1, GCOpts, {Option, Value}), - set_module_opt(LServer, ?MODULE, stale_h, NewGCOpts) + set_module_opt(HostType, ?MODULE, stale_h, NewGCOpts) end. --spec set_stale_h_repeat_after(jid:lserver(), pos_integer()) -> boolean(). -set_stale_h_repeat_after(LServer, ResumeTimeout) -> - set_stale_h_opt(LServer, stale_h_repeat_after, ResumeTimeout). +-spec set_stale_h_repeat_after(mongooseim:host_type(), pos_integer()) -> boolean(). +set_stale_h_repeat_after(HostType, ResumeTimeout) -> + set_stale_h_opt(HostType, stale_h_repeat_after, ResumeTimeout). --spec set_stale_h_geriatric(jid:lserver(), pos_integer()) -> boolean(). -set_stale_h_geriatric(LServer, GeriatricAge) -> - set_stale_h_opt(LServer, stale_h_geriatric, GeriatricAge). +-spec set_stale_h_geriatric(mongooseim:host_type(), pos_integer()) -> boolean(). +set_stale_h_geriatric(HostType, GeriatricAge) -> + set_stale_h_opt(HostType, stale_h_geriatric, GeriatricAge). %% %% API for `ejabberd_c2s' @@ -239,12 +246,12 @@ make_smid() -> base64:encode(crypto:strong_rand_bytes(21)). %% Getters --spec get_session_from_smid(jid:lserver(), smid()) -> +-spec get_session_from_smid(mongooseim:host_type(), smid()) -> {sid, ejabberd_sm:sid()} | {stale_h, non_neg_integer()} | {error, smid_not_found}. -get_session_from_smid(LServer, SMID) -> +get_session_from_smid(HostType, SMID) -> case get_sid(SMID) of {sid, SID} -> {sid, SID}; - {error, smid_not_found} -> get_stale_h(LServer, SMID) + {error, smid_not_found} -> get_stale_h(HostType, SMID) end. -spec get_sid(smid()) -> @@ -255,10 +262,10 @@ get_sid(SMID) -> [] -> {error, smid_not_found} end. --spec get_stale_h(LServer :: jid:lserver(), SMID :: smid()) -> +-spec get_stale_h(mongooseim:host_type(), SMID :: smid()) -> {stale_h, non_neg_integer()} | {error, smid_not_found}. -get_stale_h(LServer, SMID) -> - MaybeModOpts = gen_mod:get_module_opt(LServer, ?MODULE, stale_h, []), +get_stale_h(HostType, SMID) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), case proplists:get_value(enabled, MaybeModOpts, false) of false -> {error, smid_not_found}; true -> stream_management_stale_h:read_stale_h(SMID) @@ -274,15 +281,15 @@ register_smid(SMID, SID) -> {error, Reason} end. -register_stale_smid_h(LServer, SMID, H) -> - MaybeModOpts = gen_mod:get_module_opt(LServer, ?MODULE, stale_h, []), +register_stale_smid_h(HostType, SMID, H) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), case proplists:get_value(enabled, MaybeModOpts, false) of false -> ok; true -> stream_management_stale_h:write_stale_h(SMID, H) end. -remove_stale_smid_h(LServer, SMID) -> - MaybeModOpts = gen_mod:get_module_opt(LServer, ?MODULE, stale_h, []), +remove_stale_smid_h(HostType, SMID) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), case proplists:get_value(enabled, MaybeModOpts, false) of false -> ok; true -> stream_management_stale_h:delete_stale_h(SMID) @@ -294,12 +301,11 @@ remove_stale_smid_h(LServer, SMID) -> set_module_opt(Host, Module, Opt, Value) -> mod_module_opt(Host, Module, Opt, Value, fun set_opt/3). -del_module_opt(Host, Module, Opt) -> - mod_module_opt(Host, Module, Opt, undefined, fun del_opt/3). +del_module_opt(HostType, Module, Opt) -> + mod_module_opt(HostType, Module, Opt, undefined, fun del_opt/3). --spec mod_module_opt(_Host, _Module, _Opt, _Value, _Modify) -> boolean(). -mod_module_opt(Host, Module, Opt, Value, Modify) -> - Key = {Module, Host}, +mod_module_opt(HostType, Module, Opt, Value, Modify) -> + Key = {Module, HostType}, OptsList = ets:lookup(ejabberd_modules, Key), case OptsList of [] ->