Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative backend to Mnesia - CETS #3629

Merged
merged 14 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ jobs:
preset:
type: enum
enum: [internal_mnesia, mysql_redis, odbc_mssql_mnesia, ldap_mnesia,
elasticsearch_and_cassandra_mnesia, pgsql_mnesia]
elasticsearch_and_cassandra_mnesia, pgsql_mnesia, internal_cets]
description: Preset to run
default: internal_mnesia
db:
Expand Down Expand Up @@ -831,6 +831,15 @@ workflows:
requires:
- otp_25_docker
filters: *all_tags
- big_tests_in_docker:
name: internal_cets_25
executor: otp_25_redis
context: mongooseim-org
preset: internal_cets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A preset like pgsql_cets might make more sense, but we could change it separately in the feature branch.

db: "mnesia cets"
requires:
- otp_25_docker
filters: *all_tags
- big_tests_in_docker:
name: mysql_redis_25
executor: otp_25_mysql_redis
Expand Down
7 changes: 7 additions & 0 deletions big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@
[{dbs, [redis, minio]},
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{internal_cets,
[{dbs, [redis]},
{sm_backend, "\"cets\""},
{stream_management_backend, cets},
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10"}]},
{pgsql_mnesia,
[{dbs, [redis, pgsql]},
Expand Down
12 changes: 11 additions & 1 deletion big_tests/tests/ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
repeat_all_until_all_ok/2,
repeat_all_until_any_fail/1,
repeat_all_until_any_fail/2,
groups_to_all/1]).
groups_to_all/1,
get_preset_var/3]).

-type group_name() :: atom().

Expand Down Expand Up @@ -114,3 +115,12 @@ is_ct_started() ->

groups_to_all(Groups) ->
[{group, Name} || {Name, _Opts, _Cases} <- Groups].

get_preset_var(Config, Opt, Def) ->
case proplists:get_value(preset, Config, undefined) of
Preset ->
PresetAtom = list_to_existing_atom(Preset),
ct:get_config({presets, toml, PresetAtom, Opt}, Def);
_ ->
Def
end.
4 changes: 2 additions & 2 deletions big_tests/tests/graphql_metric_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,13 @@ get_cluster_metrics_by_nonexistent_name(Config) ->
Result = get_cluster_metrics_as_dicts_by_name([<<"nonexistent">>], Config),
ParsedResult = get_ok_value([data, metric, getClusterMetricsAsDicts], Result),
[#{<<"node">> := _, <<"result">> := []},
#{<<"node">> := _, <<"result">> := []}] = ParsedResult.
#{<<"node">> := _, <<"result">> := []}|_] = ParsedResult. %% two or three nodes.

get_cluster_metrics_with_nonexistent_key(Config) ->
Result = get_cluster_metrics_as_dicts_with_keys([<<"nonexistent">>], Config),
ParsedResult = get_ok_value([data, metric, getClusterMetricsAsDicts], Result),
[#{<<"node">> := _, <<"result">> := [_|_]},
#{<<"node">> := _, <<"result">> := [_|_]}] = ParsedResult.
#{<<"node">> := _, <<"result">> := [_|_]}|_] = ParsedResult.

get_cluster_metrics_empty_args(Config) ->
Node = atom_to_binary(maps:get(node, distributed_helper:mim2())),
Expand Down
9 changes: 2 additions & 7 deletions big_tests/tests/mongoose_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,8 @@ restart_listener(Spec, Listener) ->
rpc(Spec, mongoose_listener, start_listener, [Listener]).

should_minio_be_running(Config) ->
case proplists:get_value(preset, Config, undefined) of
undefined -> false;
Preset ->
PresetAtom = list_to_existing_atom(Preset),
DBs = ct:get_config({presets, toml, PresetAtom, dbs}, []),
lists:member(minio, DBs)
end.
DBs = ct_helper:get_preset_var(Config, dbs, []),
lists:member(minio, DBs).

%% It is useful to debug dynamic IQ handler registration
print_debug_info_for_module(Module) ->
Expand Down
8 changes: 4 additions & 4 deletions big_tests/tests/rest_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ assert_inlist(Pattern, L) ->
Fl = lists:filter(fun(X) -> case X of Pattern -> true; _ -> false end end, L),
case Fl of
[] ->
ct:fail(io_lib:format("Fail: ~p not in [~p...]", [Pattern, H]));
ct:fail("Fail: ~p not in [~p...]", [Pattern, H]);
_ ->
Fl
end.
Expand All @@ -49,13 +49,13 @@ assert_notinlist(Pattern, L) ->
[] ->
ok;
_ ->
ct:fail(io_lib:format("Fail: ~p in ~p", [Pattern, L]))
ct:fail("Fail: ~p in ~p", [Pattern, L])
end.

assert_inmaplist([], Map, L, [H|_]) ->
case L of
[] ->
ct:fail(io_lib:format("Fail: ~p not in [~p...]", [Map, H]));
ct:fail("Fail: ~p not in [~p...]", [Map, H]);
_ ->
L
end;
Expand All @@ -70,7 +70,7 @@ assert_notinmaplist([], Map, L, [H|_]) ->
[] ->
ok;
_ ->
ct:fail(io_lib:format("Fail: ~p in [~p...]", [Map, H]))
ct:fail("Fail: ~p in [~p...]", [Map, H])
end;
assert_notinmaplist([K|Keys], Map, L, Orig) ->
V = maps:get(K, Map),
Expand Down
17 changes: 9 additions & 8 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hoo
Group =:= manual_ack_freq_long_session_timeout;
Group =:= parallel_manual_ack_freq_1;
Group =:= manual_ack_freq_2 ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config;
init_per_group(stale_h, Config) ->
Config;
Expand All @@ -151,18 +151,18 @@ init_per_group(stream_mgmt_disabled, Config) ->
rpc(mim(), mnesia, delete_table, [sm_session]),
Config;
init_per_group(Group, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, group, Group)),
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
escalus:init_per_testcase(CN, Config);
init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
CN =:= gc_repeat_after_timeout_does_clean ->
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
Config2 = register_some_smid_h(Config),
escalus:init_per_testcase(CN, Config2);
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
Expand All @@ -187,10 +187,10 @@ end_per_testcase(CaseName, Config) ->

%% Module configuration per group (in case of stale_h group it is per testcase)

required_modules(Scope, Name) ->
required_modules(Config, Scope, Name) ->
SMConfig = case required_sm_opts(Scope, Name) of
stopped -> stopped;
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
ExtraOpts -> maps:merge(common_sm_opts(Config), ExtraOpts)
end,
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, config_parser_helper:mod_config(mod_offline, #{})}].
Expand All @@ -217,8 +217,9 @@ required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.

common_sm_opts() ->
#{buffer_max => ?SMALL_SM_BUFFER}.
common_sm_opts(Config) ->
Backend = ct_helper:get_preset_var(Config, stream_management_backend, mnesia),
#{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}.

stale_h(RepeatAfter, Geriatric) ->
#{enabled => true,
Expand Down
13 changes: 10 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
{cache_tab, "1.0.30"},
{segmented_cache, "0.2.0"},
{worker_pool, "6.0.1"},
{cets, {git, "https://github.com/esl/cets.git", {branch, "main"}}},

%%% HTTP tools
{graphql, {git, "https://github.com/esl/graphql-erlang.git", {branch, "master"}}},
Expand Down Expand Up @@ -169,11 +170,17 @@
{erl_opts, [{d, 'PROD_NODE'}]} ]},
%% development nodes
{mim1, [{relx, [ {overlay_vars, "rel/mim1.vars-toml.config"},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim2, [{relx, [ {overlay_vars, "rel/mim2.vars-toml.config"},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{mim3, [{relx, [ {overlay_vars, "rel/mim3.vars-toml.config"},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{overlay, [
{copy, "rel/files/cets_disco.txt", "etc/cets_disco.txt"},
{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{fed1, [{relx, [ {overlay_vars, "rel/fed1.vars-toml.config"},
{overlay, [{template, "rel/files/mongooseim.toml", "etc/mongooseim.toml"}]} ]}]},
{reg1, [{relx, [ {overlay_vars, "rel/reg1.vars-toml.config"},
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
{<<"bear">>,{pkg,<<"bear">>,<<"1.0.0">>},1},
{<<"cache_tab">>,{pkg,<<"cache_tab">>,<<"1.0.30">>},0},
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"351221c7a2f2c64f7ebc163428f8d340b71705ac"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1},
Expand Down
3 changes: 3 additions & 0 deletions rel/files/cets_disco.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mongooseim@localhost
ejabberd2@localhost
mongooseim3@localhost
2 changes: 1 addition & 1 deletion src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
}.
-type info() :: #{info_key() => any()}.

-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis.
-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis | ejabberd_sm_cets.
-type close_reason() :: resumed | normal | replaced.
-type info_key() :: atom().

Expand Down
136 changes: 136 additions & 0 deletions src/ejabberd_sm_cets.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
-module(ejabberd_sm_cets).

-behavior(ejabberd_sm_backend).

-include("mongoose.hrl").
-include("session.hrl").

-export([init/1,
get_sessions/0,
get_sessions/1,
get_sessions/2,
get_sessions/3,
create_session/4,
update_session/4,
delete_session/4,
cleanup/1,
total_count/0,
unique_count/0]).

-define(TABLE, cets_session).

-spec init(map()) -> any().
init(_Opts) ->
cets:start(?TABLE, #{}),
cets_discovery:add_table(mongoose_cets_discovery, ?TABLE).

-spec get_sessions() -> [ejabberd_sm:session()].
get_sessions() ->
tuples_to_sessions(ets:tab2list(?TABLE)).

-spec get_sessions(jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(Server) ->
%% This is not a full table scan. From the ETS docs:
%% For ordered_set a partially bound key will limit the traversal to only
%% scan a subset of the table based on term order.
%% A partially bound key is either a list or a tuple with
%% a prefix that is fully bound.
R = {{Server, '_', '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver()) -> [ejabberd_sm:session()].
get_sessions(User, Server) ->
R = {{Server, User, '_', '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
tuples_to_sessions(Xs).

-spec get_sessions(jid:luser(), jid:lserver(), jid:lresource()) ->
[ejabberd_sm:session()].
get_sessions(User, Server, Resource) ->
R = {{Server, User, Resource, '_'}, '_', '_'},
Xs = ets:select(?TABLE, [{R, [], ['$_']}]),
%% TODO these sessions should be deduplicated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have this issue in Mnesia? Or would it just throw 'running inconsistent database' in this case? I think it would only occur when rejoining after netsplits, because a fresh new node shouldn't have any sessions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very similar in Mnesia: if user reconnects with the same resource name, we send kick to the old session, but that is an async operation, so we could have two records in the table.
Also, it could occur with cets and nesplits for sure, pretty rare though.

Copy link
Contributor Author

@arcusfelis arcusfelis Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the best solution would be to write a property based test for this module (just test using the module API, without escalus and c2s logic).

We could find some interesting edge cases or at least we would figure out a set of properties the session API has.

But to be honest, that PR was just to copy what ejabberd_sm_mnesia does with minimal changes, so we can see the usage of cets API comparing to mnesia API.

%% It is possible, that after merging two cets tables we could end up
%% with sessions from two nodes for the same full jid.
%% One of the sessions must be killed.
%% We can detect duplicates on the merging step or on reading (or both).
tuples_to_sessions(Xs).

-spec create_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
create_session(User, Server, Resource, Session) ->
case get_sessions(User, Server, Resource) of
[] ->
cets:insert(?TABLE, session_to_tuple(Session));
Sessions when is_list(Sessions) ->
%% TODO merge_info function would be removed, once MIM-1875 is done
MergedSession = mongoose_session:merge_info
(Session, hd(lists:sort(Sessions))),
cets:insert(?TABLE, session_to_tuple(MergedSession))
end.

-spec update_session(User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource(),
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
update_session(_User, _Server, _Resource, Session) ->
cets:insert(?TABLE, session_to_tuple(Session)).

-spec delete_session(ejabberd_sm:sid(),
User :: jid:luser(),
Server :: jid:lserver(),
Resource :: jid:lresource()) -> ok.
delete_session(SID, User, Server, Resource) ->
cets:delete(?TABLE, make_key(User, Server, Resource, SID)).

%% cleanup is called on each node in the cluster, when Node is down
-spec cleanup(atom()) -> any().
cleanup(Node) ->
KeyPattern = {'_', '_', '_', {'_', '$1'}},
Guard = {'==', {node, '$1'}, Node},
R = {KeyPattern, '_', '_'},
cets:sync(?TABLE),
%% This is a full table scan, but cleanup is rare.
Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]),
lists:foreach(fun({Key, _, _} = Tuple) ->
Session = tuple_to_session(Tuple),
ejabberd_sm:run_session_cleanup_hook(Session)
end, Tuples),
%% We don't need to replicate deletes
%% We remove the local content here
ets:select_delete(?TABLE, [{R, [Guard], [true]}]).

-spec total_count() -> integer().
total_count() ->
ets:info(?TABLE, size).

%% Counts merged by US
-spec unique_count() -> integer().
unique_count() ->
compute_unique(ets:first(?TABLE), 0).

compute_unique('$end_of_table', Sum) ->
Sum;
compute_unique({S, U, _, _} = Key, Sum) ->
Key2 = ets:next(?TABLE, Key),
case Key2 of
{S, U, _, _} ->
compute_unique(Key2, Sum);
_ ->
compute_unique(Key2, Sum + 1)
end.

session_to_tuple(#session{sid = SID, usr = {U, S, R}, priority = Prio, info = Info}) ->
{make_key(U, S, R, SID), Prio, Info}.

make_key(User, Server, Resource, SID) ->
{Server, User, Resource, SID}.

tuple_to_session({{S, U, R, SID}, Prio, Info}) ->
#session{sid = SID, usr = {U, S, R}, us = {U, S}, priority = Prio, info = Info}.

tuples_to_sessions(Xs) ->
[tuple_to_session(X) || X <- Xs].
10 changes: 9 additions & 1 deletion src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,16 @@ init([]) ->
{pg,
{pg, start_link, [mim_scope]},
permanent, infinity, supervisor, [pg]},
ConfigDir = filename:dirname(mongoose_config:get_config_path()),
DiscoFile = filename:join(ConfigDir, "cets_disco.txt"),
DiscoOpts = #{name => mongoose_cets_discovery, disco_file => DiscoFile},
CetsDisco =
{cets_discovery,
{cets_discovery, start_link, [DiscoOpts]},
permanent, infinity, supervisor, [cets_discovery]},
{ok, {{one_for_one, 10, 1},
[PG,
[CetsDisco,
PG,
Comment on lines +156 to +165
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to have it temporarily in a feature branch, but I don't think it should be enabled by default - I would make it configurable with a new RDBMS discovery backend, but we could do it later of course.

Hooks,
Cleaner,
SMBackendSupervisor,
Expand Down
Loading