Skip to content

Commit

Permalink
Use maps for wpool opts and conn_opts
Browse files Browse the repository at this point in the history
  • Loading branch information
gustawlippa committed Feb 11, 2022
1 parent 0d48066 commit 28f84c0
Show file tree
Hide file tree
Showing 18 changed files with 330 additions and 345 deletions.
7 changes: 4 additions & 3 deletions big_tests/tests/mod_event_pusher_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ get_prefix(Config) ->
get_prefix(GroupName).

start_pool() ->
HTTPOpts = [{server, http_notifications_host()}],
PoolOpts = [{strategy, available_worker}, {workers, 5}],
HTTPOpts = #{server => http_notifications_host()},
PoolOpts = #{strategy => available_worker, workers => 5},
ejabberd_node_utils:call_fun(mongoose_wpool, start_configured_pools,
[[{http, global, http_pool, PoolOpts, HTTPOpts}]]).
[[#{type => http, scope => global, tag => http_pool,
opts => PoolOpts, conn_opts => HTTPOpts}]]).

stop_pool() ->
ejabberd_node_utils:call_fun(mongoose_wpool, stop, [http, global, http_pool]).
Expand Down
23 changes: 12 additions & 11 deletions big_tests/tests/mod_event_pusher_rabbit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@
]).
-define(MOD_EVENT_PUSHER_CFG, [{backends,
[{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]).
-define(WPOOL_CFG, {rabbit, host, event_pusher,
[{workers, 20}],
[%% enables publisher one-to-one confirms
%% disabled by default
%% {confirms_enabled, true},
{amqp_host, "localhost"},
{amqp_port, 5672},
{amqp_username, "guest"},
{amqp_password, "guest"}
]}).
-define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher,
opts => #{workers => 20},
conn_opts => #{%% enables publisher one-to-one confirms
%% disabled by default
%% confirms_enabled => true,
amqp_host => "localhost",
amqp_port => 5672,
amqp_username => "guest",
amqp_password => "guest"}
}).
-define(IF_EXCHANGE_EXISTS_RETRIES, 30).
-define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms

Expand Down Expand Up @@ -187,7 +187,8 @@ end_per_testcase(CaseName, Config) ->
rabbit_pool_starts_with_default_config(_Config) ->
%% GIVEN
Domain = domain(),
DefaultWpoolConfig = {rabbit, host, rabbit_event_pusher_default, [], []},
DefaultWpoolConfig = #{type => rabbit, scope => host, tag => rabbit_event_pusher_default,
opts => #{}, conn_opts => #{}},
RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default},
%% WHEN
start_rabbit_wpool(Domain, DefaultWpoolConfig),
Expand Down
8 changes: 4 additions & 4 deletions big_tests/tests/muc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ init_per_group(disco_rsm_with_offline, Config) ->

init_per_group(G, Config) when G =:= http_auth_no_server;
G =:= http_auth ->
PoolOpts = [{strategy, available_worker}, {workers, 5}],
HTTPOpts = [{server, "http://localhost:8080"},
{path_prefix, "/muc/auth/"}],
PoolOpts = #{strategy => available_worker, workers => 5},
HTTPOpts = #{server => "http://localhost:8080", path_prefix => "/muc/auth/"},
[{ok, _}] = ejabberd_node_utils:call_fun(mongoose_wpool, start_configured_pools,
[[{http, global, muc_http_auth_test, PoolOpts, HTTPOpts}]]),
[[#{type => http, scope => global, tag => muc_http_auth_test,
opts => PoolOpts, conn_opts => HTTPOpts}]]),
case G of
http_auth -> http_helper:start(8080, "/muc/auth/check_password", fun handle_http_auth/1);
_ -> ok
Expand Down
6 changes: 3 additions & 3 deletions big_tests/tests/push_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ check_default_format(From, To, Body, Msg) ->
ok.

start_pool() ->
PoolOpts = [{strategy, random_worker}, {call_timeout, 5000}, {workers, 10}],
HTTPOpts = [{path_prefix, "/"}, {http_opts, []}, {server, http_notifications_host()}],
Pool = {http, host, http_pool, PoolOpts, HTTPOpts},
PoolOpts = #{strategy => random_worker, call_timeout => 5000, workers => 10},
HTTPOpts = #{path_prefix => "/", http_opts => [], server => http_notifications_host()},
Pool = #{type => http, scope => host, tag => http_pool, opts => PoolOpts, conn_opts => HTTPOpts},
ejabberd_node_utils:call_fun(mongoose_wpool, start_configured_pools,
[[Pool], [<<"localhost">>]]).

Expand Down
7 changes: 4 additions & 3 deletions big_tests/tests/push_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ init_per_suite(Config) ->
mongoose_push_mock:start(Config),
Port = mongoose_push_mock:port(),

PoolOpts = [{strategy, available_worker}, {workers, 20}],
HTTPOpts = [{server, "https://localhost:" ++ integer_to_list(Port)}],
PoolOpts = #{strategy => available_worker, workers => 20},
HTTPOpts = #{server => "https://localhost:" ++ integer_to_list(Port)},
rpc(?RPC_SPEC, mongoose_wpool, start_configured_pools,
[[{http, global, mongoose_push_http, PoolOpts, HTTPOpts}]]),
[[#{type => http, scope => global, tag => mongoose_push_http, opts => PoolOpts,
conn_opts => HTTPOpts}]]),
ConfigWithModules = dynamic_modules:save_modules(domain(), Config),
escalus:init_per_suite(ConfigWithModules).

Expand Down
86 changes: 45 additions & 41 deletions src/config/mongoose_config_spec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ outgoing_pool(Type) ->
<<"connection">> => outgoing_pool_connection(Type)
},
process = fun ?MODULE:process_pool/2,
format_items = map,
wrap = item
}.

Expand All @@ -564,15 +565,17 @@ outgoing_pool_connection(<<"cassandra">>) ->
<<"tls">> => #section{items = tls_items(),
wrap = {kv, ssl},
process = fun ?MODULE:process_tls_sni/1}
}
},
format_items = map
};
outgoing_pool_connection(<<"elastic">>) ->
#section{
items = #{<<"host">> => #option{type = string,
validate = non_empty},
<<"port">> => #option{type = integer,
validate = port}
}
},
format_items = map
};
outgoing_pool_connection(<<"http">>) ->
#section{
Expand All @@ -586,7 +589,8 @@ outgoing_pool_connection(<<"http">>) ->
<<"tls">> => #section{items = tls_items(),
wrap = {kv, http_opts},
process = fun ?MODULE:process_tls_sni/1}
}
},
format_items = map
};
outgoing_pool_connection(<<"ldap">>) ->
#section{
Expand All @@ -604,22 +608,24 @@ outgoing_pool_connection(<<"ldap">>) ->
<<"tls">> => #section{items = tls_items(),
wrap = {kv, tls_options},
process = fun ?MODULE:process_tls_sni/1}
}
},
format_items = map
};
outgoing_pool_connection(<<"rabbit">>) ->
#section{
items = #{<<"amqp_host">> => #option{type = string,
validate = non_empty},
<<"amqp_port">> => #option{type = integer,
validate = port},
<<"amqp_username">> => #option{type = string,
<<"amqp_username">> => #option{type = binary,
validate = non_empty},
<<"amqp_password">> => #option{type = string,
<<"amqp_password">> => #option{type = binary,
validate = non_empty},
<<"confirms_enabled">> => #option{type = boolean},
<<"max_worker_queue_len">> => #option{type = int_or_infinity,
validate = non_negative}
}
},
format_items = map
};
outgoing_pool_connection(<<"rdbms">>) ->
#section{
Expand All @@ -644,7 +650,8 @@ outgoing_pool_connection(<<"rdbms">>) ->
validate = port},
<<"tls">> => sql_tls()
},
process = fun ?MODULE:process_rdbms_connection/1
process = fun ?MODULE:process_rdbms_connection/1,
format_items = map
};
outgoing_pool_connection(<<"redis">>) ->
#section{
Expand All @@ -655,7 +662,8 @@ outgoing_pool_connection(<<"redis">>) ->
<<"database">> => #option{type = integer,
validate = non_negative},
<<"password">> => #option{type = string}
}
},
format_items = map
};
outgoing_pool_connection(<<"riak">>) ->
#section{
Expand All @@ -666,7 +674,8 @@ outgoing_pool_connection(<<"riak">>) ->
<<"credentials">> => riak_credentials(),
<<"tls">> => #section{items = tls_items(),
process = fun ?MODULE:process_riak_tls/1,
wrap = none}}
wrap = none}},
format_items = map
}.

cassandra_server() ->
Expand Down Expand Up @@ -695,9 +704,8 @@ riak_credentials() ->
<<"password">> => #option{type = string,
validate = non_empty}},
required = all,
process = fun ?MODULE:process_riak_credentials/1,
wrap = prepend_key
}.
process = fun ?MODULE:process_riak_credentials/1
}.

%% path: outgoing_pools.rdbms.*.connection.tls
sql_tls() ->
Expand Down Expand Up @@ -1152,23 +1160,22 @@ check_auth_method(Method, Opts) ->
false -> error(#{what => missing_section_for_auth_method, auth_method => Method})
end.

process_pool([Tag, Type|_], KVs) ->
{[ScopeOpts, HostOpts, ConnOpts], Opts} = proplists:split(KVs, [scope, host, connection]),
Scope = pool_scope(ScopeOpts, HostOpts),
Connection = pool_connection(ConnOpts),
process_pool([Tag, Type|_], AllOpts) ->
Scope = pool_scope(maps:get(scope, AllOpts, none), maps:get(host, AllOpts, none)),
Connection = maps:get(connection, AllOpts, #{}),
Opts = maps:without([scope, host, connection], AllOpts),
#{type => b2a(Type),
scope => Scope,
tag => b2a(Tag),
opts => Opts,
conn_opts => Connection}.

pool_scope([{scope, single_host}], [{host, Host}]) -> Host;
pool_scope([{scope, host}], []) -> host;
pool_scope([{scope, global}], []) -> global;
pool_scope([], []) -> global.

pool_connection([{connection, Opts}]) -> Opts;
pool_connection([]) -> [].
pool_scope(single_host, none) -> error(#{what => pool_single_host_not_specified,
text => <<"\"host\" option is required if \"single_host\" is used.">>});
pool_scope(single_host, Host) -> Host;
pool_scope(host, none) -> host;
pool_scope(global, none) -> global;
pool_scope(none, none) -> global.

process_cassandra_server(KVs) ->
{[[{ip_address, IPAddr}]], Opts} = proplists:split(KVs, [ip_address]),
Expand All @@ -1181,28 +1188,25 @@ process_cassandra_auth([{plain, KVs}]) ->
{[[{username, User}], [{password, Pass}]], []} = proplists:split(KVs, [username, password]),
{cqerl_auth_plain_handler, [{User, Pass}]}.

process_rdbms_connection(KVs) ->
{[[{driver, Driver}], KeepaliveIntervalOpts], Opts} =
proplists:split(KVs, [driver, keepalive_interval]),
[{server, rdbms_server(Driver, Opts)} | KeepaliveIntervalOpts].

rdbms_server(odbc, Opts) ->
[{settings, Settings}] = Opts,
Settings;
rdbms_server(Driver, Opts) ->
{[[{host, Host}], [{database, DB}], [{username, User}], [{password, Pass}],
PortOpts, TLSOpts], []} =
proplists:split(Opts, [host, database, username, password, port, tls]),
process_rdbms_connection(Map) ->
KIMap = maps:with([keepalive_interval], Map),
maps:merge(KIMap, #{server => rdbms_server(Map)}).

rdbms_server(Opts = #{driver := odbc}) ->
maps:get(settings, Opts);
rdbms_server(Opts = #{host := Host, database := DB, username := User, password := Pass, driver := Driver}) ->
PortOpts = maps:get(port, Opts, none),
TLSOpts = maps:get(tls, Opts, none),
list_to_tuple([Driver, Host] ++ db_port(PortOpts) ++
[DB, User, Pass] ++ db_tls(Driver, TLSOpts)).

db_port([{port, Port}]) -> [Port];
db_port([]) -> [].
db_port(none) -> [];
db_port(Port) -> [Port].

db_tls(Driver, [{tls, KVs}]) ->
db_tls(_, none) -> [];
db_tls(Driver, KVs) ->
{[ModeOpts], Opts} = proplists:split(KVs, [required]),
[ssl_mode(Driver, ModeOpts) ++ ssl_opts(Driver, Opts)];
db_tls(_, []) -> [].
[ssl_mode(Driver, ModeOpts) ++ ssl_opts(Driver, Opts)].

ssl_mode(pgsql, [{required, true}]) -> [{ssl, required}];
ssl_mode(pgsql, [{required, false}]) -> [{ssl, true}];
Expand Down
22 changes: 14 additions & 8 deletions src/wpool/mongoose_wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@
HostType :: host_type_or_global(),
Tag :: tag()}.

-type pool_opts_in() :: map().
-type pool_opts() :: [wpool:option()].
-type conn_opts() :: [{atom(), any()}].
-type conn_opts() :: map().

-type pool_map_in() :: #{type := pool_type(),
scope := scope(),
tag := tag(),
opts := pool_opts(),
opts := pool_opts_in(),
conn_opts := conn_opts()}.
%% Pool map with expanded HostType argument instead of scope
-type pool_map() :: #{type := pool_type(),
Expand Down Expand Up @@ -145,13 +146,13 @@ start(PoolType, HostType, PoolOpts) ->
-spec start(pool_type(), host_type_or_global(), tag(),
pool_opts()) -> start_result().
start(PoolType, HostType, Tag, PoolOpts) ->
start(PoolType, HostType, Tag, PoolOpts, []).
start(PoolType, HostType, Tag, PoolOpts, #{}).

-spec start(pool_type(), host_type_or_global(), tag(),
pool_opts(), conn_opts()) -> start_result().
start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
{Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
Opts = lists:append(Opts0) ++ default_opts(PoolType),
Opts = lists:append(Opts0) ++ maps:to_list(default_opts(PoolType)),
Strategy = proplists:get_value(strategy, Opts, best_worker),
CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
%% If a callback doesn't explicitly blacklist a strategy, let's proceed.
Expand Down Expand Up @@ -233,7 +234,7 @@ stop(PoolType, HostType, Tag) ->
-spec is_configured(pool_type()) -> boolean().
is_configured(PoolType) ->
Pools = mongoose_config:get_opt(outgoing_pools, []),
lists:keymember(PoolType, 1, Pools).
lists:any(fun(M) -> maps:is_key(PoolType, M) end, Pools).

-spec get_worker(pool_type()) -> worker_result().
get_worker(PoolType) ->
Expand Down Expand Up @@ -350,7 +351,7 @@ default_opts(PoolType) ->
Mod = make_callback_module_name(PoolType),
case erlang:function_exported(Mod, default_opts, 0) of
true -> Mod:default_opts();
false -> []
false -> #{}
end.

-spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()].
Expand All @@ -365,8 +366,13 @@ expand_pools(Pools, HostTypes) ->
(Other) -> [Other]
end,
Pools1 = lists:flatmap(F, Pools),
%% Rename "scope" field to "host_type"
lists:map(fun(M = #{scope := HT}) -> M1 = maps:remove(scope, M), M1#{host_type => HT} end, Pools1).
lists:map(fun prepare_pool_map/1, Pools1).

-spec prepare_pool_map(pool_map_in()) -> pool_map().
prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
%% Rename "scope" field to "host_type" and change wpool opts to a KV list
Pool1 = maps:remove(scope, Pool),
Pool1#{host_type => HT, opts => maps:to_list(Opts)}.

-spec get_unique_types([pool_map_in()]) -> [pool_type()].
get_unique_types(Pools) ->
Expand Down
2 changes: 1 addition & 1 deletion src/wpool/mongoose_wpool_cassandra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ extend_config(PoolConfig) ->
tcp_opts => [{keepalive, true}],
keyspace => mongooseim
},
ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)),
ConfigMap = maps:merge(Defaults, PoolConfig),
maps:to_list(ConfigMap).

%% make the config survive the restart of 'cqerl_cluster' in case of a network failure
Expand Down
4 changes: 2 additions & 2 deletions src/wpool/mongoose_wpool_elastic.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ init() ->
ok.

start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
ElasticHost = proplists:get_value(host, ConnOpts, "localhost"),
Port = proplists:get_value(port, ConnOpts, 9200),
ElasticHost = maps:get(host, ConnOpts, "localhost"),
Port = maps:get(port, ConnOpts, 9200),
ProcName = mongoose_wpool:make_pool_name(elastic, HostType, Tag),
Opts = [{host, list_to_binary(ElasticHost)}, {port, Port}],
WPoolOptions = [{overrun_warning, infinity},
Expand Down
8 changes: 4 additions & 4 deletions src/wpool/mongoose_wpool_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ init() ->
start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
Name = mongoose_wpool:make_pool_name(http, HostType, Tag),
WpoolOpts = wpool_spec(WpoolOptsIn, ConnOpts),
PathPrefix = list_to_binary(gen_mod:get_opt(path_prefix, ConnOpts, "/")),
RequestTimeout = gen_mod:get_opt(request_timeout, ConnOpts, 2000),
PathPrefix = list_to_binary(maps:get(path_prefix, ConnOpts, "/")),
RequestTimeout = maps:get(request_timeout, ConnOpts, 2000),
case mongoose_wpool:start_sup_pool(http, Name, WpoolOpts) of
{ok, Pid} ->
ets:insert(?MODULE, {{HostType, Tag}, PathPrefix, RequestTimeout}),
Expand Down Expand Up @@ -66,8 +66,8 @@ get_params(HostType, Tag) ->
%% Internal functions

wpool_spec(WpoolOptsIn, ConnOpts) ->
TargetServer = gen_mod:get_opt(server, ConnOpts),
HttpOpts = gen_mod:get_opt(http_opts, ConnOpts, []),
TargetServer = maps:get(server, ConnOpts),
HttpOpts = maps:get(http_opts, ConnOpts, []),
Worker = {fusco, {TargetServer, [{connect_options, HttpOpts}]}},
[{worker, Worker} | WpoolOptsIn].

Loading

0 comments on commit 28f84c0

Please sign in to comment.