diff --git a/doc/modules/mod_pubsub.md b/doc/modules/mod_pubsub.md index 7a246c22ee..0e80a5229b 100644 --- a/doc/modules/mod_pubsub.md +++ b/doc/modules/mod_pubsub.md @@ -38,6 +38,8 @@ Node configuration still uses the default configuration defined by the node plug Current RDBMS backend replaces `pubsub_node`, `pubsub_state` and `pubsub_item` Mnesia tables with RDBMS equivalents. Due to a fact that some data is still maintained in Mnesia, there is a certain risk of data becoming inconsistent. The schema used by this backend may change until it reaches stable status. +If `mod_pubsub` is configured to use RDBMS, management of nodes indexes is +done by the database, so the `pubsub_index` table is not needed. #### Cache Backend diff --git a/priv/mssql2012.sql b/priv/mssql2012.sql index 244b5820d5..d00ba440c3 100644 --- a/priv/mssql2012.sql +++ b/priv/mssql2012.sql @@ -475,21 +475,16 @@ CREATE INDEX i_inbox_ts ON inbox(luser, lserver, timestamp); GO CREATE TABLE dbo.pubsub_nodes ( - nidx BIGINT NOT NULL, + nidx BIGINT IDENTITY(1,1) PRIMARY KEY, p_key NVARCHAR(250) NOT NULL, name NVARCHAR(250) NOT NULL, type NVARCHAR(250) NOT NULL, owners NVARCHAR(max) NOT NULL, - options NVARCHAR(max) NOT NULL, - CONSTRAINT PK_pubsub_nodes PRIMARY KEY CLUSTERED( - p_key ASC, - name ASC, - nidx ASC - ) + options NVARCHAR(max) NOT NULL ) GO -CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx); +CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes(p_key, name); GO CREATE TABLE dbo.pubsub_node_collections ( diff --git a/priv/mysql.sql b/priv/mysql.sql index c15da40afd..c392319b68 100644 --- a/priv/mysql.sql +++ b/priv/mysql.sql @@ -354,17 +354,16 @@ CREATE TABLE inbox ( CREATE INDEX i_inbox USING BTREE ON inbox(luser, lserver, timestamp); CREATE TABLE pubsub_nodes ( - nidx BIGINT UNSIGNED NOT NULL, + nidx BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, p_key VARCHAR(250) NOT NULL, name VARCHAR(250) NOT NULL, type VARCHAR(250) NOT NULL, owners JSON NOT NULL, - options JSON NOT NULL, - PRIMARY KEY(p_key, name, nidx) + options JSON NOT NULL ) CHARACTER SET utf8mb4 ROW_FORMAT=DYNAMIC; -CREATE INDEX i_pubsub_nodes_nidx USING BTREE ON pubsub_nodes(nidx); +CREATE UNIQUE INDEX i_pubsub_nodes USING BTREE ON pubsub_nodes(p_key, name); CREATE TABLE pubsub_node_collections ( name VARCHAR(250) NOT NULL, diff --git a/priv/pg.sql b/priv/pg.sql index ec9a5a632c..66f74ef54e 100644 --- a/priv/pg.sql +++ b/priv/pg.sql @@ -317,16 +317,15 @@ CREATE INDEX i_inbox USING BTREE(luser, lserver, timestamp); CREATE TABLE pubsub_nodes ( - nidx BIGINT NOT NULL, + nidx BIGSERIAL PRIMARY KEY, p_key VARCHAR(250) NOT NULL, name VARCHAR(250) NOT NULL, type VARCHAR(250) NOT NULL, owners JSON NOT NULL, - options JSON NOT NULL, - PRIMARY KEY(p_key, name, nidx) + options JSON NOT NULL ); -CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx); +CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes USING btree (p_key, name); CREATE TABLE pubsub_node_collections ( name VARCHAR(250) NOT NULL, diff --git a/src/pubsub/mod_pubsub_db_rdbms.erl b/src/pubsub/mod_pubsub_db_rdbms.erl index 249e8b8dd0..7649dc38c0 100644 --- a/src/pubsub/mod_pubsub_db_rdbms.erl +++ b/src/pubsub/mod_pubsub_db_rdbms.erl @@ -231,16 +231,19 @@ del_node(Nidx) -> {ok, States}. -spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}. -set_node(#pubsub_node{id = undefined} = Node) -> - CreateNode = Node#pubsub_node{id = pubsub_index:new(node)}, - set_node(CreateNode); -set_node(#pubsub_node{nodeid = {Key, Name}, id = Nidx, type = Type, +set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type, owners = Owners, options = Opts, parents = Parents}) -> OwnersJid = [jid:to_binary(Owner) || Owner <- Owners], - SQL = mod_pubsub_db_rdbms_sql:upsert_pubsub_node(Nidx, encode_key(Key), Name, Type, + {ok, Nidx} = mod_pubsub_db_rdbms_sql:insert_pubsub_node(encode_key(Key), Name, Type, jsx:encode(OwnersJid), jsx:encode(Opts)), - {updated, _} = mongoose_rdbms:sql_query(global, SQL), + maybe_set_parents(Name, Parents), + {ok, Nidx}; + +set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type, + owners = Owners, options = Opts, parents = Parents}) -> + OwnersJid = [jid:to_binary(Owner) || Owner <- Owners], + mod_pubsub_db_rdbms_sql:update_pubsub_node(Nidx, Type, jsx:encode(OwnersJid), jsx:encode(Opts)), maybe_set_parents(Name, Parents), {ok, Nidx}. diff --git a/src/pubsub/mod_pubsub_db_rdbms_sql.erl b/src/pubsub/mod_pubsub_db_rdbms_sql.erl index b89380301f..001ede8a7c 100644 --- a/src/pubsub/mod_pubsub_db_rdbms_sql.erl +++ b/src/pubsub/mod_pubsub_db_rdbms_sql.erl @@ -54,7 +54,8 @@ % Nodes --export([upsert_pubsub_node/6, +-export([insert_pubsub_node/5, + update_pubsub_node/4, select_node_by_key_and_name/2, select_node_by_id/1, select_nodes_by_key/1, @@ -517,82 +518,60 @@ delete_all_items(Nidx) -> ["DELETE FROM pubsub_items" " WHERE nidx = ", esc_int(Nidx)]. - --spec upsert_pubsub_node(Nidx :: mod_pubsub:nodeIdx(), Key :: binary(), - Name :: mod_pubsub:nodeId(), Type :: binary(), - Owners :: binary(), Options :: binary()) -> iolist(). -upsert_pubsub_node(Nidx, Key, Name, Type, Owners, Options) -> - EscNidx = esc_int(Nidx), +insert_pubsub_node(Key, Name, Type, Owners, Options) -> + RDBMSType = {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()}, EscKey = esc_string(Key), EscName = esc_string(Name), EscType = esc_string(Type), EscOwners = esc_string(Owners), EscOptions = esc_string(Options), - case {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()} of - {mysql, _} -> - upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions); - {pgsql, _} -> - upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions); - {odbc, mssql} -> - upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions); - NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) - end. - -upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) -> - Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions), - OnConflict = pgsql_node_conflict(EscType, EscOwners, EscOptions), - [Insert, OnConflict]. - -upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) -> - Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions), - OnConflict = mysql_node_conflict(EscType, EscOwners, EscOptions), - [Insert, OnConflict]. - -mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) -> - ["INSERT INTO pubsub_nodes (nidx, p_key, name, type, owners, options) VALUES (", - EscNidx, ", ", - EscKey, ", ", - EscName, ", ", - EscType, ", ", - EscOwners, ", ", - EscOptions, - ")"]. + sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, RDBMSType). -pgsql_node_conflict(EscType, EscOwners, EscOptions) -> - [" ON CONFLICT (nidx, p_key, name) DO", - " UPDATE SET type = ", EscType, ", " - " owners = ", EscOwners, ", " - " options = ", EscOptions]. - -mysql_node_conflict(EscType, EscOwners, EscOptions) -> - [" ON DUPLICATE KEY", - " UPDATE type = ", EscType, ", " - " owners = ", EscOwners, ", " - " options = ", EscOptions]. - -upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) -> - ["MERGE INTO pubsub_nodes WITH (SERIALIZABLE) as target" - " USING (SELECT ", EscNidx, " AS nidx, ", - EscKey, " AS p_key, ", - EscName, " AS name)" - " AS source (nidx, p_key, name)" - " ON (target.nidx = source.nidx" - " AND target.p_key = source.p_key" - " AND target.name = source.name)" - " WHEN MATCHED THEN UPDATE" - " SET type = ", EscType, ", " - "owners = ", EscOwners, ", " - "options = ", EscOptions, - " WHEN NOT MATCHED THEN INSERT" - " (nidx, p_key, name, type, owners, options)" - " VALUES (", - EscNidx, ", ", - EscKey, ", ", - EscName, ", ", - EscType, ", ", - EscOwners, ", ", - EscOptions, - ");"]. +update_pubsub_node(Nidx, Type, Owners, Options) -> + EscNidx = esc_int(Nidx), + EscType = esc_string(Type), + EscOwners = esc_string(Owners), + EscOptions = esc_string(Options), + sql_node_update(EscNidx, EscType, EscOwners, EscOptions). + +sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {odbc, mssql}) -> + Query = ["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) " + "OUTPUT inserted.nidx " + "VALUES (", + EscKey, ", ", + EscName, ", ", + EscType, ", ", + EscOwners, ", ", + EscOptions, ");"], + Res = mongoose_rdbms:sql_query(global, Query), + convert_sql_nidx(Res); +sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {pgsql, _}) -> + Query = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions), + [" RETURNING nidx;"]], + Res = mongoose_rdbms:sql_query(global, Query), + convert_sql_nidx(Res); +sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {mysql, _}) -> + Queries = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions), + ["; SELECT last_insert_id();"]], + %% When a list of qeries is passed, the firs encountered error will be returned. + %% Otherwise last statement result is returned. + Res = mongoose_rdbms:sql_query(global, Queries), + convert_sql_nidx(Res). + +common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions) -> + ["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (", + EscKey, ", ", + EscName, ", ", + EscType, ", ", + EscOwners, ", ", + EscOptions, ")"]. + +sql_node_update(EscNidx, EscType, EscOwners, EscOptions) -> + Query = [" UPDATE pubsub_nodes SET type = ", EscType, ", " + " owners = ", EscOwners, ", " + " options = ", EscOptions, + " WHERE nidx = ", EscNidx, ";"], + {updated, _} = mongoose_rdbms:sql_query(global, Query). set_parents(Node, Parents) -> EscNode = esc_string(Node), @@ -690,3 +669,11 @@ esc_string(String) -> esc_int(Int) -> mongoose_rdbms:use_escaped_integer(mongoose_rdbms:escape_integer(Int)). +%% MSSQL and MYSQL +convert_sql_nidx({selected, [{Nidx}]}) -> + {ok, mongoose_rdbms:result_to_integer(Nidx)}; +%% PGSQL +convert_sql_nidx({updated, _, [{Nidx}]}) -> + {ok, mongoose_rdbms:result_to_integer(Nidx)}; +convert_sql_nidx(Res) -> + {error, {bad_rdbms_response, Res}}. diff --git a/src/rdbms/mongoose_rdbms.erl b/src/rdbms/mongoose_rdbms.erl index bef39eb391..c4c90b3d95 100644 --- a/src/rdbms/mongoose_rdbms.erl +++ b/src/rdbms/mongoose_rdbms.erl @@ -149,6 +149,7 @@ -type rdbms_msg() :: {sql_query, _} | {sql_transaction, fun()} | {sql_execute, atom(), iodata()}. -type single_query_result() :: {selected, [tuple()]} | {updated, non_neg_integer() | undefined} | + {updated, non_neg_integer(), [tuple()]} | {aborted, Reason :: term()} | {error, Reason :: string() | duplicate_key}. -type query_result() :: single_query_result() | [single_query_result()].