Skip to content

Commit

Permalink
Merge pull request #3726 from esl/pubsub-wpool
Browse files Browse the repository at this point in the history
Use wpool for notify_loop in mod_pubsub
  • Loading branch information
chrzaszcz authored Aug 23, 2022
2 parents 86e3cef + 8a4f083 commit 6ac0a55
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 70 deletions.
128 changes: 60 additions & 68 deletions src/pubsub/mod_pubsub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@
%% packet handler export
-export([process_packet/5]).

-export([send_loop/1]).

-export([config_metrics/1]).

%% Private export for wpool worker callbacks
-export([handle_msg/1]).

-define(MOD_PUBSUB_DB_BACKEND, mod_pubsub_db_backend).
-ignore_xref([
{?MOD_PUBSUB_DB_BACKEND, transaction, 2},
Expand All @@ -126,9 +127,12 @@
on_user_offline/5, out_subscription/4, plugin/2, plugin/1, presence_probe/4,
publish_item/6, remove_user/3, send_items/7, serverhost/1, start_link/2,
string_to_affiliation/1, string_to_subscription/1, subscribe_node/5,
subscription_to_string/1, tree_action/3, unsubscribe_node/5
subscription_to_string/1, tree_action/3, unsubscribe_node/5,
handle_msg/1
]).

-type plugin_name() :: binary().

-export_type([
host/0,
hostPubsub/0,
Expand Down Expand Up @@ -214,7 +218,7 @@
max_subscriptions_node = undefined,
default_node_config = [],
nodetree = ?STDTREE,
plugins = [?STDNODE]
plugins = [?STDNODE] :: [plugin_name()]
}).

-type(state() ::
Expand Down Expand Up @@ -252,11 +256,16 @@ start(Host, Opts) ->
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, 1000, worker, [?MODULE]},
ensure_metrics(Host),
start_pool(Host, Opts),
ejabberd_sup:start_child(ChildSpec).

start_pool(HostType, #{wpool := WpoolOpts}) ->
{ok, _} = mongoose_wpool:start(generic, HostType, pubsub_notify, maps:to_list(WpoolOpts)).

stop(Host) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:call(Proc, stop),
mongoose_wpool:stop(generic, Host, pubsub_notify),
ejabberd_sup:stop_child(Proc).

-spec config_spec() -> mongoose_config_spec:config_section().
Expand Down Expand Up @@ -286,7 +295,8 @@ config_spec() ->
format_items = map},
<<"default_node_config">> => default_node_config_spec(),
<<"item_publisher">> => #option{type = boolean},
<<"sync_broadcast">> => #option{type = boolean}
<<"sync_broadcast">> => #option{type = boolean},
<<"wpool">> => wpool_spec()
},
defaults = #{<<"iqdisc">> => one_queue,
<<"host">> => default_host(),
Expand All @@ -303,6 +313,10 @@ config_spec() ->
<<"sync_broadcast">> => false}
}.

wpool_spec() ->
Wpool = mongoose_config_spec:wpool(#{}),
Wpool#section{include = always}.

pep_mapping_config_spec() ->
#section{
items = #{<<"namespace">> => #option{type = binary,
Expand Down Expand Up @@ -392,7 +406,7 @@ init([ServerHost, Opts = #{host := SubdomainPattern}]) ->
false ->
ok
end,
{_, State} = init_send_loop(ServerHost, Opts, Plugins),
State = init_state(ServerHost, Opts, Plugins),

%% Pass State as extra into ?MODULE:process_packet/5 function
PacketHandler = mongoose_packet_handler:new(?MODULE, #{state => State}),
Expand Down Expand Up @@ -452,33 +466,20 @@ delete_pep_iq_handlers(ServerHost) ->
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER).

init_send_loop(ServerHost) ->
Plugins = gen_mod:get_module_opt(ServerHost, ?MODULE, plugins),
init_send_loop(ServerHost, gen_mod:get_module_opts(ServerHost, ?MODULE), Plugins).

init_send_loop(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode,
pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline,
access_createnode := Access},
Plugins) ->
%% Plugins is a subset of configured plugins which are able to start
%% TODO Evaluate if we should just use plugins from the config instead
init_state(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode,
pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline,
access_createnode := Access}, Plugins) ->
HostType = host_to_host_type(ServerHost),
NodeTree = tree(HostType),
Host = host(HostType, ServerHost),
State = #state{host = Host, server_host = ServerHost,
access = Access, pep_mapping = PepMapping,
ignore_pep_from_offline = PepOffline,
last_item_cache = LastItemCache,
max_items_node = MaxItemsNode, nodetree = NodeTree,
plugins = Plugins },
Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME),
Pid = case whereis(Proc) of
undefined ->
SendLoop = spawn(?MODULE, send_loop, [State]),
register(Proc, SendLoop),
SendLoop;
Loop ->
Loop
end,
{Pid, State}.
#state{host = Host, server_host = ServerHost,
access = Access, pep_mapping = PepMapping,
ignore_pep_from_offline = PepOffline,
last_item_cache = LastItemCache,
max_items_node = MaxItemsNode, nodetree = NodeTree,
plugins = Plugins }.

%% @doc Call the init/1 function for each plugin declared in the config file.
%% The default plugin module is implicit.
Expand Down Expand Up @@ -517,28 +518,20 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
gen_pubsub_nodetree:terminate(TreePlugin, Host, ServerHost),
ok.

send_loop(State) ->
receive
{send_last_pubsub_items, Recipient} ->
send_last_pubsub_items(Recipient, State),
send_loop(State);
{send_last_pep_items, Recipient, Pid} ->
send_last_pep_items(Recipient, Pid, State),
send_loop(State);
{send_last_items_from_owner, NodeOwner, Recipient} ->
send_last_items_from_owner(State#state.host, NodeOwner, Recipient),
send_loop(State);
stop ->
ok
end.
notify_worker(HostType, HashKey, Request) ->
mongoose_wpool:cast(generic, HostType, pubsub_notify, HashKey,
{?MODULE, handle_msg, [Request]}).

send_last_pubsub_items(Recipient, #state{host = Host, plugins = Plugins})
when is_list(Plugins) ->
lists:foreach(
fun(PluginType) ->
send_last_pubsub_items_for_plugin(Host, PluginType, Recipient)
end,
Plugins).
handle_msg({send_last_pubsub_items, Host, Recipient, Plugins}) ->
send_last_pubsub_items(Host, Recipient, Plugins);
handle_msg({send_last_pep_items, Host, IgnorePepFromOffline, RecipientJID, RecipientPid}) ->
send_last_pep_items(Host, IgnorePepFromOffline, RecipientJID, RecipientPid);
handle_msg({send_last_items_from_owner, Host, NodeOwner, RecipientInfo}) ->
send_last_items_from_owner(Host, NodeOwner, RecipientInfo).

send_last_pubsub_items(Host, Recipient, Plugins) ->
F = fun(PluginType) -> send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) end,
lists:foreach(F, Plugins).

send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) ->
JIDs = [Recipient, jid:to_lower(Recipient), jid:to_bare(Recipient)],
Expand All @@ -549,8 +542,7 @@ send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) ->
end,
lists:usort(Subs)).

send_last_pep_items(RecipientJID, RecipientPid,
#state{host = Host, ignore_pep_from_offline = IgnorePepFromOffline}) ->
send_last_pep_items(Host, IgnorePepFromOffline, RecipientJID, RecipientPid) ->
RecipientLJID = jid:to_lower(RecipientJID),
[send_last_item_to_jid(NodeOwnerJID, Node, RecipientLJID) ||
NodeOwnerJID <- get_contacts_for_sending_last_item(RecipientPid, IgnorePepFromOffline),
Expand Down Expand Up @@ -785,23 +777,21 @@ handle_remote_hook(HandlerState, _, _, _) ->
%% presence hooks handling functions
%%

caps_recognised(Acc, #jid{ lserver = S } = JID, Pid, _Features) ->
notify_send_loop(S, {send_last_pep_items, JID, Pid}),
caps_recognised(Acc, #jid{ luser = U, lserver = S } = JID, Pid, _Features) ->
Host = host(S, S),
IgnorePepFromOffline = gen_mod:get_module_opt(S, ?MODULE, ignore_pep_from_offline),
notify_worker(S, U, {send_last_pep_items, Host, IgnorePepFromOffline, JID, Pid}),
Acc.

presence_probe(Acc, #jid{luser = _U, lserver = S, lresource = _R} = JID, JID, _Pid) ->
notify_send_loop(S, {send_last_pubsub_items, _Recipient = JID}),
presence_probe(Acc, #jid{luser = U, lserver = S, lresource = _R} = JID, JID, _Pid) ->
%% Get subdomain
Host = host(S, S),
Plugins = plugins(S),
notify_worker(S, U, {send_last_pubsub_items, Host, _Recipient = JID, Plugins}),
Acc;
presence_probe(Acc, _Host, _JID, _Pid) ->
Acc.

notify_send_loop(ServerHost, Action) ->
{SendLoop, _} = case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of
undefined -> init_send_loop(ServerHost);
Pid -> {Pid, undefined}
end,
SendLoop ! Action.

%% -------
%% subscription hooks handling functions
%%
Expand All @@ -811,13 +801,14 @@ notify_send_loop(ServerHost, Action) ->
ToJID :: jid:jid(),
Type :: mod_roster:sub_presence()) ->
mongoose_acc:t().
out_subscription(Acc, #jid{lserver = LServer} = FromJID, ToJID, subscribed) ->
out_subscription(Acc, #jid{lserver = LServer, luser = LUser} = FromJID, ToJID, subscribed) ->
{PUser, PServer, PResource} = jid:to_lower(ToJID),
PResources = case PResource of
<<>> -> user_resources(PUser, PServer);
_ -> [PResource]
end,
notify_send_loop(LServer, {send_last_items_from_owner, FromJID, {PUser, PServer, PResources}}),
Host = host(LServer, LServer),
notify_worker(LServer, LUser, {send_last_items_from_owner, Host, FromJID, {PUser, PServer, PResources}}),
Acc;
out_subscription(Acc, _, _, _) ->
Acc.
Expand Down Expand Up @@ -4123,13 +4114,14 @@ tree_mod(<<"virtual">>) ->
tree_mod(Name) ->
binary_to_atom(<<"nodetree_", Name/binary>>, utf8).

-spec plugin(Name :: binary()) -> module().
-spec plugin(Name :: plugin_name()) -> module().
plugin(Name) ->
binary_to_atom(<<"node_", Name/binary>>, utf8).

-spec plugins(ServerHost :: mongooseim:domain_name()) -> list().
-spec plugins(ServerHost :: mongooseim:domain_name()) -> [plugin_name()].
plugins(ServerHost) ->
Proc = gen_mod:get_module_proc(ServerHost, ?PROCNAME),
%% TODO This call could be replaced with persistent terms
gen_server:call(Proc, plugins).

config(ServerHost, Key) ->
Expand Down
8 changes: 6 additions & 2 deletions test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ all_modules() ->
last_item_cache => mnesia,
max_items_node => 1000,
pep_mapping => #{<<"urn:xmpp:microblog:0">> => <<"mb">>},
plugins => [<<"flat">>, <<"pep">>]}),
plugins => [<<"flat">>, <<"pep">>],
wpool => default_config([modules, mod_pubsub, wpool])}),
mod_version => mod_config(mod_version, #{os_info => true}),
mod_auth_token => #{backend => rdbms,
validity_period => #{access => #{unit => minutes, value => 13},
Expand Down Expand Up @@ -986,7 +987,8 @@ default_mod_config(mod_pubsub) ->
#{iqdisc => one_queue, host => {prefix, <<"pubsub.">>}, backend => mnesia, access_createnode => all,
max_items_node => 10, nodetree => nodetree_tree, ignore_pep_from_offline => true,
last_item_cache => false, plugins => [<<"flat">>], pep_mapping => #{},
default_node_config => [], item_publisher => false, sync_broadcast => false};
default_node_config => [], item_publisher => false, sync_broadcast => false,
wpool => default_config([modules, mod_pubsub, wpool])};
default_mod_config(mod_push_service_mongoosepush) ->
#{pool_name => undefined, api_version => <<"v3">>, max_http_connections => 100};
default_mod_config(mod_register) ->
Expand Down Expand Up @@ -1133,6 +1135,8 @@ default_config([modules, mod_event_pusher, push]) ->
virtual_pubsub_hosts => []};
default_config([modules, mod_event_pusher, push, wpool]) ->
(default_wpool_opts())#{strategy := available_worker};
default_config([modules, mod_pubsub, wpool]) ->
default_wpool_opts();
default_config([modules, mod_event_pusher, rabbit] = P) ->
#{presence_exchange => default_config(P ++ [presence_exchange]),
chat_msg_exchange => default_config(P ++ [chat_msg_exchange]),
Expand Down
1 change: 1 addition & 0 deletions test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2469,6 +2469,7 @@ mod_pubsub(_Config) ->
T(#{<<"item_publisher">> => true})),
?cfgh(P ++ [sync_broadcast], false,
T(#{<<"sync_broadcast">> => false})),
test_wpool(P ++ [wpool], fun(Opts) -> T(#{<<"wpool">> => Opts}) end),
?errh(T(#{<<"host">> => <<"">>})),
?errh(T(#{<<"host">> => <<"is this a host? no.">>})),
?errh(T(#{<<"host">> => [<<"invalid.sub@HOST@">>]})),
Expand Down

0 comments on commit 6ac0a55

Please sign in to comment.