Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use maps for mongoose_wpool options #3645

Merged
merged 17 commits into from
May 13, 2022
43 changes: 31 additions & 12 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
sql_query/0,
sql_query_part/0]).

-export([process_options/1]).

%% External exports
-export([prepare/4,
prepared/1,
Expand Down Expand Up @@ -166,10 +168,29 @@
-export_type([query_result/0,
server/0]).

-type options() :: #{driver := pgsql | mysql | odbc,
max_start_interval := pos_integer(),
atom() => any()}.

-export_type([options/0]).

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------

-spec process_options(map()) -> options().
process_options(Opts = #{driver := odbc, settings := _}) ->
Opts;
process_options(Opts = #{host := _Host, database := _DB, username := _User,
password := _Pass, driver := _Driver}) ->
ensure_db_port(Opts);
process_options(Opts) ->
error(#{what => invalid_rdbms_connection_options, options => Opts}).

ensure_db_port(Opts = #{port := _}) -> Opts;
Premwoik marked this conversation as resolved.
Show resolved Hide resolved
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.

-spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
Statement :: iodata()) ->
{ok, Name} | {error, already_exists}
Expand Down Expand Up @@ -565,15 +586,13 @@ to_bool(_) -> false.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
-spec init(term()) -> {ok, state()}.
init(Opts) ->
-spec init(options()) -> {ok, state()}.
init(Opts = #{max_start_interval := MaxStartInterval}) ->
process_flag(trap_exit, true),
{server, Settings} = lists:keyfind(server, 1, Opts),
KeepaliveInterval = proplists:get_value(keepalive_interval, Opts),
% retries are delayed exponentially, this param limits the delay
% so if we start with 2 and try 6 times, we have 2, 4, 8, 16, 30
MaxStartInterval = proplists:get_value(start_interval, Opts, 30),
case connect(Settings, ?CONNECT_RETRIES, 2, MaxStartInterval) of
KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
% retries are delayed exponentially, max_start_interval limits the delay
% e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
{ok, DbRef} ->
schedule_keepalive(KeepaliveInterval),
{ok, #state{db_ref = DbRef, keepalive_interval = KeepaliveInterval}};
Expand Down Expand Up @@ -809,10 +828,10 @@ db_type() ->
_ -> generic
end.

-spec connect(Settings :: tuple(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
-spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
connect(Settings, Retry, RetryAfter, MaxRetryDelay) ->
case mongoose_rdbms_backend:connect(Settings, ?QUERY_TIMEOUT) of
connect(Options, Retry, RetryAfter, MaxRetryDelay) ->
case mongoose_rdbms_backend:connect(Options, ?QUERY_TIMEOUT) of
{ok, _} = Ok ->
Ok;
Error when Retry =:= 0 ->
Expand All @@ -824,7 +843,7 @@ connect(Settings, Retry, RetryAfter, MaxRetryDelay) ->
error => Error, sleep_for => SleepFor}),
timer:sleep(timer:seconds(SleepFor)),
NextRetryDelay = RetryAfter * RetryAfter,
connect(Settings, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
end.


Expand Down
9 changes: 5 additions & 4 deletions src/rdbms/mongoose_rdbms_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

-define(MAIN_MODULE, mongoose_rdbms).

-type options() :: mongoose_rdbms:options().

-callback escape_binary(binary()) -> mongoose_rdbms:sql_query_part().
-callback escape_string(binary()|list()) -> mongoose_rdbms:sql_query_part().

-callback unescape_binary(binary()) -> binary().
-callback connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
-callback connect(options(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
-callback disconnect(Connection :: term()) -> any().
-callback query(Connection :: term(), Query :: any(), Timeout :: infinity | non_neg_integer()) ->
Expand Down Expand Up @@ -51,10 +52,10 @@ unescape_binary(Binary) ->
Args = [Binary],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec connect(Settings :: tuple(), QueryTimeout :: non_neg_integer()) ->
-spec connect(options(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
connect(Settings, QueryTimeout) ->
Args = [Settings, QueryTimeout],
connect(Options, QueryTimeout) ->
Args = [Options, QueryTimeout],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec disconnect(Connection :: term()) -> any().
Expand Down
49 changes: 17 additions & 32 deletions src/rdbms/mongoose_rdbms_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
-author('[email protected]').
-behaviour(mongoose_rdbms_backend).

-include("mongoose.hrl").

-define(MYSQL_PORT, 3306).
-type options() :: #{host := string(),
port := inet:port(),
database := string(),
username := string(),
password := string(),
atom() => any()}.

-export([escape_binary/1, unescape_binary/1, connect/2, disconnect/1,
query/3, prepare/5, execute/4]).
Expand All @@ -35,10 +38,10 @@ escape_binary(Bin) when is_binary(Bin) ->
unescape_binary(Bin) when is_binary(Bin) ->
Bin.

-spec connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
-spec connect(options(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
connect(Settings, QueryTimeout) ->
case mysql:start_link([{query_timeout, QueryTimeout} | db_opts(Settings)]) of
connect(Options, QueryTimeout) ->
case mysql:start_link([{query_timeout, QueryTimeout} | db_opts(Options)]) of
{ok, Ref} ->
mysql:query(Ref, <<"set names 'utf8mb4';">>),
mysql:query(Ref, <<"SET SESSION query_cache_type=1;">>),
Expand Down Expand Up @@ -70,32 +73,14 @@ execute(Connection, StatementRef, Params, _Timeout) ->

%% Helpers

-spec db_opts(Settings :: term()) -> list().
db_opts({mysql, Server, DB, User, Pass}) ->
db_opts({mysql, Server, ?MYSQL_PORT, DB, User, Pass});
db_opts({mysql, Server, Port, DB, User, Pass}) when is_integer(Port) ->
get_db_basic_opts({Server, Port, DB, User, Pass});
db_opts({mysql, Server, DB, User, Pass, SSLConnOpts}) ->
db_opts({mysql, Server, ?MYSQL_PORT, DB, User, Pass, SSLConnOpts});
db_opts({mysql, Server, Port, DB, User, Pass, SSLConnOpts})
when is_integer(Port) ->
DBBasicOpts = get_db_basic_opts({Server, Port, DB, User, Pass}),
extend_db_opts_with_ssl(DBBasicOpts, SSLConnOpts).

-spec get_db_basic_opts(Settings :: term()) -> [term()].
get_db_basic_opts({Server, Port, DB, User, Pass}) ->
[
{host, Server},
{port, Port},
{user, User},
{password, Pass},
{database, DB},
{found_rows, true}
].

-spec extend_db_opts_with_ssl(Opts :: [term()], SSLConnOpts :: [term()]) -> [term()].
extend_db_opts_with_ssl(Opts, SSLConnOpts) ->
Opts ++ [{ssl, SSLConnOpts}].
-spec db_opts(options()) -> [mysql:option()].
db_opts(Options) ->
FilteredOpts = maps:with([host, port, database, username, password, tls], Options),
[{found_rows, true} | lists:map(fun process_opt/1, maps:to_list(FilteredOpts))].

process_opt({tls, TLSOpts}) -> {ssl, TLSOpts};
process_opt({username, UserName}) -> {user, UserName};
process_opt(Opt) -> Opt.

%% @doc Convert MySQL query result to Erlang RDBMS result formalism
-spec mysql_to_rdbms(mysql:query_result(), Conn :: term()) -> mongoose_rdbms:query_result().
Expand Down
6 changes: 4 additions & 2 deletions src/rdbms/mongoose_rdbms_odbc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

-type tabcol() :: {binary(), binary()}.

-type options() :: #{settings := string(), atom() => any()}.

%% API

-spec escape_binary(binary()) -> iodata().
Expand All @@ -39,9 +41,9 @@ escape_string(Iolist) ->
unescape_binary(Bin) when is_binary(Bin) ->
base16:decode(Bin).

-spec connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
-spec connect(options(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
connect(Settings, _QueryTimeout) when is_list(Settings) ->
connect(#{settings := Settings}, _QueryTimeout) when is_list(Settings) ->
%% We need binary_strings=off to distinguish between:
%% - UTF-16 encoded NVARCHARs - encoded as binaries.
%% - Binaries/regular strings - encoded as list of small integers.
Expand Down
56 changes: 26 additions & 30 deletions src/rdbms/mongoose_rdbms_pgsql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

-include_lib("epgsql/include/epgsql.hrl").

-define(PGSQL_PORT, 5432).
-type options() :: #{host := string(),
port := inet:port(),
database := string(),
username := string(),
password := string(),
atom() => any()}.

-export([escape_binary/1, unescape_binary/1, connect/2, disconnect/1,
query/3, prepare/5, execute/4]).
Expand All @@ -37,10 +42,10 @@ unescape_binary(<<"\\x", Bin/binary>>) ->
unescape_binary(Bin) when is_binary(Bin) ->
Bin.

-spec connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
-spec connect(options(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
connect(Settings, QueryTimeout) ->
case epgsql:connect(db_opts(Settings)) of
connect(Options, QueryTimeout) ->
case epgsql:connect(db_opts(Options)) of
{ok, Pid} ->
epgsql:squery(Pid, [<<"SET statement_timeout=">>, integer_to_binary(QueryTimeout)]),
epgsql:squery(Pid, <<"SET standard_conforming_strings=off">>),
Expand Down Expand Up @@ -76,32 +81,23 @@ execute(Connection, StatementRef, Params, _Timeout) ->

%% Helpers

-spec db_opts(Settings :: term()) -> [term()].
db_opts({pgsql, Server, DB, User, Pass}) ->
db_opts({pgsql, Server, ?PGSQL_PORT, DB, User, Pass});
db_opts({pgsql, Server, Port, DB, User, Pass}) when is_integer(Port) ->
get_db_basic_opts({Server, Port, DB, User, Pass});
db_opts({pgsql, Server, DB, User, Pass, SSLConnOpts}) ->
db_opts({pgsql, Server, ?PGSQL_PORT, DB, User, Pass, SSLConnOpts});
db_opts({pgsql, Server, Port, DB, User, Pass, SSLConnOpts}) when is_integer(Port) ->
DBBasicOpts = get_db_basic_opts({Server, Port, DB, User, Pass}),
extend_db_opts_with_ssl(DBBasicOpts, SSLConnOpts).

-spec get_db_basic_opts(Settings :: term()) -> [term()].
get_db_basic_opts({Server, Port, DB, User, Pass}) ->
[
{host, Server},
{port, Port},
{database, DB},
{username, User},
{password, Pass},
%% Encode 0 and 1 as booleans, as well as true and false
{codecs, [{mongoose_rdbms_pgsql_codec_boolean, []}]}
].

-spec extend_db_opts_with_ssl(Opts :: [term()], SSLConnOpts :: [term()]) -> [term()].
extend_db_opts_with_ssl(Opts, SSLConnOpts) ->
Opts ++ SSLConnOpts.
-spec db_opts(options()) -> epgsql:connect_opts().
db_opts(Options) ->
BasicOpts = maps:with([host, port, database, username, password], Options),
TLSOpts = tls_opts(Options),
maps:merge(BasicOpts#{codecs => [{mongoose_rdbms_pgsql_codec_boolean, []}]}, TLSOpts).

tls_opts(#{tls := KVs}) ->
{[ModeOpts], Opts} = proplists:split(KVs, [required]),
(ssl_opts(Opts))#{ssl => ssl_mode(ModeOpts)};
tls_opts(#{}) ->
#{}.

ssl_mode([{required, true}]) -> required;
ssl_mode(_) -> true.

ssl_opts([]) -> #{};
ssl_opts(Opts) -> #{ssl_opts => Opts}.

-spec pgsql_to_rdbms(epgsql:reply(term())) -> mongoose_rdbms:query_result().
pgsql_to_rdbms(Items) when is_list(Items) ->
Expand Down