diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index 52b2b8ba5e9..59a4a31b998 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -693,7 +693,7 @@ init_modules(rdbms_simple, C, Config) when C =:= muc_all; init_modules(rdbms_async_pool, C, Config) when C =:= muc_all; C =:= muc_disabled_retraction -> init_module(host_type(), mod_mam_muc_rdbms_arch, [no_writer]), - init_module(host_type(), mod_mam_muc_rdbms_async_pool_writer, [{flush_interval, 1}]), %% 1ms + init_module(host_type(), mod_mam_rdbms_arch_async, [{muc, [{flush_interval, 1}]}]), %% 1ms init_module(host_type(), mod_mam_rdbms_prefs, [muc]), init_module(host_type(), mod_mam_rdbms_user, [muc, pm]), init_module(host_type(), mod_mam_muc, [{host, subhost_pattern(muc_domain(Config))}] ++ @@ -719,7 +719,7 @@ init_modules(rdbms_cache, C, Config) when C =:= muc_all; init_modules(rdbms_async_cache, C, Config) when C =:= muc_all; C =:= muc_disabled_retraction -> init_module(host_type(), mod_mam_muc_rdbms_arch, [no_writer]), - init_module(host_type(), mod_mam_muc_rdbms_async_pool_writer, [{flush_interval, 1}]), %% 1ms + init_module(host_type(), mod_mam_rdbms_arch_async, [{muc, [{flush_interval, 1}]}]), %% 1ms init_module(host_type(), mod_mam_rdbms_prefs, [muc]), init_module(host_type(), mod_mam_rdbms_user, [muc, pm]), init_module(host_type(), mod_mam_cache_user, [muc]), @@ -787,7 +787,7 @@ init_modules(elasticsearch, C, Config) -> init_modules(rdbms_async_pool, C, Config) -> init_module(host_type(), mod_mam, addin_mam_options(C, Config)), init_module(host_type(), mod_mam_rdbms_arch, [no_writer]), - init_module(host_type(), mod_mam_rdbms_async_pool_writer, [pm, {flush_interval, 1}]), %% 1ms + init_module(host_type(), mod_mam_rdbms_arch_async, [{pm, [{flush_interval, 1}]}]), %% 1ms init_module(host_type(), mod_mam_rdbms_prefs, [pm]), init_module(host_type(), mod_mam_rdbms_user, [pm]), Config; @@ -807,7 +807,7 @@ init_modules(rdbms_cache, C, Config) -> init_modules(rdbms_async_cache, C, Config) -> init_module(host_type(), mod_mam, addin_mam_options(C, Config)), init_module(host_type(), mod_mam_rdbms_arch, [no_writer]), - init_module(host_type(), mod_mam_rdbms_async_pool_writer, [pm, {flush_interval, 1}]), %% 1ms + init_module(host_type(), mod_mam_rdbms_arch_async, [{pm, [{flush_interval, 1}]}]), %% 1ms init_module(host_type(), mod_mam_rdbms_prefs, [pm]), init_module(host_type(), mod_mam_rdbms_user, [pm]), init_module(host_type(), mod_mam_cache_user, [pm]), @@ -862,8 +862,7 @@ mam_modules() -> mod_mam_muc_elasticsearch_arch, mod_mam_rdbms_arch, mod_mam_muc_rdbms_arch, - mod_mam_rdbms_async_pool_writer, - mod_mam_muc_rdbms_async_pool_writer, + mod_mam_rdbms_arch_async, mod_mam_rdbms_prefs, mod_mam_mnesia_prefs, mod_mam_rdbms_user, @@ -1162,10 +1161,14 @@ required_modules(retract_message_on_stanza_id, _Config) -> required_modules(_, _) -> []. +init_module(Host, mod_mam_rdbms_arch_async, Args) -> + OldOpts = case stop_module(Host, mod_mam_rdbms_arch_async) of + {ok, O} -> O; + ok -> [] + end, + {ok, _} = start_module(Host, mod_mam_rdbms_arch_async, lists:ukeymerge(1, OldOpts, Args)); init_module(Host, Mod, Args) -> - lists:member(Mod, mam_modules()) - orelse - ct:fail("Unknown module ~p", [Mod]), + lists:member(Mod, mam_modules()) orelse ct:fail("Unknown module ~p", [Mod]), stop_module(Host, Mod), {ok, _} = start_module(Host, Mod, Args). diff --git a/doc/migrations/5.0.0_5.1.0.md b/doc/migrations/5.0.0_5.1.0.md index 5e07beda823..2bfefbee7f8 100644 --- a/doc/migrations/5.0.0_5.1.0.md +++ b/doc/migrations/5.0.0_5.1.0.md @@ -9,3 +9,21 @@ Each authentication method needs a TOML section, e.g. if you have the `rdbms` me ### Section `host_config` The rules for overriding global options in the `host_config` section have been simplified. The `auth` section and the `s2s.address`, `s2s.domain_certfile` and `s2s.shared` options now completely override the corresponding general settings instead of being merged with them. + +## Async workers + +The `async_writer` flag of MAM is now a section on its own, that absorbs previous flags related to it: `flush_interval`, `max_batch_size` and `pool_size` now become subelements of the `async_writer` section, with one more parameter, `enabled`. Below an example: + +```toml +[modules.mod_mam_meta] + flush_interval = 1000 + max_batch_size = 100 + muc.async_writer = false +``` +now becomes +```toml +[modules.mod_mam_meta] + async_writer.flush_interval = 1000 + async_writer.batch_size = 100 + muc.async_writer.enabled = false +``` diff --git a/doc/modules/mod_mam.md b/doc/modules/mod_mam.md index b83315c7ad5..316df625bbf 100644 --- a/doc/modules/mod_mam.md +++ b/doc/modules/mod_mam.md @@ -195,8 +195,8 @@ Please note that you can override all common options in similar way. ```toml [modules.mod_mam_meta] backend = "rdbms" - async_writer = true # this option enables async writer for RDBMS backend - muc.async_writer = false # disable async writer for MUC archive only + async_writer.enabled = true # this option enables async writer for RDBMS backend + muc.async_writer.enabled = false # disable async writer for MUC archive only ``` ### RDBMS backend options @@ -236,35 +236,35 @@ Configures which cache to use, either start an internal instance, or reuse the c When set to `simple`, stores messages in XML and full JIDs. When set to `internal`, stores messages and JIDs in internal format. -#### `modules.mod_mam_meta.async_writer` +#### `modules.mod_mam_meta.async_writer.enabled` * **Syntax:** boolean * **Default:** `true` -* **Example:** `modules.mod_mam_meta.async_writer = false` +* **Example:** `modules.mod_mam_meta.async_writer.enabled = false` Enables an asynchronous writer that is faster than the synchronous one but harder to debug. -The async writers store batches of messages with a certain delay (see **flush_interval**), so the results of the lookup operations executed right after message routing may be incomplete until the configured time passes. +The async writers store batches of messages that will be flush on a timeout (see **flush_interval**) or when the batch reaches a size (see **batch_size**), so the results of the lookup operations executed right after message routing may be incomplete until the configured time passes or the queue is full. -#### `modules.mod_mam_meta.flush_interval` +#### `modules.mod_mam_meta.async_writer.flush_interval` * **Syntax:** non-negative integer * **Default:** `2000` -* **Example:** `modules.mod_mam_meta.flush_interval = 2000` +* **Example:** `modules.mod_mam_meta.async_writer.flush_interval = 2000` -How often (in milliseconds) the buffered messages are flushed to a DB. +How often (in milliseconds) the buffered messages are flushed to DB. -#### `modules.mod_mam_meta.max_batch_size` +#### `modules.mod_mam_meta.async_writer.batch_size` * **Syntax:** non-negative integer * **Default:** `30` -* **Example:** `modules.mod_mam_meta.max_batch_size = 30` +* **Example:** `modules.mod_mam_meta.async_writer.batch_size = 30` -Max size of the batch insert query for an async writer. +Max size of the batch for an async writer before the queue is considered full and flushed. If the buffer is full, messages are flushed to a database immediately and the flush timer is reset. -#### `modules.mod_mam_meta.pool_size` +#### `modules.mod_mam_meta.async_writer.pool_size` * **Syntax:** non-negative integer -* **Default:** `32` -* **Example:** `modules.mod_mam_meta.pool_size = 30` +* **Default:** `4 * erlang:system_info(schedulers_online)` +* **Example:** `modules.mod_mam_meta.async_writer.pool_size = 32` -Number of workers in the pool. +Number of workers in the pool. More than the number of available schedulers is recommended, to minimise lock contention on the message queues, and more than the number of DB workers, to fully utilise the DB capacity. How much more than these two parameters is then a good fine-tuning for specific deployments. ### Common backend options @@ -421,7 +421,7 @@ This module can be used to add extra lookup parameters to MAM lookup queries. muc.host = "muc.example.com" muc.rdbms_message_format = "simple" - muc.async_writer = false + muc.async_writer.enabled = false muc.user_prefs_store = "mnesia" ``` diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 686cb4ae72e..49ccf11d2d9 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -145,10 +145,6 @@ init([]) -> brutal_kill, worker, [mod_muc_iq]}, - MAM = - {mod_mam_sup, - {mod_mam_sup, start_link, []}, - permanent, infinity, supervisor, [mod_mam_sup]}, ShaperSup = {ejabberd_shaper_sup, {ejabberd_shaper_sup, start_link, []}, @@ -174,7 +170,6 @@ init([]) -> IQSupervisor, Listener, MucIQ, - MAM, ShaperSup]}}. start_child(ChildSpec) -> diff --git a/src/mam/mod_mam_meta.erl b/src/mam/mod_mam_meta.erl index b2b8017deb1..46a05ea47f6 100644 --- a/src/mam/mod_mam_meta.erl +++ b/src/mam/mod_mam_meta.erl @@ -74,21 +74,13 @@ config_items() -> <<"cache">> => mongoose_user_cache:config_spec(), <<"rdbms_message_format">> => #option{type = atom, validate = {enum, [simple, internal]}}, - <<"async_writer">> => #option{type = boolean}, - <<"flush_interval">> => #option{type = integer, - validate = non_negative}, - <<"max_batch_size">> => #option{type = integer, - validate = non_negative}, - <<"pool_size">> => #option{type = integer, - validate = non_negative}, + <<"async_writer">> => mod_mam_rdbms_arch_async:config_spec(), %% Low-level options <<"default_result_limit">> => #option{type = integer, validate = non_negative}, <<"max_result_limit">> => #option{type = integer, validate = non_negative}, - <<"async_writer_rdbms_pool">> => #option{type = atom, - validate = pool_name}, <<"db_jid_format">> => #option{type = atom, validate = module}, <<"db_message_format">> => #option{type = atom, @@ -175,7 +167,8 @@ valid_core_mod_opts(mod_mam_muc) -> [host] ++ common_opts(). common_opts() -> - [is_archivable_message, + [async_writer, + is_archivable_message, send_message, archive_chat_markers, extra_fin_element, @@ -213,13 +206,13 @@ parse_backend_opts(riak, Type, Opts, Deps0) -> end; parse_backend_opts(rdbms, Type, Opts0, Deps0) -> - Opts1 = add_default_rdbms_opts(Opts0), + Opts1 = add_rdbms_async_opts(Opts0), Opts = add_rdbms_cache_opts(Opts1), {ModRDBMSArch, ModAsyncWriter} = case Type of - pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_async_pool_writer}; - muc -> {mod_mam_muc_rdbms_arch, mod_mam_muc_rdbms_async_pool_writer} + pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_arch_async}; + muc -> {mod_mam_muc_rdbms_arch, mod_mam_rdbms_arch_async} end, Deps1 = add_dep(ModRDBMSArch, [Type], Deps0), @@ -265,15 +258,18 @@ add_dep(Dep, Args, Deps) -> maps:put(Dep, NewArgs, Deps). --spec add_default_rdbms_opts(Opts :: proplists:proplist()) -> proplists:proplist(). -add_default_rdbms_opts(Opts) -> - lists:foldl( - fun({Key, _} = DefaultOpt, Acc) -> - case proplists:lookup(Key, Opts) of - none -> [DefaultOpt | Acc]; - _ -> Acc - end - end, Opts, [{async_writer, true}]). +-spec add_rdbms_async_opts(proplists:proplist()) -> proplists:proplist(). +add_rdbms_async_opts(Opts) -> + case lists:keyfind(async_writer, 1, Opts) of + {async_writer, AsyncOpts} -> + case lists:keyfind(enabled, 1, AsyncOpts) of + {enabled, false} -> lists:keydelete(async_writer, 1, Opts); + _ -> Opts + end; + false -> + [{async_writer, []} | Opts] + end. + add_rdbms_cache_opts(Opts) -> case {lists:keyfind(cache_users, 1, Opts), lists:keyfind(cache, 1, Opts)} of @@ -303,11 +299,9 @@ parse_rdbms_opt(Type, ModRDBMSArch, ModAsyncWriter, Option, Deps) -> add_dep(mod_mam_mnesia_prefs, [Type], Deps); {rdbms_message_format, simple} -> add_dep(ModRDBMSArch, rdbms_simple_opts(), Deps); - {async_writer, true} -> + {async_writer, Opts} -> DepsWithNoWriter = add_dep(ModRDBMSArch, [no_writer], Deps), - add_dep(ModAsyncWriter, [Type], DepsWithNoWriter); - {async_writer_rdbms_pool, PoolName} -> - add_dep(ModAsyncWriter, [{rdbms_pool, PoolName}], Deps); + add_dep(ModAsyncWriter, [{Type, Opts}], DepsWithNoWriter); _ -> Deps end. diff --git a/src/mam/mod_mam_muc.erl b/src/mam/mod_mam_muc.erl index 5f66298b8f3..34865ebfea0 100644 --- a/src/mam/mod_mam_muc.erl +++ b/src/mam/mod_mam_muc.erl @@ -666,4 +666,5 @@ spirals() -> modMucMamForwarded, modMucMamArchived, modMucMamFlushed, + modMucMamDropped, modMucMamDroppedIQ]. diff --git a/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl b/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl deleted file mode 100644 index d85aa0b318d..00000000000 --- a/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl +++ /dev/null @@ -1,276 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author Uvarov Michael -%%% @copyright (C) 2013, Uvarov Michael -%%% @doc Collect messages and flush them into the database. -%%% @end -%%%------------------------------------------------------------------- --module(mod_mam_muc_rdbms_async_pool_writer). - -%% ---------------------------------------------------------------------- -%% Exports - -%% gen_mod handlers --export([start/2, stop/1, supported_features/0]). - -%% MAM hook handlers --behaviour(ejabberd_gen_mam_archive). --export([archive_message/3]). - -%% Helpers for debugging --export([queue_length/1, - queue_lengths/1]). - -%% gen_server callbacks --export([init/1, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --ignore_xref([code_change/3, handle_cast/2, handle_info/2, init/1, queue_length/1, - queue_lengths/1, start/2, stop/1, supported_features/0, terminate/2]). - --define(PER_MESSAGE_FLUSH_TIME, [?MODULE, per_message_flush_time]). --define(FLUSH_TIME, [?MODULE, flush_time]). --define(DEFAULT_POOL_SIZE, 32). - --include("mongoose.hrl"). --include("jlib.hrl"). - --record(state, {flush_interval :: non_neg_integer(), %% milliseconds - max_batch_size :: non_neg_integer(), - host_type :: mongooseim:host_type(), - acc=[] :: list(), - flush_interval_tref :: reference() | undefined - }). --type state() :: #state{}. - -worker_prefix() -> - "ejabberd_mod_mam_muc_writer". - -worker_count(HostType) -> - gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE). - - --spec worker_names(mongooseim:host_type()) -> [{integer(), atom()}]. -worker_names(HostType) -> - [{N, worker_name(HostType, N)} || N <- lists:seq(0, worker_count(HostType) - 1)]. - - --spec worker_name(mongooseim:host_type(), integer()) -> atom(). -worker_name(HostType, N) -> - list_to_atom(worker_prefix() ++ "_" ++ binary_to_list(HostType) ++ "_" ++ integer_to_list(N)). - - --spec select_worker(mongooseim:host_type(), integer()) -> atom(). -select_worker(HostType, ArcID) -> - N = worker_number(HostType, ArcID), - worker_name(HostType, N). - - --spec worker_number(mongooseim:host_type(), mod_mam:archive_id()) -> integer(). -worker_number(HostType, ArcID) -> - ArcID rem worker_count(HostType). - - -%% ---------------------------------------------------------------------- -%% gen_mod callbacks -%% Starting and stopping functions for users' archives - --spec start(mongooseim:host_type(), _) -> 'ok'. -start(HostType, Opts) -> - mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram), - mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram), - MaxSize = gen_mod:get_module_opt(HostType, ?MODULE, max_batch_size, 30), - mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1), - mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_messages, MaxSize), - start_workers(HostType, MaxSize), - start_muc(HostType, Opts). - - --spec stop(mongooseim:host_type()) -> any(). -stop(HostType) -> - stop_muc(HostType), - stop_workers(HostType). - --spec supported_features() -> [atom()]. -supported_features() -> - [dynamic_domains]. - -%% ---------------------------------------------------------------------- -%% Add hooks for mod_mam_muc - --spec start_muc(mongooseim:host_type(), _) -> 'ok'. -start_muc(HostType, _Opts) -> - ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - - --spec stop_muc(mongooseim:host_type()) -> 'ok'. -stop_muc(HostType) -> - ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - -%%==================================================================== -%% API -%%==================================================================== - --spec start_workers(mongooseim:host_type(), MaxSize :: pos_integer()) -> [{'error', _} - | {'ok', 'undefined' | pid()} - | {'ok', 'undefined' | pid(), _}]. -start_workers(HostType, MaxSize) -> - [start_worker(WriterProc, N, HostType, MaxSize) - || {N, WriterProc} <- worker_names(HostType)]. - --spec stop_workers(mongooseim:host_type()) -> ok. -stop_workers(HostType) -> - [stop_worker(WriterProc) || {_, WriterProc} <- worker_names(HostType)], - ok. - --spec start_worker(atom(), integer(), mongooseim:host_type(), MaxSize :: pos_integer()) - -> {'error', _} - | {'ok', 'undefined' | pid()} - | {'ok', 'undefined' | pid(), _}. -start_worker(WriterProc, _N, HostType, MaxSize) -> - WriterChildSpec = worker_spec(WriterProc, HostType, MaxSize), - supervisor:start_child(mod_mam_sup, WriterChildSpec). - -worker_spec(WriterProc, HostType, MaxSize) -> - %% Store incoming messages of the heap - Opts = [{message_queue_data, off_heap}], - Args = [{local, WriterProc}, ?MODULE, [HostType, MaxSize], Opts], - {WriterProc, - {gen_server, start_link, Args}, - permanent, - 5000, - worker, - [mod_mam_muc_rdbms_async_pool_writer]}. - --spec stop_worker(atom()) -> ok. -stop_worker(Proc) -> - supervisor:terminate_child(mod_mam_sup, Proc), - supervisor:delete_child(mod_mam_sup, Proc), - ok. - --spec archive_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> - ok | {error, timeout}. -archive_message(_Result, HostType, Params0 = #{archive_id := RoomID}) -> - Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0), - Worker = select_worker(HostType, RoomID), - gen_server:cast(Worker, {archive_message, Params}). - -%% @doc For metrics. --spec queue_length(mongooseim:host_type()) -> {'ok', number()}. -queue_length(HostType) -> - Len = lists:sum(queue_lengths(HostType)), - {ok, Len}. - --spec queue_lengths(mongooseim:host_type()) -> [non_neg_integer()]. -queue_lengths(HostType) -> - [worker_queue_length(SrvName) || {_, SrvName} <- worker_names(HostType)]. - --spec worker_queue_length(atom()) -> non_neg_integer(). -worker_queue_length(SrvName) -> - case whereis(SrvName) of - undefined -> - 0; - Pid -> - {message_queue_len, Len} = erlang:process_info(Pid, message_queue_len), - Len - end. - -%%==================================================================== -%% Internal functions -%%==================================================================== - -handle_archive_message(Params, State=#state{acc=Acc, - flush_interval_tref=TRef, - flush_interval=Int, - max_batch_size=Max}) -> - TRef2 = case {Acc, TRef} of - {[], undefined} -> erlang:send_after(Int, self(), flush); - {_, _} -> TRef - end, - State2 = State#state{acc=[Params|Acc], flush_interval_tref=TRef2}, - case length(Acc) + 1 >= Max of - true -> run_flush(State2); - false -> State2 - end. - --spec run_flush(state()) -> state(). -run_flush(State = #state{acc=[]}) -> - State; -run_flush(State = #state{host_type = HostType, acc = Acc}) -> - MessageCount = length(Acc), - {FlushTime, NewState} = timer:tc(fun do_run_flush/2, [MessageCount, State]), - mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), - mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime), - NewState. - -do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size = MaxSize, - flush_interval_tref = TRef, acc = Acc}) -> - cancel_and_flush_timer(TRef), - ?LOG_DEBUG(#{what => mam_flush, message_count => MessageCount}), - Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], - InsertResult = - case MessageCount of - MaxSize -> - mongoose_rdbms:execute(HostType, insert_mam_muc_messages, lists:append(Rows)); - OtherSize -> - Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows], - case lists:keyfind(error, 1, Results) of - false -> {updated, OtherSize}; - Error -> Error - end - end, - case InsertResult of - {updated, _Count} -> ok; - {error, Reason} -> - mongoose_metrics:update(HostType, modMamDropped, MessageCount), - ?LOG_ERROR(#{what => archive_message_query_failed, - text => <<"archive_message query failed, modMamDropped metric updated">>, - message_count => MessageCount, reason => Reason}), - ok - end, - [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], - mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount), - erlang:garbage_collect(), - State#state{acc=[], flush_interval_tref=undefined}. - --spec cancel_and_flush_timer('undefined' | reference()) -> 'ok'. -cancel_and_flush_timer(undefined) -> - ok; -cancel_and_flush_timer(TRef) -> - case erlang:cancel_timer(TRef) of - false -> - receive - flush -> ok - after 0 -> ok - end; - _ -> ok - end. - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([HostType, MaxSize]) -> - %% Use a private RDBMS-connection. - Int = gen_mod:get_module_opt(HostType, ?MODULE, flush_interval, 2000), - {ok, #state{host_type = HostType, flush_interval = Int, max_batch_size = MaxSize}}. - -handle_cast({archive_message, Params}, State) -> - {noreply, handle_archive_message(Params, State)}; -handle_cast(Msg, State) -> - ?UNEXPECTED_CAST(Msg), - {noreply, State}. - --spec handle_info('flush', state()) -> {'noreply', state()}. -handle_info(flush, State) -> - {noreply, run_flush(State#state{flush_interval_tref=undefined})}; -handle_info(Msg, State) -> - ?UNEXPECTED_INFO(Msg), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/mam/mod_mam_rdbms_arch_async.erl b/src/mam/mod_mam_rdbms_arch_async.erl new file mode 100644 index 00000000000..af78816d1ca --- /dev/null +++ b/src/mam/mod_mam_rdbms_arch_async.erl @@ -0,0 +1,179 @@ +-module(mod_mam_rdbms_arch_async). + +-include("mongoose_logger.hrl"). + +-define(PM_PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]). +-define(PM_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]). +-define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]). +-define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]). + +-behaviour(gen_mod). +-export([start/2, stop/1, config_spec/0, supported_features/0]). + +-export([archive_pm_message/3, archive_muc_message/3]). +-export([flush_pm/2, flush_muc/2]). + +-type writer_type() :: pm | muc. + +-ignore_xref([archive_pm_message/3, archive_muc_message/3]). + +-spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> + ok | {error, timeout}. +archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) -> + PoolName = mongoose_async_pools:pool_name(HostType, pm_mam), + wpool:cast(PoolName, {task, Params}, {hash_worker, ArcID}). + +-spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> + ok. +archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) -> + Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0), + PoolName = mongoose_async_pools:pool_name(HostType, muc_mam), + wpool:cast(PoolName, {task, Params}, {hash_worker, RoomID}). + +%%% gen_mod callbacks +-spec start(mongooseim:host_type(), [{writer_type(), gen_mod:module_opts()}]) -> any(). +start(HostType, Opts) -> + [ start_pool(HostType, Mod) || Mod <- Opts ]. + +-spec stop(mongooseim:host_type()) -> any(). +stop(HostType) -> + Opts = gen_mod:get_module_opts(HostType, ?MODULE), + [ stop_pool(HostType, Mod) || Mod <- Opts ]. + +-spec config_spec() -> mongoose_config_spec:config_section(). +config_spec() -> + mongoose_async_pools:config_spec(). + +-spec supported_features() -> [atom()]. +supported_features() -> + [dynamic_domains]. + +%%% internal callbacks +-spec start_pool(mongooseim:host_type(), {writer_type(), gen_mod:module_opts()}) -> + supervisor:startchild_ret(). +start_pool(HostType, {Type, Opts}) -> + Opts1 = extend_opts(Type, Opts), + prepare_insert_queries(Type, Opts1), + ensure_metrics(Type, HostType), + register_hooks(Type, HostType), + start_pool(Type, HostType, Opts1). + +extend_opts(Type, Opts) -> + Merge = maps:merge(defaults(), maps:from_list(Opts)), + Opts1 = maps:to_list(Merge), + Opts2 = gen_mod:set_opt(flush_extra, Opts1, Merge), + add_callback(Type, Opts2). + +add_callback(pm, Opts) -> + gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_pm/2); +add_callback(muc, Opts) -> + gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_muc/2). + +defaults() -> + #{flush_interval => 2000, + batch_size => 30, + pool_size => 4 * erlang:system_info(schedulers_online)}. + +prepare_insert_queries(pm, Opts) -> + MaxSize = gen_mod:get_opt(batch_size, Opts, 30), + mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1), + mod_mam_rdbms_arch:prepare_insert(insert_mam_messages, MaxSize); +prepare_insert_queries(muc, Opts) -> + MaxSize = gen_mod:get_opt(batch_size, Opts, 30), + mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1), + mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_messages, MaxSize). + +ensure_metrics(pm, HostType) -> + mongoose_metrics:ensure_metric(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, histogram), + mongoose_metrics:ensure_metric(HostType, ?PM_FLUSH_TIME, histogram); +ensure_metrics(muc, HostType) -> + mongoose_metrics:ensure_metric(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, histogram), + mongoose_metrics:ensure_metric(HostType, ?MUC_FLUSH_TIME, histogram). + +register_hooks(pm, HostType) -> + ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50); +register_hooks(muc, HostType) -> + ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50). + +start_pool(pm, HostType, Opts) -> + mongoose_async_pools:start_pool(HostType, pm_mam, Opts); +start_pool(muc, HostType, Opts) -> + mongoose_async_pools:start_pool(HostType, muc_mam, Opts). + +-spec stop_pool(mongooseim:host_type(), {writer_type(), term()}) -> ok. +stop_pool(HostType, {pm, _}) -> + ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50), + mongoose_async_pools:stop_pool(HostType, pm_mam); +stop_pool(HostType, {muc, _}) -> + ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50), + mongoose_async_pools:stop_pool(HostType, muc_mam). + +%%% flush callbacks +flush_pm(Acc, Extra = #{host_type := HostType}) -> + MessageCount = length(Acc), + {FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]), + mongoose_metrics:update(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), + mongoose_metrics:update(HostType, ?PM_FLUSH_TIME, FlushTime), + Result. + +flush_muc(Acc, Extra = #{host_type := HostType}) -> + MessageCount = length(Acc), + {FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]), + mongoose_metrics:update(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), + mongoose_metrics:update(HostType, ?MUC_FLUSH_TIME, FlushTime), + Result. + +%% mam workers callbacks +do_flush_pm(Acc, #{host_type := HostType, batch_size := MaxSize}) -> + MessageCount = length(Acc), + Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], + InsertResult = + case MessageCount of + MaxSize -> + mongoose_rdbms:execute(HostType, insert_mam_messages, lists:append(Rows)); + OtherSize -> + Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows], + case lists:keyfind(error, 1, Results) of + false -> {updated, OtherSize}; + Error -> Error + end + end, + case InsertResult of + {updated, _Count} -> ok; + {error, Reason} -> + mongoose_metrics:update(HostType, modMamDropped, MessageCount), + ?LOG_ERROR(#{what => archive_message_failed, + text => <<"archive_message query failed">>, + message_count => MessageCount, reason => Reason}), + ok + end, + [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], + mongoose_hooks:mam_flush_messages(HostType, MessageCount), + ok. + +do_flush_muc(Acc, #{host_type := HostType, batch_size := MaxSize}) -> + MessageCount = length(Acc), + Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], + InsertResult = + case MessageCount of + MaxSize -> + mongoose_rdbms:execute(HostType, insert_mam_muc_messages, lists:append(Rows)); + OtherSize -> + Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows], + case lists:keyfind(error, 1, Results) of + false -> {updated, OtherSize}; + Error -> Error + end + end, + case InsertResult of + {updated, _Count} -> ok; + {error, Reason} -> + mongoose_metrics:update(HostType, modMucMamDropped, MessageCount), + ?LOG_ERROR(#{what => archive_message_query_failed, + text => <<"archive_message query failed, modMucMamDropped metric updated">>, + message_count => MessageCount, reason => Reason}), + ok + end, + [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], + mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount), + ok. diff --git a/src/mam/mod_mam_rdbms_async_pool_writer.erl b/src/mam/mod_mam_rdbms_async_pool_writer.erl deleted file mode 100644 index 2c61353ca05..00000000000 --- a/src/mam/mod_mam_rdbms_async_pool_writer.erl +++ /dev/null @@ -1,294 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author Uvarov Michael -%%% @copyright (C) 2013, Uvarov Michael -%%% @doc Collect messages and flush them into the database. -%%% -%%% Some backends allow ArcID can be not set (riak?). -%%% This means, that there are no "jid => integer" mapping. -%%% -%%% But this module requires ArcID to be an integer. -%%% ArcID is set by mod_mam_rdbms_user. -%%% If ArcID is undefined, it means that there can be issues -%%% in mod_mam_rdbms_user. -%%% -%%% We have `is_integer(ArcID)' check on each hook handler, so cases when -%%% `ArcID' is undefined would fail at the module entrance. -%%% @end -%%%------------------------------------------------------------------- --module(mod_mam_rdbms_async_pool_writer). - -%% ---------------------------------------------------------------------- -%% Exports - -%% gen_mod handlers --export([start/2, stop/1, supported_features/0]). - -%% MAM hook handlers --behaviour(ejabberd_gen_mam_archive). --behaviour(gen_mod). --behaviour(mongoose_module_metrics). - --export([archive_message/3]). - -%% Helpers for debugging --export([queue_length/1, - queue_lengths/1]). - -%% gen_server callbacks --export([init/1, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --ignore_xref([code_change/3, handle_cast/2, handle_info/2, init/1, queue_length/1, - queue_lengths/1, terminate/2]). - --define(PER_MESSAGE_FLUSH_TIME, [?MODULE, per_message_flush_time]). --define(FLUSH_TIME, [?MODULE, flush_time]). --define(DEFAULT_POOL_SIZE, 32). - --include("mongoose.hrl"). --include("jlib.hrl"). - --record(state, { - flush_interval, %% milliseconds - max_batch_size, - host_type :: mongooseim:host_type(), - number, - acc = [], - flush_interval_tref}). - -worker_prefix() -> - "ejabberd_mod_mam_writer". - -worker_count(HostType) -> - gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE). - -worker_names(HostType) -> - [{N, worker_name(HostType, N)} || N <- lists:seq(0, worker_count(HostType) - 1)]. - -worker_name(HostType, N) -> - list_to_atom(worker_prefix() ++ "_" ++ binary_to_list(HostType) ++ "_" ++ integer_to_list(N)). - -select_worker(HostType, ArcID) -> - N = worker_number(HostType, ArcID), - worker_name(HostType, N). - -worker_number(HostType, ArcID) when is_integer(ArcID) -> - ArcID rem worker_count(HostType). - -%% ---------------------------------------------------------------------- -%% gen_mod callbacks -%% Starting and stopping functions for users' archives - -start(HostType, Opts) -> - mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram), - mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram), - MaxSize = gen_mod:get_module_opt(HostType, ?MODULE, max_batch_size, 30), - mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1), - mod_mam_rdbms_arch:prepare_insert(insert_mam_messages, MaxSize), - - start_workers(HostType, MaxSize), - case gen_mod:get_module_opt(HostType, ?MODULE, pm, false) of - true -> - start_pm(HostType, Opts); - false -> - ok - end, - case gen_mod:get_module_opt(HostType, ?MODULE, muc, false) of - true -> - start_muc(HostType, Opts); - false -> - ok - end. - -stop(HostType) -> - case gen_mod:get_module_opt(HostType, ?MODULE, pm, false) of - true -> - stop_pm(HostType); - false -> - ok - end, - case gen_mod:get_module_opt(HostType, ?MODULE, muc, false) of - true -> - stop_muc(HostType); - false -> - ok - end, - stop_workers(HostType). - --spec supported_features() -> [atom()]. -supported_features() -> - [dynamic_domains]. - -%% ---------------------------------------------------------------------- -%% Add hooks for mod_mam - -start_pm(HostType, _Opts) -> - ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - -stop_pm(HostType) -> - ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - - -%% ---------------------------------------------------------------------- -%% Add hooks for mod_mam_muc - -start_muc(HostType, _Opts) -> - ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - -stop_muc(HostType) -> - ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_message, 50), - ok. - - -%%==================================================================== -%% API -%%==================================================================== - -start_workers(HostType, MaxSize) -> - [start_worker(WriterProc, N, HostType, MaxSize) - || {N, WriterProc} <- worker_names(HostType)]. - -stop_workers(HostType) -> - [stop_worker(WriterProc) || {_, WriterProc} <- worker_names(HostType)]. - -start_worker(WriterProc, N, HostType, MaxSize) -> - WriterChildSpec = worker_spec(WriterProc, N, HostType, MaxSize), - supervisor:start_child(mod_mam_sup, WriterChildSpec). - -worker_spec(WriterProc, N, HostType, MaxSize) -> - %% Store incoming messages of the heap - Opts = [{message_queue_data, off_heap}], - Args = [{local, WriterProc}, ?MODULE, [HostType, N, MaxSize], Opts], - {WriterProc, - {gen_server, start_link, Args}, - permanent, - 5000, - worker, - [mod_mam_rdbms_async_pool_writer]}. - -stop_worker(Proc) -> - supervisor:terminate_child(mod_mam_sup, Proc), - supervisor:delete_child(mod_mam_sup, Proc). - - --spec archive_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> - ok | {error, timeout}. -archive_message(_Result, HostType, Params = #{archive_id := ArcID}) -> - Worker = select_worker(HostType, ArcID), - gen_server:cast(Worker, {archive_message, Params}). - -%% For metrics. -queue_length(HostType) -> - Len = lists:sum(queue_lengths(HostType)), - {ok, Len}. - -queue_lengths(HostType) -> - [worker_queue_length(SrvName) || {_, SrvName} <- worker_names(HostType)]. - -worker_queue_length(SrvName) -> - case whereis(SrvName) of - undefined -> - 0; - Pid -> - {message_queue_len, Len} = erlang:process_info(Pid, message_queue_len), - Len - end. - -%%==================================================================== -%% Internal functions -%%==================================================================== - -handle_archive_message(Params, State=#state{acc=Acc, - flush_interval_tref=TRef, flush_interval=Int, - max_batch_size=Max}) -> - TRef2 = case {Acc, TRef} of - {[], undefined} -> erlang:send_after(Int, self(), flush); - {_, _} -> TRef - end, - State2 = State#state{acc=[Params|Acc], flush_interval_tref=TRef2}, - case length(Acc) + 1 >= Max of - true -> run_flush(State2); - false -> State2 - end. - -run_flush(State = #state{acc = []}) -> - State; -run_flush(State = #state{host_type = HostType, acc = Acc}) -> - MessageCount = length(Acc), - {FlushTime, NewState} = timer:tc(fun do_run_flush/2, [MessageCount, State]), - mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), - mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime), - NewState. - -do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size = MaxSize, - flush_interval_tref = TRef, acc = Acc}) -> - cancel_and_flush_timer(TRef), - ?LOG_DEBUG(#{what => mam_flush, message_count => MessageCount}), - Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], - InsertResult = - case MessageCount of - MaxSize -> - mongoose_rdbms:execute(HostType, insert_mam_messages, lists:append(Rows)); - OtherSize -> - Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows], - case lists:keyfind(error, 1, Results) of - false -> {updated, OtherSize}; - Error -> Error - end - end, - case InsertResult of - {updated, _Count} -> ok; - {error, Reason} -> - mongoose_metrics:update(HostType, modMamDropped, MessageCount), - ?LOG_ERROR(#{what => archive_message_failed, - text => <<"archive_message query failed">>, - message_count => MessageCount, reason => Reason}), - ok - end, - [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], - mongoose_hooks:mam_flush_messages(HostType, MessageCount), - erlang:garbage_collect(), - State#state{acc=[], flush_interval_tref=undefined}. - -cancel_and_flush_timer(undefined) -> - ok; -cancel_and_flush_timer(TRef) -> - case erlang:cancel_timer(TRef) of - false -> - receive - flush -> ok - after 0 -> ok - end; - _ -> ok - end. - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([HostType, N, MaxSize]) -> - %% Use a private RDBMS-connection. - Int = gen_mod:get_module_opt(HostType, ?MODULE, flush_interval, 2000), - {ok, #state{host_type = HostType, number = N, - flush_interval = Int, max_batch_size = MaxSize}}. - -handle_cast({archive_message, Params}, State) -> - {noreply, handle_archive_message(Params, State)}; -handle_cast(Msg, State) -> - ?UNEXPECTED_CAST(Msg), - {noreply, State}. - -handle_info(flush, State) -> - {noreply, run_flush(State#state{flush_interval_tref=undefined})}; -handle_info(Msg, State) -> - ?UNEXPECTED_INFO(Msg), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/mam/mod_mam_sup.erl b/src/mam/mod_mam_sup.erl deleted file mode 100644 index 693d6fbab56..00000000000 --- a/src/mam/mod_mam_sup.erl +++ /dev/null @@ -1,72 +0,0 @@ -%%============================================================================== -%% Copyright 2014 Erlang Solutions Ltd. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%============================================================================== --module(mod_mam_sup). - - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - --ignore_xref([start_link/0]). - --define(SERVER, ?MODULE). - -%%%=================================================================== -%%% API functions -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the supervisor -%% -%% @end -%%-------------------------------------------------------------------- --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%%%=================================================================== -%%% Supervisor callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Whenever a supervisor is started using supervisor:start_link/[2, 3], -%% this function is called by the new process to find out about -%% restart strategy, maximum restart frequency and child -%% specifications. -%% -%% @end -%%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, {{one_for_one, pos_integer(), pos_integer()}, []}}. -init([]) -> - RestartStrategy = one_for_one, - MaxRestarts = 1000, - MaxSecondsBetweenRestarts = 3600, - - SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, - - {ok, {SupFlags, []}}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/src/mongoose_async_pools.erl b/src/mongoose_async_pools.erl new file mode 100644 index 00000000000..451c942b6ca --- /dev/null +++ b/src/mongoose_async_pools.erl @@ -0,0 +1,108 @@ +-module(mongoose_async_pools). + +-include("mongoose_config_spec.hrl"). +-include("mongoose_logger.hrl"). + +-behaviour(supervisor). +-export([start_link/3, init/1]). +-ignore_xref([start_link/3]). + +% API +-export([start_pool/3, stop_pool/2, pool_name/2, config_spec/0]). + +-type pool_id() :: atom(). +-type pool_name() :: atom(). +-type pool_opts() :: gen_mod:module_opts(). + +%%% API functions +-spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) -> + supervisor:startchild_ret(). +start_pool(HostType, PoolId, Opts) -> + ?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}), + Supervisor = sup_name(HostType, PoolId), + ChildSpec = #{id => Supervisor, + start => {?MODULE, start_link, [HostType, PoolId, Opts]}, + restart => transient, + type => supervisor}, + ejabberd_sup:start_child(ChildSpec). + +-spec stop_pool(mongooseim:host_type(), pool_id()) -> ok. +stop_pool(HostType, PoolId) -> + ?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}), + ejabberd_sup:stop_child(sup_name(HostType, PoolId)). + +-spec config_spec() -> mongoose_config_spec:config_section(). +config_spec() -> + #section{ + items = #{<<"enabled">> => #option{type = boolean}, + <<"flush_interval">> => #option{type = integer, validate = non_negative}, + <<"batch_size">> => #option{type = integer, validate = non_negative}, + <<"pool_size">> => #option{type = integer, validate = non_negative}} + }. + +-spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). +pool_name(HostType, PoolId) -> + persistent_term:get({?MODULE, HostType, PoolId}). + +%%% Supervisor callbacks +-spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(HostType, PoolId, Opts) -> + Supervisor = sup_name(HostType, PoolId), + supervisor:start_link({local, Supervisor}, ?MODULE, [HostType, PoolId, Opts]). + +-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init([HostType, PoolId, Opts]) -> + PoolName = gen_pool_name(HostType, PoolId), + store_pool_name(HostType, PoolId, PoolName), + WPoolOpts = make_wpool_opts(HostType, PoolId, Opts), + WorkerSpec = #{id => PoolName, + start => {wpool, start_pool, [PoolName, WPoolOpts]}, + restart => permanent, + type => supervisor}, + SupFlags = #{strategy => one_for_one, + intensity => 1, + period => 5}, + {ok, {SupFlags, [WorkerSpec]}}. + +%%% internal callbacks +-spec sup_name(mongooseim:host_type(), pool_id()) -> atom(). +sup_name(HostType, PoolId) -> + list_to_atom( + atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)). + +-spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok. +store_pool_name(HostType, PoolId, PoolName) -> + persistent_term:put({?MODULE, HostType, PoolId}, PoolName). + +-spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). +gen_pool_name(HostType, PoolId) -> + list_to_atom( + atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)). + +-spec make_wpool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> any(). +make_wpool_opts(HostType, PoolId, Opts) -> + Interval = gen_mod:get_opt(flush_interval, Opts, 1000), + MaxSize = gen_mod:get_opt(batch_size, Opts, 100), + NumWorkers = gen_mod:get_opt(pool_size, Opts, 4 * erlang:system_info(schedulers_online)), + FlushCallback = gen_mod:get_opt(flush_callback, Opts), + FlushExtra = make_extra(HostType, PoolId, Opts), + ProcessOpts = [{message_queue_data, off_heap}], + WorkerOpts = {HostType, Interval, MaxSize, FlushCallback, FlushExtra}, + Worker = {mongoose_batch_worker, WorkerOpts}, + [{worker, Worker}, + {workers, NumWorkers}, + {worker_opt, ProcessOpts}, + {worker_shutdown, 10000}]. + +-spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> any(). +make_extra(HostType, PoolId, Opts) -> + case {gen_mod:get_opt(init_callback, Opts, undefined), + gen_mod:get_opt(flush_extra, Opts, + fun(Val) -> Val#{host_type => HostType} end, + #{host_type => HostType})} of + {undefined, Extra} -> + Extra; + {InitFun, Extra} when is_function(InitFun, 3) -> + Extra#{init_data => InitFun(HostType, PoolId, Opts)} + end. diff --git a/src/mongoose_batch_worker.erl b/src/mongoose_batch_worker.erl new file mode 100644 index 00000000000..de04c363e16 --- /dev/null +++ b/src/mongoose_batch_worker.erl @@ -0,0 +1,138 @@ +-module(mongoose_batch_worker). + +-behaviour(gen_server). + +-include("mongoose_logger.hrl"). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2]). + +-record(state, { + host_type :: mongooseim:host_type(), + batch_size :: non_neg_integer(), + flush_interval :: non_neg_integer(), %% milliseconds + flush_interval_tref :: undefined | reference(), + flush_callback = fun(_, _) -> ok end :: flush_callback(), + flush_queue = [] :: list() | censored, % see format_status/2 for censored + flush_extra = #{} :: map() + }). +-type state() :: #state{}. +-type flush_callback() :: fun((list(), map()) -> ok | {error, term()}). + +-export_type([flush_callback/0]). + +%% gen_server callbacks +-spec init({mongooseim:host_type(), + pos_integer(), + pos_integer(), + flush_callback(), + map()}) -> {ok, state()}. +init({HostType, Interval, MaxSize, FlushCallback, FlushExtra}) -> + ?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => FlushCallback}), + {ok, #state{host_type = HostType, + batch_size = MaxSize, + flush_interval = Interval, + flush_callback = FlushCallback, + flush_extra = FlushExtra}}. + +-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}. +handle_call(Msg, From, State) -> + ?UNEXPECTED_CALL(Msg, From), + {reply, unexpected_call, State}. + +-spec handle_cast({task, term()} | term(), state()) -> {noreply, state()}. +handle_cast({task, Task}, State) -> + {noreply, handle_task(Task, State)}; +handle_cast(Msg, State) -> + ?UNEXPECTED_CAST(Msg), + {noreply, State}. + +-spec handle_info(flush | term(), state()) -> {noreply, state()}. +handle_info(flush, State) -> + {noreply, run_flush(State)}; +handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) -> + {noreply, State}; +handle_info(Msg, State) -> + ?UNEXPECTED_INFO(Msg), + {noreply, State}. + +-spec terminate(term(), state()) -> term(). +terminate(Reason, State) -> + ?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})), + case State#state.flush_queue of + [] -> ok; + _ -> + ?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush})), + do_run_flush(State) + end. + +-spec code_change(term(), state(), term()) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% Don't leak the tasks to logs, can contain private information +format_status(_Opt, [_PDict, State | _]) -> + [{data, [{"State", State#state{flush_queue = censored}}, + {"Task Queue Length", length(State#state.flush_queue)}]}]. + +%% Batched tasks callbacks +handle_task(Task, State = #state{batch_size = MaxSize, + flush_interval = Interval, + flush_interval_tref = TRef0, + flush_queue = Acc0}) -> + Length = length(Acc0), + TRef1 = maybe_schedule_flush(TRef0, Length, Interval), + State1 = State#state{flush_interval_tref = TRef1, + flush_queue = [Task | Acc0]}, + case Length + 1 >= MaxSize of + false -> State1; + true -> run_flush(State1) + end. + +maybe_schedule_flush(undefined, 0, Interval) -> + erlang:send_after(Interval, self(), flush); +maybe_schedule_flush(TRef, _, _) -> + TRef. + +run_flush(State = #state{flush_interval_tref = TRef}) -> + cancel_and_flush_timer(TRef), + ?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})), + NewState = do_run_flush(State#state{flush_interval_tref = undefined}), + erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]), + NewState. + +cancel_and_flush_timer(undefined) -> + ok; +cancel_and_flush_timer(TRef) -> + case erlang:cancel_timer(TRef) of + false -> + receive + flush -> ok + after 0 -> ok + end; + _ -> ok + end. + +do_run_flush(State = #state{flush_callback = FlushCallback, + flush_queue = Queue, + flush_extra = Extra}) -> + case FlushCallback(lists:reverse(Queue), Extra) of + ok -> ok; + {error, Reason} -> + ?LOG_ERROR(log_fields(State, + #{what => batch_worker_flush_queue_failed, reason => Reason, + text => <<"flush_callback failed">>})), + ok + end, + State#state{flush_queue = []}. + +log_fields(State, LogMessage) -> + LogMessage#{host_type => State#state.host_type, + flush_queue_len => length(State#state.flush_queue), + flush_callback => State#state.flush_callback}. diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 1d2b637cb29..31afe9981d6 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -2029,6 +2029,8 @@ mod_mam_meta_muc(_Config) -> ?errh(T(#{<<"same_mam_id_for_peers">> => true})). test_mod_mam_meta(T, M) -> + test_async_worker(T, M), + test_cache_config(T, M), ?cfgh(M([{backend, rdbms}]), T(#{<<"backend">> => <<"rdbms">>})), ?cfgh(M([{no_stanzaid_element, true}]), @@ -2039,18 +2041,8 @@ test_mod_mam_meta(T, M) -> T(#{<<"archive_chat_markers">> => false})), ?cfgh(M([{message_retraction, true}]), T(#{<<"message_retraction">> => true})), - ?cfgh(M([{cache_users, false}]), - T(#{<<"cache_users">> => false})), ?cfgh(M([{rdbms_message_format, simple}]), T(#{<<"rdbms_message_format">> => <<"simple">>})), - ?cfgh(M([{async_writer, true}]), - T(#{<<"async_writer">> => true})), - ?cfgh(M([{flush_interval, 1500}]), - T(#{<<"flush_interval">> => 1500})), - ?cfgh(M([{max_batch_size, 50}]), - T(#{<<"max_batch_size">> => 50})), - ?cfgh(M([{pool_size, 50}]), - T(#{<<"pool_size">> => 50})), ?cfgh(M([{user_prefs_store, rdbms}]), T(#{<<"user_prefs_store">> => <<"rdbms">>})), ?cfgh(M([{full_text_search, false}]), @@ -2059,8 +2051,6 @@ test_mod_mam_meta(T, M) -> T(#{<<"default_result_limit">> => 100})), ?cfgh(M([{max_result_limit, 1000}]), T(#{<<"max_result_limit">> => 1000})), - ?cfgh(M([{async_writer_rdbms_pool, async_pool}]), - T(#{<<"async_writer_rdbms_pool">> => <<"async_pool">>})), ?cfgh(M([{db_jid_format, mam_jid_rfc}]), T(#{<<"db_jid_format">> => <<"mam_jid_rfc">>})), ?cfgh(M([{db_message_format, mam_message_xml}]), @@ -2071,37 +2061,36 @@ test_mod_mam_meta(T, M) -> T(#{<<"extra_fin_element">> => <<"mod_mam_utils">>})), ?cfgh(M([{extra_lookup_params, mod_mam_utils}]), T(#{<<"extra_lookup_params">> => <<"mod_mam_utils">>})), - ?cfgh(M([{cache, [{module, internal}]}]), - T(#{<<"cache">> => #{<<"module">> => <<"internal">>}})), - ?cfgh(M([{cache, [{time_to_live, 8600}]}]), - T(#{<<"cache">> => #{<<"time_to_live">> => 8600}})), - ?cfgh(M([{cache, [{time_to_live, infinity}]}]), - T(#{<<"cache">> => #{<<"time_to_live">> => <<"infinity">>}})), - ?cfgh(M([{cache, [{number_of_segments, 10}]}]), - T(#{<<"cache">> => #{<<"number_of_segments">> => 10}})), - ?cfgh(M([{cache, [{strategy, fifo}]}]), - T(#{<<"cache">> => #{<<"strategy">> => <<"fifo">>}})), ?errh(T(#{<<"backend">> => <<"notepad">>})), ?errh(T(#{<<"no_stanzaid_element">> => <<"true">>})), ?errh(T(#{<<"is_archivable_message">> => <<"mod_mam_fake">>})), ?errh(T(#{<<"archive_chat_markers">> => <<"maybe">>})), ?errh(T(#{<<"message_retraction">> => 1})), - ?errh(T(#{<<"cache_users">> => []})), ?errh(T(#{<<"rdbms_message_format">> => <<"complex">>})), - ?errh(T(#{<<"async_writer">> => #{}})), - ?errh(T(#{<<"flush_interval">> => -1})), - ?errh(T(#{<<"max_batch_size">> => -1})), - ?errh(T(#{<<"pool_size">> => -1})), ?errh(T(#{<<"user_prefs_store">> => <<"textfile">>})), ?errh(T(#{<<"full_text_search">> => <<"disabled">>})), ?errh(T(#{<<"default_result_limit">> => -1})), ?errh(T(#{<<"max_result_limit">> => -2})), - ?errh(T(#{<<"async_writer_rdbms_pool">> => <<>>})), ?errh(T(#{<<"db_jid_format">> => <<"not_a_module">>})), ?errh(T(#{<<"db_message_format">> => <<"not_a_module">>})), ?errh(T(#{<<"simple">> => <<"yes">>})), ?errh(T(#{<<"extra_fin_element">> => <<"bad_module">>})), - ?errh(T(#{<<"extra_lookup_params">> => <<"bad_module">>})), + ?errh(T(#{<<"extra_lookup_params">> => <<"bad_module">>})). + +test_cache_config(T, M) -> + ?cfgh(M([{cache_users, false}]), + T(#{<<"cache_users">> => false})), + ?cfgh(M([{cache, [{module, internal}]}]), + T(#{<<"cache">> => #{<<"module">> => <<"internal">>}})), + ?cfgh(M([{cache, [{time_to_live, 8600}]}]), + T(#{<<"cache">> => #{<<"time_to_live">> => 8600}})), + ?cfgh(M([{cache, [{time_to_live, infinity}]}]), + T(#{<<"cache">> => #{<<"time_to_live">> => <<"infinity">>}})), + ?cfgh(M([{cache, [{number_of_segments, 10}]}]), + T(#{<<"cache">> => #{<<"number_of_segments">> => 10}})), + ?cfgh(M([{cache, [{strategy, fifo}]}]), + T(#{<<"cache">> => #{<<"strategy">> => <<"fifo">>}})), + ?errh(T(#{<<"cache_users">> => []})), ?errh(T(#{<<"cache">> => #{<<"module">> => <<"mod_wrong_cache">>}})), ?errh(T(#{<<"cache">> => #{<<"module">> => <<"mod_cache_users">>, <<"time_to_live">> => 8600}})), @@ -2110,6 +2099,20 @@ test_mod_mam_meta(T, M) -> ?errh(T(#{<<"cache">> => #{<<"number_of_segments">> => 0}})), ?errh(T(#{<<"cache">> => #{<<"number_of_segments">> => <<"infinity">>}})). +test_async_worker(T, M) -> + ?cfgh(M([{async_writer, [{flush_interval, 1500}]}]), + T(#{<<"async_writer">> => #{<<"flush_interval">> => 1500}})), + ?cfgh(M([{async_writer, [{batch_size, 1500}]}]), + T(#{<<"async_writer">> => #{<<"batch_size">> => 1500}})), + ?cfgh(M([{async_writer, [{pool_size, 1500}]}]), + T(#{<<"async_writer">> => #{<<"pool_size">> => 1500}})), + ?cfgh(M([{async_writer, [{enabled, false}]}]), + T(#{<<"async_writer">> => #{<<"enabled">> => false}})), + ?errh(T(#{<<"async_writer">> => #{<<"flush_interval">> => -1}})), + ?errh(T(#{<<"async_writer">> => #{<<"batch_size">> => -1}})), + ?errh(T(#{<<"async_writer">> => #{<<"pool_size">> => -1}})), + ?errh(T(#{<<"async_writer">> => #{<<"enabled">> => <<"wrong">>}})). + mod_muc(_Config) -> T = fun(Opts) -> #{<<"modules">> => #{<<"mod_muc">> => Opts}} end, M = fun(Cfg) -> modopts(mod_muc, Cfg) end, diff --git a/test/config_parser_SUITE_data/modules.toml b/test/config_parser_SUITE_data/modules.toml index ca9224c5c68..4de0161d526 100644 --- a/test/config_parser_SUITE_data/modules.toml +++ b/test/config_parser_SUITE_data/modules.toml @@ -158,7 +158,7 @@ pm.full_text_search = false muc.host = "muc.example.com" muc.rdbms_message_format = "simple" - muc.async_writer = false + muc.async_writer.enabled = false muc.user_prefs_store = "mnesia" [modules.mod_muc] diff --git a/test/config_parser_helper.erl b/test/config_parser_helper.erl index f627854b427..56eb9997362 100644 --- a/test/config_parser_helper.erl +++ b/test/config_parser_helper.erl @@ -409,7 +409,7 @@ all_modules() -> {password_strength, 32}, {registration_watchers, [<<"JID1">>, <<"JID2">>]}, {welcome_message, {"Subject", "Body"}}]}, - {mod_mam_rdbms_async_pool_writer, [pm]}, + {mod_mam_rdbms_arch_async, [{pm, []}]}, {mod_adhoc, [{iqdisc, one_queue}, {report_commands_node, true}]}, {mod_event_pusher_sns, [{access_key_id, "AKIAIOSFODNN7EXAMPLE"}, @@ -462,6 +462,7 @@ all_modules() -> {mod_mam_mnesia_prefs, [muc]}, {mod_mam_muc, [{archive_chat_markers, true}, + {async_writer, [{enabled, false}]}, {full_text_search, true}, {host, {fqdn, <<"muc.example.com">>}}, {is_archivable_message, mod_mam_utils}]}, @@ -557,7 +558,7 @@ all_modules() -> {full_text_search, true}, {is_archivable_message, mod_mam_utils}, {muc, - [{async_writer, false}, + [{async_writer, [{enabled, false}]}, {host, {fqdn, <<"muc.example.com">>}}, {rdbms_message_format, simple}, {user_prefs_store, mnesia}]}, diff --git a/test/mod_mam_meta_SUITE.erl b/test/mod_mam_meta_SUITE.erl index 7f21ff6b892..d58bc94d839 100644 --- a/test/mod_mam_meta_SUITE.erl +++ b/test/mod_mam_meta_SUITE.erl @@ -50,13 +50,13 @@ handles_only_muc(_Config) -> disables_sync_writer_on_async_writer(_Config) -> - Deps = deps([{pm, [async_writer]}]), + Deps = deps([{pm, [{async_writer, []}]}]), {_, Args, _} = lists:keyfind(mod_mam_rdbms_arch, 1, Deps), ?assert(lists:member(no_writer, Args)). disables_sync_muc_writer_on_async_writer(_Config) -> - Deps = deps([{muc, [async_writer]}]), + Deps = deps([{muc, [{async_writer, []}]}]), {_, Args, _} = lists:keyfind(mod_mam_muc_rdbms_arch, 1, Deps), ?assert(lists:member(no_writer, Args)). @@ -66,7 +66,7 @@ produces_valid_configurations(_Config) -> {backend, rdbms}, cache_users, - {pm, [{user_prefs_store, rdbms}, archive_groupchats, {async_writer, false}]}, + {pm, [{user_prefs_store, rdbms}, archive_groupchats, {async_writer, [{enabled, false}]}]}, {muc, [ {host, <<"host">>}, {rdbms_message_format, simple}, @@ -84,12 +84,11 @@ produces_valid_configurations(_Config) -> check_has_args(mod_mam_cache_user, [pm, muc], Deps), check_has_args(mod_mam_mnesia_prefs, [muc], Deps), check_has_args(mod_mam_rdbms_prefs, [pm], Deps), - check_has_args(mod_mam_muc_rdbms_async_pool_writer, [], Deps), + check_has_args(mod_mam_rdbms_arch_async, [{muc, []}], Deps), check_has_no_args(mod_mam_rdbms_arch, [muc, no_writer | ExpandedSimpleOpts], Deps), check_has_no_args(mod_mam_mnesia_prefs, [pm], Deps), - check_has_no_args(mod_mam_rdbms_prefs, [muc], Deps), - ?assertNot(lists:keymember(mod_mam_rdbms_async_pool_writer, 1, Deps)). + check_has_no_args(mod_mam_rdbms_prefs, [muc], Deps). handles_riak_config(_Config) -> @@ -127,7 +126,7 @@ handles_cassandra_config(_Config) -> example_muc_only_no_pref_good_performance(_Config) -> Deps = deps([ cache_users, - async_writer, + {async_writer, []}, {muc, [{host, "muc.@HOST@"}]} ]), @@ -137,8 +136,8 @@ example_muc_only_no_pref_good_performance(_Config) -> %% 'muc' argument is ignored by the module {mod_mam_muc_rdbms_arch, [muc, no_writer]}, %% 'muc' argument is ignored by the module - {mod_mam_muc_rdbms_async_pool_writer, [muc]}, - {mod_mam_muc, [{host, "muc.@HOST@"}]} + {mod_mam_rdbms_arch_async, [{muc, []}]}, + {mod_mam_muc, [{async_writer, []}, {host, "muc.@HOST@"}]} ], Deps). @@ -146,7 +145,7 @@ example_pm_only_good_performance(_Config) -> Deps = deps([ {pm, []}, cache_users, - async_writer, + {async_writer, []}, {user_prefs_store, mnesia} ]), @@ -155,8 +154,8 @@ example_pm_only_good_performance(_Config) -> {mod_mam_cache_user, [pm]}, {mod_mam_mnesia_prefs, [pm]}, {mod_mam_rdbms_arch, [pm, no_writer]}, - {mod_mam_rdbms_async_pool_writer, [pm]}, - {mod_mam, []} + {mod_mam_rdbms_arch_async, [{pm, []}]}, + {mod_mam, [{async_writer, []}]} ], Deps). %% Helpers