Skip to content

Commit

Permalink
Rework WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Mar 22, 2022
1 parent 17f4538 commit 1a6b934
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 582 deletions.
12 changes: 5 additions & 7 deletions src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1966,9 +1966,7 @@ presence_update_to_available(Acc, From, Packet, StateData) ->
Packet :: exml:element(),
StateData :: state()) -> {mongoose_acc:t(), state()}.
presence_update_to_available(true, Acc, _, NewPriority, From, Packet, StateData) ->
Acc2 = mongoose_hooks:user_available_hook(StateData#state.host_type,
Acc,
StateData#state.jid),
Acc2 = mongoose_hooks:user_available_hook(Acc, StateData#state.jid),
Res = case NewPriority >= 0 of
true ->
Acc3 = mongoose_hooks:roster_get_subscription_lists(
Expand Down Expand Up @@ -3027,14 +3025,14 @@ notify_unacknowledged_msg_if_in_resume_state(Acc,
notify_unacknowledged_msg_if_in_resume_state(Acc, _) ->
Acc.

maybe_notify_unacknowledged_msg(Acc, #state{jid = Jid, host_type = HostType}) ->
maybe_notify_unacknowledged_msg(Acc, #state{jid = Jid}) ->
case mongoose_acc:stanza_name(Acc) of
<<"message">> -> notify_unacknowledged_msg(HostType, Acc, Jid);
<<"message">> -> notify_unacknowledged_msg(Acc, Jid);
_ -> Acc
end.

notify_unacknowledged_msg(HostType, Acc, Jid) ->
NewAcc = mongoose_hooks:unacknowledged_message(HostType, Acc, Jid),
notify_unacknowledged_msg(Acc, Jid) ->
NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid),
mongoose_acc:strip(NewAcc).

finish_state(ok, StateName, StateData) ->
Expand Down
126 changes: 31 additions & 95 deletions src/event_pusher/mod_event_pusher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
-behaviour(gen_mod).
-behaviour(mongoose_module_metrics).

-include("jlib.hrl").
-include("mod_event_pusher_events.hrl").
-include("mongoose_config_spec.hrl").

-type backend() :: atom().
-type event() :: #user_status_event{} | #chat_event{} | #unack_msg_event{}.
-export_type([event/0]).

-export([deps/2, start/2, stop/1, config_spec/0, push_event/3]).
-export([deps/2, start/2, stop/1, config_spec/0, push_event/2]).

-export([config_metrics/1]).

Expand All @@ -37,127 +37,63 @@
%% Callbacks
%%--------------------------------------------------------------------

-callback push_event(Acc :: mongoose_acc:t(), Host :: jid:lserver(), Event :: event()) -> mongoose_acc:t().
-callback push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------

%% @doc Pushes the event to each backend registered with the event_pusher.
-spec push_event(mongoose_acc:t(), Host :: jid:server(), Event :: event()) -> mongoose_acc:t().
push_event(Acc, Host, Event) ->
lists:foldl(fun(B, Acc0) ->
B:push_event(Acc0, Host, Event) end,
Acc,
ets:lookup_element(ets_name(Host), backends, 2)).
-spec push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().
push_event(Acc, Event) ->
HostType = mongoose_acc:host_type(Acc),
Backends = maps:keys(gen_mod:get_loaded_module_opts(HostType, ?MODULE)),
lists:foldl(fun(B, Acc0) -> B:push_event(Acc0, Event) end, Acc, Backends).

%%--------------------------------------------------------------------
%% gen_mod API
%%--------------------------------------------------------------------

-spec deps(Host :: jid:server(), Opts :: proplists:proplist()) -> gen_mod_deps:deps().
deps(_Host, Opts) ->
Backends = get_backends(Opts),
[{B, DepOpts, hard} || {B, DepOpts} <- Backends].
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
deps(_HostType, Opts) ->
[{translate_backend(Backend), BackendOpts, hard} ||
{Backend, BackendOpts} <- maps:to_list(Opts)].

-spec start(Host :: jid:server(), Opts :: proplists:proplist()) -> any().
start(Host, Opts) ->
create_ets(Host),
Backends = get_backends(Opts),
ets:insert(ets_name(Host), {backends, [B || {B, _} <- Backends]}),
mod_event_pusher_hook_translator:add_hooks(Host).
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, _Opts) ->
mod_event_pusher_hook_translator:add_hooks(HostType).

-spec stop(Host :: jid:server()) -> any().
stop(Host) ->
mod_event_pusher_hook_translator:delete_hooks(Host),
ets:delete(ets_name(Host)).
-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
mod_event_pusher_hook_translator:delete_hooks(HostType).

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
BackendItems = [{atom_to_binary(B, utf8),
(translate_backend(B)):config_spec()} || B <- all_backends()],
#section{
items = #{<<"backend">> => #section{items = maps:from_list(BackendItems),
wrap = {kv, backends}}
}
}.
#section{items = maps:from_list(BackendItems),
format_items = map}.

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------

-spec get_backends(Opts :: proplists:proplist()) -> [{module(), proplists:proplist()}].
get_backends(Opts) ->
{backends, Backends0} = lists:keyfind(backends, 1, Opts),
lists:foldr(fun add_backend/2, [], Backends0).

add_backend({http, Opts}, BackendList) ->
% http backend is treated somewhat differently - we allow configuration settings as if there
% were many modules, while here we put together a single list of settings for the http event
% pusher module. Thus, you can configure event pusher like:
%{mod_event_pusher,
% [{backends,
% [{http,
% [{path, "/push_here"},
% {callback_module, mod_event_pusher_http_one},
% {pool_name, http_pool}]
% },
% {http,
% [{path, "/push_there"},
% {callback_module, mod_event_pusher_http_two},
% {pool_name, http_pool}]
% }
% ]
% }]
HttpModName = translate_backend(http),
case lists:keyfind(HttpModName, 1, BackendList) of
false ->
[{HttpModName, [{configs, [Opts]}]} | BackendList];
{HttpModName, [{configs, O}]} ->
lists:keyreplace(HttpModName, 1, BackendList,
{HttpModName, [{configs, [Opts | O]}]})
end;
add_backend({Mod, Opts}, BackendList) ->
[{translate_backend(Mod), Opts} | BackendList].

-spec translate_backend(Backend :: atom()) -> module().
-spec translate_backend(backend()) -> module().
translate_backend(Backend) ->
list_to_atom(?MODULE_STRING ++ "_" ++ atom_to_list(Backend)).

-spec ets_name(Host :: jid:server()) -> atom().
ets_name(Host) ->
gen_mod:get_module_proc(Host, ?MODULE).

-spec create_ets(Host :: jid:server()) -> any().
create_ets(Host) ->
Self = self(),
Heir = case whereis(ejabberd_sup) of
undefined -> none;
Self -> none;
Pid -> Pid
end,
ets:new(ets_name(Host), [public, named_table, {read_concurrency, true}, {heir, Heir, testing}]).

config_metrics(Host) ->
try
Opts = gen_mod:get_module_opts(Host, ?MODULE),
BackendsWithOpts = proplists:get_value(backends, Opts, none),
Backends = proplists:get_keys(BackendsWithOpts),
ReturnList = lists:map(pa:bind(fun get_backend/2, BackendsWithOpts), Backends),
lists:flatten(ReturnList)
catch
_:_ -> [{none, none}]
config_metrics(HostType) ->
case gen_mod:get_loaded_module_opts(HostType, ?MODULE) of
Empty when Empty =:= #{} ->
[{none, none}];
Opts ->
lists:flatmap(fun get_backend/1, maps:to_list(Opts))
end.

get_backend(BackendsWithOpts, Backend) ->
case Backend of
push ->
PushOptions = proplists:get_value(push, BackendsWithOpts),
PushBackend = atom_to_list(proplists:get_value(backend, PushOptions, mnesia)),
[{backend, push}, {backend, list_to_atom("push_" ++ PushBackend)}];
Backend ->
{backend, Backend}
end.
get_backend({push, #{backend := PushBackend}}) ->
[{backend, push}, {backend, list_to_atom("push_" ++ PushBackend)}];
get_backend({Backend, _}) ->
[{backend, Backend}].

all_backends() ->
[sns, push, http, rabbit].
18 changes: 9 additions & 9 deletions src/event_pusher/mod_event_pusher_hook_translator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ delete_hooks(HostType) ->
(routing_data()) -> routing_data().
filter_local_packet(drop) ->
drop;
filter_local_packet({From, To = #jid{lserver = Host}, Acc0, Packet}) ->
filter_local_packet({From, To, Acc0, Packet}) ->
Acc = case chat_type(Acc0) of
false -> Acc0;
Type ->
Event = #chat_event{type = Type, direction = out,
from = From, to = To, packet = Packet},
NewAcc = mod_event_pusher:push_event(Acc0, Host, Event),
NewAcc = mod_event_pusher:push_event(Acc0, Event),
merge_acc(Acc0, NewAcc)
end,
{From, To, Acc, Packet}.
Expand All @@ -70,7 +70,7 @@ user_send_packet(Acc, From, To, Packet = #xmlel{name = <<"message">>}) ->
Type ->
Event = #chat_event{type = Type, direction = in,
from = From, to = To, packet = Packet},
NewAcc = mod_event_pusher:push_event(Acc, From#jid.lserver, Event),
NewAcc = mod_event_pusher:push_event(Acc, Event),
merge_acc(Acc, NewAcc)
end;
user_send_packet(Acc, _From, _To, _Packet) ->
Expand All @@ -79,20 +79,20 @@ user_send_packet(Acc, _From, _To, _Packet) ->
-spec user_present(mongoose_acc:t(), UserJID :: jid:jid()) -> mongoose_acc:t().
user_present(Acc, #jid{} = UserJID) ->
Event = #user_status_event{jid = UserJID, status = online},
NewAcc = mod_event_pusher:push_event(Acc, UserJID#jid.lserver, Event),
NewAcc = mod_event_pusher:push_event(Acc, Event),
merge_acc(Acc, NewAcc).

-spec user_not_present(mongoose_acc:t(), User :: jid:luser(), Server :: jid:lserver(),
Resource :: jid:lresource(), Status :: any()) -> mongoose_acc:t().
user_not_present(Acc, LUser, LHost, LResource, _Status) ->
UserJID = jid:make_noprep(LUser, LHost, LResource),
user_not_present(Acc, LUser, LServer, LResource, _Status) ->
UserJID = jid:make_noprep(LUser, LServer, LResource),
Event = #user_status_event{jid = UserJID, status = offline},
NewAcc = mod_event_pusher:push_event(Acc, LHost, Event),
NewAcc = mod_event_pusher:push_event(Acc, Event),
merge_acc(Acc, NewAcc).

unacknowledged_message(Acc, #jid{lserver = Server} = Jid) ->
unacknowledged_message(Acc, Jid) ->
Event = #unack_msg_event{to = Jid},
NewAcc = mod_event_pusher:push_event(Acc, Server, Event),
NewAcc = mod_event_pusher:push_event(Acc, Event),
merge_acc(Acc, NewAcc).

%%--------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 1a6b934

Please sign in to comment.