From 9d90b2897c37e09ce42c8a4846d766ab79e07f13 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 22 Feb 2022 22:08:54 +0100 Subject: [PATCH] Add ejabberd_sm_cets Use cets for stream resumption backend Print cets tables in mongoosectl mnesia info Enable cets on CI Add test for mongooseimctl mnesia info Test ejabberd_sm_cets in ejabberd_sm_SUITE Rename to cets --- .circleci/template.yml | 12 +- big_tests/test.config | 7 + big_tests/tests/ct_helper.erl | 12 +- big_tests/tests/mongoose_helper.erl | 9 +- big_tests/tests/mongooseimctl_SUITE.erl | 16 +- big_tests/tests/sm_SUITE.erl | 17 ++- rebar.config | 14 +- rebar.lock | 4 + rel/files/cets_disco.txt | 3 + src/ejabberd_ctl.erl | 13 ++ src/ejabberd_sm.erl | 2 +- src/ejabberd_sm_cets.erl | 139 +++++++++++++++++ src/ejabberd_sup.erl | 10 +- src/mongooseim.app.src | 3 +- .../mod_stream_management_cets.erl | 141 ++++++++++++++++++ test/ejabberd_sm_SUITE.erl | 29 +++- tools/build-releases.sh | 1 + tools/test_runner/apply_templates.erl | 15 +- 18 files changed, 414 insertions(+), 33 deletions(-) create mode 100644 rel/files/cets_disco.txt create mode 100644 src/ejabberd_sm_cets.erl create mode 100644 src/stream_management/mod_stream_management_cets.erl diff --git a/.circleci/template.yml b/.circleci/template.yml index 0cfd93e77b..a63e20521d 100644 --- a/.circleci/template.yml +++ b/.circleci/template.yml @@ -543,7 +543,8 @@ jobs: preset: type: enum enum: [internal_mnesia, mysql_redis, odbc_mssql_mnesia, ldap_mnesia, - elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia] + elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia, + internal_cets] description: Preset to run default: internal_mnesia db: @@ -743,6 +744,15 @@ workflows: requires: - otp_24_docker filters: *all_tags + - big_tests_in_docker: + name: internal_cets_24 + executor: otp_24_redis + context: mongooseim-org + preset: internal_cets + db: "mnesia cets" + requires: + - otp_24_docker + filters: *all_tags - big_tests_in_docker: name: mysql_redis_24 executor: otp_24_mysql_redis diff --git a/big_tests/test.config b/big_tests/test.config index b3442e4601..7b01753702 100644 --- a/big_tests/test.config +++ b/big_tests/test.config @@ -235,6 +235,13 @@ [{dbs, [redis, minio]}, {outgoing_pools, "[outgoing_pools.redis.global_distrib] scope = \"global\" + workers = 10"}]}, + {internal_cets, + [{dbs, [redis]}, + {sm_backend, "\"cets\""}, + {stream_management_backend, cets}, + {outgoing_pools, "[outgoing_pools.redis.global_distrib] + scope = \"global\" workers = 10"}]}, {pgsql_mnesia, [{dbs, [redis, pgsql]}, diff --git a/big_tests/tests/ct_helper.erl b/big_tests/tests/ct_helper.erl index dd5a0f0197..a7bd86833e 100644 --- a/big_tests/tests/ct_helper.erl +++ b/big_tests/tests/ct_helper.erl @@ -4,7 +4,8 @@ repeat_all_until_all_ok/2, repeat_all_until_any_fail/1, repeat_all_until_any_fail/2, - groups_to_all/1]). + groups_to_all/1, + get_preset_var/3]). -type group_name() :: atom(). @@ -114,3 +115,12 @@ is_ct_started() -> groups_to_all(Groups) -> [{group, Name} || {Name, _Opts, _Cases} <- Groups]. + +get_preset_var(Config, Opt, Def) -> + case proplists:get_value(preset, Config, undefined) of + Preset -> + PresetAtom = list_to_existing_atom(Preset), + ct:get_config({presets, toml, PresetAtom, Opt}, Def); + _ -> + Def + end. diff --git a/big_tests/tests/mongoose_helper.erl b/big_tests/tests/mongoose_helper.erl index fb3051799f..0fcac9ea22 100644 --- a/big_tests/tests/mongoose_helper.erl +++ b/big_tests/tests/mongoose_helper.erl @@ -497,13 +497,8 @@ restart_listener(Spec, Listener) -> rpc(Spec, mongoose_listener, start_listener, [Listener]). should_minio_be_running(Config) -> - case proplists:get_value(preset, Config, undefined) of - undefined -> false; - Preset -> - PresetAtom = list_to_existing_atom(Preset), - DBs = ct:get_config({presets, toml, PresetAtom, dbs}, []), - lists:member(minio, DBs) - end. + DBs = ct_helper:get_preset_var(Config, dbs, []), + lists:member(minio, DBs). %% It is useful to debug dynamic IQ handler registration print_debug_info_for_module(Module) -> diff --git a/big_tests/tests/mongooseimctl_SUITE.erl b/big_tests/tests/mongooseimctl_SUITE.erl index ab8146ea54..a40381962f 100644 --- a/big_tests/tests/mongooseimctl_SUITE.erl +++ b/big_tests/tests/mongooseimctl_SUITE.erl @@ -130,7 +130,8 @@ basic() -> dump_table, get_loglevel, remove_old_messages_test, - remove_expired_messages_test]. + remove_expired_messages_test, + cets_tables_are_in_mnesia_info]. accounts() -> [change_password, check_password_hash, check_password, check_account, ban_account, num_active_users, delete_old_users, @@ -282,6 +283,13 @@ end_per_group(_GroupName, Config) -> get_registered_users() -> rpc(mim(), ejabberd_auth, get_vh_registered_users, [domain()]). +init_per_testcase(CaseName = cets_tables_are_in_mnesia_info, Config) -> + case rpc(mim(), ejabberd_sm, sm_backend, []) of + ejabberd_sm_cets -> + escalus:init_per_testcase(CaseName, Config); + _ -> + {skip, "Only for cets preset"} + end; init_per_testcase(CaseName, Config) when CaseName == delete_old_users_vhost orelse CaseName == stats_global @@ -1280,6 +1288,12 @@ remove_expired_messages_test(Config) -> 2 = length(SecondList) end). +cets_tables_are_in_mnesia_info(Config) -> + {Out, 0} = mongooseimctl("mnesia", ["info"], Config), + Lines = binary:split(iolist_to_binary(Out), <<"\n">>, [global]), + [_Line] = [L || <<"table=cets_session", _/binary>> = L <- Lines], + ok. + %%----------------------------------------------------------------- %% Helpers %%----------------------------------------------------------------- diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index 14c28116c0..dbe6a037f6 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -136,7 +136,7 @@ init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hoo Group =:= manual_ack_freq_long_session_timeout; Group =:= parallel_manual_ack_freq_1; Group =:= manual_ack_freq_2 -> - dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), + dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)), Config; init_per_group(stale_h, Config) -> Config; @@ -145,18 +145,18 @@ init_per_group(stream_mgmt_disabled, Config) -> rpc(mim(), mnesia, delete_table, [sm_session]), Config; init_per_group(Group, Config) -> - dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), + dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)), Config. end_per_group(_Group, _Config) -> ok. init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> - dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), + dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)), escalus:init_per_testcase(CN, Config); init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; CN =:= gc_repeat_after_timeout_does_clean -> - dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), + dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)), Config2 = register_some_smid_h(Config), escalus:init_per_testcase(CN, Config2); init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> @@ -181,10 +181,10 @@ end_per_testcase(CaseName, Config) -> %% Module configuration per group (in case of stale_h group it is per testcase) -required_modules(Scope, Name) -> +required_modules(Config, Scope, Name) -> SMConfig = case required_sm_opts(Scope, Name) of stopped -> stopped; - ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) + ExtraOpts -> maps:merge(common_sm_opts(Config), ExtraOpts) end, [{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, {mod_offline, config_parser_helper:mod_config(mod_offline, #{})}]. @@ -211,8 +211,9 @@ required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}. -common_sm_opts() -> - #{buffer_max => ?SMALL_SM_BUFFER}. +common_sm_opts(Config) -> + Backend = ct_helper:get_preset_var(Config, stream_management_backend, mnesia), + #{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}. stale_h(RepeatAfter, Geriatric) -> #{enabled => true, diff --git a/rebar.config b/rebar.config index d7d186eb28..9bcfc6cdfc 100644 --- a/rebar.config +++ b/rebar.config @@ -77,6 +77,8 @@ {cache_tab, "1.0.29"}, {segmented_cache, "0.1.1"}, {worker_pool, "6.0.1"}, + {worker_pool, "6.0.0"}, + {cets, {git, "https://github.com/arcusfelis/cets.git", {branch, "main"}}}, %%% HTTP tools {cowboy, "2.9.0"}, @@ -167,11 +169,17 @@ {erl_opts, [{d, 'PROD_NODE'}]} ]}, %% development nodes {mim1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim1.vars-toml.config"]}, - {overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, + {overlay, [ + {copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"}, + {template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, {mim2, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim2.vars-toml.config"]}, - {overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, + {overlay, [ + {copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"}, + {template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, {mim3, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim3.vars-toml.config"]}, - {overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, + {overlay, [ + {copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"}, + {template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, {fed1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/fed1.vars-toml.config"]}, {overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]}, {reg1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/reg1.vars-toml.config"]}, diff --git a/rebar.lock b/rebar.lock index 52a9a66b25..f39ecafae8 100644 --- a/rebar.lock +++ b/rebar.lock @@ -6,6 +6,10 @@ {<<"bear">>,{pkg,<<"bear">>,<<"1.0.0">>},1}, {<<"cache_tab">>,{pkg,<<"cache_tab">>,<<"1.0.29">>},0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.3.1">>},2}, + {<<"cets">>, + {git,"https://github.com/arcusfelis/cets.git", + {ref,"6f8b79889844bf2f3778104bdadfba5ee1efc5fc"}}, + 0}, {<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0}, {<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.0">>},0}, {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1}, diff --git a/rel/files/cets_disco.txt b/rel/files/cets_disco.txt new file mode 100644 index 0000000000..428fc58a86 --- /dev/null +++ b/rel/files/cets_disco.txt @@ -0,0 +1,3 @@ +mongooseim@localhost +ejabberd2@localhost +mongooseim3@localhost diff --git a/src/ejabberd_ctl.erl b/src/ejabberd_ctl.erl index 6946163cf5..cafd869336 100644 --- a/src/ejabberd_ctl.erl +++ b/src/ejabberd_ctl.erl @@ -170,6 +170,7 @@ process(["mnesia"]) -> ?STATUS_SUCCESS; process(["mnesia", "info"]) -> mnesia:info(), + cets_info(), ?STATUS_SUCCESS; process(["mnesia", Arg]) when is_list(Arg) -> case catch mnesia:system_info(list_to_atom(Arg)) of @@ -928,3 +929,15 @@ get_dist_proto() -> _ -> "inet_tcp" end. +cets_info() -> + Tables = cets_discovery:info(mongoose_cets_discovery), + cets_info(Tables). + +cets_info([]) -> + ok; +cets_info(Tables) -> + ?PRINT("CETS tables:~n", []), + [cets_table_info(Table) || Table <- Tables]. + +cets_table_info(#{memory := Memory, size := Size, nodes := Nodes, table := Tab}) -> + ?PRINT("table=~0p size=~p memory_words=~0p nodes=~0p~n", [Tab, Size, Memory, Nodes]). diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index a27087d8c3..e8de195047 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -105,7 +105,7 @@ }. -type info() :: #{info_key() => any()}. --type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis. +-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis | ejabberd_sm_cets. -type close_reason() :: resumed | normal | replaced. -type info_key() :: atom(). diff --git a/src/ejabberd_sm_cets.erl b/src/ejabberd_sm_cets.erl new file mode 100644 index 0000000000..91faaa25b2 --- /dev/null +++ b/src/ejabberd_sm_cets.erl @@ -0,0 +1,139 @@ +-module(ejabberd_sm_cets). + +-behavior(ejabberd_sm_backend). + +-include("mongoose.hrl"). +-include("session.hrl"). + +-export([init/1, + get_sessions/0, + get_sessions/1, + get_sessions/2, + get_sessions/3, + create_session/4, + update_session/4, + delete_session/4, + cleanup/1, + total_count/0, + unique_count/0]). + +-define(TABLE, cets_session). + +-spec init(list()) -> any(). +init(_Opts) -> + cets:start(?TABLE, #{}), + cets_discovery:add_table(mongoose_cets_discovery, ?TABLE). + +-spec get_sessions() -> [ejabberd_sm:session()]. +get_sessions() -> + tuples_to_sessions(ets:tab2list(?TABLE)). + +-spec get_sessions(jid:lserver()) -> [ejabberd_sm:session()]. +get_sessions(Server) -> + R = {{Server, '_', '_', '_'}, '_', '_'}, + Xs = ets:select(?TABLE, [{R, [], ['$_']}]), + tuples_to_sessions(Xs). + +-spec get_sessions(jid:luser(), jid:lserver()) -> [ejabberd_sm:session()]. +get_sessions(User, Server) -> + R = {{Server, User, '_', '_'}, '_', '_'}, + Xs = ets:select(?TABLE, [{R, [], ['$_']}]), + tuples_to_sessions(Xs). + +-spec get_sessions(jid:luser(), jid:lserver(), jid:lresource()) -> + [ejabberd_sm:session()]. +get_sessions(User, Server, Resource) -> + R = {{Server, User, Resource, '_'}, '_', '_'}, + Xs = ets:select(?TABLE, [{R, [], ['$_']}]), + %% TODO these sessions should be deduplicated. + %% It is possible, that after merging two cets tables we could end up + %% with sessions from two nodes for the same full jid. + %% One of the sessions must be killed. + %% We can detect duplicates on the merging step or on reading (or both). + tuples_to_sessions(Xs). + +-spec create_session(User :: jid:luser(), + Server :: jid:lserver(), + Resource :: jid:lresource(), + Session :: ejabberd_sm:session()) -> ok | {error, term()}. +create_session(User, Server, Resource, Session) -> + case get_sessions(User, Server, Resource) of + [] -> + cets:insert(?TABLE, session_to_tuple(Session)); + Sessions when is_list(Sessions) -> + %% Fix potential race condition during XMPP bind, where + %% multiple calls (> 2) to ejabberd_sm:open_session + %% have been made, resulting in >1 sessions for this resource + %% XXX Why do we need that exactly? + %% Sessions are open from c2s and that specific process is updating + %% its session info. Adding info from other processes would cause some + %% strange bugs. On another hand, there is very limited usage + %% of that info field, so nothing would probably break if + %% we keep calling merge_info (and it would make ejabberd_sm_SUITE happy). + MergedSession = mongoose_session:merge_info + (Session, hd(lists:sort(Sessions))), + cets:insert(?TABLE, session_to_tuple(MergedSession)) + end. + +-spec update_session(User :: jid:luser(), + Server :: jid:lserver(), + Resource :: jid:lresource(), + Session :: ejabberd_sm:session()) -> ok | {error, term()}. +update_session(_User, _Server, _Resource, Session) -> + cets:insert(?TABLE, session_to_tuple(Session)). + +-spec delete_session(ejabberd_sm:sid(), + User :: jid:luser(), + Server :: jid:lserver(), + Resource :: jid:lresource()) -> ok. +delete_session(SID, User, Server, Resource) -> + cets:delete(?TABLE, make_key(User, Server, Resource, SID)). + +-spec cleanup(atom()) -> any(). +cleanup(Node) -> + %% TODO this could be optimized, we don't need to replicate deletes, + %% we could just call cleanup on each node (but calling the hook only + %% on one of the nodes) + KeyPattern = {'_', '_', '_', {'_', '$1'}}, + Guard = {'==', {node, '$1'}, Node}, + R = {KeyPattern, '_', '_'}, + cets:sync(?TABLE), + Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]), + Keys = lists:map(fun({Key, _, _} = Tuple) -> + Session = tuple_to_session(Tuple), + ejabberd_sm:run_session_cleanup_hook(Session), + Key + end, Tuples), + cets:delete_many(?TABLE, Keys). + +-spec total_count() -> integer(). +total_count() -> + ets:info(?TABLE, size). + +%% Counts merged by US +-spec unique_count() -> integer(). +unique_count() -> + compute_unique(ets:first(?TABLE), 0). + +compute_unique('$end_of_table', Sum) -> + Sum; +compute_unique({S, U, _, _} = Key, Sum) -> + Key2 = ets:next(?TABLE, Key), + case Key2 of + {S, U, _, _} -> + compute_unique(Key2, Sum); + _ -> + compute_unique(Key2, Sum + 1) + end. + +session_to_tuple(#session{sid = SID, usr = {U, S, R}, priority = Prio, info = Info}) -> + {make_key(U, S, R, SID), Prio, Info}. + +make_key(User, Server, Resource, SID) -> + {Server, User, Resource, SID}. + +tuple_to_session({{S, U, R, SID}, Prio, Info}) -> + #session{sid = SID, usr = {U, S, R}, us = {U, S}, priority = Prio, info = Info}. + +tuples_to_sessions(Xs) -> + [tuple_to_session(X) || X <- Xs]. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 302b410b01..59e52c262c 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -153,8 +153,16 @@ init([]) -> {pg, {pg, start_link, [mim_scope]}, permanent, infinity, supervisor, [pg]}, + ConfigDir = filename:dirname(mongoose_config:get_config_path()), + DiscoFile = filename:join(ConfigDir, "cets_disco.txt"), + DiscoOpts = #{name => mongoose_cets_discovery, disco_file => DiscoFile}, + CetsDisco = + {cets_discovery, + {cets_discovery, start_link, [DiscoOpts]}, + permanent, infinity, supervisor, [cets_discovery]}, {ok, {{one_for_one, 10, 1}, - [PG, + [CetsDisco, + PG, Hooks, Cleaner, SMBackendSupervisor, diff --git a/src/mongooseim.app.src b/src/mongooseim.app.src index 8af2a21ca3..e09c16ed19 100644 --- a/src/mongooseim.app.src +++ b/src/mongooseim.app.src @@ -50,7 +50,8 @@ cowboy_swagger, tomerl, flatlog, - segmented_cache + segmented_cache, + cets ]}, {env, []}, {mod, {ejabberd_app, []}}]}. diff --git a/src/stream_management/mod_stream_management_cets.erl b/src/stream_management/mod_stream_management_cets.erl new file mode 100644 index 0000000000..e7d105b2cb --- /dev/null +++ b/src/stream_management/mod_stream_management_cets.erl @@ -0,0 +1,141 @@ +-module(mod_stream_management_cets). +-behaviour(mod_stream_management_backend). +-behaviour(gen_server). + +-include("mongoose.hrl"). +-include("jlib.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([init/2, + register_smid/3, + unregister_smid/2, + get_sid/2]). + +-export([read_stale_h/2, + write_stale_h/3, + delete_stale_h/2]). + +%% Internal exports +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-ignore_xref([start_link/1]). + +-record(smgc_state, + {gc_repeat_after :: non_neg_integer(), + gc_geriatric :: non_neg_integer() }). + +-define(TABLE, cets_strm_man). +-define(TABLE_H, cets_strm_man_h). + +init(_HostType, Opts = #{stale_h := StaleOpts}) -> + cets:start(?TABLE, #{}), + cets_discovery:add_table(mongoose_cets_discovery, ?TABLE), + maybe_init_stale_h(StaleOpts), + ok. + +maybe_init_stale_h(StaleOpts = #{enabled := true}) -> + cets:start(?TABLE_H, #{}), + cets_discovery:add_table(mongoose_cets_discovery, ?TABLE_H), + start_cleaner(StaleOpts); +maybe_init_stale_h(_) -> ok. + +-spec register_smid(HostType, SMID, SID) -> + ok | {error, term()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + SID :: ejabberd_sm:sid(). +register_smid(_HostType, SMID, SID) -> + cets:insert(?TABLE, [{{sid, SID}, SMID}, {{smid, SMID}, SID}]). + +-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> + {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. +unregister_smid(_HostType, SID) -> + case ets:lookup(?TABLE, {sid, SID}) of + [] -> + {error, smid_not_found}; + [{_, SMID}] -> + cets:delete_many(?TABLE, [{sid, SID}, {smid, SMID}]), + {ok, SMID} + end. + +-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> + {sid, ejabberd_sm:sid()} | {error, smid_not_found}. +get_sid(_HostType, SMID) -> + case ets:lookup(?TABLE, {smid, SMID}) of + [] -> + {error, smid_not_found}; + [{_, SID}] -> + {sid, SID} + end. + +%% stale_h functions + +-spec read_stale_h(HostType, SMID) -> + {stale_h, non_neg_integer()} | {error, smid_not_found} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +read_stale_h(_HostType, SMID) -> + case ets:lookup(?TABLE_H, SMID) of + [] -> + {error, smid_not_found}; + [{_, H, _}] -> + {stale_h, H} + end. + +-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + H :: non_neg_integer(). +write_stale_h(_HostType, SMID, H) -> + Stamp = erlang:monotonic_time(second), + cets:insert(?TABLE_H, {SMID, H, Stamp}). + +-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +delete_stale_h(_HostType, SMID) -> + cets:delete(?TABLE_H, SMID). + +%% stale_h cleaning logic + +start_cleaner(Opts) -> + MFA = {?MODULE, start_link, [Opts]}, + ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]}, + %% TODO cleaner should be a service + ejabberd_sup:start_child(ChildSpec). + +start_link(Opts) -> + gen_server:start_link({local, stream_management_stale_h}, ?MODULE, Opts, []). + +init(#{repeat_after := RepeatAfter, geriatric := GeriatricAge}) -> + State = #smgc_state{gc_repeat_after = RepeatAfter, + gc_geriatric = GeriatricAge}, + schedule_check(State), + {ok, State}. + +handle_call(Msg, From, State) -> + ?UNEXPECTED_CALL(Msg, From), + {reply, ok, State}. + +handle_cast(Msg, State) -> + ?UNEXPECTED_CAST(Msg), + {noreply, State}. + +handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) -> + clear_table(GeriatricAge), + schedule_check(State), + {noreply, State}; +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info), + {noreply, State}. + +schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) -> + erlang:send_after(RepeatAfter * 1000, self(), check). + +clear_table(GeriatricAge) -> + TimeToDie = erlang:monotonic_time(second) - GeriatricAge, + MS = ets:fun2ms(fun({_, _, S}) when S < TimeToDie -> true end), + ets:select_delete(?TABLE_H, MS). diff --git a/test/ejabberd_sm_SUITE.erl b/test/ejabberd_sm_SUITE.erl index d357b309d0..46ee5897a9 100644 --- a/test/ejabberd_sm_SUITE.erl +++ b/test/ejabberd_sm_SUITE.erl @@ -17,7 +17,7 @@ -define(MAX_USER_SESSIONS, 2). -all() -> [{group, mnesia}, {group, redis}]. +all() -> [{group, mnesia}, {group, redis}, {group, cets}]. init_per_suite(C) -> {ok, _} = application:ensure_all_started(jid), @@ -37,7 +37,8 @@ end_per_suite(C) -> groups() -> [{mnesia, [], tests()}, - {redis, [], tests()}]. + {redis, [], tests()}, + {cets, [], tests()}]. tests() -> [open_session, @@ -68,7 +69,11 @@ init_per_group(mnesia, Config) -> ok = mnesia:start(), [{backend, ejabberd_sm_mnesia} | Config]; init_per_group(redis, Config) -> - init_redis_group(is_redis_running(), Config). + init_redis_group(is_redis_running(), Config); +init_per_group(cets, Config) -> + DiscoOpts = #{name => mongoose_cets_discovery, disco_file => "does_not_exist.txt"}, + {ok, Pid} = cets_discovery:start(DiscoOpts), + [{backend, ejabberd_sm_cets}, {cets_disco_pid, Pid} | Config]. init_redis_group(true, Config) -> Self = self(), @@ -91,7 +96,10 @@ end_per_group(mnesia, Config) -> mnesia:stop(), mnesia:delete_schema([node()]), Config; -end_per_group(_, Config) -> +end_per_group(cets, Config) -> + exit(proplists:get_value(cets_disco_pid, Config), kill), + Config; +end_per_group(redis, Config) -> whereis(test_helper) ! stop, Config. @@ -388,6 +396,8 @@ get_fun_for_unique_count(ejabberd_sm_mnesia) -> fun() -> mnesia:abort({badarg,[session,{{1442,941593,580189},list_to_pid("<0.23291.6>")}]}) end; +get_fun_for_unique_count(ejabberd_sm_cets) -> + fun() -> error(oops) end; get_fun_for_unique_count(ejabberd_sm_redis) -> fun() -> %% The code below is on purpose, it's to crash with badarg reason @@ -423,6 +433,8 @@ verify_session_opened(C, Sid, USR) -> do_verify_session_opened(ejabberd_sm_mnesia, Sid, {U, S, R} = USR) -> general_session_check(ejabberd_sm_mnesia, Sid, USR, U, S, R); +do_verify_session_opened(ejabberd_sm_cets, Sid, {U, S, R} = USR) -> + general_session_check(ejabberd_sm_cets, Sid, USR, U, S, R); do_verify_session_opened(ejabberd_sm_redis, Sid, {U, S, R} = USR) -> UHash = iolist_to_binary(hash(U, S, R, Sid)), Hashes = mongoose_redis:cmd(["SMEMBERS", n(node())]), @@ -451,7 +463,9 @@ clean_sessions(C) -> ejabberd_sm_mnesia -> mnesia:clear_table(session); ejabberd_sm_redis -> - mongoose_redis:cmd(["FLUSHALL"]) + mongoose_redis:cmd(["FLUSHALL"]); + ejabberd_sm_cets -> + ets:delete_all_objects(cets_session) end. generate_random_user(S) -> @@ -590,6 +604,8 @@ setup_sm(Config) -> ejabberd_sm_redis -> mongoose_redis:cmd(["FLUSHALL"]); ejabberd_sm_mnesia -> + ok; + ejabberd_sm_cets -> ok end. @@ -609,7 +625,8 @@ opts(Config) -> {sm_backend, sm_backend(?config(backend, Config))}]. sm_backend(ejabberd_sm_redis) -> redis; -sm_backend(ejabberd_sm_mnesia) -> mnesia. +sm_backend(ejabberd_sm_mnesia) -> mnesia; +sm_backend(ejabberd_sm_cets) -> cets. set_meck() -> meck:expect(gen_hook, add_handler, fun(_, _, _, _, _) -> ok end), diff --git a/tools/build-releases.sh b/tools/build-releases.sh index 27765e874c..2df2cec6e1 100755 --- a/tools/build-releases.sh +++ b/tools/build-releases.sh @@ -31,6 +31,7 @@ function try_copy_release --exclude rel/mongooseim/Mnesia.* \ --exclude rel/mongooseim/var \ --exclude rel/mongooseim/log \ + --exclude rel/mongooseim/etc/cets_disco.txt \ -al _build/$FIRST_NODE/ _build/$NODE/ ./tools/test_runner/apply_templates.sh "$NODE" "$(pwd)/_build/$NODE/" fi diff --git a/tools/test_runner/apply_templates.erl b/tools/test_runner/apply_templates.erl index 382a21be71..cb12c6b3c4 100644 --- a/tools/test_runner/apply_templates.erl +++ b/tools/test_runner/apply_templates.erl @@ -10,7 +10,7 @@ main([NodeAtom, BuildDirAtom]) -> log("BuildDirAtom=~p~n", [BuildDirAtom]), BuildDir = atom_to_list(BuildDirAtom), RelDir = BuildDir ++ "/rel/mongooseim", - Templates = templates(RelDir), + Templates = templates(RelDir, NodeAtom), log("Templates:~n~p~n", [Templates]), Vars0 = overlay_vars(NodeAtom), Vars = Vars0#{output_dir => list_to_binary(RelDir)}, @@ -36,8 +36,9 @@ consult_map(File) -> maps:from_list(Vars). %% Based on rebar.config overlay section -templates(RelDir) -> - simple_templates(RelDir) ++ erts_templates(RelDir). +templates(RelDir, NodeAtom) -> + simple_templates(RelDir) ++ erts_templates(RelDir) + ++ disco_template(RelDir, NodeAtom). simple_templates(RelDir) -> [{In, RelDir ++ "/" ++ Out} || {In, Out} <- simple_templates()]. @@ -57,6 +58,14 @@ erts_templates(RelDir) -> ErtsDirs = filelib:wildcard(RelDir ++ "/erts-*"), [{"rel/files/nodetool", ErtsDir ++ "/bin/nodetool"} || ErtsDir <- ErtsDirs]. +disco_template(RelDir, NodeAtom) -> + case lists:member(NodeAtom, [mim1, mim2, mim3]) of + true -> + [{"rel/files/cets_disco.txt", RelDir ++ "/etc/cets_disco.txt"}]; + false -> + [] + end. + render_template(In, Out, Vars) -> BinIn = bbmustache:parse_file(In), %% Do render twice to allow templates in variables