Skip to content

Commit

Permalink
Merge pull request #3556 from esl/mu-stream-man-backend
Browse files Browse the repository at this point in the history
Add mod_stream_management_backend
  • Loading branch information
NelsonVides authored Feb 23, 2022
2 parents 028f9e4 + 6c21188 commit b4392ac
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 184 deletions.
8 changes: 5 additions & 3 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
-define(MOD_SM, mod_stream_management).
-define(CONSTRAINT_CHECK_TIMEOUT, 5000).
-define(LONG_TIMEOUT, 3600).
-define(SHORT_TIMEOUT, 3).
-define(SHORT_TIMEOUT, 1).
-define(SMALL_SM_BUFFER, 3).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -840,7 +840,8 @@ resume_session_with_wrong_h_does_not_leak_sessions(Config) ->
escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed),

[] = sm_helper:get_user_present_resources(Alice),
{error, smid_not_found} = sm_helper:get_sid_by_stream_id(SMID),
HostType = host_type(),
{error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID),
escalus_connection:wait_for_close(Alice, timer:seconds(5))
end).

Expand All @@ -860,7 +861,8 @@ resume_session_with_wrong_namespace_is_a_noop(Config) ->
resume_dead_session_results_in_item_not_found(Config) ->
SMID = base64:encode(crypto:strong_rand_bytes(21)),
SID = {os:timestamp(), undefined},
rpc(mim(), ?MOD_SM, register_smid, [SMID, SID]),
HostType = host_type(),
rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]),
session_resumption_expects_item_not_found(Config, SMID).

session_resumption_expects_item_not_found(Config, SMID) ->
Expand Down
6 changes: 3 additions & 3 deletions big_tests/tests/sm_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
-export([client_to_spec0/1,
client_to_spec/1,
client_to_smid/1,
get_sid_by_stream_id/1]).
get_sid_by_stream_id/2]).

%% Connection helpers
-export([connect_fresh/3,
Expand Down Expand Up @@ -55,8 +55,8 @@ client_to_spec(#client{props = Props}) ->
client_to_spec0(#client{props = Props}) ->
lists:foldl(fun proplists:delete/2, Props, [stream_id, resource]).

get_sid_by_stream_id(SMID) ->
rpc(mim(), ?MOD_SM, get_sid, [SMID]).
get_sid_by_stream_id(HostType, SMID) ->
rpc(mim(), ?MOD_SM, get_sid, [HostType, SMID]).


%% Connection helpers
Expand Down
5 changes: 5 additions & 0 deletions doc/modules/mod_stream_management.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ while the management of the session tables and configuration is implemented in

## Options

### `modules.mod_stream_management.backend`
* **Syntax:** string.
* **Default:** "mnesia"
* **Example:** `backend = "mnesia"`

### `modules.mod_stream_management.buffer`
* **Syntax:** boolean
* **Default:** true
Expand Down
6 changes: 3 additions & 3 deletions src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2798,13 +2798,13 @@ maybe_enable_stream_mgmt(NextState, El, StateData = #state{host_type = HostType}
c2s_stream_error(mongoose_xmpp_errors:invalid_namespace(), StateData)
end.

enable_stream_resumption(SD) ->
enable_stream_resumption(SD = #state{host_type = HostType}) ->
SMID = mod_stream_management:make_smid(),
SID = case SD#state.sid of
undefined -> ejabberd_sm:make_new_sid();
RSID -> RSID
end,
ok = mod_stream_management:register_smid(SMID, SID),
ok = mod_stream_management:register_smid(HostType, SMID, SID),
{SD#state{stream_mgmt_id = SMID, sid = SID},
stream_mgmt_enabled([{<<"id">>, SMID}, {<<"resume">>, <<"true">>}])}.

Expand Down Expand Up @@ -3105,7 +3105,7 @@ do_resume_session(SMID, El, {sid, {_, Pid}}, StateData) ->
Info = #{ip => NSD#state.ip, conn => NSD#state.conn,
auth_module => NSD#state.auth_module },
ejabberd_sm:open_session(NSD#state.host_type, SID, NSD#state.jid, Priority, Info),
ok = mod_stream_management:register_smid(SMID, SID),
ok = mod_stream_management:register_smid(NSD#state.host_type, SMID, SID),
try
Resumed = stream_mgmt_resumed(NSD#state.stream_mgmt_id,
NSD#state.stream_mgmt_in),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,35 @@
get_buffer_max/2,
get_ack_freq/2,
get_resume_timeout/2,
register_smid/2]).
register_smid/3]).

%% API for inspection and tests
-export([get_sid/1,
-export([get_sid/2,
get_stale_h/2,
register_stale_smid_h/3,
remove_stale_smid_h/2]).

-ignore_xref([c2s_stream_features/3, get_sid/1, get_stale_h/2, remove_smid/5,
-ignore_xref([c2s_stream_features/3, get_sid/2, get_stale_h/2, remove_smid/5,
register_stale_smid_h/3, remove_stale_smid_h/2, session_cleanup/5]).

-type smid() :: base64:ascii_binary().

-export_type([smid/0]).

-include("mongoose.hrl").
-include("jlib.hrl").
-include("mongoose_config_spec.hrl").

-record(sm_session,
{smid :: smid(),
sid :: ejabberd_sm:sid()
}).

-type buffer_max() :: pos_integer() | infinity | no_buffer.
-type ack_freq() :: pos_integer() | never.
%%
%% `gen_mod' callbacks
%%

start(HostType, Opts) ->
mod_stream_management_backend:init(HostType, Opts),
?LOG_INFO(#{what => stream_management_starting}),
ejabberd_hooks:add(hooks(HostType)),
mnesia:create_table(sm_session, [{ram_copies, [node()]},
{attributes, record_info(fields, sm_session)}]),
mnesia:add_table_index(sm_session, sid),
mnesia:add_table_copy(sm_session, node(), ram_copies),
stream_management_stale_h:maybe_start(Opts),
ok.

stop(HostType) ->
Expand All @@ -72,7 +65,8 @@ hooks(HostType) ->
-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{
items = #{<<"buffer">> => #option{type = boolean},
items = #{<<"backend">> => #option{type = atom, validate = {module, ?MODULE}},
<<"buffer">> => #option{type = boolean},
<<"buffer_max">> => #option{type = int_or_infinity,
validate = positive},
<<"ack">> => #option{type = boolean},
Expand Down Expand Up @@ -148,15 +142,12 @@ session_cleanup(Acc, _LUser, _LServer, _LResource, SID) ->
mongoose_acc:t().
do_remove_smid(Acc, HostType, SID) ->
H = mongoose_acc:get(stream_mgmt, h, undefined, Acc),
MaybeSMID = case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of
[] -> {error, smid_not_found};
[#sm_session{smid = SMID}] ->
mnesia:dirty_delete(sm_session, SMID),
case H of
undefined -> ok;
_ -> register_stale_smid_h(HostType, SMID, H)
end,
{ok, SMID}
MaybeSMID = unregister_smid(HostType, SID),
case MaybeSMID of
{ok, SMID} when H =/= undefined ->
register_stale_smid_h(HostType, SMID, H);
_ ->
ok
end,
mongoose_acc:set(stream_mgmt, smid, MaybeSMID, Acc).

Expand All @@ -172,26 +163,17 @@ make_smid() ->
-spec get_session_from_smid(mongooseim:host_type(), smid()) ->
{sid, ejabberd_sm:sid()} | {stale_h, non_neg_integer()} | {error, smid_not_found}.
get_session_from_smid(HostType, SMID) ->
case get_sid(SMID) of
case get_sid(HostType, SMID) of
{sid, SID} -> {sid, SID};
{error, smid_not_found} -> get_stale_h(HostType, SMID)
end.

-spec get_sid(smid()) ->
{sid, ejabberd_sm:sid()} | {error, smid_not_found}.
get_sid(SMID) ->
case mnesia:dirty_read(sm_session, SMID) of
[#sm_session{sid = SID}] -> {sid, SID};
[] -> {error, smid_not_found}
end.

-spec get_stale_h(mongooseim:host_type(), SMID :: smid()) ->
{stale_h, non_neg_integer()} | {error, smid_not_found}.
get_stale_h(HostType, SMID) ->
MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []),
case proplists:get_value(enabled, MaybeModOpts, false) of
case is_stale_h_enabled(HostType) of
false -> {error, smid_not_found};
true -> stream_management_stale_h:read_stale_h(SMID)
true -> read_stale_h(HostType, SMID)
end.

-spec get_buffer_max(mongooseim:host_type(), buffer_max()) -> buffer_max().
Expand All @@ -206,26 +188,61 @@ get_ack_freq(HostType, Default) ->
get_resume_timeout(HostType, Default) ->
gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout, Default).

%% Setters
register_smid(SMID, SID) ->
try
mnesia:sync_dirty(fun mnesia:write/1,
[#sm_session{smid = SMID, sid = SID}]),
ok
catch exit:Reason ->
{error, Reason}
end.

register_stale_smid_h(HostType, SMID, H) ->
MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []),
case proplists:get_value(enabled, MaybeModOpts, false) of
case is_stale_h_enabled(HostType) of
false -> ok;
true -> stream_management_stale_h:write_stale_h(SMID, H)
true -> write_stale_h(HostType, SMID, H)
end.

remove_stale_smid_h(HostType, SMID) ->
MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []),
case proplists:get_value(enabled, MaybeModOpts, false) of
case is_stale_h_enabled(HostType) of
false -> ok;
true -> stream_management_stale_h:delete_stale_h(SMID)
true -> delete_stale_h(HostType, SMID)
end.

is_stale_h_enabled(HostType) ->
MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []),
proplists:get_value(enabled, MaybeModOpts, false).

%% Backend operations

-spec register_smid(HostType, SMID, SID) ->
ok | {error, term()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
SID :: ejabberd_sm:sid().
register_smid(HostType, SMID, SID) ->
mod_stream_management_backend:register_smid(HostType, SMID, SID).

-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) ->
{ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}.
unregister_smid(HostType, SID) ->
mod_stream_management_backend:unregister_smid(HostType, SID).

-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) ->
{sid, ejabberd_sm:sid()} | {error, smid_not_found}.
get_sid(HostType, SMID) ->
mod_stream_management_backend:get_sid(HostType, SMID).

%% stale_h

-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
H :: non_neg_integer().
write_stale_h(HostType, SMID, H) ->
mod_stream_management_backend:write_stale_h(HostType, SMID, H).

-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().
delete_stale_h(HostType, SMID) ->
mod_stream_management_backend:delete_stale_h(HostType, SMID).

-spec read_stale_h(HostType, SMID) ->
{stale_h, non_neg_integer()} | {error, smid_not_found} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().
read_stale_h(HostType, SMID) ->
mod_stream_management_backend:read_stale_h(HostType, SMID).
105 changes: 105 additions & 0 deletions src/stream_management/mod_stream_management_backend.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
-module(mod_stream_management_backend).
-export([init/2,
register_smid/3,
unregister_smid/2,
get_sid/2]).

-export([read_stale_h/2,
write_stale_h/3,
delete_stale_h/2]).

-define(MAIN_MODULE, mod_stream_management).

%% ----------------------------------------------------------------------
%% Callbacks
%% (exactly the same as specs in this module)

-callback init(HostType, Opts) -> ok when
HostType :: mongooseim:host_type(),
Opts :: gen_mod:module_opts().

-callback register_smid(HostType, SMID, SID) ->
ok | {error, term()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
SID :: ejabberd_sm:sid().

-callback unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) ->
{ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}.

-callback get_sid(mongooseim:host_type(), mod_stream_management:smid()) ->
{sid, ejabberd_sm:sid()} | {error, smid_not_found}.

%% stale_h functions

-callback read_stale_h(HostType, SMID) ->
{stale_h, non_neg_integer()} | {error, smid_not_found} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().

-callback write_stale_h(HostType, SMID, H) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
H :: non_neg_integer().

-callback delete_stale_h(HostType, SMID) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().

%% ----------------------------------------------------------------------
%% API Functions

-spec init(HostType, Opts) -> ok when
HostType :: mongooseim:host_type(),
Opts :: gen_mod:module_opts().
init(HostType, Opts) ->
TrackedFuns = [],
mongoose_backend:init(HostType, ?MAIN_MODULE, TrackedFuns, Opts),
Args = [HostType, Opts],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec register_smid(HostType, SMID, SID) ->
ok | {error, term()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
SID :: ejabberd_sm:sid().
register_smid(HostType, SMID, SID) ->
Args = [HostType, SMID, SID],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) ->
{ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}.
unregister_smid(HostType, SID) ->
Args = [HostType, SID],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) ->
{sid, ejabberd_sm:sid()} | {error, smid_not_found}.
get_sid(HostType, SMID) ->
Args = [HostType, SMID],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

%% stale_h functions

-spec read_stale_h(HostType, SMID) ->
{stale_h, non_neg_integer()} | {error, smid_not_found} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().
read_stale_h(HostType, SMID) ->
Args = [HostType, SMID],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid(),
H :: non_neg_integer().
write_stale_h(HostType, SMID, H) ->
Args = [HostType, SMID, H],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when
HostType :: mongooseim:host_type(),
SMID :: mod_stream_management:smid().
delete_stale_h(HostType, SMID) ->
Args = [HostType, SMID],
mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args).
Loading

0 comments on commit b4392ac

Please sign in to comment.