Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub index rdbms #2160

Merged
merged 12 commits into from
Dec 13, 2018
2 changes: 2 additions & 0 deletions doc/modules/mod_pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Node configuration still uses the default configuration defined by the node plug
Current RDBMS backend replaces `pubsub_node`, `pubsub_state` and `pubsub_item` Mnesia tables with RDBMS equivalents.
Due to a fact that some data is still maintained in Mnesia, there is a certain risk of data becoming inconsistent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the data is still maintained in Mnesia, so there is a certain risk of it becoming inconsistent?

Please note the schema used by this backend is still prone to changes as it has not reached a stable status?

If mod_pubsub is configured to use RDBMS, management of nodes indexes is done by the database, so the pubsub_index table is not needed.

The schema used by this backend may change until it reaches stable status.
If `mod_pubsub` is configured to use RDBMS, management of nodes indexes is
done by the database, so the `pubsub_index` table is not needed.

#### Cache Backend

Expand Down
11 changes: 3 additions & 8 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -475,21 +475,16 @@ CREATE INDEX i_inbox_ts ON inbox(luser, lserver, timestamp);
GO

CREATE TABLE dbo.pubsub_nodes (
nidx BIGINT NOT NULL,
nidx BIGINT IDENTITY(1,1) PRIMARY KEY,
p_key NVARCHAR(250) NOT NULL,
name NVARCHAR(250) NOT NULL,
type NVARCHAR(250) NOT NULL,
owners NVARCHAR(max) NOT NULL,
options NVARCHAR(max) NOT NULL,
CONSTRAINT PK_pubsub_nodes PRIMARY KEY CLUSTERED(
p_key ASC,
name ASC,
nidx ASC
)
options NVARCHAR(max) NOT NULL
)
GO

CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes(p_key, name);
GO

CREATE TABLE dbo.pubsub_node_collections (
Expand Down
7 changes: 3 additions & 4 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,16 @@ CREATE TABLE inbox (
CREATE INDEX i_inbox USING BTREE ON inbox(luser, lserver, timestamp);

CREATE TABLE pubsub_nodes (
nidx BIGINT UNSIGNED NOT NULL,
nidx BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
p_key VARCHAR(250) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(250) NOT NULL,
owners JSON NOT NULL,
options JSON NOT NULL,
PRIMARY KEY(p_key, name, nidx)
options JSON NOT NULL
) CHARACTER SET utf8mb4
ROW_FORMAT=DYNAMIC;

CREATE INDEX i_pubsub_nodes_nidx USING BTREE ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes USING BTREE ON pubsub_nodes(p_key, name);

CREATE TABLE pubsub_node_collections (
name VARCHAR(250) NOT NULL,
Expand Down
7 changes: 3 additions & 4 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,15 @@ CREATE INDEX i_inbox
USING BTREE(luser, lserver, timestamp);

CREATE TABLE pubsub_nodes (
nidx BIGINT NOT NULL,
nidx BIGSERIAL PRIMARY KEY,
p_key VARCHAR(250) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(250) NOT NULL,
owners JSON NOT NULL,
options JSON NOT NULL,
PRIMARY KEY(p_key, name, nidx)
options JSON NOT NULL
);

CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes USING btree (p_key, name);

CREATE TABLE pubsub_node_collections (
name VARCHAR(250) NOT NULL,
Expand Down
15 changes: 9 additions & 6 deletions src/pubsub/mod_pubsub_db_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,19 @@ del_node(Nidx) ->
{ok, States}.

-spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
set_node(#pubsub_node{id = undefined} = Node) ->
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
set_node(CreateNode);
set_node(#pubsub_node{nodeid = {Key, Name}, id = Nidx, type = Type,
set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type,
owners = Owners, options = Opts, parents = Parents}) ->
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
SQL = mod_pubsub_db_rdbms_sql:upsert_pubsub_node(Nidx, encode_key(Key), Name, Type,
{ok, Nidx} = mod_pubsub_db_rdbms_sql:insert_pubsub_node(encode_key(Key), Name, Type,
jsx:encode(OwnersJid),
jsx:encode(Opts)),
{updated, _} = mongoose_rdbms:sql_query(global, SQL),
maybe_set_parents(Name, Parents),
{ok, Nidx};

set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type,
owners = Owners, options = Opts, parents = Parents}) ->
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
mod_pubsub_db_rdbms_sql:update_pubsub_node(Nidx, Type, jsx:encode(OwnersJid), jsx:encode(Opts)),
maybe_set_parents(Name, Parents),
{ok, Nidx}.

Expand Down
129 changes: 58 additions & 71 deletions src/pubsub/mod_pubsub_db_rdbms_sql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@

% Nodes

-export([upsert_pubsub_node/6,
-export([insert_pubsub_node/5,
update_pubsub_node/4,
select_node_by_key_and_name/2,
select_node_by_id/1,
select_nodes_by_key/1,
Expand Down Expand Up @@ -517,82 +518,60 @@ delete_all_items(Nidx) ->
["DELETE FROM pubsub_items"
" WHERE nidx = ", esc_int(Nidx)].


-spec upsert_pubsub_node(Nidx :: mod_pubsub:nodeIdx(), Key :: binary(),
Name :: mod_pubsub:nodeId(), Type :: binary(),
Owners :: binary(), Options :: binary()) -> iolist().
upsert_pubsub_node(Nidx, Key, Name, Type, Owners, Options) ->
EscNidx = esc_int(Nidx),
insert_pubsub_node(Key, Name, Type, Owners, Options) ->
RDBMSType = {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()},
EscKey = esc_string(Key),
EscName = esc_string(Name),
EscType = esc_string(Type),
EscOwners = esc_string(Owners),
EscOptions = esc_string(Options),
case {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()} of
{mysql, _} ->
upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
{pgsql, _} ->
upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
{odbc, mssql} ->
upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
end.

upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions),
OnConflict = pgsql_node_conflict(EscType, EscOwners, EscOptions),
[Insert, OnConflict].

upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions),
OnConflict = mysql_node_conflict(EscType, EscOwners, EscOptions),
[Insert, OnConflict].

mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
["INSERT INTO pubsub_nodes (nidx, p_key, name, type, owners, options) VALUES (",
EscNidx, ", ",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions,
")"].
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, RDBMSType).

pgsql_node_conflict(EscType, EscOwners, EscOptions) ->
[" ON CONFLICT (nidx, p_key, name) DO",
" UPDATE SET type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions].

mysql_node_conflict(EscType, EscOwners, EscOptions) ->
[" ON DUPLICATE KEY",
" UPDATE type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions].

upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
["MERGE INTO pubsub_nodes WITH (SERIALIZABLE) as target"
" USING (SELECT ", EscNidx, " AS nidx, ",
EscKey, " AS p_key, ",
EscName, " AS name)"
" AS source (nidx, p_key, name)"
" ON (target.nidx = source.nidx"
" AND target.p_key = source.p_key"
" AND target.name = source.name)"
" WHEN MATCHED THEN UPDATE"
" SET type = ", EscType, ", "
"owners = ", EscOwners, ", "
"options = ", EscOptions,
" WHEN NOT MATCHED THEN INSERT"
" (nidx, p_key, name, type, owners, options)"
" VALUES (",
EscNidx, ", ",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions,
");"].
update_pubsub_node(Nidx, Type, Owners, Options) ->
EscNidx = esc_int(Nidx),
EscType = esc_string(Type),
EscOwners = esc_string(Owners),
EscOptions = esc_string(Options),
sql_node_update(EscNidx, EscType, EscOwners, EscOptions).

sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {odbc, mssql}) ->
Query = ["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) "
"OUTPUT inserted.nidx "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OUTPUT statement is cool :) 👍

"VALUES (",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions, ");"],
Res = mongoose_rdbms:sql_query(global, Query),
convert_sql_nidx(Res);
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {pgsql, _}) ->
Query = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions),
[" RETURNING nidx;"]],
Res = mongoose_rdbms:sql_query(global, Query),
convert_sql_nidx(Res);
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {mysql, _}) ->
Queries = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions),
["; SELECT last_insert_id();"]],
%% When a list of qeries is passed, the firs encountered error will be returned.
%% Otherwise last statement result is returned.
Res = mongoose_rdbms:sql_query(global, Queries),
convert_sql_nidx(Res).

common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions) ->
["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions, ")"].

sql_node_update(EscNidx, EscType, EscOwners, EscOptions) ->
Query = [" UPDATE pubsub_nodes SET type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions,
" WHERE nidx = ", EscNidx, ";"],
{updated, _} = mongoose_rdbms:sql_query(global, Query).

set_parents(Node, Parents) ->
EscNode = esc_string(Node),
Expand Down Expand Up @@ -690,3 +669,11 @@ esc_string(String) ->
esc_int(Int) ->
mongoose_rdbms:use_escaped_integer(mongoose_rdbms:escape_integer(Int)).

%% MSSQL and MYSQL
convert_sql_nidx({selected, [{Nidx}]}) ->
Copy link
Contributor

@michalwski michalwski Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that it may be helpful for someone new to this part of the code if there is a comment somewhere in the code which DB returns what. So that it's easier to understand the code. You and me understand it (at least today). I'm not sure how easy would it be to understand for anyone who didn't work on this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've marked which clause applies to which DB. I think that's enough to understand :)

{ok, mongoose_rdbms:result_to_integer(Nidx)};
%% PGSQL
convert_sql_nidx({updated, _, [{Nidx}]}) ->
{ok, mongoose_rdbms:result_to_integer(Nidx)};
convert_sql_nidx(Res) ->
{error, {bad_rdbms_response, Res}}.
1 change: 1 addition & 0 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
-type rdbms_msg() :: {sql_query, _} | {sql_transaction, fun()} | {sql_execute, atom(), iodata()}.
-type single_query_result() :: {selected, [tuple()]} |
{updated, non_neg_integer() | undefined} |
{updated, non_neg_integer(), [tuple()]} |
{aborted, Reason :: term()} |
{error, Reason :: string() | duplicate_key}.
-type query_result() :: single_query_result() | [single_query_result()].
Expand Down