Skip to content

Commit

Permalink
Merge pull request #4231 from esl/pools/rdbms_pool_tags
Browse files Browse the repository at this point in the history
Introduce pool names to mongoose_rdbms
  • Loading branch information
chrzaszcz authored Feb 28, 2024
2 parents 0478cfb + 03a4175 commit 786f3a2
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 169 deletions.
134 changes: 108 additions & 26 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-module(rdbms_SUITE).
-compile([export_all, nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

%% We need assert from it
Expand All @@ -31,10 +32,16 @@
%%--------------------------------------------------------------------

all() ->
[{group, rdbms_queries}].
[
{group, global_rdbms_queries},
{group, tagged_rdbms_queries}
].

groups() ->
[{rdbms_queries, [], rdbms_queries_cases()}].
[
{global_rdbms_queries, [], rdbms_queries_cases()},
{tagged_rdbms_queries, [], rdbms_queries_cases()}
].

rdbms_queries_cases() ->
[select_one_works_case,
Expand Down Expand Up @@ -99,6 +106,18 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(tagged_rdbms_queries, Config) ->
ExtraConfig = stop_global_default_pool(),
start_local_host_type_pool(ExtraConfig),
ExtraConfig ++ Config;
init_per_group(global_rdbms_queries, Config) ->
[{tag, global} | Config].

end_per_group(tagged_rdbms_queries, Config) ->
restart_global_default_pool(Config);
end_per_group(global_rdbms_queries, Config) ->
Config.

init_per_testcase(test_incremental_upsert, Config) ->
erase_inbox(Config),
escalus:init_per_testcase(test_incremental_upsert, Config);
Expand Down Expand Up @@ -325,7 +344,13 @@ read_prep_boolean_case(Config) ->

select_current_timestamp_case(Config) ->
ok = rpc(mim(), mongoose_rdbms_timestamp, prepare, []),
assert_is_integer(rpc(mim(), mongoose_rdbms_timestamp, select, [])).
Res = case ?config(tag, Config) of
global ->
rpc(mim(), mongoose_rdbms_timestamp, select, []);
Tag ->
rpc(mim(), mongoose_rdbms_timestamp, select, [host_type(), Tag])
end,
assert_is_integer(Res).

assert_is_integer(X) when is_integer(X) ->
X.
Expand Down Expand Up @@ -522,7 +547,8 @@ test_failed_wrapper_transaction(Config) ->
end,

% when
F = fun() -> sql_execute_wrapped_request(Config, insert_one, [<<"check1">>], WrapperFun) end,
ScopeAndTag = scope_and_tag(Config),
F = fun() -> sql_execute_wrapped_request(ScopeAndTag, insert_one, [<<"check1">>], WrapperFun) end,
sql_transaction(Config, F),

% then
Expand Down Expand Up @@ -590,49 +616,71 @@ select_like_prep_case(Config) ->
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
tag() ->
extra_tag.

scope_and_tag(Config) ->
case ?config(tag, Config) of
global -> [host_type()];
Tag -> [host_type(), Tag]
end.

sql_query(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_query, [host_type(), Query]).
sql_query(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_query, ScopeAndTag ++ [Query]).

sql_prepare(_Config, Name, Table, Fields, Query) ->
escalus_ejabberd:rpc(mongoose_rdbms, prepare, [Name, Table, Fields, Query]).

sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) ->
escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]).

sql_execute(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]).
sql_execute(Config, Name, Parameters) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, execute, ScopeAndTag ++ [Name, Parameters]).

sql_execute_cast(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute_cast, [host_type(), Name, Parameters]).
sql_execute_cast(Config, Name, Parameters) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, execute_cast, ScopeAndTag ++ [Name, Parameters]).

sql_query_cast(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_query_cast, [host_type(), Query]).
sql_query_cast(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_query_cast, ScopeAndTag ++ [Query]).

sql_execute_request(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute_request, [host_type(), Name, Parameters]).
sql_execute_request(Config, Name, Parameters) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, execute_request, ScopeAndTag ++ [Name, Parameters]).

sql_execute_wrapped_request(_Config, Name, Parameters, WrapperFun) ->
slow_rpc(mongoose_rdbms, execute_wrapped_request, [host_type(), Name, Parameters, WrapperFun]).
sql_execute_wrapped_request(ScopeAndTag, Name, Parameters, WrapperFun) ->
slow_rpc(mongoose_rdbms, execute_wrapped_request, ScopeAndTag ++ [Name, Parameters, WrapperFun]).

sql_execute_wrapped_request_and_wait_response(_Config, Name, Parameters, WrapperFun) ->
slow_rpc(?MODULE, execute_wrapped_request_and_wait_response, [host_type(), Name, Parameters, WrapperFun]).
sql_execute_wrapped_request_and_wait_response(Config, Name, Parameters, WrapperFun) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(?MODULE, execute_wrapped_request_and_wait_response, ScopeAndTag ++ [Name, Parameters, WrapperFun]).

execute_wrapped_request_and_wait_response(HostType, Tag, Name, Parameters, WrapperFun) ->
RequestId = mongoose_rdbms:execute_wrapped_request(HostType, Tag, Name, Parameters, WrapperFun),
gen_server:wait_response(RequestId, 100).

execute_wrapped_request_and_wait_response(HostType, Name, Parameters, WrapperFun) ->
RequestId = mongoose_rdbms:execute_wrapped_request(HostType, Name, Parameters, WrapperFun),
gen_server:wait_response(RequestId, 100).

sql_execute_upsert(_Config, Name, Insert, Update, Unique) ->
slow_rpc(rdbms_queries, execute_upsert, [host_type(), Name, Insert, Update, Unique]).
sql_execute_upsert(Config, Name, Insert, Update, Unique) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]).

sql_query_request(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_query_request, [host_type(), Query]).
sql_query_request(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_query_request, ScopeAndTag ++ [Query]).

sql_transaction_request(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_transaction_request, [host_type(), Query]).
sql_transaction_request(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_transaction_request, ScopeAndTag ++ [Query]).

sql_transaction(_Config, F) ->
slow_rpc(mongoose_rdbms, sql_transaction, [host_type(), F]).
sql_transaction(Config, F) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_transaction, ScopeAndTag ++ [F]).

escape_null(_Config) ->
escalus_ejabberd:rpc(mongoose_rdbms, escape_null, []).
Expand Down Expand Up @@ -1118,6 +1166,40 @@ is_pgsql() ->
is_mysql() ->
db_engine() == mysql.

stop_global_default_pool() ->
Pools = rpc(mim(), mongoose_config, get_opt, [outgoing_pools]),
[GlobalRdbmsPool] = [Pool || Pool = #{type := rdbms, scope := global, tag := default} <- Pools],
ok = rpc(mim(), mongoose_wpool, stop, [rdbms, global, default]),
Extra = maybe_stop_service_domain_db(),
[{tag, tag()}, {global_default_rdbms_pool, GlobalRdbmsPool} | Extra].

restart_global_default_pool(Config) ->
GlobalRdbmsPool = ?config(global_default_rdbms_pool, Config),
rpc(mim(), mongoose_wpool, start_configured_pools, [[GlobalRdbmsPool]]),
maybe_restart_service_domain_db(Config).

maybe_stop_service_domain_db() ->
case rpc(mim(), erlang, whereis, [service_domain_db]) of
undefined ->
[];
ServiceDomainDB when is_pid(ServiceDomainDB) ->
ok = rpc(mim(), sys, suspend, [ServiceDomainDB]),
[{service_domain_db, ServiceDomainDB}]
end.

maybe_restart_service_domain_db(Config) ->
case ?config(service_domain_db, Config) of
undefined ->
ok;
ServiceDomainDB ->
ok = rpc(mim(), sys, resume, [ServiceDomainDB])
end.

start_local_host_type_pool(Config) ->
GlobalRdbmsPool = ?config(global_default_rdbms_pool, Config),
LocalHostTypePool = GlobalRdbmsPool#{scope := host_type(), tag := tag()},
rpc(mim(), mongoose_wpool, start_configured_pools, [[LocalHostTypePool], [host_type()]]).

escape_column(Name) ->
case is_mysql() of
true ->
Expand Down
4 changes: 4 additions & 0 deletions doc/configuration/outgoing-connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ This allows you to create multiple dedicated pools of the same type.
* `host` - the pool will be started for each XMPP host or host type served by MongooseIM
* `single_host` - the pool will be started for the selected host or host type only (you must provide the name).

!!! Note
A pool with scope `global` and tag `default` is used by services that are not configured by host_type, like `service_domain_db` or `service_mongoose_system_metrics`, or by modules that don't support dynamic domains, like `mod_pubsub`.
If a global default pool is not configured, these services will fail.

## Worker pool options

All pools are managed by the [inaka/worker_pool](https://github.com/inaka/worker_pool) library.
Expand Down
Loading

0 comments on commit 786f3a2

Please sign in to comment.