Skip to content

Commit

Permalink
Merge pull request #3433 from esl/async_pools
Browse files Browse the repository at this point in the history
Async pools
  • Loading branch information
arcusfelis committed Dec 17, 2021
2 parents 3078826 + 6479445 commit 1e94f3f
Show file tree
Hide file tree
Showing 16 changed files with 539 additions and 742 deletions.
21 changes: 12 additions & 9 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ init_modules(rdbms_simple, C, Config) when C =:= muc_all;
init_modules(rdbms_async_pool, C, Config) when C =:= muc_all;
C =:= muc_disabled_retraction ->
init_module(host_type(), mod_mam_muc_rdbms_arch, [no_writer]),
init_module(host_type(), mod_mam_muc_rdbms_async_pool_writer, [{flush_interval, 1}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_arch_async, [{muc, [{flush_interval, 1}]}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_prefs, [muc]),
init_module(host_type(), mod_mam_rdbms_user, [muc, pm]),
init_module(host_type(), mod_mam_muc, [{host, subhost_pattern(muc_domain(Config))}] ++
Expand All @@ -719,7 +719,7 @@ init_modules(rdbms_cache, C, Config) when C =:= muc_all;
init_modules(rdbms_async_cache, C, Config) when C =:= muc_all;
C =:= muc_disabled_retraction ->
init_module(host_type(), mod_mam_muc_rdbms_arch, [no_writer]),
init_module(host_type(), mod_mam_muc_rdbms_async_pool_writer, [{flush_interval, 1}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_arch_async, [{muc, [{flush_interval, 1}]}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_prefs, [muc]),
init_module(host_type(), mod_mam_rdbms_user, [muc, pm]),
init_module(host_type(), mod_mam_cache_user, [muc]),
Expand Down Expand Up @@ -787,7 +787,7 @@ init_modules(elasticsearch, C, Config) ->
init_modules(rdbms_async_pool, C, Config) ->
init_module(host_type(), mod_mam, addin_mam_options(C, Config)),
init_module(host_type(), mod_mam_rdbms_arch, [no_writer]),
init_module(host_type(), mod_mam_rdbms_async_pool_writer, [pm, {flush_interval, 1}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_arch_async, [{pm, [{flush_interval, 1}]}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_prefs, [pm]),
init_module(host_type(), mod_mam_rdbms_user, [pm]),
Config;
Expand All @@ -807,7 +807,7 @@ init_modules(rdbms_cache, C, Config) ->
init_modules(rdbms_async_cache, C, Config) ->
init_module(host_type(), mod_mam, addin_mam_options(C, Config)),
init_module(host_type(), mod_mam_rdbms_arch, [no_writer]),
init_module(host_type(), mod_mam_rdbms_async_pool_writer, [pm, {flush_interval, 1}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_arch_async, [{pm, [{flush_interval, 1}]}]), %% 1ms
init_module(host_type(), mod_mam_rdbms_prefs, [pm]),
init_module(host_type(), mod_mam_rdbms_user, [pm]),
init_module(host_type(), mod_mam_cache_user, [pm]),
Expand Down Expand Up @@ -862,8 +862,7 @@ mam_modules() ->
mod_mam_muc_elasticsearch_arch,
mod_mam_rdbms_arch,
mod_mam_muc_rdbms_arch,
mod_mam_rdbms_async_pool_writer,
mod_mam_muc_rdbms_async_pool_writer,
mod_mam_rdbms_arch_async,
mod_mam_rdbms_prefs,
mod_mam_mnesia_prefs,
mod_mam_rdbms_user,
Expand Down Expand Up @@ -1162,10 +1161,14 @@ required_modules(retract_message_on_stanza_id, _Config) ->
required_modules(_, _) ->
[].

init_module(Host, mod_mam_rdbms_arch_async, Args) ->
OldOpts = case stop_module(Host, mod_mam_rdbms_arch_async) of
{ok, O} -> O;
ok -> []
end,
{ok, _} = start_module(Host, mod_mam_rdbms_arch_async, lists:ukeymerge(1, OldOpts, Args));
init_module(Host, Mod, Args) ->
lists:member(Mod, mam_modules())
orelse
ct:fail("Unknown module ~p", [Mod]),
lists:member(Mod, mam_modules()) orelse ct:fail("Unknown module ~p", [Mod]),
stop_module(Host, Mod),
{ok, _} = start_module(Host, Mod, Args).

Expand Down
18 changes: 18 additions & 0 deletions doc/migrations/5.0.0_5.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,21 @@ Each authentication method needs a TOML section, e.g. if you have the `rdbms` me
### Section `host_config`

The rules for overriding global options in the `host_config` section have been simplified. The `auth` section and the `s2s.address`, `s2s.domain_certfile` and `s2s.shared` options now completely override the corresponding general settings instead of being merged with them.

## Async workers

The `async_writer` flag of MAM is now a section on its own, that absorbs previous flags related to it: `flush_interval`, `max_batch_size` and `pool_size` now become subelements of the `async_writer` section, with one more parameter, `enabled`. Below an example:

```toml
[modules.mod_mam_meta]
flush_interval = 1000
max_batch_size = 100
muc.async_writer = false
```
now becomes
```toml
[modules.mod_mam_meta]
async_writer.flush_interval = 1000
async_writer.batch_size = 100
muc.async_writer.enabled = false
```
32 changes: 16 additions & 16 deletions doc/modules/mod_mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ Please note that you can override all common options in similar way.
```toml
[modules.mod_mam_meta]
backend = "rdbms"
async_writer = true # this option enables async writer for RDBMS backend
muc.async_writer = false # disable async writer for MUC archive only
async_writer.enabled = true # this option enables async writer for RDBMS backend
muc.async_writer.enabled = false # disable async writer for MUC archive only
```

### RDBMS backend options
Expand Down Expand Up @@ -236,35 +236,35 @@ Configures which cache to use, either start an internal instance, or reuse the c
When set to `simple`, stores messages in XML and full JIDs.
When set to `internal`, stores messages and JIDs in internal format.

#### `modules.mod_mam_meta.async_writer`
#### `modules.mod_mam_meta.async_writer.enabled`
* **Syntax:** boolean
* **Default:** `true`
* **Example:** `modules.mod_mam_meta.async_writer = false`
* **Example:** `modules.mod_mam_meta.async_writer.enabled = false`

Enables an asynchronous writer that is faster than the synchronous one but harder to debug.
The async writers store batches of messages with a certain delay (see **flush_interval**), so the results of the lookup operations executed right after message routing may be incomplete until the configured time passes.
The async writers store batches of messages that will be flush on a timeout (see **flush_interval**) or when the batch reaches a size (see **batch_size**), so the results of the lookup operations executed right after message routing may be incomplete until the configured time passes or the queue is full.

#### `modules.mod_mam_meta.flush_interval`
#### `modules.mod_mam_meta.async_writer.flush_interval`
* **Syntax:** non-negative integer
* **Default:** `2000`
* **Example:** `modules.mod_mam_meta.flush_interval = 2000`
* **Example:** `modules.mod_mam_meta.async_writer.flush_interval = 2000`

How often (in milliseconds) the buffered messages are flushed to a DB.
How often (in milliseconds) the buffered messages are flushed to DB.

#### `modules.mod_mam_meta.max_batch_size`
#### `modules.mod_mam_meta.async_writer.batch_size`
* **Syntax:** non-negative integer
* **Default:** `30`
* **Example:** `modules.mod_mam_meta.max_batch_size = 30`
* **Example:** `modules.mod_mam_meta.async_writer.batch_size = 30`

Max size of the batch insert query for an async writer.
Max size of the batch for an async writer before the queue is considered full and flushed.
If the buffer is full, messages are flushed to a database immediately and the flush timer is reset.

#### `modules.mod_mam_meta.pool_size`
#### `modules.mod_mam_meta.async_writer.pool_size`
* **Syntax:** non-negative integer
* **Default:** `32`
* **Example:** `modules.mod_mam_meta.pool_size = 30`
* **Default:** `4 * erlang:system_info(schedulers_online)`
* **Example:** `modules.mod_mam_meta.async_writer.pool_size = 32`

Number of workers in the pool.
Number of workers in the pool. More than the number of available schedulers is recommended, to minimise lock contention on the message queues, and more than the number of DB workers, to fully utilise the DB capacity. How much more than these two parameters is then a good fine-tuning for specific deployments.

### Common backend options

Expand Down Expand Up @@ -421,7 +421,7 @@ This module can be used to add extra lookup parameters to MAM lookup queries.

muc.host = "muc.example.com"
muc.rdbms_message_format = "simple"
muc.async_writer = false
muc.async_writer.enabled = false
muc.user_prefs_store = "mnesia"

```
Expand Down
5 changes: 0 additions & 5 deletions src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ init([]) ->
brutal_kill,
worker,
[mod_muc_iq]},
MAM =
{mod_mam_sup,
{mod_mam_sup, start_link, []},
permanent, infinity, supervisor, [mod_mam_sup]},
ShaperSup =
{ejabberd_shaper_sup,
{ejabberd_shaper_sup, start_link, []},
Expand All @@ -174,7 +170,6 @@ init([]) ->
IQSupervisor,
Listener,
MucIQ,
MAM,
ShaperSup]}}.

start_child(ChildSpec) ->
Expand Down
46 changes: 20 additions & 26 deletions src/mam/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,13 @@ config_items() ->
<<"cache">> => mongoose_user_cache:config_spec(),
<<"rdbms_message_format">> => #option{type = atom,
validate = {enum, [simple, internal]}},
<<"async_writer">> => #option{type = boolean},
<<"flush_interval">> => #option{type = integer,
validate = non_negative},
<<"max_batch_size">> => #option{type = integer,
validate = non_negative},
<<"pool_size">> => #option{type = integer,
validate = non_negative},
<<"async_writer">> => mod_mam_rdbms_arch_async:config_spec(),

%% Low-level options
<<"default_result_limit">> => #option{type = integer,
validate = non_negative},
<<"max_result_limit">> => #option{type = integer,
validate = non_negative},
<<"async_writer_rdbms_pool">> => #option{type = atom,
validate = pool_name},
<<"db_jid_format">> => #option{type = atom,
validate = module},
<<"db_message_format">> => #option{type = atom,
Expand Down Expand Up @@ -175,7 +167,8 @@ valid_core_mod_opts(mod_mam_muc) ->
[host] ++ common_opts().

common_opts() ->
[is_archivable_message,
[async_writer,
is_archivable_message,
send_message,
archive_chat_markers,
extra_fin_element,
Expand Down Expand Up @@ -213,13 +206,13 @@ parse_backend_opts(riak, Type, Opts, Deps0) ->
end;

parse_backend_opts(rdbms, Type, Opts0, Deps0) ->
Opts1 = add_default_rdbms_opts(Opts0),
Opts1 = add_rdbms_async_opts(Opts0),
Opts = add_rdbms_cache_opts(Opts1),

{ModRDBMSArch, ModAsyncWriter} =
case Type of
pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_async_pool_writer};
muc -> {mod_mam_muc_rdbms_arch, mod_mam_muc_rdbms_async_pool_writer}
pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_arch_async};
muc -> {mod_mam_muc_rdbms_arch, mod_mam_rdbms_arch_async}
end,

Deps1 = add_dep(ModRDBMSArch, [Type], Deps0),
Expand Down Expand Up @@ -265,15 +258,18 @@ add_dep(Dep, Args, Deps) ->
maps:put(Dep, NewArgs, Deps).


-spec add_default_rdbms_opts(Opts :: proplists:proplist()) -> proplists:proplist().
add_default_rdbms_opts(Opts) ->
lists:foldl(
fun({Key, _} = DefaultOpt, Acc) ->
case proplists:lookup(Key, Opts) of
none -> [DefaultOpt | Acc];
_ -> Acc
end
end, Opts, [{async_writer, true}]).
-spec add_rdbms_async_opts(proplists:proplist()) -> proplists:proplist().
add_rdbms_async_opts(Opts) ->
case lists:keyfind(async_writer, 1, Opts) of
{async_writer, AsyncOpts} ->
case lists:keyfind(enabled, 1, AsyncOpts) of
{enabled, false} -> lists:keydelete(async_writer, 1, Opts);
_ -> Opts
end;
false ->
[{async_writer, []} | Opts]
end.


add_rdbms_cache_opts(Opts) ->
case {lists:keyfind(cache_users, 1, Opts), lists:keyfind(cache, 1, Opts)} of
Expand Down Expand Up @@ -303,11 +299,9 @@ parse_rdbms_opt(Type, ModRDBMSArch, ModAsyncWriter, Option, Deps) ->
add_dep(mod_mam_mnesia_prefs, [Type], Deps);
{rdbms_message_format, simple} ->
add_dep(ModRDBMSArch, rdbms_simple_opts(), Deps);
{async_writer, true} ->
{async_writer, Opts} ->
DepsWithNoWriter = add_dep(ModRDBMSArch, [no_writer], Deps),
add_dep(ModAsyncWriter, [Type], DepsWithNoWriter);
{async_writer_rdbms_pool, PoolName} ->
add_dep(ModAsyncWriter, [{rdbms_pool, PoolName}], Deps);
add_dep(ModAsyncWriter, [{Type, Opts}], DepsWithNoWriter);
_ -> Deps
end.

Expand Down
1 change: 1 addition & 0 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -666,4 +666,5 @@ spirals() ->
modMucMamForwarded,
modMucMamArchived,
modMucMamFlushed,
modMucMamDropped,
modMucMamDroppedIQ].
Loading

0 comments on commit 1e94f3f

Please sign in to comment.