Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimenting with alternative SM mechanisms #3574

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ jobs:
preset:
type: enum
enum: [internal_mnesia, mysql_redis, odbc_mssql_mnesia, ldap_mnesia,
elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia]
elasticsearch_and_cassandra_mnesia, pgsql_mnesia, riak_mnesia,
internal_kiss]
description: Preset to run
default: internal_mnesia
db:
Expand Down Expand Up @@ -743,6 +744,15 @@ workflows:
requires:
- otp_24_docker
filters: *all_tags
- big_tests_in_docker:
name: internal_kiss_24
executor: otp_24_redis
context: mongooseim-org
preset: internal_kiss
db: "mnesia kiss"
requires:
- otp_24_docker
filters: *all_tags
- big_tests_in_docker:
name: mysql_redis_24
executor: otp_24_mysql_redis
Expand Down
7 changes: 7 additions & 0 deletions big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@
[{dbs, [redis, minio]},
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{internal_kiss,
[{dbs, [redis]},
{sm_backend, "\"kiss\""},
{stream_management_backend, kiss},
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{pgsql_mnesia,
[{dbs, [redis, pgsql]},
Expand Down
12 changes: 11 additions & 1 deletion big_tests/tests/ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
repeat_all_until_all_ok/2,
repeat_all_until_any_fail/1,
repeat_all_until_any_fail/2,
groups_to_all/1]).
groups_to_all/1,
get_preset_var/3]).

-type group_name() :: atom().

Expand Down Expand Up @@ -114,3 +115,12 @@ is_ct_started() ->

groups_to_all(Groups) ->
[{group, Name} || {Name, _Opts, _Cases} <- Groups].

get_preset_var(Config, Opt, Def) ->
case proplists:get_value(preset, Config, undefined) of
Preset ->
PresetAtom = list_to_existing_atom(Preset),
ct:get_config({presets, toml, PresetAtom, Opt}, Def);
_ ->
Def
end.
9 changes: 2 additions & 7 deletions big_tests/tests/mongoose_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,8 @@ restart_listener(Spec, Listener) ->
rpc(Spec, ejabberd_listener, start_listener, [Listener]).

should_minio_be_running(Config) ->
case proplists:get_value(preset, Config, undefined) of
undefined -> false;
Preset ->
PresetAtom = list_to_existing_atom(Preset),
DBs = ct:get_config({presets, toml, PresetAtom, dbs}, []),
lists:member(minio, DBs)
end.
DBs = ct_helper:get_preset_var(Config, dbs, []),
lists:member(minio, DBs).

%% It is useful to debug dynamic IQ handler registration
print_debug_info_for_module(Module) ->
Expand Down
16 changes: 15 additions & 1 deletion big_tests/tests/mongooseimctl_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ basic() ->
dump_table,
get_loglevel,
remove_old_messages_test,
remove_expired_messages_test].
remove_expired_messages_test,
kiss_tables_are_in_mnesia_info].

accounts() -> [change_password, check_password_hash, check_password,
check_account, ban_account, num_active_users, delete_old_users,
Expand Down Expand Up @@ -281,6 +282,13 @@ end_per_group(_GroupName, Config) ->
get_registered_users() ->
rpc(mim(), ejabberd_auth, get_vh_registered_users, [domain()]).

init_per_testcase(CaseName = kiss_tables_are_in_mnesia_info, Config) ->
case rpc(mim(), ejabberd_sm, sm_backend, []) of
ejabberd_sm_kiss ->
escalus:init_per_testcase(CaseName, Config);
_ ->
{skip, "Only for kiss preset"}
end;
init_per_testcase(CaseName, Config)
when CaseName == delete_old_users_vhost
orelse CaseName == stats_global
Expand Down Expand Up @@ -1269,6 +1277,12 @@ remove_expired_messages_test(Config) ->
2 = length(SecondList)
end).

kiss_tables_are_in_mnesia_info(Config) ->
{Out, 0} = mongooseimctl("mnesia", ["info"], Config),
Lines = binary:split(iolist_to_binary(Out), <<"\n">>, [global]),
[_Line] = [L || <<"table=kiss_session", _/binary>> = L <- Lines],
ok.

%%-----------------------------------------------------------------
%% Helpers
%%-----------------------------------------------------------------
Expand Down
17 changes: 9 additions & 8 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hoo
Group =:= manual_ack_freq_long_session_timeout;
Group =:= parallel_manual_ack_freq_1;
Group =:= manual_ack_freq_2 ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config;
init_per_group(stale_h, Config) ->
Config;
Expand All @@ -145,18 +145,18 @@ init_per_group(stream_mgmt_disabled, Config) ->
rpc(mim(), mnesia, delete_table, [sm_session]),
Config;
init_per_group(Group, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
escalus:init_per_testcase(CN, Config);
init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
CN =:= gc_repeat_after_timeout_does_clean ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
Config2 = register_some_smid_h(Config),
escalus:init_per_testcase(CN, Config2);
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
Expand All @@ -180,10 +180,10 @@ end_per_testcase(CaseName, Config) ->

%% Module configuration per group (in case of stale_h group it is per testcase)

required_modules(Scope, Name) ->
required_modules(Config, Scope, Name) ->
SMConfig = case required_sm_opts(Scope, Name) of
stopped -> stopped;
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
ExtraOpts -> maps:merge(common_sm_opts(Config), ExtraOpts)
end,
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, []}].
Expand All @@ -210,8 +210,9 @@ required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.

common_sm_opts() ->
#{buffer_max => ?SMALL_SM_BUFFER}.
common_sm_opts(Config) ->
Backend = ct_helper:get_preset_var(Config, stream_management_backend, mnesia),
#{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}.

stale_h(RepeatAfter, Geriatric) ->
#{enabled => true,
Expand Down
13 changes: 10 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
{cache_tab, "1.0.29"},
{segmented_cache, "0.1.1"},
{worker_pool, "6.0.0"},
{kiss, {git, "https://github.com/arcusfelis/kiss.git", {branch, "main"}}},

%%% HTTP tools
{cowboy, "2.9.0"},
Expand Down Expand Up @@ -167,11 +168,17 @@
{erl_opts, [{d, 'PROD_NODE'}]} ]},
%% development nodes
{mim1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim1.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/kiss_disco.txt", "etc/kiss_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim2, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim2.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/kiss_disco.txt", "etc/kiss_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim3, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/mim3.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/kiss_disco.txt", "etc/kiss_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{fed1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/fed1.vars-toml.config"]},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{reg1, [{relx, [ {overlay_vars, ["rel/vars-toml.config", "rel/reg1.vars-toml.config"]},
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
{git,"https://gitlab.com/vkatsuba/jwerl.git",
{ref,"d03607fd14a6a7556f01014af84903a3df60ff5d"}},
0},
{<<"kiss">>,
{git,"https://github.com/arcusfelis/kiss.git",
{ref,"4e7e23fcc0ffe58cf018a0f97437739133f859ed"}},
0},
{<<"lager">>,{pkg,<<"lager">>,<<"3.9.2">>},0},
{<<"lasse">>,{pkg,<<"lasse">>,<<"1.2.0">>},0},
{<<"lhttpc">>,{pkg,<<"lhttpc">>,<<"1.6.2">>},1},
Expand Down
3 changes: 3 additions & 0 deletions rel/files/kiss_disco.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mongooseim@localhost
ejabberd2@localhost
mongooseim3@localhost
13 changes: 13 additions & 0 deletions src/ejabberd_ctl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ process(["mnesia"]) ->
?STATUS_SUCCESS;
process(["mnesia", "info"]) ->
mnesia:info(),
kiss_info(),
?STATUS_SUCCESS;
process(["mnesia", Arg]) when is_list(Arg) ->
case catch mnesia:system_info(list_to_atom(Arg)) of
Expand Down Expand Up @@ -928,3 +929,15 @@ get_dist_proto() ->
_ -> "inet_tcp"
end.

kiss_info() ->
Tables = kiss_discovery:info(mongoose_kiss_discovery),
kiss_info(Tables).

kiss_info([]) ->
ok;
kiss_info(Tables) ->
?PRINT("Kiss tables:~n", []),
[kiss_table_info(Table) || Table <- Tables].

kiss_table_info(#{memory := Memory, size := Size, nodes := Nodes, table := Tab}) ->
?PRINT("table=~0p size=~p memory_words=~0p nodes=~0p~n", [Tab, Size, Memory, Nodes]).
2 changes: 1 addition & 1 deletion src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
}.
-type info() :: #{info_key() => any()}.

-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis.
-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis | ejabberd_sm_kiss.
-type close_reason() :: resumed | normal | replaced.
-type info_key() :: atom().

Expand Down
139 changes: 139 additions & 0 deletions src/ejabberd_sm_kiss.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
-module(ejabberd_sm_kiss).

-behavior(ejabberd_sm_backend).

-include("mongoose.hrl").
-include("session.hrl").

-export([init/1,
get_sessions/0,
get_sessions/1,
get_sessions/2,
get_sessions/3,
create_session/4,
update_session/4,
delete_session/4,
cleanup/1,
total_count/0,
unique_count/0]).

-define(TABLE, kiss_session).

-spec init(list()) -> any().
init(_Opts) ->
kiss:start(?TABLE, #{}),
kiss_discovery:add_table(mongoose_kiss_discovery, ?TABLE).

-spec get_sessions() -> [ejabberd_sm:session()].
get_sessions() ->
tuples_to_sessions(ets:tab2list(?TABLE)).

-spec get_sessions(jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(Server) ->
R = {{Server, '_', '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(User, Server) ->
R = {{Server, User, '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver(), jid:lresource()) ->
[ejabberd_sm:session()].
get_sessions(User, Server, Resource) ->
R = {{Server, User, Resource, '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
%% TODO these sessions should be deduplicated.
%% It is possible, that after merging two kiss tables we could end up
%% with sessions from two nodes for the same full jid.
%% One of the sessions must be killed.
%% We can detect duplicates on the merging step or on reading (or both).
tuples_to_sessions(Xs).

-spec create_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
create_session(User, Server, Resource, Session) ->
case get_sessions(User, Server, Resource) of
[] ->
kiss:insert(?TABLE, session_to_tuple(Session));
Sessions when is_list(Sessions) ->
%% Fix potential race condition during XMPP bind, where
%% multiple calls (> 2) to ejabberd_sm:open_session
%% have been made, resulting in >1 sessions for this resource
%% XXX Why do we need that exactly?
%% Sessions are open from c2s and that specific process is updating
%% its session info. Adding info from other processes would cause some
%% strange bugs. On another hand, there is very limited usage
%% of that info field, so nothing would probably break if
%% we keep calling merge_info (and it would make ejabberd_sm_SUITE happy).
MergedSession = mongoose_session:merge_info
(Session, hd(lists:sort(Sessions))),
kiss:insert(?TABLE, session_to_tuple(MergedSession))
end.

-spec update_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
update_session(_User, _Server, _Resource, Session) ->
kiss:insert(?TABLE, session_to_tuple(Session)).

-spec delete_session(ejabberd_sm:sid(),
User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource()) -> ok.
delete_session(SID, User, Server, Resource) ->
kiss:delete(?TABLE, make_key(User, Server, Resource, SID)).

-spec cleanup(atom()) -> any().
cleanup(Node) ->
%% TODO this could be optimized, we don't need to replicate deletes,
%% we could just call cleanup on each node (but calling the hook only
%% on one of the nodes)
KeyPattern = {'_', '_', '_', {'_', '$1'}},
Guard = {'==', {node, '$1'}, Node},
R = {KeyPattern, '_', '_'},
kiss:sync(?TABLE),
Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]),
Keys = lists:map(fun({Key, _, _} = Tuple) ->
Session = tuple_to_session(Tuple),
ejabberd_sm:run_session_cleanup_hook(Session),
Key
end, Tuples),
kiss:delete_many(?TABLE, Keys).

-spec total_count() -> integer().
total_count() ->
ets:info(?TABLE, size).

%% Counts merged by US
-spec unique_count() -> integer().
unique_count() ->
compute_unique(ets:first(?TABLE), 0).

compute_unique('$end_of_table', Sum) ->
Sum;
compute_unique({S, U, _, _} = Key, Sum) ->
Key2 = ets:next(?TABLE, Key),
case Key2 of
{S, U, _, _} ->
compute_unique(Key2, Sum);
_ ->
compute_unique(Key2, Sum + 1)
end.

session_to_tuple(#session{sid = SID, usr = {U, S, R}, priority = Prio, info = Info}) ->
{make_key(U, S, R, SID), Prio, Info}.

make_key(User, Server, Resource, SID) ->
{Server, User, Resource, SID}.

tuple_to_session({{S, U, R, SID}, Prio, Info}) ->
#session{sid = SID, usr = {U, S, R}, us = {U, S}, priority = Prio, info = Info}.

tuples_to_sessions(Xs) ->
[tuple_to_session(X) || X <- Xs].
Loading