Skip to content

Commit

Permalink
Merge pull request #3060 from esl/prepared-queries-event-pusher
Browse files Browse the repository at this point in the history
Use prepared queries in mod_event_pusher_push:
* Introduced prepared queries
* Updated the backend API to remove an unused case that would require a separate prepared query
  • Loading branch information
DenysGonchar authored Mar 26, 2021
2 parents ac4aa5c + a031068 commit 8a4e707
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 96 deletions.
37 changes: 27 additions & 10 deletions big_tests/tests/push_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ groups() ->
pm_no_msg_notifications_if_user_online,
pm_msg_notify_if_user_offline,
pm_msg_notify_if_user_offline_with_publish_options,
pm_msg_notify_stops_after_disabling
pm_msg_notify_stops_after_disabling,
pm_msg_notify_stops_after_removal
]},
{muclight_msg_notifications, [parallel], [
muclight_no_msg_notifications_if_not_enabled,
Expand Down Expand Up @@ -580,6 +581,31 @@ pm_msg_notify_stops_after_disabling(Config) ->
ok
end).

pm_msg_notify_stops_after_removal(Config) ->
PubsubJID = pubsub_jid(Config),
escalus:story(
Config, [{bob, 1}],
fun(Bob) ->
%% Enable
escalus:send(Bob, enable_stanza(PubsubJID, <<"NodeId">>, [])),
escalus:assert(is_iq_result, escalus:wait_for_stanza(Bob)),

%% Remove account - this should disable the notifications
Pid = mongoose_helper:get_session_pid(Bob, distributed_helper:mim()),
escalus_connection:send(Bob, escalus_stanza:remove_account()),
escalus:assert(is_iq_result, escalus:wait_for_stanza(Bob)),
mongoose_helper:wait_for_pid_to_die(Pid)
end),
BobUser = lists:keyfind(bob, 1, escalus_config:get_config(escalus_users, Config)),
escalus_users:create_user(Config, BobUser),
escalus:story(
Config, [{bob, 1}, {alice, 1}],
fun(Bob, Alice) ->
become_unavailable(Bob),
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
?assert(not truly(received_push(Config)))
end).

%%--------------------------------------------------------------------
%% GROUP muclight_msg_notifications
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -823,12 +849,3 @@ pubsub_jid(Config) ->
room_name(Config) ->
CaseName = proplists:get_value(case_name, Config),
<<"room_", (atom_to_binary(CaseName, utf8))/binary>>.

is_offline(LUser, LServer) ->
case catch lists:max(rpc(ejabberd_sm, get_user_present_pids, [LUser, LServer])) of
{Priority, _} when is_integer(Priority), Priority >= 0 ->
false;
_ ->
true
end.

15 changes: 9 additions & 6 deletions src/event_pusher/mod_event_pusher_push.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@
Node :: pubsub_node(), Form :: form()) ->
ok | {error, Reason :: term()}.

-callback disable(UserJID :: jid:jid()) ->
ok | {error, Reason :: term()}.

-callback disable(UserJID :: jid:jid(), PubsubJID :: jid:jid(),
Node :: pubsub_node()) ->
Node :: pubsub_node() | undefined) ->
ok | {error, Reason :: term()}.

-callback get_publish_services(User :: jid:jid()) ->
Expand Down Expand Up @@ -144,8 +147,7 @@ push_event(Acc, _, _) ->
-spec remove_user(Acc :: mongoose_acc:t(), LUser :: binary(), LServer :: binary()) ->
mongoose_acc:t().
remove_user(Acc, LUser, LServer) ->
R = mod_event_pusher_push_backend:disable(jid:make_noprep(LUser, LServer, <<>>),
undefined, undefined),
R = mod_event_pusher_push_backend:disable(jid:make_noprep(LUser, LServer, <<>>)),
mongoose_lib:log_if_backend_error(R, ?MODULE, ?LINE, {Acc, LUser, LServer}),
Acc.

Expand Down Expand Up @@ -288,13 +290,14 @@ maybe_enable_node(#jid{lserver = Host} = From, BarePubSubJID, Node, FormFields,
store_session_info(Jid, Service) ->
ejabberd_sm:store_info(Jid, {?SESSION_KEY, Service}).

-spec maybe_remove_push_node_from_sessions_info(jid:jid(), jid:jid(), pubsub_node()) -> ok.
-spec maybe_remove_push_node_from_sessions_info(jid:jid(), jid:jid(), pubsub_node() | undefined) ->
ok.
maybe_remove_push_node_from_sessions_info(From, PubSubJid, Node) ->
AllSessions = ejabberd_sm:get_raw_sessions(From),
find_and_remove_push_node(From, AllSessions, PubSubJid, Node).

-spec find_and_remove_push_node(jid:jid(), [ejabberd_sm:session()],
jid:jid(), pubsub_node()) -> ok.
jid:jid(), pubsub_node() | undefined) -> ok.
find_and_remove_push_node(_From, [], _,_) ->
ok;
find_and_remove_push_node(From, [RawSession | Rest], PubSubJid, Node) ->
Expand All @@ -308,7 +311,7 @@ find_and_remove_push_node(From, [RawSession | Rest], PubSubJid, Node) ->
find_and_remove_push_node(From, Rest, PubSubJid, Node)
end.

-spec my_push_node(ejabberd_sm:session(), jid:jid(), pubsub_node()) -> boolean().
-spec my_push_node(ejabberd_sm:session(), jid:jid(), pubsub_node() | undfined) -> boolean().
my_push_node(RawSession, PubSubJid, Node) ->
case mongoose_session:get_info(RawSession, ?SESSION_KEY, undefined) of
{?SESSION_KEY, {PubSubJid, Node, _}} ->
Expand Down
27 changes: 15 additions & 12 deletions src/event_pusher/mod_event_pusher_push_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
%%--------------------------------------------------------------------

-export([init/2]).
-export([enable/4, disable/3, get_publish_services/1]).
-export([enable/4,
disable/1,
disable/3,
get_publish_services/1]).

%%--------------------------------------------------------------------
%% Definitions
Expand Down Expand Up @@ -56,25 +59,25 @@ enable(User, PubSub, Node, Forms) ->
write(make_record(User, PubSub, Node, Forms)).


-spec disable(UserJID :: jid:jid()) -> ok | {error, Reason :: term()}.
disable(User) ->
delete(key(User)).

-spec disable(UserJID :: jid:jid(), PubsubJID :: jid:jid(),
Node :: mod_event_pusher_push:pubsub_node()) -> ok | {error, Reason :: term()}.
disable(User, undefined, undefined) ->
delete(key(User));
Node :: mod_event_pusher_push:pubsub_node() | undefined) ->
ok | {error, Reason :: term()}.
disable(User, PubsubJID, Node) ->
Result =
exec(
fun() ->
PubsubFiltered =
[Record ||
#push_subscription{pubsub_jid = RecPubsubJID} = Record <- read(key(User)),
PubsubJID == undefined orelse RecPubsubJID == PubsubJID],

NodeFiltered =
Filtered =
[Record ||
#push_subscription{pubsub_node = RecNode} = Record <- PubsubFiltered,
#push_subscription{pubsub_jid = RecPubsubJID,
pubsub_node = RecNode} = Record <- read(key(User)),
RecPubsubJID == PubsubJID,
Node == undefined orelse RecNode == Node],

[mnesia:delete_object(Record) || Record <- NodeFiltered]
[mnesia:delete_object(Record) || Record <- Filtered]
end),

case Result of
Expand Down
149 changes: 81 additions & 68 deletions src/event_pusher/mod_event_pusher_push_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

-export([init/2]).
-export([enable/4,
disable/1,
disable/3,
get_publish_services/1]).


%%--------------------------------------------------------------------
%% Backend callbacks
%%--------------------------------------------------------------------

-spec init(Host :: jid:server(), Opts :: list()) -> ok.
init(_Host, _Opts) ->
ok.
prepare_queries().

-spec enable(User, PubSub, Node, Form) -> Result when
User :: jid:jid(),
Expand All @@ -36,57 +36,43 @@ init(_Host, _Opts) ->
Form :: mod_event_pusher_push:form(),
Result :: ok | {error, term()}.
enable(#jid{lserver = LServer} = User, PubSub, Node, Forms) ->
UserEscaped = mongoose_rdbms:escape_string(jid:to_binary(jid:to_lus(User))),
PubSubEscaped = mongoose_rdbms:escape_string(jid:to_binary(PubSub)),
FormsJSONEscaped = mongoose_rdbms:escape_string(encode_form(Forms)),
NodeEscaped = mongoose_rdbms:escape_string(Node),

DeleteQuery = delete_query(UserEscaped, PubSubEscaped, NodeEscaped),
mongoose_rdbms:sql_query(LServer, DeleteQuery),

CreatedAtEscaped = mongoose_rdbms:escape_integer(os:system_time(microsecond)),
InsertQuery = insert_query(UserEscaped, PubSubEscaped, NodeEscaped,
FormsJSONEscaped, CreatedAtEscaped),
case mongoose_rdbms:sql_query(LServer, InsertQuery) of
ExtUser = jid:to_binary(jid:to_lus(User)),
ExtPubSub = jid:to_binary(PubSub),
ExtForms = encode_form(Forms),
execute_delete(LServer, ExtUser, Node, ExtPubSub),
CreatedAt = os:system_time(microsecond),
case execute_insert(LServer, ExtUser, Node, ExtPubSub, ExtForms, CreatedAt) of
{updated, 1} -> ok;
Other -> Other
end.

-spec disable(User :: jid:jid(), PubSub :: jid:jid() | undefined,
Node :: mod_event_pusher_push:pubsub_node() | undefined) ->
ok | {error, Reason :: term()}.
disable(#jid{lserver = LServer} = User, PubSub, Node) ->
UserEscaped = mongoose_rdbms:escape_string(jid:to_binary(jid:to_lus(User))),
PubSubBin = maybe_to_binary(PubSub),
PubSubEscaped = escape_if_defined(PubSubBin),
NodeEscaped = escape_if_defined(Node),
DeleteQuery = delete_query(UserEscaped, PubSubEscaped, NodeEscaped),
case mongoose_rdbms:sql_query(LServer, DeleteQuery) of
{updated, _} -> ok;
Other -> Other
end.

maybe_to_binary(undefined) -> undefined;
maybe_to_binary(#jid{} = JID) -> jid:to_binary(JID).
-spec disable(User :: jid:jid()) -> ok.
disable(#jid{lserver = LServer} = User) ->
ExtUser = jid:to_binary(jid:to_lus(User)),
execute_delete(LServer, ExtUser),
ok.

escape_if_defined(undefined) -> undefined;
escape_if_defined(Value) -> mongoose_rdbms:escape_string(Value).
-spec disable(User :: jid:jid(), PubSub :: jid:jid(),
Node :: mod_event_pusher_push:pubsub_node() | undefined) -> ok.
disable(#jid{lserver = LServer} = User, PubSub, undefined) ->
ExtUser = jid:to_binary(jid:to_lus(User)),
ExtPubSub = jid:to_binary(PubSub),
execute_delete(LServer, ExtUser, ExtPubSub),
ok;
disable(#jid{lserver = LServer} = User, PubSub, Node) ->
ExtUser = jid:to_binary(jid:to_lus(User)),
ExtPubSub = jid:to_binary(PubSub),
execute_delete(LServer, ExtUser, Node, ExtPubSub),
ok.

-spec get_publish_services(User :: jid:jid()) ->
{ok, [{PubSub :: jid:jid(),
Node :: mod_event_pusher_push:pubsub_node(),
Form :: mod_event_pusher_push:form()}]} |
{error, Reason :: term()}.
Form :: mod_event_pusher_push:form()}]}.
get_publish_services(#jid{lserver = LServer} = User) ->
UserEscaped = mongoose_rdbms:escape_string(jid:to_binary(jid:to_lus(User))),
SelectQuery = select_query(UserEscaped),
Result = mongoose_rdbms:sql_query(LServer, SelectQuery),
case Result of
{selected, Rows} ->
{ok, decode_rows(Rows)};
Other ->
Other
end.
ExtUser = jid:to_binary(jid:to_lus(User)),
{selected, Rows} = execute_select(LServer, ExtUser),
{ok, decode_rows(Rows)}.

decode_rows(Rows) ->
[decode_row(Row) || Row <- Rows].
Expand All @@ -96,35 +82,62 @@ decode_row({NodeID, PubSubBin, FormJSON}) ->
NodeID,
decode_form(FormJSON)}.

insert_query(User, PubSub, Node, FormJSON, CreatedAt) ->
[<<"INSERT INTO ">>, table_name(), <<" (owner_jid, node, pubsub_jid, form, created_at) VALUES (">>,
mongoose_rdbms:use_escaped_string(User), "," ,
mongoose_rdbms:use_escaped_string(Node), ",",
mongoose_rdbms:use_escaped_string(PubSub), "," ,
mongoose_rdbms:use_escaped_string(FormJSON), "," ,
mongoose_rdbms:use_escaped_integer(CreatedAt),
")"].

delete_query(User, PubSub, Node) ->
[<<"DELETE FROM ">>, table_name(),
<<" WHERE owner_jid = ">>, mongoose_rdbms:use_escaped_string(User),
maybe_add_col_filter("node", Node),
maybe_add_col_filter("pubsub_jid", PubSub)].

maybe_add_col_filter(_, undefined) -> [];
maybe_add_col_filter(ColName, Value) ->
[" AND ", ColName, " = ", mongoose_rdbms:use_escaped_string(Value)].

select_query(User) ->
[<<"SELECT node, pubsub_jid, form FROM ">>, table_name(),
<<" WHERE owner_jid = ">>, mongoose_rdbms:use_escaped_string(User)].

encode_form(Forms) ->
jiffy:encode({Forms}).

decode_form(FormJSON) ->
{Items} = jiffy:decode(FormJSON),
Items.

table_name() ->
<<"event_pusher_push_subscription">>.
%% Prepared queries

-spec prepare_queries() -> ok.
prepare_queries() ->
mongoose_rdbms:prepare(event_pusher_push_insert, event_pusher_push_subscription,
[owner_jid, node, pubsub_jid, form, created_at],
<<"INSERT INTO event_pusher_push_subscription VALUES (?, ?, ?, ?, ?)">>),
mongoose_rdbms:prepare(event_pusher_push_select, event_pusher_push_subscription,
[owner_jid],
<<"SELECT node, pubsub_jid, form FROM event_pusher_push_subscription "
"WHERE owner_jid = ?">>),
mongoose_rdbms:prepare(event_pusher_push_delete, event_pusher_push_subscription,
[owner_jid],
<<"DELETE FROM event_pusher_push_subscription "
"WHERE owner_jid = ?">>),
mongoose_rdbms:prepare(event_pusher_push_delete_pubsub_jid, event_pusher_push_subscription,
[owner_jid, pubsub_jid],
<<"DELETE FROM event_pusher_push_subscription "
"WHERE owner_jid = ? AND pubsub_jid = ?">>),
mongoose_rdbms:prepare(event_pusher_push_delete_node, event_pusher_push_subscription,
[owner_jid, node, pubsub_jid],
<<"DELETE FROM event_pusher_push_subscription "
"WHERE owner_jid = ? AND node = ? AND pubsub_jid = ?">>),
ok.

-spec execute_insert(jid:lserver(), jid:literal_jid(), mod_event_pusher_push:pubsub_node(),
jid:literal_jid(), iodata(), non_neg_integer()) ->
mongoose_rdbms:query_result().
execute_insert(LServer, OwnerJid, Node, PubSubJid, Form, CreatedAt) ->
mongoose_rdbms:execute_successfully(LServer, event_pusher_push_insert,
[OwnerJid, Node, PubSubJid, Form, CreatedAt]).

-spec execute_select(jid:lserver(), jid:literal_jid()) -> mongoose_rdbms:query_result().
execute_select(LServer, OwnerJid) ->
mongoose_rdbms:execute_successfully(LServer, event_pusher_push_select, [OwnerJid]).

-spec execute_delete(jid:lserver(), jid:literal_jid()) -> mongoose_rdbms:query_result().
execute_delete(LServer, OwnerJid) ->
mongoose_rdbms:execute_successfully(LServer, event_pusher_push_delete, [OwnerJid]).

-spec execute_delete(jid:lserver(), jid:literal_jid(), jid:literal_jid()) ->
mongoose_rdbms:query_result().
execute_delete(LServer, OwnerJid, PubSubJid) ->
mongoose_rdbms:execute_successfully(LServer, event_pusher_push_delete_pubsub_jid,
[OwnerJid, PubSubJid]).

-spec execute_delete(jid:lserver(), jid:literal_jid(), mod_event_pusher_push:pubsub_node(),
jid:literal_jid()) ->
mongoose_rdbms:query_result().
execute_delete(LServer, OwnerJid, Node, PubSubJid) ->
mongoose_rdbms:execute_successfully(LServer, event_pusher_push_delete_node,
[OwnerJid, Node, PubSubJid]).

0 comments on commit 8a4e707

Please sign in to comment.