Skip to content

Commit

Permalink
Implement async pool for smart_markers
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Mar 14, 2022
1 parent b6a46e5 commit 29480e0
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 47 deletions.
103 changes: 62 additions & 41 deletions big_tests/tests/smart_markers_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(smart_markers_SUITE).
-compile([export_all, nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("exml/include/exml.hrl").
Expand All @@ -10,7 +11,7 @@

-import(distributed_helper, [mim/0, rpc/4, subhost_pattern/1]).
-import(domain_helper, [host_type/0]).
-import(config_parser_helper, [default_mod_config/1, mod_config/2]).
-import(config_parser_helper, [mod_config/2]).

%%% Suite configuration
all() ->
Expand All @@ -22,9 +23,8 @@ all() ->

all_cases() ->
[
{group, one2one},
{group, muclight},
{group, keep_private}
{group, regular},
{group, async_pools}
].

groups() ->
Expand Down Expand Up @@ -53,6 +53,18 @@ groups() ->
[
marker_is_not_routed_nor_fetchable,
fetching_room_answers_only_own_marker
]},
{regular, [],
[
{group, one2one},
{group, muclight},
{group, keep_private}
]},
{async_pools, [],
[
{group, one2one},
{group, muclight},
{group, keep_private}
]}
].

Expand All @@ -65,17 +77,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(regular, Config) ->
[{merge_opts, #{backend => rdbms}} | Config];
init_per_group(async_pools, Config) ->
[{merge_opts, #{backend => rdbms_async,
async_writer => #{pool_size => 2}}} | Config];
init_per_group(GroupName, Config) ->
ok = dynamic_modules:ensure_modules(host_type(), group_to_module(GroupName)),
AsyncType = ?config(merge_opts, Config),
ok = dynamic_modules:ensure_modules(host_type(), group_to_module(GroupName, AsyncType)),
Config.

group_to_module(one2one) ->
[{mod_smart_markers, default_mod_config(mod_smart_markers)}];
group_to_module(keep_private) ->
[{mod_smart_markers, mod_config(mod_smart_markers, #{keep_private => true})},
group_to_module(one2one, MergeOpts) ->
[{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts)}];
group_to_module(keep_private, MergeOpts) ->
[{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts#{keep_private => true})},
{mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}];
group_to_module(muclight) ->
[{mod_smart_markers, default_mod_config(mod_smart_markers)},
group_to_module(muclight, MergeOpts) ->
[{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts)},
{mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}].

end_per_group(muclight, Config) ->
Expand Down Expand Up @@ -169,8 +187,8 @@ fetching_room_answers_only_own_marker(Config) ->
<<"type">>, <<"groupchat">>),
[ begin
escalus:send(User, ChatMarker),
MUser = verify_marker_fetch(User, RoomBinJid),
?assertEqual(1, length(MUser))
{ok, MarkersThatUserHasInRoom} = verify_marker_fetch(User, RoomBinJid),
?assertEqual(1, length(MarkersThatUserHasInRoom))
end || User <- [Alice, Bob] ]
end).

Expand Down Expand Up @@ -273,7 +291,7 @@ markers_are_removed_when_room_is_removed(Config) ->
Jid = jid:from_binary(escalus_client:full_jid(User)),
mongoose_helper:wait_until(
fun() -> length(fetch_markers_for_users(Jid, RoomJid)) end, 0)
end || User <- Users]
end || User <- Users ]
end).

%%% helpers
Expand All @@ -300,23 +318,23 @@ delete_room(Owner, Users, RoomBinJid) ->
escalus:assert(is_iq_result, escalus:wait_for_stanza(Owner)).

one_marker_in_room(Users, RoomBinJid, Writer, Marker) ->
MsgId = escalus_stanza:id(),
StzId = send_msg_to_room(Users, RoomBinJid, Writer, MsgId),
mark_msg(Users, RoomBinJid, Marker, StzId).
MsgId = escalus_stanza:id(),
StanzaId = send_msg_to_room(Users, RoomBinJid, Writer, MsgId),
mark_msg(Users, RoomBinJid, Marker, StanzaId).

send_msg_to_room(Users, RoomBinJid, Writer, MsgId) ->
Msg = escalus_stanza:set_id(escalus_stanza:groupchat_to(RoomBinJid, <<"Hello">>), MsgId),
escalus:send(Writer, Msg),
Msgs = [ escalus:wait_for_stanza(User) || User <- Users],
get_id(hd(Msgs), MsgId).

mark_msg(Users, RoomBinJid, Marker, StzId) ->
ChatMarker = escalus_stanza:setattr(
escalus_stanza:chat_marker(RoomBinJid, <<"displayed">>, StzId),
<<"type">>, <<"groupchat">>),
escalus:send(Marker, ChatMarker),
[ escalus:wait_for_stanza(User) || User <- Users],
StzId.
Msg = escalus_stanza:set_id(escalus_stanza:groupchat_to(RoomBinJid, <<"Hello">>), MsgId),
escalus:send(Writer, Msg),
Msgs = [ escalus:wait_for_stanza(User) || User <- Users ],
get_id(hd(Msgs), MsgId).

mark_msg(Users, RoomBinJid, Marker, StanzaId) ->
ChatMarker = escalus_stanza:setattr(
escalus_stanza:chat_marker(RoomBinJid, <<"displayed">>, StanzaId),
<<"type">>, <<"groupchat">>),
escalus:send(Marker, ChatMarker),
[ escalus:wait_for_stanza(User) || User <- Users ],
StanzaId.

send_message_respond_marker(MsgWriter, MarkerAnswerer) ->
send_message_respond_marker(MsgWriter, MarkerAnswerer, undefined).
Expand Down Expand Up @@ -352,18 +370,21 @@ verify_marker_fetch(MarkingUser, MarkedUser, Thread, After) ->
_ -> [{<<"after">>, After}]
end,
Iq = iq_fetch_marker(MarkedUserBJid ++ MaybeThread ++ MaybeAfter),
escalus:send(MarkingUser, Iq),
Response = escalus:wait_for_stanza(MarkingUser),
escalus:assert(is_iq_result, [Iq], Response),
Markers = [Marker | _] = exml_query:paths(
Response, [{element_with_ns, <<"query">>, ?NS_ESL_SMART_MARKERS},
{element, <<"marker">>}]),
?assertNotEqual(undefined, Marker),
?assertNotEqual(undefined, exml_query:attr(Marker, <<"timestamp">>)),
?assertEqual(<<"displayed">>, exml_query:attr(Marker, <<"type">>)),
?assertEqual(Thread, exml_query:attr(Marker, <<"thread">>)),
?assertNotEqual(undefined, exml_query:attr(Marker, <<"id">>)),
lists:sort(Markers).
mongoose_helper:wait_until(
fun() ->
escalus:send(MarkingUser, Iq),
Response = escalus:wait_for_stanza(MarkingUser),
escalus:assert(is_iq_result, [Iq], Response),
Markers = [Marker | _] = exml_query:paths(
Response, [{element_with_ns, <<"query">>, ?NS_ESL_SMART_MARKERS},
{element, <<"marker">>}]),
?assertNotEqual(undefined, Marker),
?assertNotEqual(undefined, exml_query:attr(Marker, <<"timestamp">>)),
?assertEqual(<<"displayed">>, exml_query:attr(Marker, <<"type">>)),
?assertEqual(Thread, exml_query:attr(Marker, <<"thread">>)),
?assertNotEqual(undefined, exml_query:attr(Marker, <<"id">>)),
lists:sort(Markers)
end, ok, #{name => fetch_marker, validator => fun(_) -> true end}).

verify_marker_fetch_is_empty(MarkingUser, MarkedUser) ->
MarkedUserBJid = escalus_utils:jid_to_lower(escalus_client:short_jid(MarkedUser)),
Expand Down
2 changes: 1 addition & 1 deletion src/inbox/mod_inbox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ start(HostType, #{iqdisc := IQDisc, groupchat := MucTypes} = Opts) ->
-spec stop(HostType :: mongooseim:host_type()) -> ok.
stop(HostType) ->
mod_inbox_muc:stop(HostType),
case mongoose_config:get_opt([{modules, HostType}, mod_inbox, backend]) of
case mongoose_config:get_opt([{modules, HostType}, ?MODULE, backend]) of
rdbms_async -> mod_inbox_rdbms_async:stop(HostType);
_ -> ok
end,
Expand Down
19 changes: 16 additions & 3 deletions src/smart_markers/mod_smart_markers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ start(HostType, #{iqdisc := IQDisc, keep_private := Private} = Opts) ->

-spec stop(mongooseim:host_type()) -> ok.
stop(HostType) ->
gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_SMART_MARKERS, ejabberd_sm),
Opts = gen_mod:get_module_opts(HostType, ?MODULE),
case gen_mod:get_opt(backend, Opts) of
rdbms_async -> mod_smart_markers_rdbms_async:stop(HostType);
_ -> ok
end,
gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_SMART_MARKERS, ejabberd_sm),
ejabberd_hooks:delete(hooks(HostType, Opts)).

-spec supported_features() -> [atom()].
Expand All @@ -111,14 +115,23 @@ supported_features() ->
config_spec() ->
#section{
items = #{<<"keep_private">> => #option{type = boolean},
<<"backend">> => #option{type = atom, validate = {enum, [rdbms]}},
<<"backend">> => #option{type = atom, validate = {enum, [rdbms, rdbms_async]}},
<<"async_writer">> => async_config_spec(),
<<"iqdisc">> => mongoose_config_spec:iqdisc()},
defaults = #{<<"keep_private">> => false,
<<"backend">> => rdbms,
<<"iqdisc">> => no_queue},
format_items = map
}.

async_config_spec() ->
#section{
items = #{<<"pool_size">> => #option{type = integer, validate = non_negative}},
defaults = #{<<"pool_size">> => 2 * erlang:system_info(schedulers_online)},
format_items = map,
include = always
}.

%% IQ handlers
-spec process_iq(mongoose_acc:t(), jid:jid(), jid:jid(), jlib:iq(), map()) ->
{mongoose_acc:t(), jlib:iq()}.
Expand Down Expand Up @@ -305,7 +318,7 @@ extract_chat_markers(Acc, From, To, Packet) ->
ChatMarkers ->
TS = mongoose_acc:timestamp(Acc),
CM = #{from => From,
to => To,
to => jid:to_bare(To),
thread => get_thread(Packet),
timestamp => TS,
type => undefined,
Expand Down
1 change: 1 addition & 0 deletions src/smart_markers/mod_smart_markers_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
-export([init/2, update_chat_marker/2, get_chat_markers/4]).
-export([get_conv_chat_marker/6]).
-export([remove_domain/2, remove_user/2, remove_to/2, remove_to_for_user/3]).
-export([encode_jid/1, encode_thread/1, encode_type/1, check_upsert_result/1]).

%%--------------------------------------------------------------------
%% API
Expand Down
116 changes: 116 additions & 0 deletions src/smart_markers/mod_smart_markers_rdbms_async.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
%%% @doc
%%% RDBMS backend for mod_smart_markers
%%% @end
%%% @copyright (C) 2022, Erlang Solutions Ltd.
-module(mod_smart_markers_rdbms_async).
-behavior(mod_smart_markers_backend).
-behaviour(mongoose_aggregator_worker).

-include("jlib.hrl").
-include("mongoose_logger.hrl").

-export([init/2, stop/1, update_chat_marker/2, get_chat_markers/4, get_conv_chat_marker/6]).
-export([remove_domain/2, remove_user/2, remove_to/2, remove_to_for_user/3]).

%% Worker callbacks
-export([request/2, aggregate/3, verify/3]).

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
init(HostType, Opts) ->
AsyncOpts = prepare_pool_opts(Opts),
mod_smart_markers_rdbms:init(HostType, Opts),
start_pool(HostType, AsyncOpts),
ok.

stop(HostType) ->
mongoose_async_pools:stop_pool(HostType, smart_markers).

prepare_pool_opts(#{async_writer := AsyncOpts}) ->
AsyncOpts#{pool_type => aggregate,
request_callback => fun ?MODULE:request/2,
aggregate_callback => fun ?MODULE:aggregate/3,
verify_callback => fun ?MODULE:verify/3}.

-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
start_pool(HostType, Opts) ->
mongoose_async_pools:start_pool(HostType, smart_markers, Opts).

%%% @doc
%%% 'from', 'to', 'thread' and 'type' keys of the ChatMarker map serve
%%% as a composite database key. If key is not available in the database,
%%% then chat marker must be added. Otherwise this function must update
%%% chat marker record for that composite key.
%%% @end
-spec update_chat_marker(mongooseim:host_type(),
mod_smart_markers:chat_marker()) -> ok.
update_chat_marker(HostType, #{from := #jid{luser = LU, lserver = LS},
to := To, thread := Thread, type := Type} = Marker) ->
Key = {LU, LS, To, Thread, Type},
mongoose_async_pools:put_task(HostType, smart_markers, Key, Marker).

%% Synchronous calls
-spec get_conv_chat_marker(HostType :: mongooseim:host_type(),
From :: jid:jid(),
To :: jid:jid(),
Thread :: mod_smart_markers:maybe_thread(),
Timestamp :: integer(),
Private :: boolean()) -> [mod_smart_markers:chat_marker()].
get_conv_chat_marker(HostType, From, To, Thread, TS, Private) ->
mod_smart_markers_rdbms:get_conv_chat_marker(HostType, From, To, Thread, TS, Private).

-spec get_chat_markers(HostType :: mongooseim:host_type(),
To :: jid:jid(),
Thread :: mod_smart_markers:maybe_thread(),
Timestamp :: integer()) -> [mod_smart_markers:chat_marker()].
get_chat_markers(HostType, To, Thread, TS) ->
mod_smart_markers_rdbms:get_chat_markers(HostType, To, Thread, TS).

-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> mongoose_rdbms:query_result().
remove_domain(HostType, Domain) ->
mod_smart_markers_rdbms:remove_domain(HostType, Domain).

-spec remove_user(mongooseim:host_type(), jid:jid()) -> mongoose_rdbms:query_result().
remove_user(HostType, User) ->
mod_smart_markers_rdbms:remove_user(HostType, User).

-spec remove_to(mongooseim:host_type(), jid:jid()) -> mongoose_rdbms:query_result().
remove_to(HostType, To) ->
mod_smart_markers_rdbms:remove_to(HostType, To).

-spec remove_to_for_user(mongooseim:host_type(), From :: jid:jid(), To :: jid:jid()) ->
mongoose_rdbms:query_result().
remove_to_for_user(HostType, From, To) ->
mod_smart_markers_rdbms:remove_to_for_user(HostType, From, To).

%% callbacks
-spec aggregate(mod_smart_markers:chat_marker(),
mod_smart_markers:chat_marker(),
mongoose_async_pools:pool_extra()) -> {ok, mod_smart_markers:chat_marker()}.
aggregate(_, NewTask, _Extra) ->
{ok, NewTask}.

-spec request(mod_smart_markers:chat_marker(),
mongoose_async_pools:pool_extra()) -> reference().
request(#{from := #jid{luser = LU, lserver = LS}, to := To, thread := Thread,
type := Type, timestamp := TS, id := Id}, #{host_type := HostType}) ->
ToEncoded = mod_smart_markers_rdbms:encode_jid(To),
ThreadEncoded = mod_smart_markers_rdbms:encode_thread(Thread),
TypeEncoded = mod_smart_markers_rdbms:encode_type(Type),
KeyValues = [LS, LU, ToEncoded, ThreadEncoded, TypeEncoded],
UpdateValues = [Id, TS],
InsertValues = KeyValues ++ UpdateValues,
rdbms_queries:request_upsert(HostType, smart_markers_upsert,
InsertValues, UpdateValues, KeyValues).

-spec verify(term(), mod_smart_markers:chat_marker(), mongoose_async_pools:pool_extra()) -> ok.
verify(Answer, Marker, _Extra) ->
case mod_smart_markers_rdbms:check_upsert_result(Answer) of
{error, Reason} ->
?LOG_WARNING(#{what => smart_marker_insert_failed, reason => Reason,
marker => Marker});
_ -> ok
end.

6 changes: 4 additions & 2 deletions test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,6 @@ default_pool_conn_opts(_Type) ->
mod_config(Module, ExtraOpts) ->
maps:merge(default_mod_config(Module), ExtraOpts).

default_mod_config(mod_smart_markers) ->
#{keep_private => false, backend => rdbms, iqdisc => no_queue};
default_mod_config(mod_adhoc) ->
#{iqdisc => one_queue, report_commands_node => false};
default_mod_config(mod_auth_token) ->
Expand Down Expand Up @@ -896,6 +894,10 @@ default_mod_config(mod_shared_roster_ldap) ->
group_cache_size => 1000, rfilter => <<"">>, gfilter => <<"">>, ufilter => <<"">>};
default_mod_config(mod_sic) ->
#{iqdisc => one_queue};
default_mod_config(mod_smart_markers) ->
#{keep_private => false,
async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)},
backend => rdbms, iqdisc => no_queue};
default_mod_config(mod_stream_management) ->
#{backend => mnesia,
buffer => true,
Expand Down
1 change: 1 addition & 0 deletions test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,7 @@ mod_smart_markers(_Config) ->
T = fun(Opts) -> #{<<"modules">> => #{<<"mod_smart_markers">> => Opts}} end,
?cfgh(P ++ [backend], rdbms, T(#{<<"backend">> => <<"rdbms">>})),
?cfgh(P ++ [keep_private], true, T(#{<<"keep_private">> => true})),
?cfgh(P ++ [async_writer], #{pool_size => 8}, T(#{<<"async_writer">> => #{<<"pool_size">> => 8}})),
?errh(T(#{<<"backend">> => <<"nodejs">>})),
?errh(T(#{<<"keep_private">> => 1})).

Expand Down

0 comments on commit 29480e0

Please sign in to comment.