Skip to content

Commit

Permalink
Make tag/name and type declarations consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Feb 27, 2024
1 parent 600c0d7 commit 070ad29
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 115 deletions.
4 changes: 2 additions & 2 deletions doc/configuration/outgoing-connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ This allows you to create multiple dedicated pools of the same type.
* `single_host` - the pool will be started for the selected host or host type only (you must provide the name).

!!! Note
A global pool with name `default` is used by services that are not configured by host_type, like `service_domain_db` or `service_mongoose_system_metrics`, or by modules that don't support dynamic domains, like `mod_pubsub`.
If a global pool is not configured, these services will fail.
A pool with scope `global` and tag `default` is used by services that are not configured by host_type, like `service_domain_db` or `service_mongoose_system_metrics`, or by modules that don't support dynamic domains, like `mod_pubsub`.
If a global default pool is not configured, these services will fail.

## Worker pool options

Expand Down
154 changes: 68 additions & 86 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,58 +86,43 @@
use_escaped/1]).

%% Unicode escaping
-export([escape_string/1,
use_escaped_string/1]).
-export([escape_string/1, use_escaped_string/1]).

%% Integer escaping
-export([escape_integer/1,
use_escaped_integer/1]).
-export([escape_integer/1, use_escaped_integer/1]).

%% Boolean escaping
-export([escape_boolean/1,
use_escaped_boolean/1]).
-export([escape_boolean/1, use_escaped_boolean/1]).

%% LIKE escaping
-export([escape_like/1,
escape_prepared_like/1,
escape_like_prefix/1,
use_escaped_like/1]).
-export([escape_like/1, escape_prepared_like/1, escape_like_prefix/1, use_escaped_like/1]).

%% BLOB escaping
-export([escape_binary/2,
unescape_binary/2,
use_escaped_binary/1]).
-export([escape_binary/2, unescape_binary/2, use_escaped_binary/1]).

%% Null escaping
%% (to keep uniform pattern of passing values)
-export([escape_null/0,
use_escaped_null/1]).
-export([escape_null/0, use_escaped_null/1]).

%% count / integra types decoding
-export([result_to_integer/1,
selected_to_integer/1]).
-export([result_to_integer/1, selected_to_integer/1]).

-export([character_to_integer/1]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%% External exports
-ignore_xref([
sql_query_cast/2, sql_query_request/2,
execute/4, execute_wrapped_request/5,
sql_query/3, sql_query_cast/3, sql_query_request/3,
sql_dirty/3, sql_transaction/3,
execute_successfully/4, send_request/3,
execute_cast/3, execute_cast/4,
execute/4, execute_cast/3, execute_cast/4,
execute_request/3, execute_request/4,
execute_wrapped_request/4,
sql_transaction_request/2, sql_transaction_request/3,
sql_query_t/1, use_escaped/1,
execute_wrapped_request/4, execute_wrapped_request/5,
execute_successfully/4,
sql_query/3, sql_query_cast/2, sql_query_cast/3,
sql_query_request/2, sql_query_request/3,
sql_transaction/3, sql_transaction_request/2, sql_transaction_request/3,
sql_dirty/3, sql_query_t/1,
use_escaped/1,
escape_like/1, escape_like_prefix/1, use_escaped_like/1,
escape_binary/2, use_escaped_binary/1,
escape_integer/1, use_escaped_integer/1,
Expand Down Expand Up @@ -167,7 +152,8 @@
%% the retry counter runs out. We just attempt to reduce log pollution.
-define(CONNECT_RETRIES, 5).

-type server() :: mongooseim:host_type() | global.
-type query_name() :: atom().
-type query_params() :: [term()].
-type request_wrapper() :: fun((fun(() -> T)) -> T).
-type rdbms_msg() :: {sql_query, _}
| {sql_transaction, fun()}
Expand All @@ -182,9 +168,7 @@
-type query_result() :: single_query_result() | [single_query_result()].
-type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
-type dirty_result() :: {ok, any()} | {error, any()}.
-export_type([query_result/0,
transaction_result/0,
server/0]).
-export_type([query_name/0, query_result/0, transaction_result/0]).

-type options() :: #{driver := pgsql | mysql | odbc,
max_start_interval := pos_integer(),
Expand Down Expand Up @@ -217,10 +201,9 @@ ensure_db_port(Opts = #{port := _}) -> Opts;
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.

-spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
Statement :: iodata()) ->
{ok, Name} | {error, already_exists}
when Name :: atom().
-spec prepare(
query_name(), Table :: binary() | atom(), Fields :: [binary() | atom()], Statement :: iodata()) ->
{ok, query_name()} | {error, already_exists}.
prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
Expand All @@ -237,61 +220,52 @@ prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
prepared(Name) ->
ets:member(prepared_statements, Name).

-spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
-spec execute(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
execute(HostType, Name, Parameters) ->
execute(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).

-spec execute(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
-spec execute(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
query_result().
execute(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_call(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
-spec execute_cast(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
execute_cast(HostType, Name, Parameters) ->
execute_cast(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).

-spec execute_cast(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
-spec execute_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
query_result().
execute_cast(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_cast(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
request_id().
-spec execute_request(mongooseim:host_type_or_global(), query_name(), query_params()) -> request_id().
execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
execute_request(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).

-spec execute_request(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
-spec execute_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
request_id().
execute_request(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_request(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_wrapped_request(
HostType :: server(),
Name :: atom(),
Parameters :: [term()],
Wrapper :: request_wrapper()) -> request_id().
-spec execute_wrapped_request(mongooseim:host_type_or_global(), query_name(), query_params(), request_wrapper()) ->
request_id().
execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
execute_wrapped_request(HostType, ?DEFAULT_POOL_NAME, Name, Parameters, Wrapper).

-spec execute_wrapped_request(
HostType :: server(),
PoolName :: atom(),
Name :: atom(),
Parameters :: [term()],
Wrapper :: request_wrapper()) -> request_id().
mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params(), request_wrapper()) ->
request_id().
execute_wrapped_request(HostType, PoolName, Name, Parameters, Wrapper)
when is_atom(PoolName), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
sql_request(HostType, PoolName, {sql_execute_wrapped, Name, Parameters, Wrapper}).

%% Same as execute/3, but would fail loudly on any error.
-spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
-spec execute_successfully(mongooseim:host_type_or_global(), query_name(), query_params()) ->
query_result().
execute_successfully(HostType, Name, Parameters) ->
execute_successfully(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).

-spec execute_successfully(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
-spec execute_successfully(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
query_result().
execute_successfully(HostType, PoolName, Name, Parameters) ->
try execute(HostType, PoolName, Name, Parameters) of
Expand Down Expand Up @@ -322,36 +296,41 @@ query_name_to_string(Name) ->
Statement
end.

-spec sql_query(HostType :: server(), Query :: any()) -> query_result().
-spec sql_query(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
sql_query(HostType, Query) ->
sql_query(HostType, ?DEFAULT_POOL_NAME, Query).

-spec sql_query(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
-spec sql_query(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
query_result().
sql_query(HostType, PoolName, Query) ->
sql_call(HostType, PoolName, {sql_query, Query}).

-spec sql_query_request(HostType :: server(), Query :: any()) -> request_id().
-spec sql_query_request(mongooseim:host_type_or_global(), Query :: any()) -> request_id().
sql_query_request(HostType, Query) ->
sql_query_request(HostType, ?DEFAULT_POOL_NAME, Query).

-spec sql_query_request(HostType :: server(), PoolName :: atom(), Query :: any()) -> request_id().
-spec sql_query_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
request_id().
sql_query_request(HostType, PoolName, Query) ->
sql_request(HostType, PoolName, {sql_query, Query}).

-spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
-spec sql_query_cast(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
sql_query_cast(HostType, Query) ->
sql_query_cast(HostType, ?DEFAULT_POOL_NAME, Query).

-spec sql_query_cast(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
-spec sql_query_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
query_result().
sql_query_cast(HostType, PoolName, Query) ->
sql_cast(HostType, PoolName, {sql_query, Query}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
-spec sql_transaction(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
transaction_result().
sql_transaction(HostType, Msg) ->
sql_transaction(HostType, ?DEFAULT_POOL_NAME, Msg).

-spec sql_transaction(server(), atom(), fun() | maybe_improper_list()) -> transaction_result().
-spec sql_transaction(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
transaction_result().
sql_transaction(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
sql_transaction(HostType, PoolName, F);

Check warning on line 336 in src/rdbms/mongoose_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/rdbms/mongoose_rdbms.erl#L336

Added line #L336 was not covered by tests
Expand All @@ -360,11 +339,13 @@ sql_transaction(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
sql_call(HostType, PoolName, {sql_transaction, F}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id().
-spec sql_transaction_request(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
request_id().
sql_transaction_request(HostType, Queries) ->
sql_transaction_request(HostType, ?DEFAULT_POOL_NAME, Queries).

-spec sql_transaction_request(server(), atom(), fun() | maybe_improper_list()) -> request_id().
-spec sql_transaction_request(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
request_id().
sql_transaction_request(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
sql_transaction_request(HostType, PoolName, F);
Expand All @@ -373,7 +354,8 @@ sql_transaction_request(HostType, PoolName, F) when is_atom(PoolName), is_functi
sql_request(HostType, PoolName, {sql_transaction, F}).

%% This function allows to specify delay between retries.
-spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
-spec transaction_with_delayed_retry(mongooseim:host_type_or_global(), fun() | maybe_improper_list(), map()) ->
transaction_result().
transaction_with_delayed_retry(HostType, F, Info) ->
Retries = maps:get(retries, Info),
Delay = maps:get(delay, Info),
Expand All @@ -398,19 +380,19 @@ do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
erlang:error(Err)
end.

-spec sql_dirty(server(), fun()) -> any() | no_return().
-spec sql_dirty(mongooseim:host_type_or_global(), fun()) -> any() | no_return().
sql_dirty(HostType, F) ->
sql_dirty(HostType, ?DEFAULT_POOL_NAME, F).

-spec sql_dirty(server(), atom(), fun()) -> any() | no_return().
-spec sql_dirty(mongooseim:host_type_or_global(), atom(), fun()) -> any() | no_return().
sql_dirty(HostType, PoolName, F) when is_function(F) ->
case sql_call(HostType, PoolName, {sql_dirty, F}) of
{ok, Result} -> Result;
{error, Error} -> throw(Error)
end.

%% TODO: Better spec for RPC calls
-spec sql_call(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_call(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_call(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_call0(HostType, PoolName, Msg);
Expand All @@ -420,12 +402,12 @@ sql_call(HostType, PoolName, Msg) ->
Res
end.

-spec sql_call0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_call0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_call0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:call(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec sql_request(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_request(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_request0(HostType, PoolName, Msg);
Expand All @@ -435,12 +417,12 @@ sql_request(HostType, PoolName, Msg) ->
Res
end.

-spec sql_request0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_request0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_request0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:send_request(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec sql_cast(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_cast(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_cast0(HostType, PoolName, Msg);
Expand All @@ -450,12 +432,12 @@ sql_cast(HostType, PoolName, Msg) ->
Res
end.

-spec sql_cast0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
-spec sql_cast0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
sql_cast0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:cast(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec get_db_info(Target :: server() | pid()) ->
-spec get_db_info(Target :: mongooseim:host_type_or_global() | pid()) ->
{ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
get_db_info(Pid) when is_pid(Pid) ->
wpool_process:call(Pid, get_db_info, 5000);
Expand Down Expand Up @@ -502,7 +484,7 @@ escape_like(S) ->
escape_like_prefix(S) ->
{escaped_like, [$', escape_like_internal(S), $%, $']}.

-spec escape_binary(server(), binary()) -> escaped_binary().
-spec escape_binary(mongooseim:host_type_or_global(), binary()) -> escaped_binary().
escape_binary(_HostType, Bin) when is_binary(Bin) ->
{escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.

Expand Down Expand Up @@ -642,7 +624,7 @@ escape_character($\\) -> "\\\\";
escape_character(C) -> C.


-spec unescape_binary(server(), binary()) -> binary().
-spec unescape_binary(mongooseim:host_type_or_global(), binary()) -> binary().
unescape_binary(_HostType, Bin) when is_binary(Bin) ->
mongoose_rdbms_backend:unescape_binary(Bin).

Expand Down Expand Up @@ -876,7 +858,7 @@ sql_dirty_internal(F, State) ->
end,
{Result, erase_state()}.

-spec sql_execute(Type :: atom(), Name :: atom(), Params :: [term()], state()) ->
-spec sql_execute(Type :: atom(), query_name(), query_params(), state()) ->
{query_result(), state()}.
sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
%% Postgres allows to prepare statement only once, so we should take care that NewState is updated
Expand Down Expand Up @@ -904,7 +886,7 @@ check_execute_result(nested_op, Res, Name, Params) ->
ok
end.

-spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
-spec prepare_statement(query_name(), state()) -> {Ref :: term(), state()}.
prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
case maps:get(Name, Prepared, undefined) of
undefined ->
Expand All @@ -930,7 +912,7 @@ abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql
abort_on_driver_error(_) ->
continue.

-spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
-spec db_engine(mongooseim:host_type_or_global()) -> odbc | mysql | pgsql | undefined.
db_engine(_HostType) ->
try mongoose_backend:get_backend_name(global, ?MODULE)
catch error:badarg -> undefined end.
Expand Down
Loading

0 comments on commit 070ad29

Please sign in to comment.