diff --git a/big_tests/tests/smart_markers_SUITE.erl b/big_tests/tests/smart_markers_SUITE.erl index 82c911326da..bceefd64b6c 100644 --- a/big_tests/tests/smart_markers_SUITE.erl +++ b/big_tests/tests/smart_markers_SUITE.erl @@ -1,6 +1,7 @@ -module(smart_markers_SUITE). -compile([export_all, nowarn_export_all]). +-include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("escalus/include/escalus_xmlns.hrl"). -include_lib("exml/include/exml.hrl"). @@ -10,7 +11,7 @@ -import(distributed_helper, [mim/0, rpc/4, subhost_pattern/1]). -import(domain_helper, [host_type/0]). --import(config_parser_helper, [default_mod_config/1, mod_config/2]). +-import(config_parser_helper, [mod_config/2]). %%% Suite configuration all() -> @@ -22,9 +23,8 @@ all() -> all_cases() -> [ - {group, one2one}, - {group, muclight}, - {group, keep_private} + {group, regular}, + {group, async_pools} ]. groups() -> @@ -53,6 +53,18 @@ groups() -> [ marker_is_not_routed_nor_fetchable, fetching_room_answers_only_own_marker + ]}, + {regular, [], + [ + {group, one2one}, + {group, muclight}, + {group, keep_private} + ]}, + {async_pools, [], + [ + {group, one2one}, + {group, muclight}, + {group, keep_private} ]} ]. @@ -65,17 +77,23 @@ init_per_suite(Config) -> end_per_suite(Config) -> escalus:end_per_suite(Config). +init_per_group(regular, Config) -> + [{merge_opts, #{backend => rdbms}} | Config]; +init_per_group(async_pools, Config) -> + [{merge_opts, #{backend => rdbms_async, + async_writer => #{pool_size => 2}}} | Config]; init_per_group(GroupName, Config) -> - ok = dynamic_modules:ensure_modules(host_type(), group_to_module(GroupName)), + AsyncType = ?config(merge_opts, Config), + ok = dynamic_modules:ensure_modules(host_type(), group_to_module(GroupName, AsyncType)), Config. -group_to_module(one2one) -> - [{mod_smart_markers, default_mod_config(mod_smart_markers)}]; -group_to_module(keep_private) -> - [{mod_smart_markers, mod_config(mod_smart_markers, #{keep_private => true})}, +group_to_module(one2one, MergeOpts) -> + [{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts)}]; +group_to_module(keep_private, MergeOpts) -> + [{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts#{keep_private => true})}, {mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}]; -group_to_module(muclight) -> - [{mod_smart_markers, default_mod_config(mod_smart_markers)}, +group_to_module(muclight, MergeOpts) -> + [{mod_smart_markers, mod_config(mod_smart_markers, MergeOpts)}, {mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}]. end_per_group(muclight, Config) -> @@ -169,8 +187,8 @@ fetching_room_answers_only_own_marker(Config) -> <<"type">>, <<"groupchat">>), [ begin escalus:send(User, ChatMarker), - MUser = verify_marker_fetch(User, RoomBinJid), - ?assertEqual(1, length(MUser)) + {ok, MarkersThatUserHasInRoom} = verify_marker_fetch(User, RoomBinJid), + ?assertEqual(1, length(MarkersThatUserHasInRoom)) end || User <- [Alice, Bob] ] end). @@ -273,7 +291,7 @@ markers_are_removed_when_room_is_removed(Config) -> Jid = jid:from_binary(escalus_client:full_jid(User)), mongoose_helper:wait_until( fun() -> length(fetch_markers_for_users(Jid, RoomJid)) end, 0) - end || User <- Users] + end || User <- Users ] end). %%% helpers @@ -300,23 +318,23 @@ delete_room(Owner, Users, RoomBinJid) -> escalus:assert(is_iq_result, escalus:wait_for_stanza(Owner)). one_marker_in_room(Users, RoomBinJid, Writer, Marker) -> - MsgId = escalus_stanza:id(), - StzId = send_msg_to_room(Users, RoomBinJid, Writer, MsgId), - mark_msg(Users, RoomBinJid, Marker, StzId). + MsgId = escalus_stanza:id(), + StanzaId = send_msg_to_room(Users, RoomBinJid, Writer, MsgId), + mark_msg(Users, RoomBinJid, Marker, StanzaId). send_msg_to_room(Users, RoomBinJid, Writer, MsgId) -> - Msg = escalus_stanza:set_id(escalus_stanza:groupchat_to(RoomBinJid, <<"Hello">>), MsgId), - escalus:send(Writer, Msg), - Msgs = [ escalus:wait_for_stanza(User) || User <- Users], - get_id(hd(Msgs), MsgId). - -mark_msg(Users, RoomBinJid, Marker, StzId) -> - ChatMarker = escalus_stanza:setattr( - escalus_stanza:chat_marker(RoomBinJid, <<"displayed">>, StzId), - <<"type">>, <<"groupchat">>), - escalus:send(Marker, ChatMarker), - [ escalus:wait_for_stanza(User) || User <- Users], - StzId. + Msg = escalus_stanza:set_id(escalus_stanza:groupchat_to(RoomBinJid, <<"Hello">>), MsgId), + escalus:send(Writer, Msg), + Msgs = [ escalus:wait_for_stanza(User) || User <- Users ], + get_id(hd(Msgs), MsgId). + +mark_msg(Users, RoomBinJid, Marker, StanzaId) -> + ChatMarker = escalus_stanza:setattr( + escalus_stanza:chat_marker(RoomBinJid, <<"displayed">>, StanzaId), + <<"type">>, <<"groupchat">>), + escalus:send(Marker, ChatMarker), + [ escalus:wait_for_stanza(User) || User <- Users ], + StanzaId. send_message_respond_marker(MsgWriter, MarkerAnswerer) -> send_message_respond_marker(MsgWriter, MarkerAnswerer, undefined). @@ -352,18 +370,21 @@ verify_marker_fetch(MarkingUser, MarkedUser, Thread, After) -> _ -> [{<<"after">>, After}] end, Iq = iq_fetch_marker(MarkedUserBJid ++ MaybeThread ++ MaybeAfter), - escalus:send(MarkingUser, Iq), - Response = escalus:wait_for_stanza(MarkingUser), - escalus:assert(is_iq_result, [Iq], Response), - Markers = [Marker | _] = exml_query:paths( - Response, [{element_with_ns, <<"query">>, ?NS_ESL_SMART_MARKERS}, - {element, <<"marker">>}]), - ?assertNotEqual(undefined, Marker), - ?assertNotEqual(undefined, exml_query:attr(Marker, <<"timestamp">>)), - ?assertEqual(<<"displayed">>, exml_query:attr(Marker, <<"type">>)), - ?assertEqual(Thread, exml_query:attr(Marker, <<"thread">>)), - ?assertNotEqual(undefined, exml_query:attr(Marker, <<"id">>)), - lists:sort(Markers). + mongoose_helper:wait_until( + fun() -> + escalus:send(MarkingUser, Iq), + Response = escalus:wait_for_stanza(MarkingUser), + escalus:assert(is_iq_result, [Iq], Response), + Markers = [Marker | _] = exml_query:paths( + Response, [{element_with_ns, <<"query">>, ?NS_ESL_SMART_MARKERS}, + {element, <<"marker">>}]), + ?assertNotEqual(undefined, Marker), + ?assertNotEqual(undefined, exml_query:attr(Marker, <<"timestamp">>)), + ?assertEqual(<<"displayed">>, exml_query:attr(Marker, <<"type">>)), + ?assertEqual(Thread, exml_query:attr(Marker, <<"thread">>)), + ?assertNotEqual(undefined, exml_query:attr(Marker, <<"id">>)), + lists:sort(Markers) + end, ok, #{name => fetch_marker, validator => fun(_) -> true end}). verify_marker_fetch_is_empty(MarkingUser, MarkedUser) -> MarkedUserBJid = escalus_utils:jid_to_lower(escalus_client:short_jid(MarkedUser)), diff --git a/src/inbox/mod_inbox.erl b/src/inbox/mod_inbox.erl index 7eddfc39fe9..dc765227e99 100644 --- a/src/inbox/mod_inbox.erl +++ b/src/inbox/mod_inbox.erl @@ -98,7 +98,7 @@ start(HostType, #{iqdisc := IQDisc, groupchat := MucTypes} = Opts) -> -spec stop(HostType :: mongooseim:host_type()) -> ok. stop(HostType) -> mod_inbox_muc:stop(HostType), - case mongoose_config:get_opt([{modules, HostType}, mod_inbox, backend]) of + case mongoose_config:get_opt([{modules, HostType}, ?MODULE, backend]) of rdbms_async -> mod_inbox_rdbms_async:stop(HostType); _ -> ok end, diff --git a/src/smart_markers/mod_smart_markers.erl b/src/smart_markers/mod_smart_markers.erl index aff4aa7b2c2..cb1ba54a431 100644 --- a/src/smart_markers/mod_smart_markers.erl +++ b/src/smart_markers/mod_smart_markers.erl @@ -99,8 +99,12 @@ start(HostType, #{iqdisc := IQDisc, keep_private := Private} = Opts) -> -spec stop(mongooseim:host_type()) -> ok. stop(HostType) -> - gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_SMART_MARKERS, ejabberd_sm), Opts = gen_mod:get_module_opts(HostType, ?MODULE), + case gen_mod:get_opt(backend, Opts) of + rdbms_async -> mod_smart_markers_rdbms_async:stop(HostType); + _ -> ok + end, + gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_SMART_MARKERS, ejabberd_sm), ejabberd_hooks:delete(hooks(HostType, Opts)). -spec supported_features() -> [atom()]. @@ -111,7 +115,8 @@ supported_features() -> config_spec() -> #section{ items = #{<<"keep_private">> => #option{type = boolean}, - <<"backend">> => #option{type = atom, validate = {enum, [rdbms]}}, + <<"backend">> => #option{type = atom, validate = {enum, [rdbms, rdbms_async]}}, + <<"async_writer">> => async_config_spec(), <<"iqdisc">> => mongoose_config_spec:iqdisc()}, defaults = #{<<"keep_private">> => false, <<"backend">> => rdbms, @@ -119,6 +124,14 @@ config_spec() -> format_items = map }. +async_config_spec() -> + #section{ + items = #{<<"pool_size">> => #option{type = integer, validate = non_negative}}, + defaults = #{<<"pool_size">> => 2 * erlang:system_info(schedulers_online)}, + format_items = map, + include = always + }. + %% IQ handlers -spec process_iq(mongoose_acc:t(), jid:jid(), jid:jid(), jlib:iq(), map()) -> {mongoose_acc:t(), jlib:iq()}. @@ -305,7 +318,7 @@ extract_chat_markers(Acc, From, To, Packet) -> ChatMarkers -> TS = mongoose_acc:timestamp(Acc), CM = #{from => From, - to => To, + to => jid:to_bare(To), thread => get_thread(Packet), timestamp => TS, type => undefined, diff --git a/src/smart_markers/mod_smart_markers_rdbms.erl b/src/smart_markers/mod_smart_markers_rdbms.erl index 8a7f32eaa40..36de987bf08 100644 --- a/src/smart_markers/mod_smart_markers_rdbms.erl +++ b/src/smart_markers/mod_smart_markers_rdbms.erl @@ -13,6 +13,7 @@ -export([init/2, update_chat_marker/2, get_chat_markers/4]). -export([get_conv_chat_marker/6]). -export([remove_domain/2, remove_user/2, remove_to/2, remove_to_for_user/3]). +-export([encode_jid/1, encode_thread/1, encode_type/1, check_upsert_result/1]). %%-------------------------------------------------------------------- %% API diff --git a/src/smart_markers/mod_smart_markers_rdbms_async.erl b/src/smart_markers/mod_smart_markers_rdbms_async.erl new file mode 100644 index 00000000000..b0303e0ba6f --- /dev/null +++ b/src/smart_markers/mod_smart_markers_rdbms_async.erl @@ -0,0 +1,116 @@ +%%% @doc +%%% RDBMS backend for mod_smart_markers +%%% @end +%%% @copyright (C) 2022, Erlang Solutions Ltd. +-module(mod_smart_markers_rdbms_async). +-behavior(mod_smart_markers_backend). +-behaviour(mongoose_aggregator_worker). + +-include("jlib.hrl"). +-include("mongoose_logger.hrl"). + +-export([init/2, stop/1, update_chat_marker/2, get_chat_markers/4, get_conv_chat_marker/6]). +-export([remove_domain/2, remove_user/2, remove_to/2, remove_to_for_user/3]). + +%% Worker callbacks +-export([request/2, aggregate/3, verify/3]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok. +init(HostType, Opts) -> + AsyncOpts = prepare_pool_opts(Opts), + mod_smart_markers_rdbms:init(HostType, Opts), + start_pool(HostType, AsyncOpts), + ok. + +stop(HostType) -> + mongoose_async_pools:stop_pool(HostType, smart_markers). + +prepare_pool_opts(#{async_writer := AsyncOpts}) -> + AsyncOpts#{pool_type => aggregate, + request_callback => fun ?MODULE:request/2, + aggregate_callback => fun ?MODULE:aggregate/3, + verify_callback => fun ?MODULE:verify/3}. + +-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term(). +start_pool(HostType, Opts) -> + mongoose_async_pools:start_pool(HostType, smart_markers, Opts). + +%%% @doc +%%% 'from', 'to', 'thread' and 'type' keys of the ChatMarker map serve +%%% as a composite database key. If key is not available in the database, +%%% then chat marker must be added. Otherwise this function must update +%%% chat marker record for that composite key. +%%% @end +-spec update_chat_marker(mongooseim:host_type(), + mod_smart_markers:chat_marker()) -> ok. +update_chat_marker(HostType, #{from := #jid{luser = LU, lserver = LS}, + to := To, thread := Thread, type := Type} = Marker) -> + Key = {LU, LS, To, Thread, Type}, + mongoose_async_pools:put_task(HostType, smart_markers, Key, Marker). + +%% Synchronous calls +-spec get_conv_chat_marker(HostType :: mongooseim:host_type(), + From :: jid:jid(), + To :: jid:jid(), + Thread :: mod_smart_markers:maybe_thread(), + Timestamp :: integer(), + Private :: boolean()) -> [mod_smart_markers:chat_marker()]. +get_conv_chat_marker(HostType, From, To, Thread, TS, Private) -> + mod_smart_markers_rdbms:get_conv_chat_marker(HostType, From, To, Thread, TS, Private). + +-spec get_chat_markers(HostType :: mongooseim:host_type(), + To :: jid:jid(), + Thread :: mod_smart_markers:maybe_thread(), + Timestamp :: integer()) -> [mod_smart_markers:chat_marker()]. +get_chat_markers(HostType, To, Thread, TS) -> + mod_smart_markers_rdbms:get_chat_markers(HostType, To, Thread, TS). + +-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> mongoose_rdbms:query_result(). +remove_domain(HostType, Domain) -> + mod_smart_markers_rdbms:remove_domain(HostType, Domain). + +-spec remove_user(mongooseim:host_type(), jid:jid()) -> mongoose_rdbms:query_result(). +remove_user(HostType, User) -> + mod_smart_markers_rdbms:remove_user(HostType, User). + +-spec remove_to(mongooseim:host_type(), jid:jid()) -> mongoose_rdbms:query_result(). +remove_to(HostType, To) -> + mod_smart_markers_rdbms:remove_to(HostType, To). + +-spec remove_to_for_user(mongooseim:host_type(), From :: jid:jid(), To :: jid:jid()) -> + mongoose_rdbms:query_result(). +remove_to_for_user(HostType, From, To) -> + mod_smart_markers_rdbms:remove_to_for_user(HostType, From, To). + +%% callbacks +-spec aggregate(mod_smart_markers:chat_marker(), + mod_smart_markers:chat_marker(), + mongoose_async_pools:pool_extra()) -> {ok, mod_smart_markers:chat_marker()}. +aggregate(_, NewTask, _Extra) -> + {ok, NewTask}. + +-spec request(mod_smart_markers:chat_marker(), + mongoose_async_pools:pool_extra()) -> reference(). +request(#{from := #jid{luser = LU, lserver = LS}, to := To, thread := Thread, + type := Type, timestamp := TS, id := Id}, #{host_type := HostType}) -> + ToEncoded = mod_smart_markers_rdbms:encode_jid(To), + ThreadEncoded = mod_smart_markers_rdbms:encode_thread(Thread), + TypeEncoded = mod_smart_markers_rdbms:encode_type(Type), + KeyValues = [LS, LU, ToEncoded, ThreadEncoded, TypeEncoded], + UpdateValues = [Id, TS], + InsertValues = KeyValues ++ UpdateValues, + rdbms_queries:request_upsert(HostType, smart_markers_upsert, + InsertValues, UpdateValues, KeyValues). + +-spec verify(term(), mod_smart_markers:chat_marker(), mongoose_async_pools:pool_extra()) -> ok. +verify(Answer, Marker, _Extra) -> + case mod_smart_markers_rdbms:check_upsert_result(Answer) of + {error, Reason} -> + ?LOG_WARNING(#{what => smart_marker_insert_failed, reason => Reason, + marker => Marker}); + _ -> ok + end. + diff --git a/test/common/config_parser_helper.erl b/test/common/config_parser_helper.erl index 10cbb58681e..5095dd7498a 100644 --- a/test/common/config_parser_helper.erl +++ b/test/common/config_parser_helper.erl @@ -832,8 +832,6 @@ default_pool_conn_opts(_Type) -> mod_config(Module, ExtraOpts) -> maps:merge(default_mod_config(Module), ExtraOpts). -default_mod_config(mod_smart_markers) -> - #{keep_private => false, backend => rdbms, iqdisc => no_queue}; default_mod_config(mod_adhoc) -> #{iqdisc => one_queue, report_commands_node => false}; default_mod_config(mod_auth_token) -> @@ -896,6 +894,10 @@ default_mod_config(mod_shared_roster_ldap) -> group_cache_size => 1000, rfilter => <<"">>, gfilter => <<"">>, ufilter => <<"">>}; default_mod_config(mod_sic) -> #{iqdisc => one_queue}; +default_mod_config(mod_smart_markers) -> + #{keep_private => false, + async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)}, + backend => rdbms, iqdisc => no_queue}; default_mod_config(mod_stream_management) -> #{backend => mnesia, buffer => true, diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 99deba264d6..bbe7b738ed9 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -2883,6 +2883,7 @@ mod_smart_markers(_Config) -> T = fun(Opts) -> #{<<"modules">> => #{<<"mod_smart_markers">> => Opts}} end, ?cfgh(P ++ [backend], rdbms, T(#{<<"backend">> => <<"rdbms">>})), ?cfgh(P ++ [keep_private], true, T(#{<<"keep_private">> => true})), + ?cfgh(P ++ [async_writer], #{pool_size => 8}, T(#{<<"async_writer">> => #{<<"pool_size">> => 8}})), ?errh(T(#{<<"backend">> => <<"nodejs">>})), ?errh(T(#{<<"keep_private">> => 1})).