Skip to content

Commit

Permalink
Merge fc1f77e into 57ab4c6
Browse files Browse the repository at this point in the history
  • Loading branch information
zofpolkowska authored Dec 12, 2018
2 parents 57ab4c6 + fc1f77e commit 619e139
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 93 deletions.
11 changes: 3 additions & 8 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -475,21 +475,16 @@ CREATE INDEX i_inbox_ts ON inbox(luser, lserver, timestamp);
GO

CREATE TABLE dbo.pubsub_nodes (
nidx BIGINT NOT NULL,
nidx BIGINT IDENTITY(1,1) PRIMARY KEY,
p_key NVARCHAR(250) NOT NULL,
name NVARCHAR(250) NOT NULL,
type NVARCHAR(250) NOT NULL,
owners NVARCHAR(max) NOT NULL,
options NVARCHAR(max) NOT NULL,
CONSTRAINT PK_pubsub_nodes PRIMARY KEY CLUSTERED(
p_key ASC,
name ASC,
nidx ASC
)
options NVARCHAR(max) NOT NULL
)
GO

CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes(p_key, name);
GO

CREATE TABLE dbo.pubsub_node_collections (
Expand Down
7 changes: 3 additions & 4 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,16 @@ CREATE TABLE inbox (
CREATE INDEX i_inbox USING BTREE ON inbox(luser, lserver, timestamp);

CREATE TABLE pubsub_nodes (
nidx BIGINT UNSIGNED NOT NULL,
nidx BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
p_key VARCHAR(250) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(250) NOT NULL,
owners JSON NOT NULL,
options JSON NOT NULL,
PRIMARY KEY(p_key, name, nidx)
options JSON NOT NULL
) CHARACTER SET utf8mb4
ROW_FORMAT=DYNAMIC;

CREATE INDEX i_pubsub_nodes_nidx USING BTREE ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes USING BTREE ON pubsub_nodes(p_key, name);

CREATE TABLE pubsub_node_collections (
name VARCHAR(250) NOT NULL,
Expand Down
7 changes: 3 additions & 4 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,15 @@ CREATE INDEX i_inbox
USING BTREE(luser, lserver, timestamp);

CREATE TABLE pubsub_nodes (
nidx BIGINT NOT NULL,
nidx BIGSERIAL PRIMARY KEY,
p_key VARCHAR(250) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(250) NOT NULL,
owners JSON NOT NULL,
options JSON NOT NULL,
PRIMARY KEY(p_key, name, nidx)
options JSON NOT NULL
);

CREATE INDEX i_pubsub_nodes_nidx ON pubsub_nodes(nidx);
CREATE UNIQUE INDEX i_pubsub_nodes_key_name ON pubsub_nodes USING btree (p_key, name);

CREATE TABLE pubsub_node_collections (
name VARCHAR(250) NOT NULL,
Expand Down
15 changes: 9 additions & 6 deletions src/pubsub/mod_pubsub_db_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,19 @@ del_node(Nidx) ->
{ok, States}.

-spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
set_node(#pubsub_node{id = undefined} = Node) ->
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
set_node(CreateNode);
set_node(#pubsub_node{nodeid = {Key, Name}, id = Nidx, type = Type,
set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type,
owners = Owners, options = Opts, parents = Parents}) ->
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
SQL = mod_pubsub_db_rdbms_sql:upsert_pubsub_node(Nidx, encode_key(Key), Name, Type,
{ok, Nidx} = mod_pubsub_db_rdbms_sql:insert_pubsub_node(encode_key(Key), Name, Type,
jsx:encode(OwnersJid),
jsx:encode(Opts)),
{updated, _} = mongoose_rdbms:sql_query(global, SQL),
maybe_set_parents(Name, Parents),
{ok, Nidx};

set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type,
owners = Owners, options = Opts, parents = Parents}) ->
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
mod_pubsub_db_rdbms_sql:update_pubsub_node(Nidx, Type, jsx:encode(OwnersJid), jsx:encode(Opts)),
maybe_set_parents(Name, Parents),
{ok, Nidx}.

Expand Down
127 changes: 56 additions & 71 deletions src/pubsub/mod_pubsub_db_rdbms_sql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@

% Nodes

-export([upsert_pubsub_node/6,
-export([insert_pubsub_node/5,
update_pubsub_node/4,
select_node_by_key_and_name/2,
select_node_by_id/1,
select_nodes_by_key/1,
Expand Down Expand Up @@ -517,82 +518,60 @@ delete_all_items(Nidx) ->
["DELETE FROM pubsub_items"
" WHERE nidx = ", esc_int(Nidx)].


-spec upsert_pubsub_node(Nidx :: mod_pubsub:nodeIdx(), Key :: binary(),
Name :: mod_pubsub:nodeId(), Type :: binary(),
Owners :: binary(), Options :: binary()) -> iolist().
upsert_pubsub_node(Nidx, Key, Name, Type, Owners, Options) ->
EscNidx = esc_int(Nidx),
insert_pubsub_node(Key, Name, Type, Owners, Options) ->
RDBMSType = {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()},
EscKey = esc_string(Key),
EscName = esc_string(Name),
EscType = esc_string(Type),
EscOwners = esc_string(Owners),
EscOptions = esc_string(Options),
case {mongoose_rdbms:db_engine(global), mongoose_rdbms_type:get()} of
{mysql, _} ->
upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
{pgsql, _} ->
upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
{odbc, mssql} ->
upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions);
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
end.

upsert_node_pgsql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions),
OnConflict = pgsql_node_conflict(EscType, EscOwners, EscOptions),
[Insert, OnConflict].

upsert_node_mysql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
Insert = mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions),
OnConflict = mysql_node_conflict(EscType, EscOwners, EscOptions),
[Insert, OnConflict].

mysql_and_pgsql_node_insert(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
["INSERT INTO pubsub_nodes (nidx, p_key, name, type, owners, options) VALUES (",
EscNidx, ", ",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions,
")"].
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, RDBMSType).

pgsql_node_conflict(EscType, EscOwners, EscOptions) ->
[" ON CONFLICT (nidx, p_key, name) DO",
" UPDATE SET type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions].

mysql_node_conflict(EscType, EscOwners, EscOptions) ->
[" ON DUPLICATE KEY",
" UPDATE type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions].

upsert_node_mssql(EscNidx, EscKey, EscName, EscType, EscOwners, EscOptions) ->
["MERGE INTO pubsub_nodes WITH (SERIALIZABLE) as target"
" USING (SELECT ", EscNidx, " AS nidx, ",
EscKey, " AS p_key, ",
EscName, " AS name)"
" AS source (nidx, p_key, name)"
" ON (target.nidx = source.nidx"
" AND target.p_key = source.p_key"
" AND target.name = source.name)"
" WHEN MATCHED THEN UPDATE"
" SET type = ", EscType, ", "
"owners = ", EscOwners, ", "
"options = ", EscOptions,
" WHEN NOT MATCHED THEN INSERT"
" (nidx, p_key, name, type, owners, options)"
" VALUES (",
EscNidx, ", ",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions,
");"].
update_pubsub_node(Nidx, Type, Owners, Options) ->
EscNidx = esc_int(Nidx),
EscType = esc_string(Type),
EscOwners = esc_string(Owners),
EscOptions = esc_string(Options),
sql_node_update(EscNidx, EscType, EscOwners, EscOptions).

sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {odbc, mssql}) ->
Query = ["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) "
"OUTPUT inserted.nidx "
"VALUES (",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions, ");"],
Res = mongoose_rdbms:sql_query(global, Query),
convert_sql_nidx(Res);
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {pgsql, _}) ->
Query = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions),
[" RETURNING nidx;"]],
Res = mongoose_rdbms:sql_query(global, Query),
convert_sql_nidx(Res);
sql_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions, {mysql, _}) ->
Queries = [common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions),
["; SELECT last_insert_id();"]],
%% When a list of qeries is passed, the firs encountered error will be returned.
%% Otherwise last statement result is returned.
Res = mongoose_rdbms:sql_query(global, Queries),
convert_sql_nidx(Res).

common_node_insert(EscKey, EscName, EscType, EscOwners, EscOptions) ->
["INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (",
EscKey, ", ",
EscName, ", ",
EscType, ", ",
EscOwners, ", ",
EscOptions, ")"].

sql_node_update(EscNidx, EscType, EscOwners, EscOptions) ->
Query = [" UPDATE pubsub_nodes SET type = ", EscType, ", "
" owners = ", EscOwners, ", "
" options = ", EscOptions,
" WHERE nidx = ", EscNidx, ";"],
{updated, _} = mongoose_rdbms:sql_query(global, Query).

set_parents(Node, Parents) ->
EscNode = esc_string(Node),
Expand Down Expand Up @@ -690,3 +669,9 @@ esc_string(String) ->
esc_int(Int) ->
mongoose_rdbms:use_escaped_integer(mongoose_rdbms:escape_integer(Int)).

convert_sql_nidx({selected, [{Nidx}]}) ->
{ok, mongoose_rdbms:result_to_integer(Nidx)};
convert_sql_nidx({updated, _, [{Nidx}]}) ->
{ok, mongoose_rdbms:result_to_integer(Nidx)};
convert_sql_nidx(Res) ->
{error, {bad_rdbms_response, Res}}.
1 change: 1 addition & 0 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
-type rdbms_msg() :: {sql_query, _} | {sql_transaction, fun()} | {sql_execute, atom(), iodata()}.
-type single_query_result() :: {selected, [tuple()]} |
{updated, non_neg_integer() | undefined} |
{updated, non_neg_integer(), [tuple()]} |
{aborted, Reason :: term()} |
{error, Reason :: string() | duplicate_key}.
-type query_result() :: single_query_result() | [single_query_result()].
Expand Down

0 comments on commit 619e139

Please sign in to comment.