From 772f587f8bc194a036aed627033b649914dc9292 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 9 Aug 2022 15:50:57 +0200 Subject: [PATCH 1/3] Use wpool for notify_loop in mod_pubsub --- src/pubsub/mod_pubsub.erl | 121 +++++++++++++-------------- test/common/config_parser_helper.erl | 8 +- test/config_parser_SUITE.erl | 1 + 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/src/pubsub/mod_pubsub.erl b/src/pubsub/mod_pubsub.erl index 9839c4fc18..adf45d6ce6 100644 --- a/src/pubsub/mod_pubsub.erl +++ b/src/pubsub/mod_pubsub.erl @@ -105,10 +105,11 @@ %% packet handler export -export([process_packet/5]). --export([send_loop/1]). - -export([config_metrics/1]). +%% Private export for wpool worker callbacks +-export([handle_msg/3]). + -define(MOD_PUBSUB_DB_BACKEND, mod_pubsub_db_backend). -ignore_xref([ {?MOD_PUBSUB_DB_BACKEND, transaction, 2}, @@ -126,7 +127,8 @@ on_user_offline/5, out_subscription/4, plugin/2, plugin/1, presence_probe/4, publish_item/6, remove_user/3, send_items/7, serverhost/1, start_link/2, string_to_affiliation/1, string_to_subscription/1, subscribe_node/5, - subscription_to_string/1, tree_action/3, unsubscribe_node/5 + subscription_to_string/1, tree_action/3, unsubscribe_node/5, + handle_msg/3 ]). -export_type([ @@ -252,11 +254,16 @@ start(Host, Opts) -> ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 1000, worker, [?MODULE]}, ensure_metrics(Host), + start_pool(Host, Opts), ejabberd_sup:start_child(ChildSpec). +start_pool(HostType, #{wpool := WpoolOpts}) -> + {ok, _} = mongoose_wpool:start(generic, HostType, pubsub_notify, maps:to_list(WpoolOpts)). + stop(Host) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), gen_server:call(Proc, stop), + mongoose_wpool:stop(generic, Host, pubsub_notify), ejabberd_sup:stop_child(Proc). -spec config_spec() -> mongoose_config_spec:config_section(). @@ -286,7 +293,8 @@ config_spec() -> format_items = map}, <<"default_node_config">> => default_node_config_spec(), <<"item_publisher">> => #option{type = boolean}, - <<"sync_broadcast">> => #option{type = boolean} + <<"sync_broadcast">> => #option{type = boolean}, + <<"wpool">> => wpool_spec() }, defaults = #{<<"iqdisc">> => one_queue, <<"host">> => default_host(), @@ -303,6 +311,10 @@ config_spec() -> <<"sync_broadcast">> => false} }. +wpool_spec() -> + Wpool = mongoose_config_spec:wpool(#{}), + Wpool#section{include = always}. + pep_mapping_config_spec() -> #section{ items = #{<<"namespace">> => #option{type = binary, @@ -392,7 +404,7 @@ init([ServerHost, Opts = #{host := SubdomainPattern}]) -> false -> ok end, - {_, State} = init_send_loop(ServerHost, Opts, Plugins), + State = init_state(ServerHost, Opts, Plugins), %% Pass State as extra into ?MODULE:process_packet/5 function PacketHandler = mongoose_packet_handler:new(?MODULE, #{state => State}), @@ -452,33 +464,18 @@ delete_pep_iq_handlers(ServerHost) -> gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB), gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER). -init_send_loop(ServerHost) -> - Plugins = gen_mod:get_module_opt(ServerHost, ?MODULE, plugins), - init_send_loop(ServerHost, gen_mod:get_module_opts(ServerHost, ?MODULE), Plugins). - -init_send_loop(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode, - pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline, - access_createnode := Access}, - Plugins) -> +init_state(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode, + pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline, + access_createnode := Access, plugins := Plugins}, Plugins) -> HostType = host_to_host_type(ServerHost), NodeTree = tree(HostType), Host = host(HostType, ServerHost), - State = #state{host = Host, server_host = ServerHost, - access = Access, pep_mapping = PepMapping, - ignore_pep_from_offline = PepOffline, - last_item_cache = LastItemCache, - max_items_node = MaxItemsNode, nodetree = NodeTree, - plugins = Plugins }, - Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME), - Pid = case whereis(Proc) of - undefined -> - SendLoop = spawn(?MODULE, send_loop, [State]), - register(Proc, SendLoop), - SendLoop; - Loop -> - Loop - end, - {Pid, State}. + #state{host = Host, server_host = ServerHost, + access = Access, pep_mapping = PepMapping, + ignore_pep_from_offline = PepOffline, + last_item_cache = LastItemCache, + max_items_node = MaxItemsNode, nodetree = NodeTree, + plugins = Plugins }. %% @doc Call the init/1 function for each plugin declared in the config file. %% The default plugin module is implicit. @@ -517,28 +514,30 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) -> gen_pubsub_nodetree:terminate(TreePlugin, Host, ServerHost), ok. -send_loop(State) -> - receive - {send_last_pubsub_items, Recipient} -> - send_last_pubsub_items(Recipient, State), - send_loop(State); - {send_last_pep_items, Recipient, Pid} -> - send_last_pep_items(Recipient, Pid, State), - send_loop(State); - {send_last_items_from_owner, NodeOwner, Recipient} -> - send_last_items_from_owner(State#state.host, NodeOwner, Recipient), - send_loop(State); - stop -> - ok - end. -send_last_pubsub_items(Recipient, #state{host = Host, plugins = Plugins}) - when is_list(Plugins) -> - lists:foreach( - fun(PluginType) -> - send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) - end, - Plugins). +%% Currently HostType = ServerHost always (pubsub does not support dynamic domains yet) +notify_worker(HostType, ServerHost, LUser, Request) -> + %% Signature: mongoose_wpool:cast(PoolType, HostType, Tag, HashKey, Request). + mongoose_wpool:cast(generic, HostType, pubsub_notify, LUser, + {?MODULE, handle_msg, [HostType, ServerHost, Request]}). + +handle_msg(HostType, ServerHost, {send_last_pubsub_items, Recipient}) -> + %% Get subdomain + Host = host(HostType, ServerHost), + Plugins = gen_mod:get_module_opt(HostType, ?MODULE, plugins), + send_last_pubsub_items(Host, Recipient, Plugins); +handle_msg(HostType, ServerHost, {send_last_pep_items, RecipientJID, RecipientPid}) -> + Host = host(HostType, ServerHost), + IgnorePepFromOffline = gen_mod:get_module_opt(HostType, ?MODULE, ignore_pep_from_offline), + send_last_pep_items(Host, IgnorePepFromOffline, RecipientJID, RecipientPid); +handle_msg(HostType, ServerHost, {send_last_items_from_owner, NodeOwner, RecipientJID}) -> + Host = host(HostType, ServerHost), + send_last_items_from_owner(Host, NodeOwner, RecipientJID). + +send_last_pubsub_items(Host, Recipient, Plugins) -> + lists:foreach(fun(PluginType) -> + send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) + end, Plugins). send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) -> JIDs = [Recipient, jid:to_lower(Recipient), jid:to_bare(Recipient)], @@ -549,8 +548,7 @@ send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) -> end, lists:usort(Subs)). -send_last_pep_items(RecipientJID, RecipientPid, - #state{host = Host, ignore_pep_from_offline = IgnorePepFromOffline}) -> +send_last_pep_items(Host, IgnorePepFromOffline, RecipientJID, RecipientPid) -> RecipientLJID = jid:to_lower(RecipientJID), [send_last_item_to_jid(NodeOwnerJID, Node, RecipientLJID) || NodeOwnerJID <- get_contacts_for_sending_last_item(RecipientPid, IgnorePepFromOffline), @@ -785,23 +783,16 @@ handle_remote_hook(HandlerState, _, _, _) -> %% presence hooks handling functions %% -caps_recognised(Acc, #jid{ lserver = S } = JID, Pid, _Features) -> - notify_send_loop(S, {send_last_pep_items, JID, Pid}), +caps_recognised(Acc, #jid{ luser = U, lserver = S } = JID, Pid, _Features) -> + notify_worker(S, S, U, {send_last_pep_items, JID, Pid}), Acc. -presence_probe(Acc, #jid{luser = _U, lserver = S, lresource = _R} = JID, JID, _Pid) -> - notify_send_loop(S, {send_last_pubsub_items, _Recipient = JID}), +presence_probe(Acc, #jid{luser = U, lserver = S, lresource = _R} = JID, JID, _Pid) -> + notify_worker(S, S, U, {send_last_pubsub_items, _Recipient = JID}), Acc; presence_probe(Acc, _Host, _JID, _Pid) -> Acc. -notify_send_loop(ServerHost, Action) -> - {SendLoop, _} = case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of - undefined -> init_send_loop(ServerHost); - Pid -> {Pid, undefined} - end, - SendLoop ! Action. - %% ------- %% subscription hooks handling functions %% @@ -811,13 +802,13 @@ notify_send_loop(ServerHost, Action) -> ToJID :: jid:jid(), Type :: mod_roster:sub_presence()) -> mongoose_acc:t(). -out_subscription(Acc, #jid{lserver = LServer} = FromJID, ToJID, subscribed) -> +out_subscription(Acc, #jid{lserver = LServer, luser = LUser} = FromJID, ToJID, subscribed) -> {PUser, PServer, PResource} = jid:to_lower(ToJID), PResources = case PResource of <<>> -> user_resources(PUser, PServer); _ -> [PResource] end, - notify_send_loop(LServer, {send_last_items_from_owner, FromJID, {PUser, PServer, PResources}}), + notify_worker(LServer, LServer, LUser, {send_last_items_from_owner, FromJID, {PUser, PServer, PResources}}), Acc; out_subscription(Acc, _, _, _) -> Acc. diff --git a/test/common/config_parser_helper.erl b/test/common/config_parser_helper.erl index ab0e9cc688..5ee2864beb 100644 --- a/test/common/config_parser_helper.erl +++ b/test/common/config_parser_helper.erl @@ -469,7 +469,8 @@ all_modules() -> last_item_cache => mnesia, max_items_node => 1000, pep_mapping => #{<<"urn:xmpp:microblog:0">> => <<"mb">>}, - plugins => [<<"flat">>, <<"pep">>]}), + plugins => [<<"flat">>, <<"pep">>], + wpool => default_config([modules, mod_pubsub, wpool])}), mod_version => mod_config(mod_version, #{os_info => true}), mod_auth_token => #{backend => rdbms, validity_period => #{access => #{unit => minutes, value => 13}, @@ -986,7 +987,8 @@ default_mod_config(mod_pubsub) -> #{iqdisc => one_queue, host => {prefix, <<"pubsub.">>}, backend => mnesia, access_createnode => all, max_items_node => 10, nodetree => nodetree_tree, ignore_pep_from_offline => true, last_item_cache => false, plugins => [<<"flat">>], pep_mapping => #{}, - default_node_config => [], item_publisher => false, sync_broadcast => false}; + default_node_config => [], item_publisher => false, sync_broadcast => false, + wpool => default_config([modules, mod_pubsub, wpool])}; default_mod_config(mod_push_service_mongoosepush) -> #{pool_name => undefined, api_version => <<"v3">>, max_http_connections => 100}; default_mod_config(mod_register) -> @@ -1133,6 +1135,8 @@ default_config([modules, mod_event_pusher, push]) -> virtual_pubsub_hosts => []}; default_config([modules, mod_event_pusher, push, wpool]) -> (default_wpool_opts())#{strategy := available_worker}; +default_config([modules, mod_pubsub, wpool]) -> + default_wpool_opts(); default_config([modules, mod_event_pusher, rabbit] = P) -> #{presence_exchange => default_config(P ++ [presence_exchange]), chat_msg_exchange => default_config(P ++ [chat_msg_exchange]), diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 7e374b0077..2e49c9b66f 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -2464,6 +2464,7 @@ mod_pubsub(_Config) -> T(#{<<"item_publisher">> => true})), ?cfgh(P ++ [sync_broadcast], false, T(#{<<"sync_broadcast">> => false})), + test_wpool(P ++ [wpool], fun(Opts) -> T(#{<<"wpool">> => Opts}) end), ?errh(T(#{<<"host">> => <<"">>})), ?errh(T(#{<<"host">> => <<"is this a host? no.">>})), ?errh(T(#{<<"host">> => [<<"invalid.sub@HOST@">>]})), From 3dbf5f827e27574d96df5c67ba4d21ba04d00ddf Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 17 Aug 2022 11:15:05 +0200 Subject: [PATCH 2/3] Get all options before calling the worker --- src/pubsub/mod_pubsub.erl | 49 +++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/src/pubsub/mod_pubsub.erl b/src/pubsub/mod_pubsub.erl index adf45d6ce6..aa6259e8fe 100644 --- a/src/pubsub/mod_pubsub.erl +++ b/src/pubsub/mod_pubsub.erl @@ -108,7 +108,7 @@ -export([config_metrics/1]). %% Private export for wpool worker callbacks --export([handle_msg/3]). +-export([handle_msg/1]). -define(MOD_PUBSUB_DB_BACKEND, mod_pubsub_db_backend). -ignore_xref([ @@ -128,9 +128,11 @@ publish_item/6, remove_user/3, send_items/7, serverhost/1, start_link/2, string_to_affiliation/1, string_to_subscription/1, subscribe_node/5, subscription_to_string/1, tree_action/3, unsubscribe_node/5, - handle_msg/3 + handle_msg/1 ]). +-type plugin_name() :: binary(). + -export_type([ host/0, hostPubsub/0, @@ -216,7 +218,7 @@ max_subscriptions_node = undefined, default_node_config = [], nodetree = ?STDTREE, - plugins = [?STDNODE] + plugins = [?STDNODE] :: [plugin_name()] }). -type(state() :: @@ -466,7 +468,7 @@ delete_pep_iq_handlers(ServerHost) -> init_state(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode, pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline, - access_createnode := Access, plugins := Plugins}, Plugins) -> + access_createnode := Access}, Plugins) -> HostType = host_to_host_type(ServerHost), NodeTree = tree(HostType), Host = host(HostType, ServerHost), @@ -514,25 +516,16 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) -> gen_pubsub_nodetree:terminate(TreePlugin, Host, ServerHost), ok. +notify_worker(HostType, HashKey, Request) -> + mongoose_wpool:cast(generic, HostType, pubsub_notify, HashKey, + {?MODULE, handle_msg, [Request]}). -%% Currently HostType = ServerHost always (pubsub does not support dynamic domains yet) -notify_worker(HostType, ServerHost, LUser, Request) -> - %% Signature: mongoose_wpool:cast(PoolType, HostType, Tag, HashKey, Request). - mongoose_wpool:cast(generic, HostType, pubsub_notify, LUser, - {?MODULE, handle_msg, [HostType, ServerHost, Request]}). - -handle_msg(HostType, ServerHost, {send_last_pubsub_items, Recipient}) -> - %% Get subdomain - Host = host(HostType, ServerHost), - Plugins = gen_mod:get_module_opt(HostType, ?MODULE, plugins), +handle_msg({send_last_pubsub_items, Host, Recipient, Plugins}) -> send_last_pubsub_items(Host, Recipient, Plugins); -handle_msg(HostType, ServerHost, {send_last_pep_items, RecipientJID, RecipientPid}) -> - Host = host(HostType, ServerHost), - IgnorePepFromOffline = gen_mod:get_module_opt(HostType, ?MODULE, ignore_pep_from_offline), +handle_msg({send_last_pep_items, Host, IgnorePepFromOffline, RecipientJID, RecipientPid}) -> send_last_pep_items(Host, IgnorePepFromOffline, RecipientJID, RecipientPid); -handle_msg(HostType, ServerHost, {send_last_items_from_owner, NodeOwner, RecipientJID}) -> - Host = host(HostType, ServerHost), - send_last_items_from_owner(Host, NodeOwner, RecipientJID). +handle_msg({send_last_items_from_owner, Host, NodeOwner, RecipientInfo}) -> + send_last_items_from_owner(Host, NodeOwner, RecipientInfo). send_last_pubsub_items(Host, Recipient, Plugins) -> lists:foreach(fun(PluginType) -> @@ -784,11 +777,16 @@ handle_remote_hook(HandlerState, _, _, _) -> %% caps_recognised(Acc, #jid{ luser = U, lserver = S } = JID, Pid, _Features) -> - notify_worker(S, S, U, {send_last_pep_items, JID, Pid}), + Host = host(S, S), + IgnorePepFromOffline = gen_mod:get_module_opt(S, ?MODULE, ignore_pep_from_offline), + notify_worker(S, U, {send_last_pep_items, Host, IgnorePepFromOffline, JID, Pid}), Acc. presence_probe(Acc, #jid{luser = U, lserver = S, lresource = _R} = JID, JID, _Pid) -> - notify_worker(S, S, U, {send_last_pubsub_items, _Recipient = JID}), + %% Get subdomain + Host = host(S, S), + Plugins = plugins(S), + notify_worker(S, U, {send_last_pubsub_items, Host, _Recipient = JID, Plugins}), Acc; presence_probe(Acc, _Host, _JID, _Pid) -> Acc. @@ -808,7 +806,8 @@ out_subscription(Acc, #jid{lserver = LServer, luser = LUser} = FromJID, ToJID, s <<>> -> user_resources(PUser, PServer); _ -> [PResource] end, - notify_worker(LServer, LServer, LUser, {send_last_items_from_owner, FromJID, {PUser, PServer, PResources}}), + Host = host(LServer, LServer), + notify_worker(LServer, LUser, {send_last_items_from_owner, Host, FromJID, {PUser, PServer, PResources}}), Acc; out_subscription(Acc, _, _, _) -> Acc. @@ -4114,11 +4113,11 @@ tree_mod(<<"virtual">>) -> tree_mod(Name) -> binary_to_atom(<<"nodetree_", Name/binary>>, utf8). --spec plugin(Name :: binary()) -> module(). +-spec plugin(Name :: plugin_name()) -> module(). plugin(Name) -> binary_to_atom(<<"node_", Name/binary>>, utf8). --spec plugins(ServerHost :: mongooseim:domain_name()) -> list(). +-spec plugins(ServerHost :: mongooseim:domain_name()) -> [plugin_name()]. plugins(ServerHost) -> Proc = gen_mod:get_module_proc(ServerHost, ?PROCNAME), gen_server:call(Proc, plugins). From 8a4f083f381ec682c5a0cfd7c2eb427178c2b826 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 22 Aug 2022 10:49:23 +0200 Subject: [PATCH 3/3] Address the review comments --- src/pubsub/mod_pubsub.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pubsub/mod_pubsub.erl b/src/pubsub/mod_pubsub.erl index aa6259e8fe..aada409c59 100644 --- a/src/pubsub/mod_pubsub.erl +++ b/src/pubsub/mod_pubsub.erl @@ -466,6 +466,8 @@ delete_pep_iq_handlers(ServerHost) -> gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB), gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER). +%% Plugins is a subset of configured plugins which are able to start +%% TODO Evaluate if we should just use plugins from the config instead init_state(ServerHost, #{last_item_cache := LastItemCache, max_items_node := MaxItemsNode, pep_mapping := PepMapping, ignore_pep_from_offline := PepOffline, access_createnode := Access}, Plugins) -> @@ -528,9 +530,8 @@ handle_msg({send_last_items_from_owner, Host, NodeOwner, RecipientInfo}) -> send_last_items_from_owner(Host, NodeOwner, RecipientInfo). send_last_pubsub_items(Host, Recipient, Plugins) -> - lists:foreach(fun(PluginType) -> - send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) - end, Plugins). + F = fun(PluginType) -> send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) end, + lists:foreach(F, Plugins). send_last_pubsub_items_for_plugin(Host, PluginType, Recipient) -> JIDs = [Recipient, jid:to_lower(Recipient), jid:to_bare(Recipient)], @@ -4120,6 +4121,7 @@ plugin(Name) -> -spec plugins(ServerHost :: mongooseim:domain_name()) -> [plugin_name()]. plugins(ServerHost) -> Proc = gen_mod:get_module_proc(ServerHost, ?PROCNAME), + %% TODO This call could be replaced with persistent terms gen_server:call(Proc, plugins). config(ServerHost, Key) ->