Skip to content

Commit

Permalink
Add ejabberd_sm_cets
Browse files Browse the repository at this point in the history
Use cets for stream resumption backend
Print cets tables in mongoosectl  mnesia info
Enable cets on CI
Add test for mongooseimctl mnesia info
Test ejabberd_sm_cets in ejabberd_sm_SUITE
Rename to cets
  • Loading branch information
arcusfelis committed Apr 29, 2022
1 parent 72db059 commit 9d90b28
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 33 deletions.
12 changes: 11 additions & 1 deletion .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ jobs:
preset:
type: enum
enum: [internal_mnesia, mysql_redis, odbc_mssql_mnesia, ldap_mnesia,
elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia]
elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia,
internal_cets]
description: Preset to run
default: internal_mnesia
db:
Expand Down Expand Up @@ -743,6 +744,15 @@ workflows:
requires:
- otp_24_docker
filters: *all_tags
- big_tests_in_docker:
name: internal_cets_24
executor: otp_24_redis
context: mongooseim-org
preset: internal_cets
db: "mnesia cets"
requires:
- otp_24_docker
filters: *all_tags
- big_tests_in_docker:
name: mysql_redis_24
executor: otp_24_mysql_redis
Expand Down
7 changes: 7 additions & 0 deletions big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@
[{dbs, [redis, minio]},
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{internal_cets,
[{dbs, [redis]},
{sm_backend, "\"cets\""},
{stream_management_backend, cets},
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{pgsql_mnesia,
[{dbs, [redis, pgsql]},
Expand Down
12 changes: 11 additions & 1 deletion big_tests/tests/ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
repeat_all_until_all_ok/2,
repeat_all_until_any_fail/1,
repeat_all_until_any_fail/2,
groups_to_all/1]).
groups_to_all/1,
get_preset_var/3]).

-type group_name() :: atom().

Expand Down Expand Up @@ -114,3 +115,12 @@ is_ct_started() ->

groups_to_all(Groups) ->
[{group, Name} || {Name, _Opts, _Cases} <- Groups].

get_preset_var(Config, Opt, Def) ->
case proplists:get_value(preset, Config, undefined) of
Preset ->
PresetAtom = list_to_existing_atom(Preset),
ct:get_config({presets, toml, PresetAtom, Opt}, Def);
_ ->
Def
end.
9 changes: 2 additions & 7 deletions big_tests/tests/mongoose_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,8 @@ restart_listener(Spec, Listener) ->
rpc(Spec, mongoose_listener, start_listener, [Listener]).

should_minio_be_running(Config) ->
case proplists:get_value(preset, Config, undefined) of
undefined -> false;
Preset ->
PresetAtom = list_to_existing_atom(Preset),
DBs = ct:get_config({presets, toml, PresetAtom, dbs}, []),
lists:member(minio, DBs)
end.
DBs = ct_helper:get_preset_var(Config, dbs, []),
lists:member(minio, DBs).

%% It is useful to debug dynamic IQ handler registration
print_debug_info_for_module(Module) ->
Expand Down
16 changes: 15 additions & 1 deletion big_tests/tests/mongooseimctl_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ basic() ->
dump_table,
get_loglevel,
remove_old_messages_test,
remove_expired_messages_test].
remove_expired_messages_test,
cets_tables_are_in_mnesia_info].

accounts() -> [change_password, check_password_hash, check_password,
check_account, ban_account, num_active_users, delete_old_users,
Expand Down Expand Up @@ -282,6 +283,13 @@ end_per_group(_GroupName, Config) ->
get_registered_users() ->
rpc(mim(), ejabberd_auth, get_vh_registered_users, [domain()]).

init_per_testcase(CaseName = cets_tables_are_in_mnesia_info, Config) ->
case rpc(mim(), ejabberd_sm, sm_backend, []) of
ejabberd_sm_cets ->
escalus:init_per_testcase(CaseName, Config);
_ ->
{skip, "Only for cets preset"}
end;
init_per_testcase(CaseName, Config)
when CaseName == delete_old_users_vhost
orelse CaseName == stats_global
Expand Down Expand Up @@ -1280,6 +1288,12 @@ remove_expired_messages_test(Config) ->
2 = length(SecondList)
end).

cets_tables_are_in_mnesia_info(Config) ->
{Out, 0} = mongooseimctl("mnesia", ["info"], Config),
Lines = binary:split(iolist_to_binary(Out), <<"\n">>, [global]),
[_Line] = [L || <<"table=cets_session", _/binary>> = L <- Lines],
ok.

%%-----------------------------------------------------------------
%% Helpers
%%-----------------------------------------------------------------
Expand Down
17 changes: 9 additions & 8 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hoo
Group =:= manual_ack_freq_long_session_timeout;
Group =:= parallel_manual_ack_freq_1;
Group =:= manual_ack_freq_2 ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config;
init_per_group(stale_h, Config) ->
Config;
Expand All @@ -145,18 +145,18 @@ init_per_group(stream_mgmt_disabled, Config) ->
rpc(mim(), mnesia, delete_table, [sm_session]),
Config;
init_per_group(Group, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
escalus:init_per_testcase(CN, Config);
init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
CN =:= gc_repeat_after_timeout_does_clean ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
Config2 = register_some_smid_h(Config),
escalus:init_per_testcase(CN, Config2);
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
Expand All @@ -181,10 +181,10 @@ end_per_testcase(CaseName, Config) ->

%% Module configuration per group (in case of stale_h group it is per testcase)

required_modules(Scope, Name) ->
required_modules(Config, Scope, Name) ->
SMConfig = case required_sm_opts(Scope, Name) of
stopped -> stopped;
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
ExtraOpts -> maps:merge(common_sm_opts(Config), ExtraOpts)
end,
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, config_parser_helper:mod_config(mod_offline, #{})}].
Expand All @@ -211,8 +211,9 @@ required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.

common_sm_opts() ->
#{buffer_max => ?SMALL_SM_BUFFER}.
common_sm_opts(Config) ->
Backend = ct_helper:get_preset_var(Config, stream_management_backend, mnesia),
#{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}.

stale_h(RepeatAfter, Geriatric) ->
#{enabled => true,
Expand Down
14 changes: 11 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
{cache_tab, "1.0.29"},
{segmented_cache, "0.1.1"},
{worker_pool, "6.0.1"},
{worker_pool, "6.0.0"},
{cets, {git, "https://github.com/arcusfelis/cets.git", {branch, "main"}}},

%%% HTTP tools
{cowboy, "2.9.0"},
Expand Down Expand Up @@ -167,11 +169,17 @@
{erl_opts, [{d, 'PROD_NODE'}]} ]},
%% development nodes
{mim1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim1.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim2, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim2.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim3, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim3.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{fed1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/fed1.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{reg1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/reg1.vars-toml.config"]},
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
{<<"bear">>,{pkg,<<"bear">>,<<"1.0.0">>},1},
{<<"cache_tab">>,{pkg,<<"cache_tab">>,<<"1.0.29">>},0},
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.3.1">>},2},
{<<"cets">>,
{git,"https://github.com/arcusfelis/cets.git",
{ref,"6f8b79889844bf2f3778104bdadfba5ee1efc5fc"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.0">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1},
Expand Down
3 changes: 3 additions & 0 deletions rel/files/cets_disco.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mongooseim@localhost
ejabberd2@localhost
mongooseim3@localhost
13 changes: 13 additions & 0 deletions src/ejabberd_ctl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ process(["mnesia"]) ->
?STATUS_SUCCESS;
process(["mnesia", "info"]) ->
mnesia:info(),
cets_info(),
?STATUS_SUCCESS;
process(["mnesia", Arg]) when is_list(Arg) ->
case catch mnesia:system_info(list_to_atom(Arg)) of
Expand Down Expand Up @@ -928,3 +929,15 @@ get_dist_proto() ->
_ -> "inet_tcp"
end.

cets_info() ->
Tables = cets_discovery:info(mongoose_cets_discovery),
cets_info(Tables).

cets_info([]) ->
ok;
cets_info(Tables) ->
?PRINT("CETS tables:~n", []),
[cets_table_info(Table) || Table <- Tables].

cets_table_info(#{memory := Memory, size := Size, nodes := Nodes, table := Tab}) ->
?PRINT("table=~0p size=~p memory_words=~0p nodes=~0p~n", [Tab, Size, Memory, Nodes]).
2 changes: 1 addition & 1 deletion src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
}.
-type info() :: #{info_key() => any()}.

-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis.
-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis | ejabberd_sm_cets.
-type close_reason() :: resumed | normal | replaced.
-type info_key() :: atom().

Expand Down
139 changes: 139 additions & 0 deletions src/ejabberd_sm_cets.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
-module(ejabberd_sm_cets).

-behavior(ejabberd_sm_backend).

-include("mongoose.hrl").
-include("session.hrl").

-export([init/1,
get_sessions/0,
get_sessions/1,
get_sessions/2,
get_sessions/3,
create_session/4,
update_session/4,
delete_session/4,
cleanup/1,
total_count/0,
unique_count/0]).

-define(TABLE, cets_session).

-spec init(list()) -> any().
init(_Opts) ->
cets:start(?TABLE, #{}),
cets_discovery:add_table(mongoose_cets_discovery, ?TABLE).

-spec get_sessions() -> [ejabberd_sm:session()].
get_sessions() ->
tuples_to_sessions(ets:tab2list(?TABLE)).

-spec get_sessions(jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(Server) ->
R = {{Server, '_', '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(User, Server) ->
R = {{Server, User, '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver(), jid:lresource()) ->
[ejabberd_sm:session()].
get_sessions(User, Server, Resource) ->
R = {{Server, User, Resource, '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
%% TODO these sessions should be deduplicated.
%% It is possible, that after merging two cets tables we could end up
%% with sessions from two nodes for the same full jid.
%% One of the sessions must be killed.
%% We can detect duplicates on the merging step or on reading (or both).
tuples_to_sessions(Xs).

-spec create_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
create_session(User, Server, Resource, Session) ->
case get_sessions(User, Server, Resource) of
[] ->
cets:insert(?TABLE, session_to_tuple(Session));
Sessions when is_list(Sessions) ->
%% Fix potential race condition during XMPP bind, where
%% multiple calls (> 2) to ejabberd_sm:open_session
%% have been made, resulting in >1 sessions for this resource
%% XXX Why do we need that exactly?
%% Sessions are open from c2s and that specific process is updating
%% its session info. Adding info from other processes would cause some
%% strange bugs. On another hand, there is very limited usage
%% of that info field, so nothing would probably break if
%% we keep calling merge_info (and it would make ejabberd_sm_SUITE happy).
MergedSession = mongoose_session:merge_info
(Session, hd(lists:sort(Sessions))),
cets:insert(?TABLE, session_to_tuple(MergedSession))
end.

-spec update_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
update_session(_User, _Server, _Resource, Session) ->
cets:insert(?TABLE, session_to_tuple(Session)).

-spec delete_session(ejabberd_sm:sid(),
User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource()) -> ok.
delete_session(SID, User, Server, Resource) ->
cets:delete(?TABLE, make_key(User, Server, Resource, SID)).

-spec cleanup(atom()) -> any().
cleanup(Node) ->
%% TODO this could be optimized, we don't need to replicate deletes,
%% we could just call cleanup on each node (but calling the hook only
%% on one of the nodes)
KeyPattern = {'_', '_', '_', {'_', '$1'}},
Guard = {'==', {node, '$1'}, Node},
R = {KeyPattern, '_', '_'},
cets:sync(?TABLE),
Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]),
Keys = lists:map(fun({Key, _, _} = Tuple) ->
Session = tuple_to_session(Tuple),
ejabberd_sm:run_session_cleanup_hook(Session),
Key
end, Tuples),
cets:delete_many(?TABLE, Keys).

-spec total_count() -> integer().
total_count() ->
ets:info(?TABLE, size).

%% Counts merged by US
-spec unique_count() -> integer().
unique_count() ->
compute_unique(ets:first(?TABLE), 0).

compute_unique('$end_of_table', Sum) ->
Sum;
compute_unique({S, U, _, _} = Key, Sum) ->
Key2 = ets:next(?TABLE, Key),
case Key2 of
{S, U, _, _} ->
compute_unique(Key2, Sum);
_ ->
compute_unique(Key2, Sum + 1)
end.

session_to_tuple(#session{sid = SID, usr = {U, S, R}, priority = Prio, info = Info}) ->
{make_key(U, S, R, SID), Prio, Info}.

make_key(User, Server, Resource, SID) ->
{Server, User, Resource, SID}.

tuple_to_session({{S, U, R, SID}, Prio, Info}) ->
#session{sid = SID, usr = {U, S, R}, us = {U, S}, priority = Prio, info = Info}.

tuples_to_sessions(Xs) ->
[tuple_to_session(X) || X <- Xs].
Loading

0 comments on commit 9d90b28

Please sign in to comment.