From 2114824ab48d4c93835302636b87786d36ca47ec Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Thu, 25 Nov 2021 21:42:34 +0100 Subject: [PATCH 1/6] Remove unused header file --- include/mongoose_wpool.hrl | 8 -------- src/wpool/mongoose_wpool.erl | 1 - 2 files changed, 9 deletions(-) delete mode 100644 include/mongoose_wpool.hrl diff --git a/include/mongoose_wpool.hrl b/include/mongoose_wpool.hrl deleted file mode 100644 index f78a55c300..0000000000 --- a/include/mongoose_wpool.hrl +++ /dev/null @@ -1,8 +0,0 @@ --author("bartlomiej.gorny@erlang-solutions.com"). - -%% for worker_pool --record(mongoose_worker_pool, {name :: atom(), - selection_strategy :: atom(), - extra :: term(), - request_timeout :: pos_integer(), - pool_timeout :: pos_integer() }). diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index eafb413fcd..c180374c91 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -8,7 +8,6 @@ -module(mongoose_wpool). -author("bartlomiej.gorny@erlang-solutions.com"). -include("mongoose.hrl"). --include("mongoose_wpool.hrl"). -type call_timeout() :: pos_integer() | undefined. -record(mongoose_wpool, { From d8a4a944541d683accf0547dd77b4dbc7aa57cc8 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sat, 27 Nov 2021 19:19:20 +0100 Subject: [PATCH 2/6] Make keypos more mnemonic by using the record definition --- src/wpool/mongoose_wpool.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index c180374c91..53a0da4ca2 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -112,7 +112,7 @@ ensure_started() -> % we set heir here because the whole thing may be started by an ephemeral process ets:new(?MODULE, [named_table, public, {read_concurrency, true}, - {keypos, 2}, + {keypos, #mongoose_wpool.name}, {heir, whereis(mongoose_wpool_sup), undefined}]); _ -> ok From cbdb5aa0789ae3878a49fb1ebec243ad3d96c47c Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sun, 28 Nov 2021 00:00:25 +0100 Subject: [PATCH 3/6] Use stored name instead of regenerating it on every use I suspect this might also have somehow better concurrency characteristics, as generating the name generates atoms, and therefore needs to grab locks on the atom table, which is not optimised for writes, even if the write is idempotent as the atom already exists, but this one is mostly guessing. Sequential performance is surely much better. --- src/wpool/mongoose_wpool.erl | 12 +++++++++--- src/wpool/mongoose_wpool_mgr.erl | 1 - 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index 53a0da4ca2..532eea2a9a 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -12,6 +12,7 @@ -type call_timeout() :: pos_integer() | undefined. -record(mongoose_wpool, { name :: pool_name(), + atom_name :: wpool:name(), strategy :: wpool:strategy() | undefined, call_timeout :: call_timeout() }). @@ -170,11 +171,14 @@ start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) -> case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of {ok, Pid} -> ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}, + atom_name = make_pool_name(PoolType, HostType, Tag), strategy = Strategy, call_timeout = CallTimeout}), {ok, Pid}; {external, Pid} -> - ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}}), + ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}, + atom_name = make_pool_name(PoolType, HostType, Tag) + }), {ok, Pid}; Error -> Error @@ -316,8 +320,10 @@ make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) -> binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$, HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8). -make_pool_name(#mongoose_wpool{name = {PoolType, HostType, Tag}}) -> - make_pool_name(PoolType, HostType, Tag). +make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) -> + make_pool_name(PoolType, HostType, Tag); +make_pool_name(#mongoose_wpool{atom_name = AtomName}) -> + AtomName. -spec call_start_callback(pool_type(), list()) -> term(). call_start_callback(PoolType, Args) -> diff --git a/src/wpool/mongoose_wpool_mgr.erl b/src/wpool/mongoose_wpool_mgr.erl index 6ee4072676..34d6405b67 100644 --- a/src/wpool/mongoose_wpool_mgr.erl +++ b/src/wpool/mongoose_wpool_mgr.erl @@ -225,4 +225,3 @@ maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) -> ?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}), {Other, NewMonitors} end. - From 970a17a0897477c112342585cd35334938d1b0b8 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sun, 28 Nov 2021 20:14:56 +0100 Subject: [PATCH 4/6] Restore abandoned option with mam pool_size --- doc/modules/mod_mam.md | 7 +++++++ src/mam/mod_mam_meta.erl | 2 ++ test/config_parser_SUITE.erl | 3 +++ 3 files changed, 12 insertions(+) diff --git a/doc/modules/mod_mam.md b/doc/modules/mod_mam.md index 908e99ca23..0b80115454 100644 --- a/doc/modules/mod_mam.md +++ b/doc/modules/mod_mam.md @@ -259,6 +259,13 @@ How often (in milliseconds) the buffered messages are flushed to a DB. Max size of the batch insert query for an async writer. If the buffer is full, messages are flushed to a database immediately and the flush timer is reset. +#### `modules.mod_mam_meta.pool_size` +* **Syntax:** non-negative integer +* **Default:** `32` +* **Example:** `modules.mod_mam_meta.pool_size = 30` + +Number of workers in the pool. + ### Common backend options #### `modules.mod_mam_meta.user_prefs_store` diff --git a/src/mam/mod_mam_meta.erl b/src/mam/mod_mam_meta.erl index 8e25102753..b3d9e96004 100644 --- a/src/mam/mod_mam_meta.erl +++ b/src/mam/mod_mam_meta.erl @@ -79,6 +79,8 @@ config_items() -> validate = non_negative}, <<"max_batch_size">> => #option{type = integer, validate = non_negative}, + <<"pool_size">> => #option{type = integer, + validate = non_negative}, %% Low-level options <<"default_result_limit">> => #option{type = integer, diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 30f1abf0f3..2bde5215a7 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -2033,6 +2033,8 @@ test_mod_mam_meta(T, M) -> T(#{<<"flush_interval">> => 1500})), ?cfgh(M([{max_batch_size, 50}]), T(#{<<"max_batch_size">> => 50})), + ?cfgh(M([{pool_size, 50}]), + T(#{<<"pool_size">> => 50})), ?cfgh(M([{user_prefs_store, rdbms}]), T(#{<<"user_prefs_store">> => <<"rdbms">>})), ?cfgh(M([{full_text_search, false}]), @@ -2073,6 +2075,7 @@ test_mod_mam_meta(T, M) -> ?errh(T(#{<<"async_writer">> => #{}})), ?errh(T(#{<<"flush_interval">> => -1})), ?errh(T(#{<<"max_batch_size">> => -1})), + ?errh(T(#{<<"pool_size">> => -1})), ?errh(T(#{<<"user_prefs_store">> => <<"textfile">>})), ?errh(T(#{<<"full_text_search">> => <<"disabled">>})), ?errh(T(#{<<"default_result_limit">> => -1})), From ec9d240fa38810f686ddd1611600f0aa7e228d35 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sun, 28 Nov 2021 20:17:48 +0100 Subject: [PATCH 5/6] Remove long-irrelevant comment This is invalid since f3ada672c and 7df0bf83 --- src/mam/mod_mam_muc_rdbms_async_pool_writer.erl | 8 -------- src/mam/mod_mam_rdbms_async_pool_writer.erl | 7 ------- 2 files changed, 15 deletions(-) diff --git a/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl b/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl index cc46cbfa65..d85aa0b318 100644 --- a/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl +++ b/src/mam/mod_mam_muc_rdbms_async_pool_writer.erl @@ -45,14 +45,6 @@ worker_prefix() -> "ejabberd_mod_mam_muc_writer". - -%% @doc Ensure, that: -%% `worker_count(_) = Int * mod_mam_muc_rdbms_arch:partition_count()' -%% -%% For example, -%% `worker_count(_) = 32, partition_count() = 16'. -%% or -%% `worker_count(_) = 16, partition_count() = 16'. worker_count(HostType) -> gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE). diff --git a/src/mam/mod_mam_rdbms_async_pool_writer.erl b/src/mam/mod_mam_rdbms_async_pool_writer.erl index 7a2fcee3ed..464d03f0b0 100644 --- a/src/mam/mod_mam_rdbms_async_pool_writer.erl +++ b/src/mam/mod_mam_rdbms_async_pool_writer.erl @@ -59,13 +59,6 @@ worker_prefix() -> "ejabberd_mod_mam_writer". -%% Ensure, that: -%% `worker_count(_) = Int * mod_mam_rdbms_arch:partition_count()' -%% -%% For example, -%% `worker_count(_) = 32, partition_count() = 16'. -%% or -%% `worker_count(_) = 16, partition_count() = 16'. worker_count(HostType) -> gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE). From eee8837c2ec5f1844b5f71b913687deff37d7943 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sun, 28 Nov 2021 20:23:02 +0100 Subject: [PATCH 6/6] Bring a bit of symmetry between both async mam workers --- src/mam/mod_mam_rdbms_async_pool_writer.erl | 4 ++-- src/metrics/mongoose_metrics_mam_hooks.erl | 10 ++++++++-- src/mongoose_hooks.erl | 11 ++++++++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/mam/mod_mam_rdbms_async_pool_writer.erl b/src/mam/mod_mam_rdbms_async_pool_writer.erl index 464d03f0b0..2c61353ca0 100644 --- a/src/mam/mod_mam_rdbms_async_pool_writer.erl +++ b/src/mam/mod_mam_rdbms_async_pool_writer.erl @@ -239,7 +239,6 @@ do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size = Error -> Error end end, - [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], case InsertResult of {updated, _Count} -> ok; {error, Reason} -> @@ -249,7 +248,8 @@ do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size = message_count => MessageCount, reason => Reason}), ok end, - mongoose_metrics:update(HostType, modMamFlushed, MessageCount), + [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], + mongoose_hooks:mam_flush_messages(HostType, MessageCount), erlang:garbage_collect(), State#state{acc=[], flush_interval_tref=undefined}. diff --git a/src/metrics/mongoose_metrics_mam_hooks.erl b/src/metrics/mongoose_metrics_mam_hooks.erl index 6d8e357431..0a389f4da6 100644 --- a/src/metrics/mongoose_metrics_mam_hooks.erl +++ b/src/metrics/mongoose_metrics_mam_hooks.erl @@ -21,6 +21,7 @@ mam_remove_archive/4, mam_lookup_messages/3, mam_archive_message/3, + mam_flush_messages/3, mam_muc_get_prefs/4, mam_muc_set_prefs/7, mam_muc_remove_archive/4, @@ -28,7 +29,7 @@ mam_muc_archive_message/3, mam_muc_flush_messages/3]). --ignore_xref([mam_archive_message/3, mam_get_prefs/4, mam_lookup_messages/3, +-ignore_xref([mam_archive_message/3, mam_get_prefs/4, mam_lookup_messages/3, mam_flush_messages/3, mam_muc_archive_message/3, mam_muc_flush_messages/3, mam_muc_get_prefs/4, mam_muc_lookup_messages/3, mam_muc_remove_archive/4, mam_muc_set_prefs/7, mam_remove_archive/4, mam_set_prefs/7]). @@ -47,7 +48,8 @@ get_mam_hooks(Host) -> {mam_get_prefs, Host, ?MODULE, mam_get_prefs, 50}, {mam_archive_message, Host, ?MODULE, mam_archive_message, 50}, {mam_remove_archive, Host, ?MODULE, mam_remove_archive, 50}, - {mam_lookup_messages, Host, ?MODULE, mam_lookup_messages, 100} + {mam_lookup_messages, Host, ?MODULE, mam_lookup_messages, 100}, + {mam_flush_messages, Host, ?MODULE, mam_flush_messages, 50} ]. %% @doc Here will be declared which hooks should be registered when mod_mam_muc is enabled. @@ -106,6 +108,10 @@ mam_archive_message(Result, Host, _Params) -> mongoose_metrics:update(Host, modMamArchived, 1), Result. +mam_flush_messages(Acc, Host, MessageCount) -> + mongoose_metrics:update(Host, modMamFlushed, MessageCount), + Acc. + %% ---------------------------------------------------------------------------- %% mod_mam_muc diff --git a/src/mongoose_hooks.erl b/src/mongoose_hooks.erl index 54bfab0acb..7bdd2440e8 100644 --- a/src/mongoose_hooks.erl +++ b/src/mongoose_hooks.erl @@ -94,7 +94,8 @@ mam_get_prefs/4, mam_remove_archive/3, mam_lookup_messages/2, - mam_archive_message/2]). + mam_archive_message/2, + mam_flush_messages/2]). -export([mam_muc_archive_id/2, mam_muc_archive_size/3, @@ -1008,6 +1009,14 @@ mam_lookup_messages(HookServer, Params) -> mam_archive_message(HookServer, Params) -> run_hook_for_host_type(mam_archive_message, HookServer, ok, [HookServer, Params]). +%%% @doc The `mam_flush_messages' hook is run after the async bulk write +%%% happens for messages despite the result of the write. +-spec mam_flush_messages(HookServer :: jid:lserver(), + MessageCount :: integer()) -> ok. +mam_flush_messages(HookServer, MessageCount) -> + run_hook_for_host_type(mam_flush_messages, HookServer, ok, + [HookServer, MessageCount]). + %% MAM MUC related hooks