Skip to content

Commit

Permalink
Introduce pool names to mongoose_rdbms
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Feb 26, 2024
1 parent 880776f commit 486e295
Showing 1 changed file with 115 additions and 57 deletions.
172 changes: 115 additions & 57 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@
%% External exports
-export([prepare/4,
prepared/1,
execute/3,
execute_cast/3,
execute_request/3,
execute_wrapped_request/4,
execute_successfully/3,
sql_query/2,
sql_query_cast/2,
sql_query_request/2,
execute/3, execute/4,
execute_cast/3, execute_cast/4,
execute_request/3, execute_request/4,
execute_wrapped_request/4, execute_wrapped_request/5,
execute_successfully/3, execute_successfully/4,
sql_query/2, sql_query/3,
sql_query_cast/2, sql_query_cast/3,
sql_query_request/2, sql_query_request/3,
sql_transaction/2, sql_transaction/3,
sql_transaction_request/2, sql_transaction_request/3,
sql_dirty/2, sql_dirty/3,
sql_query_t/1,
sql_transaction/2,
sql_transaction_request/2,
transaction_with_delayed_retry/3,
sql_dirty/2,
to_bool/1,
db_engine/1,
db_type/0,
Expand Down Expand Up @@ -127,10 +127,14 @@
terminate/2,
code_change/3]).

-ignore_xref([sql_query_cast/2, sql_query_request/2,
execute_cast/3, execute_request/3,
-ignore_xref([
sql_query_cast/2, sql_query_request/2,
execute/4, execute_wrapped_request/5,
sql_query_cast/3, sql_query_request/3, sql_dirty/3,
execute_cast/3, execute_cast/4,
execute_request/3, execute_request/4,
execute_wrapped_request/4,
sql_transaction_request/2,
sql_transaction_request/2, sql_transaction_request/3,
sql_query_t/1, use_escaped/1,
escape_like/1, escape_like_prefix/1, use_escaped_like/1,
escape_binary/2, use_escaped_binary/1,
Expand All @@ -152,6 +156,7 @@
}).
-type state() :: #state{}.

-define(DEFAULT_POOL, default).
-define(STATE_KEY, mongoose_rdbms_state).
-define(MAX_TRANSACTION_RESTARTS, 10).
-define(TRANSACTION_TIMEOUT, 60000). % milliseconds
Expand Down Expand Up @@ -232,33 +237,62 @@ prepared(Name) ->

-spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_call(HostType, {sql_execute, Name, Parameters}).
execute(HostType, Name, Parameters) ->
execute(HostType, ?DEFAULT_POOL, Name, Parameters).

-spec execute(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_call(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_cast(HostType, {sql_execute, Name, Parameters}).
query_result().
execute_cast(HostType, Name, Parameters) ->
execute_cast(HostType, ?DEFAULT_POOL, Name, Parameters).

-spec execute_cast(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute_cast(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_cast(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
request_id().
request_id().
execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_request(HostType, {sql_execute, Name, Parameters}).
execute_request(HostType, ?DEFAULT_POOL, Name, Parameters).

-spec execute_request(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
request_id().
execute_request(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
sql_request(HostType, PoolName, {sql_execute, Name, Parameters}).

-spec execute_wrapped_request(
HostType :: server(),
Name :: atom(),
Parameters :: [term()],
Wrapper :: request_wrapper()) -> request_id().
execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
execute_wrapped_request(HostType, ?DEFAULT_POOL, Name, Parameters, Wrapper).

-spec execute_wrapped_request(
HostType :: server(),
PoolName :: atom(),
Name :: atom(),
Parameters :: [term()],
Wrapper :: request_wrapper()) -> request_id().
execute_wrapped_request(HostType, Name, Parameters, Wrapper)
when is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
sql_request(HostType, {sql_execute_wrapped, Name, Parameters, Wrapper}).
execute_wrapped_request(HostType, PoolName, Name, Parameters, Wrapper)
when is_atom(PoolName), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
sql_request(HostType, PoolName, {sql_execute_wrapped, Name, Parameters, Wrapper}).

%% Same as execute/3, but would fail loudly on any error.
-spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
query_result().
execute_successfully(HostType, Name, Parameters) ->
try execute(HostType, Name, Parameters) of
execute_successfully(HostType, ?DEFAULT_POOL, Name, Parameters).

-spec execute_successfully(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute_successfully(HostType, PoolName, Name, Parameters) ->
try execute(HostType, PoolName, Name, Parameters) of
{selected, _} = Result ->
Result;
{updated, _} = Result ->
Expand Down Expand Up @@ -288,33 +322,53 @@ query_name_to_string(Name) ->

-spec sql_query(HostType :: server(), Query :: any()) -> query_result().
sql_query(HostType, Query) ->
sql_call(HostType, {sql_query, Query}).
sql_query(HostType, ?DEFAULT_POOL, Query).

-spec sql_query(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
sql_query(HostType, PoolName, Query) ->
sql_call(HostType, PoolName, {sql_query, Query}).

-spec sql_query_request(HostType :: server(), Query :: any()) -> request_id().
sql_query_request(HostType, Query) ->
sql_request(HostType, {sql_query, Query}).
sql_query_request(HostType, ?DEFAULT_POOL, Query).

-spec sql_query_request(HostType :: server(), PoolName :: atom(), Query :: any()) -> request_id().
sql_query_request(HostType, PoolName, Query) ->
sql_request(HostType, PoolName, {sql_query, Query}).

-spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
sql_query_cast(HostType, Query) ->
sql_cast(HostType, {sql_query, Query}).
sql_query_cast(HostType, ?DEFAULT_POOL, Query).

-spec sql_query_cast(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
sql_query_cast(HostType, PoolName, Query) ->
sql_cast(HostType, PoolName, {sql_query, Query}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
sql_transaction(HostType, Queries) when is_list(Queries) ->
sql_transaction(HostType, Msg) ->
sql_transaction(HostType, ?DEFAULT_POOL, Msg).

-spec sql_transaction(server(), atom(), fun() | maybe_improper_list()) -> transaction_result().
sql_transaction(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
sql_transaction(HostType, F);
sql_transaction(HostType, PoolName, F);

Check warning on line 355 in src/rdbms/mongoose_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/rdbms/mongoose_rdbms.erl#L355

Added line #L355 was not covered by tests
%% SQL transaction, based on a erlang anonymous function (F = fun)
sql_transaction(HostType, F) when is_function(F) ->
sql_call(HostType, {sql_transaction, F}).
sql_transaction(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
sql_call(HostType, PoolName, {sql_transaction, F}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id().
sql_transaction_request(HostType, Queries) when is_list(Queries) ->
sql_transaction_request(HostType, Queries) ->
sql_transaction_request(HostType, ?DEFAULT_POOL, Queries).

-spec sql_transaction_request(server(), atom(), fun() | maybe_improper_list()) -> request_id().
sql_transaction_request(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
sql_transaction_request(HostType, F);
sql_transaction_request(HostType, PoolName, F);
%% SQL transaction, based on a erlang anonymous function (F = fun)
sql_transaction_request(HostType, F) when is_function(F) ->
sql_request(HostType, {sql_transaction, F}).
sql_transaction_request(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
sql_request(HostType, PoolName, {sql_transaction, F}).

%% This function allows to specify delay between retries.
-spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
Expand Down Expand Up @@ -343,57 +397,61 @@ do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
end.

-spec sql_dirty(server(), fun()) -> any() | no_return().
sql_dirty(HostType, F) when is_function(F) ->
case sql_call(HostType, {sql_dirty, F}) of
sql_dirty(HostType, F) ->
sql_dirty(HostType, ?DEFAULT_POOL, F).

-spec sql_dirty(server(), atom(), fun()) -> any() | no_return().
sql_dirty(HostType, PoolName, F) when is_function(F) ->
case sql_call(HostType, PoolName, {sql_dirty, F}) of
{ok, Result} -> Result;
{error, Error} -> throw(Error)
end.

%% TODO: Better spec for RPC calls
-spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_call(HostType, Msg) ->
-spec sql_call(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_call(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_call0(HostType, Msg);
undefined -> sql_call0(HostType, PoolName, Msg);
State ->
{Res, NewState} = nested_op(Msg, State),
put_state(NewState),
Res
end.

-spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_call0(HostType, Msg) ->
-spec sql_call0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_call0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
mongoose_wpool:call(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_request(HostType, Msg) ->
-spec sql_request(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_request(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_request0(HostType, Msg);
undefined -> sql_request0(HostType, PoolName, Msg);
State ->
{Res, NewState} = nested_op(Msg, State),
put_state(NewState),
Res
end.

-spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_request0(HostType, Msg) ->
-spec sql_request0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_request0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
mongoose_wpool:send_request(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast(HostType, Msg) ->
-spec sql_cast(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_cast(HostType, PoolName, Msg) ->
case get_state() of
undefined -> sql_cast0(HostType, Msg);
undefined -> sql_cast0(HostType, PoolName, Msg);
State ->
{Res, NewState} = nested_op(Msg, State),
put_state(NewState),
Res
end.

-spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast0(HostType, Msg) ->
-spec sql_cast0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
sql_cast0(HostType, PoolName, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
mongoose_wpool:cast(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).

-spec get_db_info(Target :: server() | pid()) ->
{ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
Expand Down

0 comments on commit 486e295

Please sign in to comment.