Skip to content

Commit

Permalink
Unify code for gen_servers that run cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Mar 31, 2022
1 parent 9e6512d commit 2dedbcb
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 82 deletions.
3 changes: 2 additions & 1 deletion big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ init_per_testcase(CaseName, Config) ->
end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h;
CN =:= gc_repeat_after_never_means_no_cleaning;
CN =:= gc_repeat_after_timeout_does_clean ->
rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]),
Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]),
rpc(mim(), ejabberd_sup, stop_child, [Name]),
escalus:end_per_testcase(CN, Config);
end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
unregister_handler(),
Expand Down
37 changes: 5 additions & 32 deletions src/inbox/mod_inbox_rdbms_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
-include("mod_inbox.hrl").
-include("mongoose_logger.hrl").

-behaviour(gen_server).
-behaviour(mod_inbox_backend).
-behaviour(mongoose_aggregator_worker).

Expand Down Expand Up @@ -35,8 +34,7 @@

%% Cleaner gen_server callbacks
-export([flush_user_bin/3, flush_global_bin/2]).
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2]).
-ignore_xref([flush_user_bin/3, flush_global_bin/2, start_link/2]).
-ignore_xref([flush_user_bin/3, flush_global_bin/2]).

%% Initialisation
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
Expand Down Expand Up @@ -321,39 +319,14 @@ aggregate(_OldTask, NewTask) ->


%% Cleaner gen_server callbacks
start_cleaner(HostType, #{async_writer := Opts}) ->
MFA = {?MODULE, start_link, [HostType, Opts]},
start_cleaner(HostType, #{async_writer := #{bin_ttl := TTL, bin_clean_after := Interval}}) ->
Name = gen_mod:get_module_proc(HostType, ?MODULE),
WOpts = #{host_type => HostType, action => fun ?MODULE:flush_global_bin/2,
opts => TTL, interval => Interval},
MFA = {mongoose_collector, start_link, [Name, WOpts]},
ChildSpec = {Name, MFA, permanent, 5000, worker, [?MODULE]},
ejabberd_sup:start_child(ChildSpec).

stop_cleaner(HostType) ->
Name = gen_mod:get_module_proc(HostType, ?MODULE),
ejabberd_sup:stop_child(Name).

start_link(HostType, Opts) ->
gen_server:start_link(?MODULE, {HostType, Opts}, []).

init({HostType, #{bin_ttl := TTL, bin_clean_after := Timeout}}) ->
State = #{host_type => HostType, bin_ttl => TTL,
bin_clean_after => Timeout, timer_ref => undefined},
{ok, schedule_check(State)}.

handle_call(Msg, From, State) ->
?UNEXPECTED_CALL(Msg, From),
{reply, ok, State}.

handle_cast(Msg, State) ->
?UNEXPECTED_CAST(Msg),
{noreply, State}.

handle_info({timeout, Ref, empty_bin},
#{timer_ref := Ref, host_type := HostType, bin_ttl := TTL} = State) ->
flush_global_bin(HostType, TTL),
{noreply, schedule_check(State)};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info),
{noreply, State}.

schedule_check(State = #{bin_clean_after := Timeout}) ->
State#{timer_ref := erlang:start_timer(Timeout, self(), empty_bin)}.
53 changes: 53 additions & 0 deletions src/mongoose_collector.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-module(mongoose_collector).

-include("mongoose_logger.hrl").

%% gen_server callbacks
-behaviour(gen_server).
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2]).

-ignore_xref([start_link/2]).

-record(watchdog, {
host_type :: mongooseim:host_type(),
action :: fun((mongooseim:host_type(), map()) -> term()),
opts :: term(),
interval :: pos_integer(),
timer_ref :: undefined | reference()
}).

start_link(Name, Opts) ->
gen_server:start_link({local, Name}, ?MODULE, Opts, []).

init(#{host_type := HostType,
action := Fun,
opts := Opts,
interval := Interval}) when is_function(Fun, 2) ->
State = #watchdog{host_type = HostType,
action = Fun,
opts = Opts,
interval = Interval,
timer_ref = undefined},
{ok, schedule_check(State)}.

handle_call(Msg, From, State) ->
?UNEXPECTED_CALL(Msg, From),
{reply, ok, State}.

handle_cast(Msg, State) ->
?UNEXPECTED_CAST(Msg),
{noreply, State}.

handle_info({timeout, Ref, run_action},
#watchdog{timer_ref = Ref} = State) ->
run_action(State),
{noreply, schedule_check(State)};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info),
{noreply, State}.

schedule_check(State = #watchdog{interval = Interval}) ->
State#watchdog{timer_ref = erlang:start_timer(Interval, self(), run_action)}.

run_action(#watchdog{host_type = HostType, action = Fun, opts = Opts}) ->
Fun(HostType, Opts).
62 changes: 13 additions & 49 deletions src/stream_management/mod_stream_management_mnesia.erl
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
-module(mod_stream_management_mnesia).
-behaviour(mod_stream_management_backend).
-behaviour(gen_server).

-include("mongoose.hrl").
-include("jlib.hrl").
-include_lib("stdlib/include/ms_transform.hrl").

-export([init/2,
Expand All @@ -16,16 +14,7 @@
delete_stale_h/2]).

%% Internal exports
-export([start_link/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-ignore_xref([start_link/1]).

-record(smgc_state,
{gc_repeat_after :: non_neg_integer(),
gc_geriatric :: non_neg_integer() }).
-export([clear_table/2]).

-record(stream_mgmt_stale_h,
{smid :: mod_stream_management:smid(),
Expand All @@ -36,22 +25,22 @@
{smid :: mod_stream_management:smid(),
sid :: ejabberd_sm:sid() }).

init(_HostType, #{stale_h := StaleOpts}) ->
init(HostType, #{stale_h := StaleOpts}) ->
mnesia:create_table(sm_session, [{ram_copies, [node()]},
{attributes, record_info(fields, sm_session)}]),
mnesia:add_table_index(sm_session, sid),
mnesia:add_table_copy(sm_session, node(), ram_copies),
maybe_init_stale_h(StaleOpts),
maybe_init_stale_h(HostType, StaleOpts),
ok.

maybe_init_stale_h(StaleOpts = #{enabled := true}) ->
maybe_init_stale_h(HostType, StaleOpts = #{enabled := true}) ->
?LOG_INFO(#{what => stream_mgmt_stale_h_start}),
mnesia:create_table(stream_mgmt_stale_h,
[{ram_copies, [node()]},
{attributes, record_info(fields, stream_mgmt_stale_h)}]),
mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies),
start_cleaner(StaleOpts);
maybe_init_stale_h(_) -> ok.
start_cleaner(HostType, StaleOpts);
maybe_init_stale_h(_, _) -> ok.

-spec register_smid(HostType, SMID, SID) ->
ok | {error, term()} when
Expand Down Expand Up @@ -126,41 +115,16 @@ delete_stale_h(_HostType, SMID) ->

%% stale_h cleaning logic

start_cleaner(Opts) ->
MFA = {?MODULE, start_link, [Opts]},
ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]},
start_cleaner(HostType, #{repeat_after := Interval, geriatric := TTL}) ->
Name = gen_mod:get_module_proc(HostType, stream_management_stale_h),
WOpts = #{host_type => HostType, action => fun ?MODULE:clear_table/2,
opts => TTL, interval => Interval},
MFA = {mongoose_collector, start_link, [Name, WOpts]},
ChildSpec = {Name, MFA, permanent, 5000, worker, [?MODULE]},
%% TODO cleaner should be a service
ejabberd_sup:start_child(ChildSpec).

start_link(Opts) ->
gen_server:start_link({local, stream_management_stale_h}, ?MODULE, Opts, []).

init(#{repeat_after := RepeatAfter, geriatric := GeriatricAge}) ->
State = #smgc_state{gc_repeat_after = RepeatAfter,
gc_geriatric = GeriatricAge},
schedule_check(State),
{ok, State}.

handle_call(Msg, From, State) ->
?UNEXPECTED_CALL(Msg, From),
{reply, ok, State}.

handle_cast(Msg, State) ->
?UNEXPECTED_CAST(Msg),
{noreply, State}.

handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) ->
clear_table(GeriatricAge),
schedule_check(State),
{noreply, State};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info),
{noreply, State}.

schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) ->
erlang:send_after(timer:seconds(RepeatAfter), self(), check).

clear_table(GeriatricAge) ->
clear_table(_HostType, GeriatricAge) ->
TimeToDie = erlang:monotonic_time(second) - GeriatricAge,
MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp = S}) when S < TimeToDie -> true end),
ets:select_delete(stream_mgmt_stale_h, MS).

0 comments on commit 2dedbcb

Please sign in to comment.