Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C2s/Do not route broadcast tuples #3946

Merged
merged 7 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions src/admin_extra/service_admin_extra_roster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
_Group}.
-type subs() :: atom() | binary().
-type push_action() :: remove
| none
| {add, Nick :: binary(), Subs :: subs(),
Group :: binary() | string()}.

Expand Down Expand Up @@ -359,8 +358,8 @@ build_list_users(Group, [{User, Server}|Users], Res) ->
push_roster_item(JID, #jid{luser = U, lserver = S} = RemJID, Action) ->
lists:foreach(fun(R) ->
RJID = jid:replace_resource(JID, R),
BroadcastEl = build_broadcast(U, S, Action),
ejabberd_sm:route(RJID, RJID, BroadcastEl),
SubsAtom = action_to_subscription(Action),
mod_roster:broadcast_item(RJID, {U, S, <<>>}, SubsAtom),
Item = build_roster_item(RemJID, Action),
ResIQ = build_iq_roster_push(Item),
ejabberd_router:route(RJID, RJID, ResIQ)
Expand All @@ -377,10 +376,7 @@ build_roster_item(#jid{lresource = <<>>} = JID, {add, Nick, Subs, Group}) ->
build_roster_item(#jid{lresource = <<>>} = JID, remove) ->
#xmlel{ name = <<"item">>,
attrs = [{<<"jid">>, jid:to_binary(JID)},
{<<"subscription">>, <<"remove">>}]};
build_roster_item(#jid{} = JID, Action) ->
build_roster_item(jid:replace_resource(JID, <<>>), Action).

{<<"subscription">>, <<"remove">>}]}.

-spec build_iq_roster_push(jlib:xmlcdata() | exml:element()) -> exml:element().
build_iq_roster_push(Item) ->
Expand All @@ -390,15 +386,11 @@ build_iq_roster_push(Item) ->
attrs = [{<<"xmlns">>, ?NS_ROSTER}],
children = [Item]}] }.

-spec build_broadcast(U :: jid:user(), S :: jid:server(),
push_action()) -> ejabberd_c2s:broadcast().
build_broadcast(U, S, {add, _Nick, Subs, _Group}) ->
build_broadcast(U, S, list_to_existing_atom(binary_to_list(Subs)));
build_broadcast(U, S, remove) ->
build_broadcast(U, S, none);
%% Subs = both | from | to | none
build_broadcast(U, S, SubsAtom) when is_atom(SubsAtom) ->
{broadcast, {item, {U, S, <<"">>}, SubsAtom}}.
-spec action_to_subscription(push_action()) -> atom().
action_to_subscription({add, _Nick, Subs, _Group}) ->
list_to_existing_atom(binary_to_list(Subs));
action_to_subscription(remove) ->
none.

%%-----------------------------
%% Purge roster items
Expand Down
15 changes: 14 additions & 1 deletion src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-export([callback_mode/0, init/1, handle_event/4, terminate/3]).

%% utils
-export([start_link/2, start/2, stop/2, exit/2, async/3]).
-export([start_link/2, start/2, stop/2, exit/2, async/3, call/3, cast/3]).
-export([create_data/1, get_host_type/1, get_lserver/1, get_sid/1, get_jid/1,
get_mod_state/2, merge_mod_state/2, remove_mod_state/2,
get_ip/1, get_socket/1, get_lang/1, get_stream_id/1, hook_arg/5]).
Expand Down Expand Up @@ -959,6 +959,11 @@ generate_random_resource() ->

-spec hook_arg(data(), state(), terminate | gen_statem:event_type(), term(), term()) ->
mongoose_c2s_hooks:hook_params().
hook_arg(StateData, C2SState, EventType, #{event_tag := EventTag,
event_content := EventContent}, Reason) ->
#{c2s_data => StateData, c2s_state => C2SState,
event_type => EventType, event_tag => EventTag, event_content => EventContent,
reason => Reason};
hook_arg(StateData, C2SState, EventType, EventContent, Reason) ->
#{c2s_data => StateData, c2s_state => C2SState,
event_type => EventType, event_content => EventContent,
Expand Down Expand Up @@ -990,6 +995,14 @@ exit(Pid, Reason) ->
async(Pid, Fun, Args) ->
gen_statem:cast(Pid, {async, Fun, Args}).

-spec call(pid(), atom(), term()) -> term().
call(Pid, EventTag, EventContent) ->
gen_statem:call(Pid, #{event_tag => EventTag, event_content => EventContent}, 5000).

-spec cast(pid(), atom(), term()) -> ok.
cast(Pid, EventTag, EventContent) ->
gen_statem:cast(Pid, #{event_tag => EventTag, event_content => EventContent}).

-spec create_data(#{host_type := mongooseim:host_type(), jid := jid:jid()}) -> data().
create_data(#{host_type := HostType, jid := Jid}) ->
#c2s_data{host_type = HostType, jid = Jid}.
Expand Down
1 change: 1 addition & 0 deletions src/c2s/mongoose_c2s_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
-type params() :: #{c2s_data := mongoose_c2s:data(),
c2s_state := mongoose_c2s:state(),
event_type := undefined | gen_statem:event_type(),
event_tag => atom(),
event_content := undefined | term(),
reason := undefined | term()}.
-type result() :: gen_hook:hook_fn_ret(mongoose_acc:t()).
Expand Down
60 changes: 3 additions & 57 deletions src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
get_full_session_list/0,
register_iq_handler/3,
unregister_iq_handler/2,
force_update_presence/2,
user_resources/2,
get_session_pid/1,
get_session/1,
Expand Down Expand Up @@ -81,7 +80,7 @@
-export([do_filter/3]).
-export([do_route/4]).

-ignore_xref([do_filter/3, do_route/4, force_update_presence/2, get_unique_sessions_number/0,
-ignore_xref([do_filter/3, do_route/4, get_unique_sessions_number/0,
get_user_present_pids/2, start_link/0, user_resources/2, sm_backend/0]).

-include("mongoose.hrl").
Expand Down Expand Up @@ -146,17 +145,11 @@ start_link() ->
-spec route(From, To, Packet) -> Acc when
From :: jid:jid(),
To :: jid:jid(),
Packet :: exml:element() | mongoose_acc:t() | ejabberd_c2s:broadcast(),
Packet :: exml:element() | mongoose_acc:t(),
Acc :: mongoose_acc:t().
route(From, To, #xmlel{} = Packet) ->
Acc = new_acc(From, To, Packet),
route(From, To, Acc);
route(From, To, {broadcast, #xmlel{} = Payload}) ->
Acc = new_acc(From, To, Payload),
route(From, To, Acc, {broadcast, Payload});
route(From, To, {broadcast, Payload}) ->
Acc = new_acc(To),
route(From, To, Acc, {broadcast, Payload});
route(From, To, Acc) ->
route(From, To, Acc, mongoose_acc:element(Acc)).

Expand All @@ -170,24 +163,6 @@ new_acc(From, To = #jid{lserver = LServer}, Packet) ->
from_jid => From,
to_jid => To}).

-spec new_acc(jid:jid()) -> mongoose_acc:t().
new_acc(To = #jid{lserver = LServer}) ->
{ok, HostType} = mongoose_domain_api:get_domain_host_type(To#jid.lserver),
mongoose_acc:new(#{location => ?LOCATION,
host_type => HostType,
lserver => LServer,
element => undefined}).

route(From, To, Acc, {broadcast, Payload}) ->
try
do_route(Acc, From, To, {broadcast, Payload})
catch Class:Reason:Stacktrace ->
?LOG_ERROR(#{what => sm_route_failed,
text => <<"Failed to route broadcast in ejabberd_sm">>,
class => Class, reason => Reason, stacktrace => Stacktrace,
payload => Payload, acc => Acc}),
Acc
end;
route(From, To, Acc, El) ->
try
do_route(Acc, From, To, El)
Expand Down Expand Up @@ -632,28 +607,7 @@ do_filter(From, To, Packet) ->
Acc :: mongoose_acc:t(),
From :: jid:jid(),
To :: jid:jid(),
Payload :: exml:element() | ejabberd_c2s:broadcast().
do_route(Acc, From, To, {broadcast, Payload} = Broadcast) ->
?LOG_DEBUG(#{what => sm_route_broadcast, acc => Acc, payload => Payload}),
#jid{ luser = LUser, lserver = LServer, lresource = LResource} = To,
case LResource of
<<>> ->
CurrentPids = get_user_present_pids(LUser, LServer),
Acc1 = mongoose_hooks:sm_broadcast(Acc, From, To, Broadcast, length(CurrentPids)),
?LOG_DEBUG(#{what => sm_broadcast, session_pids => CurrentPids}),
BCast = {broadcast, Payload},
lists:foreach(fun({_, Pid}) -> Pid ! BCast end, CurrentPids),
Acc1;
_ ->
case get_session_pid(To) of
none ->
Acc; % do nothing
Pid when is_pid(Pid) ->
?LOG_DEBUG(#{what => sm_broadcast, session_pid => Pid}),
Pid ! Broadcast,
Acc
end
end;
Payload :: exml:element().
do_route(Acc, From, To, El) ->
?LOG_DEBUG(#{what => sm_route, acc => Acc}),
#jid{lresource = LResource} = To,
Expand Down Expand Up @@ -996,14 +950,6 @@ process_iq(_, From, To, Acc, Packet) ->
{Acc1, Err} = jlib:make_error_reply(Acc, Packet, mongoose_xmpp_errors:bad_request()),
ejabberd_router:route(To, From, Acc1, Err).


-spec force_update_presence(mongooseim:host_type(), {jid:luser(), jid:lserver()}) -> 'ok'.
force_update_presence(_HostType, {LUser, LServer}) ->
Ss = ejabberd_sm_backend:get_sessions(LUser, LServer),
lists:foreach(fun(#session{sid = {_, Pid}}) ->
Pid ! {force_update_presence, LUser}
end, Ss).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% ejabberd commands

Expand Down
4 changes: 2 additions & 2 deletions src/metrics/mongoose_metrics_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ get_hooks(HostType) ->
{privacy_iq_get, HostType, fun ?MODULE:privacy_iq_get/3, #{}, 1},
{privacy_iq_set, HostType, fun ?MODULE:privacy_iq_set/3, #{}, 1},
{privacy_check_packet, HostType, fun ?MODULE:privacy_check_packet/3, #{}, 55},
{sm_broadcast, HostType, fun ?MODULE:privacy_list_push/3, #{}, 1}
{privacy_list_push, HostType, fun ?MODULE:privacy_list_push/3, #{}, 1}
| c2s_hooks(HostType)].

-spec c2s_hooks(mongooseim:host_type()) -> gen_hook:hook_list(mongoose_c2s_hooks:fn()).
Expand Down Expand Up @@ -265,7 +265,7 @@ privacy_iq_set(Acc, #{iq := #iq{sub_el = SubEl}}, #{host_type := HostType}) ->
{ok, Acc}.

-spec privacy_list_push(Acc, Params, Extra) -> {ok, Acc} when
Acc :: mongoose_acc:t(),
Acc :: any(),
Params :: #{session_count := non_neg_integer()},
Extra :: #{host_type := mongooseim:host_type()}.
privacy_list_push(Acc, #{session_count := SessionCount}, #{host_type := HostType}) ->
Expand Down
15 changes: 9 additions & 6 deletions src/mod_presence.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ supported_features() ->

-spec get_presence(pid()) -> {jid:luser(), jid:lresource(), binary(), binary()}.
get_presence(Pid) ->
gen_statem:call(Pid, get_presence, 5000).
mongoose_c2s:call(Pid, ?MODULE, get_presence).

-spec get_subscribed(pid()) -> [jid:jid()].
get_subscribed(Pid) ->
gen_statem:call(Pid, get_subscribed, 5000).
mongoose_c2s:call(Pid, ?MODULE, get_subscribed).

-spec set_presence(pid(), exml:element()) -> ok.

set_presence(Pid, Message) ->
gen_statem:cast(Pid, {set_presence, Message}).
mongoose_c2s:cast(Pid, ?MODULE, {set_presence, Message}).

-spec c2s_hooks(mongooseim:host_type()) -> gen_hook:hook_list(mongoose_c2s_hooks:fn()).
c2s_hooks(HostType) ->
Expand Down Expand Up @@ -136,8 +135,9 @@ handle_user_terminate(Acc, StateData, Presences, Reason) ->
Extra :: gen_hook:extra(),
Result :: mongoose_c2s_hooks:result().
foreign_event(Acc, #{c2s_data := StateData,
event_type := info,
event_content := {broadcast, {item, IJID, ISubscription}}}, _Extra) ->
event_type := cast,
event_tag := mod_roster,
event_content := {item, IJID, ISubscription}}, _Extra) ->
case get_mod_state(StateData) of
{error, not_found} ->
{ok, Acc};
Expand All @@ -146,6 +146,7 @@ foreign_event(Acc, #{c2s_data := StateData,
end;
foreign_event(Acc, #{c2s_data := StateData,
event_type := {call, From},
event_tag := ?MODULE,
event_content := get_presence}, _Extra) ->
PresLast = case get_mod_state(StateData) of
{error, not_found} ->
Expand All @@ -159,6 +160,7 @@ foreign_event(Acc, #{c2s_data := StateData,
{stop, Acc1};
foreign_event(Acc, #{c2s_data := StateData,
event_type := {call, From},
event_tag := ?MODULE,
event_content := get_subscribed}, _Extra) ->
Subscribed = case get_mod_state(StateData) of
{error, not_found} ->
Expand All @@ -170,6 +172,7 @@ foreign_event(Acc, #{c2s_data := StateData,
{stop, Acc1};
foreign_event(Acc, #{c2s_data := StateData,
event_type := cast,
event_tag := ?MODULE,
event_content := {set_presence, Message}}, _Extra) ->
Acc1 = mongoose_acc:update_stanza(#{element => Message}, Acc),
{FromJid, ToJid, Packet} = mongoose_acc:packet(Acc1),
Expand Down
12 changes: 9 additions & 3 deletions src/mod_roster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
set_items/3,
set_roster_entry/5,
remove_from_roster/3,
item_to_xml/1
item_to_xml/1,
broadcast_item/3
]).

% Hook handlers
Expand Down Expand Up @@ -507,8 +508,7 @@ process_item_els(Item, [{xmlcdata, _} | Els]) ->
process_item_els(Item, []) -> Item.

push_item(HostType, JID, From, Item) ->
ejabberd_sm:route(jid:make_noprep(<<>>, <<>>, <<>>), JID,
{broadcast, {item, Item#roster.jid, Item#roster.subscription}}),
broadcast_item(JID, Item#roster.jid, Item#roster.subscription),
case roster_versioning_enabled(HostType) of
true ->
push_item_version(JID, From, Item, roster_version(HostType, JID));
Expand All @@ -519,6 +519,12 @@ push_item(HostType, JID, From, Item) ->
ejabberd_sm:get_user_resources(JID))
end.

-spec broadcast_item(jid:jid(), jid:simple_jid(), subscription_state()) -> ok.
broadcast_item(#jid{luser = LUser, lserver = LServer}, ContactJid, Subscription) ->
Item = {item, ContactJid, Subscription},
UserPids = ejabberd_sm:get_user_present_pids(LUser, LServer),
lists:foreach(fun({_, Pid}) -> mongoose_c2s:cast(Pid, ?MODULE, Item) end, UserPids).

push_item_without_version(HostType, JID, Resource, From, Item) ->
mongoose_hooks:roster_push(HostType, From, Item),
push_item_final(jid:replace_resource(JID, Resource), From, Item, not_found).
Expand Down
27 changes: 13 additions & 14 deletions src/mongoose_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
privacy_get_user_list/2,
privacy_iq_get/6,
privacy_iq_set/5,
privacy_updated_list/3]).
privacy_updated_list/3,
privacy_list_push/5]).

-export([offline_groupchat_message_hook/4,
offline_message_hook/4,
set_presence_hook/3,
sm_broadcast/5,
sm_filter_offline_message/4,
sm_register_connection_hook/4,
sm_remove_connection_hook/5,
Expand Down Expand Up @@ -564,6 +564,17 @@ privacy_updated_list(HostType, OldList, NewList) ->
Params = #{old_list => OldList, new_list => NewList},
run_hook_for_host_type(privacy_updated_list, HostType, false, Params).

-spec privacy_list_push(HostType, LUser, LServer, Item, SessionCount) -> Result when
HostType :: mongooseim:host_type(),
LUser :: jid:luser(),
LServer :: jid:lserver(),
Item :: term(),
SessionCount :: non_neg_integer(),
Result :: any().
privacy_list_push(HostType, LUser, LServer, Item, SessionCount) ->
Params = #{luse => LUser, lserver => LServer, item => Item, session_count => SessionCount},
run_hook_for_host_type(privacy_list_push, HostType, ok, Params).

%% Session management related hooks

-spec offline_groupchat_message_hook(Acc, From, To, Packet) -> Result when
Expand Down Expand Up @@ -598,18 +609,6 @@ set_presence_hook(Acc, JID, Presence) ->
HostType = mongoose_acc:host_type(Acc),
run_hook_for_host_type(set_presence_hook, HostType, Acc, Params).

-spec sm_broadcast(Acc, From, To, Broadcast, SessionCount) -> Result when
Acc :: mongoose_acc:t(),
From :: jid:jid(),
To :: jid:jid(),
Broadcast :: ejabberd_c2s:broadcast(),
SessionCount :: non_neg_integer(),
Result :: mongoose_acc:t().
sm_broadcast(Acc, From, To, Broadcast, SessionCount) ->
Params = #{from => From, to => To, broadcast => Broadcast, session_count => SessionCount},
HostType = mongoose_acc:host_type(Acc),
run_hook_for_host_type(sm_broadcast, HostType, Acc, Params).

-spec sm_filter_offline_message(HostType, From, To, Packet) -> Result when
HostType :: mongooseim:host_type(),
From :: jid:jid(),
Expand Down
17 changes: 10 additions & 7 deletions src/privacy/mod_blocking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ user_send_iq(Acc, #{c2s_data := StateData}, #{host_type := HostType}) ->
-spec foreign_event(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
foreign_event(Acc, #{c2s_data := StateData,
event_type := info,
event_content := {broadcast, {blocking, UserList, Action, Changed}}}, _Extra) ->
event_type := cast,
event_tag := ?MODULE,
event_content := {blocking, UserList, Action, Changed}}, _Extra) ->
{stop, handle_new_blocking_command(Acc, StateData, UserList, Action, Changed)};
foreign_event(Acc, _Params, _Extra) ->
{ok, Acc}.
Expand Down Expand Up @@ -221,9 +222,9 @@ complete_iq_set(blocking_command, Acc, _, _, {error, Reason}) ->
complete_iq_set(blocking_command, Acc, LUser, LServer, {ok, Changed, List, Type}) ->
UserList = #userlist{name = <<"blocking">>, list = List, needdb = false},
% send the list to all users c2s processes (resources) to make it effective immediately
Acc1 = broadcast_blocking_command(Acc, LUser, LServer, UserList, Changed, Type),
broadcast_blocking_command(Acc, LUser, LServer, UserList, Changed, Type),
% return a response here so that c2s sets the list in its state
{Acc1, {result, [], UserList}}.
{Acc, {result, [], UserList}}.
%%complete_iq_set(blocking_command, _, _, _) ->
%% {result, []}.

Expand Down Expand Up @@ -297,9 +298,11 @@ make_blocking_list_entry(J) ->
broadcast_blocking_command(Acc, LUser, LServer, UserList, _Changed, unblock_all) ->
broadcast_blocking_command(Acc, LUser, LServer, UserList, [], unblock);
broadcast_blocking_command(Acc, LUser, LServer, UserList, Changed, Type) ->
UserJID = jid:make_noprep(LUser, LServer, <<>>),
Bcast = {blocking, UserList, Type, Changed},
ejabberd_sm:route(UserJID, UserJID, Acc, {broadcast, Bcast}).
Item = {blocking, UserList, Type, Changed},
UserPids = ejabberd_sm:get_user_present_pids(LUser, LServer),
HostType = mongoose_acc:host_type(Acc),
mongoose_hooks:privacy_list_push(HostType, LUser, LServer, Item, length(UserPids)),
lists:foreach(fun({_, Pid}) -> mongoose_c2s:cast(Pid, ?MODULE, Item) end, UserPids).

-spec blocking_query_response([mod_privacy:list_name()]) -> exml:element().
blocking_query_response(Lst) ->
Expand Down
Loading