Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker pool cleanups #3419

Merged
merged 6 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/modules/mod_mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ How often (in milliseconds) the buffered messages are flushed to a DB.
Max size of the batch insert query for an async writer.
If the buffer is full, messages are flushed to a database immediately and the flush timer is reset.

#### `modules.mod_mam_meta.pool_size`
* **Syntax:** non-negative integer
* **Default:** `32`
* **Example:** `modules.mod_mam_meta.pool_size = 30`

Number of workers in the pool.

### Common backend options

#### `modules.mod_mam_meta.user_prefs_store`
Expand Down
8 changes: 0 additions & 8 deletions include/mongoose_wpool.hrl

This file was deleted.

2 changes: 2 additions & 0 deletions src/mam/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ config_items() ->
validate = non_negative},
<<"max_batch_size">> => #option{type = integer,
validate = non_negative},
<<"pool_size">> => #option{type = integer,
validate = non_negative},

%% Low-level options
<<"default_result_limit">> => #option{type = integer,
Expand Down
8 changes: 0 additions & 8 deletions src/mam/mod_mam_muc_rdbms_async_pool_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@
worker_prefix() ->
"ejabberd_mod_mam_muc_writer".


%% @doc Ensure, that:
%% `worker_count(_) = Int * mod_mam_muc_rdbms_arch:partition_count()'
%%
%% For example,
%% `worker_count(_) = 32, partition_count() = 16'.
%% or
%% `worker_count(_) = 16, partition_count() = 16'.
worker_count(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE).

Expand Down
11 changes: 2 additions & 9 deletions src/mam/mod_mam_rdbms_async_pool_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@
worker_prefix() ->
"ejabberd_mod_mam_writer".

%% Ensure, that:
%% `worker_count(_) = Int * mod_mam_rdbms_arch:partition_count()'
%%
%% For example,
%% `worker_count(_) = 32, partition_count() = 16'.
%% or
%% `worker_count(_) = 16, partition_count() = 16'.
worker_count(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, pool_size, ?DEFAULT_POOL_SIZE).

Expand Down Expand Up @@ -246,7 +239,6 @@ do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size =
Error -> Error
end
end,
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
case InsertResult of
{updated, _Count} -> ok;
{error, Reason} ->
Expand All @@ -256,7 +248,8 @@ do_run_flush(MessageCount, State = #state{host_type = HostType, max_batch_size =
message_count => MessageCount, reason => Reason}),
ok
end,
mongoose_metrics:update(HostType, modMamFlushed, MessageCount),
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
mongoose_hooks:mam_flush_messages(HostType, MessageCount),
erlang:garbage_collect(),
State#state{acc=[], flush_interval_tref=undefined}.

Expand Down
10 changes: 8 additions & 2 deletions src/metrics/mongoose_metrics_mam_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
mam_remove_archive/4,
mam_lookup_messages/3,
mam_archive_message/3,
mam_flush_messages/3,
mam_muc_get_prefs/4,
mam_muc_set_prefs/7,
mam_muc_remove_archive/4,
mam_muc_lookup_messages/3,
mam_muc_archive_message/3,
mam_muc_flush_messages/3]).

-ignore_xref([mam_archive_message/3, mam_get_prefs/4, mam_lookup_messages/3,
-ignore_xref([mam_archive_message/3, mam_get_prefs/4, mam_lookup_messages/3, mam_flush_messages/3,
mam_muc_archive_message/3, mam_muc_flush_messages/3, mam_muc_get_prefs/4,
mam_muc_lookup_messages/3, mam_muc_remove_archive/4, mam_muc_set_prefs/7,
mam_remove_archive/4, mam_set_prefs/7]).
Expand All @@ -47,7 +48,8 @@ get_mam_hooks(Host) ->
{mam_get_prefs, Host, ?MODULE, mam_get_prefs, 50},
{mam_archive_message, Host, ?MODULE, mam_archive_message, 50},
{mam_remove_archive, Host, ?MODULE, mam_remove_archive, 50},
{mam_lookup_messages, Host, ?MODULE, mam_lookup_messages, 100}
{mam_lookup_messages, Host, ?MODULE, mam_lookup_messages, 100},
{mam_flush_messages, Host, ?MODULE, mam_flush_messages, 50}
].

%% @doc Here will be declared which hooks should be registered when mod_mam_muc is enabled.
Expand Down Expand Up @@ -106,6 +108,10 @@ mam_archive_message(Result, Host, _Params) ->
mongoose_metrics:update(Host, modMamArchived, 1),
Result.

mam_flush_messages(Acc, Host, MessageCount) ->
mongoose_metrics:update(Host, modMamFlushed, MessageCount),
Acc.

%% ----------------------------------------------------------------------------
%% mod_mam_muc

Expand Down
11 changes: 10 additions & 1 deletion src/mongoose_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
mam_get_prefs/4,
mam_remove_archive/3,
mam_lookup_messages/2,
mam_archive_message/2]).
mam_archive_message/2,
mam_flush_messages/2]).

-export([mam_muc_archive_id/2,
mam_muc_archive_size/3,
Expand Down Expand Up @@ -1008,6 +1009,14 @@ mam_lookup_messages(HookServer, Params) ->
mam_archive_message(HookServer, Params) ->
run_hook_for_host_type(mam_archive_message, HookServer, ok, [HookServer, Params]).

%%% @doc The `mam_flush_messages' hook is run after the async bulk write
%%% happens for messages despite the result of the write.
-spec mam_flush_messages(HookServer :: jid:lserver(),
MessageCount :: integer()) -> ok.
mam_flush_messages(HookServer, MessageCount) ->
run_hook_for_host_type(mam_flush_messages, HookServer, ok,
[HookServer, MessageCount]).


%% MAM MUC related hooks

Expand Down
15 changes: 10 additions & 5 deletions src/wpool/mongoose_wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
-module(mongoose_wpool).
-author("[email protected]").
-include("mongoose.hrl").
-include("mongoose_wpool.hrl").

-type call_timeout() :: pos_integer() | undefined.
-record(mongoose_wpool, {
name :: pool_name(),
atom_name :: wpool:name(),
strategy :: wpool:strategy() | undefined,
call_timeout :: call_timeout()
}).
Expand Down Expand Up @@ -113,7 +113,7 @@ ensure_started() ->
% we set heir here because the whole thing may be started by an ephemeral process
ets:new(?MODULE, [named_table, public,
{read_concurrency, true},
{keypos, 2},
{keypos, #mongoose_wpool.name},
{heir, whereis(mongoose_wpool_sup), undefined}]);
_ ->
ok
Expand Down Expand Up @@ -171,11 +171,14 @@ start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
{ok, Pid} ->
ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
atom_name = make_pool_name(PoolType, HostType, Tag),
strategy = Strategy,
call_timeout = CallTimeout}),
{ok, Pid};
{external, Pid} ->
ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}}),
ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
atom_name = make_pool_name(PoolType, HostType, Tag)
}),
{ok, Pid};
Error ->
Error
Expand Down Expand Up @@ -317,8 +320,10 @@ make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).

make_pool_name(#mongoose_wpool{name = {PoolType, HostType, Tag}}) ->
make_pool_name(PoolType, HostType, Tag).
make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
make_pool_name(PoolType, HostType, Tag);
make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
AtomName.

-spec call_start_callback(pool_type(), list()) -> term().
call_start_callback(PoolType, Args) ->
Expand Down
1 change: 0 additions & 1 deletion src/wpool/mongoose_wpool_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,3 @@ maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) ->
?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}),
{Other, NewMonitors}
end.

3 changes: 3 additions & 0 deletions test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,8 @@ test_mod_mam_meta(T, M) ->
T(#{<<"flush_interval">> => 1500})),
?cfgh(M([{max_batch_size, 50}]),
T(#{<<"max_batch_size">> => 50})),
?cfgh(M([{pool_size, 50}]),
T(#{<<"pool_size">> => 50})),
?cfgh(M([{user_prefs_store, rdbms}]),
T(#{<<"user_prefs_store">> => <<"rdbms">>})),
?cfgh(M([{full_text_search, false}]),
Expand Down Expand Up @@ -2073,6 +2075,7 @@ test_mod_mam_meta(T, M) ->
?errh(T(#{<<"async_writer">> => #{}})),
?errh(T(#{<<"flush_interval">> => -1})),
?errh(T(#{<<"max_batch_size">> => -1})),
?errh(T(#{<<"pool_size">> => -1})),
?errh(T(#{<<"user_prefs_store">> => <<"textfile">>})),
?errh(T(#{<<"full_text_search">> => <<"disabled">>})),
?errh(T(#{<<"default_result_limit">> => -1})),
Expand Down