Skip to content

Commit

Permalink
Add mongoose_node_num instead of ejabberd_node_id
Browse files Browse the repository at this point in the history
There are two backends: Mnesia or CETS disco.

If none backends are useful, we fallback to node_id to be 0 (which is not that major issue for MAM ID collisions)
  • Loading branch information
arcusfelis committed Jun 21, 2023
1 parent f69dfda commit e1b7ad2
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 53 deletions.
4 changes: 2 additions & 2 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_id INT UNSIGNED,
node_num INT UNSIGNED NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_nodeid ON discovery_nodes(cluster_name, node_id);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num);
4 changes: 2 additions & 2 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_id INT UNSIGNED,
node_num INT UNSIGNED NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_nodeid USING BTREE ON discovery_nodes(cluster_name, node_id);
CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num);
4 changes: 2 additions & 2 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_id INT UNSIGNED,
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_nodeid ON discovery_nodes USING BTREE(cluster_name, node_id);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num);
2 changes: 1 addition & 1 deletion src/ejabberd_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ db_init() ->
ok;

Check warning on line 120 in src/ejabberd_app.erl

View check run for this annotation

Codecov / codecov/patch

src/ejabberd_app.erl#L120

Added line #L120 was not covered by tests
_ ->
db_init_mnesia(),
mongoose_short_number_node_id_mnesia:init()
mongoose_node_num_mnesia:init()
end.

db_init_mnesia() ->
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
%%%
%%% <ul>
%%% <li>date (using `timestamp()');</li>
%%% <li>node number (using {@link ejabberd_node_id}).</li>
%%% <li>node number (using {@link mongoose_node_num}).</li>
%%% </ul>
%%% @end
%%%-------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
%%%
%%% <ul>
%%% <li>date (using `timestamp()');</li>
%%% <li>node number (using {@link ejabberd_node_id}).</li>
%%% <li>node number (using {@link mongoose_node_num}).</li>
%%% </ul>
%%% @end
%%%-------------------------------------------------------------------
Expand Down
14 changes: 7 additions & 7 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,27 @@ get_or_generate_mam_id(Acc) ->

-spec generate_message_id(integer()) -> integer().
generate_message_id(CandidateStamp) ->
{ok, NodeId} = ejabberd_node_id:node_id(),
NodeNum = mongoose_node_num:node_num(),

Check warning on line 182 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L182

Added line #L182 was not covered by tests
UniqueStamp = mongoose_mam_id:next_unique(CandidateStamp),
encode_compact_uuid(UniqueStamp, NodeId).
encode_compact_uuid(UniqueStamp, NodeNum).

Check warning on line 184 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L184

Added line #L184 was not covered by tests

%% @doc Create a message ID (UID).
%%
%% It removes a leading 0 from 64-bit binary representation.
%% It puts node id as a last byte.
%% The maximum date, that can be encoded is `{{4253, 5, 31}, {22, 20, 37}}'.
-spec encode_compact_uuid(integer(), integer()) -> integer().
encode_compact_uuid(Microseconds, NodeId)
when is_integer(Microseconds), is_integer(NodeId) ->
(Microseconds bsl 8) + NodeId.
encode_compact_uuid(Microseconds, NodeNum)
when is_integer(Microseconds), is_integer(NodeNum) ->
(Microseconds bsl 8) + NodeNum.

Check warning on line 194 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L194

Added line #L194 was not covered by tests


%% @doc Extract date and node id from a message id.
-spec decode_compact_uuid(integer()) -> {integer(), byte()}.
decode_compact_uuid(Id) ->
Microseconds = Id bsr 8,
NodeId = Id band 255,
{Microseconds, NodeId}.
NodeNum = Id band 255,
{Microseconds, NodeNum}.

Check warning on line 202 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L201-L202

Added lines #L201 - L202 were not covered by tests


%% @doc Encode a message ID to pass it to the user.
Expand Down
102 changes: 65 additions & 37 deletions src/mongoose_cets_discovery_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,71 @@ init(Opts = #{cluster_name := _, node_name_to_insert := _}) ->

-spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
prepare(),
insert(ClusterName, Node),
try mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]) of
{selected, Rows} ->
Nodes = [binary_to_atom(X) || {X} <- Rows, X =/= <<>>],
{{ok, Nodes}, State}
catch Class:Reason:Stacktrace ->
?LOG_ERROR(#{
what => discovery_failed_select,
class => Class,
reason => Reason,
stacktrace => Stacktrace
}),
{{error, Reason}, State}
try
{Num, Nodes} = try_register(ClusterName, Node),
mongoose_node_num:set_node_num(Num),
{{ok, Nodes}, State}

Check warning on line 20 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L17-L20

Added lines #L17 - L20 were not covered by tests
catch Class:Reason:Stacktrace ->
?LOG_ERROR(#{what => discovery_failed_select, class => Class,
reason => Reason, stacktrace => Stacktrace}),
{{error, Reason}, State}

Check warning on line 24 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L22-L24

Added lines #L22 - L24 were not covered by tests
end.

try_register(ClusterName, Node) ->
prepare(),
{selected, Rows} = select(ClusterName),
Pairs = [{binary_to_atom(NodeBin), Num} || {NodeBin, Num} <- Rows],
{Nodes, Nums} = lists:unzip(Pairs),
Inserted = lists:member(Node, Nodes),
Timestamp = timestamp(),
NodeNum =

Check warning on line 34 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L28-L34

Added lines #L28 - L34 were not covered by tests
case Inserted of
true ->
update_existing(ClusterName, Node, Timestamp),
{value, {_, Num}} = lists:keysearch(Node, 1, Pairs),
Num;

Check warning on line 39 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L37-L39

Added lines #L37 - L39 were not covered by tests
false ->
Num = next_free_num(lists:usort(Nums)),
insert_new(ClusterName, Node, Timestamp, Num),
Num

Check warning on line 43 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L41-L43

Added lines #L41 - L43 were not covered by tests
end,
{NodeNum, Nodes}.

Check warning on line 45 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L45

Added line #L45 was not covered by tests

prepare() ->
Filter = [<<"node_name">>, <<"cluster_name">>],
Fields = [<<"updated_timestamp">>],
rdbms_queries:prepare_upsert(global, cets_disco_insert, discovery_nodes,
Filter ++ Fields, Fields, Filter),
mongoose_rdbms:prepare(cets_disco_select, discovery_nodes, [cluster_name],
<<"SELECT node_name FROM discovery_nodes WHERE cluster_name = ?">>).

insert(ClusterName, Node) ->
Timestamp = os:system_time(microsecond),
Filter = [Node, ClusterName],
Fields = [Timestamp],
try
{updated, _} = rdbms_queries:execute_upsert(global, cets_disco_insert,
Filter ++ Fields, Fields,
Filter)
catch Class:Reason:Stacktrace ->
?LOG_ERROR(#{
what => discovery_failed_insert,
class => Class,
reason => Reason,
stacktrace => Stacktrace
})
end.
T = discovery_nodes,
mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
mongoose_rdbms:prepare(cets_disco_insert_new, T,

Check warning on line 50 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L48-L50

Added lines #L48 - L50 were not covered by tests
[cluster_name, node_name, node_num, timestamp], insert_new()),
mongoose_rdbms:prepare(cets_disco_update_existing, T,

Check warning on line 52 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L52

Added line #L52 was not covered by tests
[timestamp, cluster_name, node_name], update_existing()).

select() ->
<<"SELECT node_name, node_num FROM discovery_nodes WHERE cluster_name = ?">>.

Check warning on line 56 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L56

Added line #L56 was not covered by tests

select(ClusterName) ->
mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).

Check warning on line 59 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L59

Added line #L59 was not covered by tests

insert_new() ->
<<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, timestamp)"

Check warning on line 62 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L62

Added line #L62 was not covered by tests
" VALUES (?, ?, ?, ?)">>.

insert_new(ClusterName, Node, Timestamp, Num) ->
mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]).

Check warning on line 66 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L66

Added line #L66 was not covered by tests

update_existing() ->
<<"UPDATE discovery_nodes SET timestamp = ? WHERE cluster_name = ? AND node_name = ?">>.

Check warning on line 69 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L69

Added line #L69 was not covered by tests

update_existing(ClusterName, Node, Timestamp) ->
mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]).

Check warning on line 72 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L72

Added line #L72 was not covered by tests

timestamp() ->
os:system_time(microsecond).

Check warning on line 75 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L75

Added line #L75 was not covered by tests

%% Returns a next free node id based on the currently registered ids
next_free_num([]) ->
0;

Check warning on line 79 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L79

Added line #L79 was not covered by tests
next_free_num([H | T = [E | _]]) when ((H + 1) =:= E) ->
%% Sequential, ignore H
next_free_num(T);

Check warning on line 82 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L82

Added line #L82 was not covered by tests
next_free_num([H | _]) ->
H + 1.

Check warning on line 84 in src/mongoose_cets_discovery_rdbms.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_cets_discovery_rdbms.erl#L84

Added line #L84 was not covered by tests
30 changes: 30 additions & 0 deletions src/mongoose_node_num.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
%% Returns a numeric id from 0 to 255 for the current node.
%% Used to generate MAM IDs.
-module(mongoose_node_num).
-export([set_node_num/1]).
-export([node_num/0]).

-include("mongoose.hrl").
-include("jlib.hrl").
-include("mongoose_config_spec.hrl").
-include("mongoose_logger.hrl").

-type node_num() :: 0..255.
-define(KEY, ?MODULE).
-export_type([node_num/0]).

%% @doc Return an integer node ID.
-spec node_num() -> node_num().
node_num() ->
%% We just return 0 if service is not running.
persistent_term:get(?KEY, 0).

-spec set_node_num(node_num()) -> ignore | updated | same.
set_node_num(Num) ->
case node_num() =:= Num of
true ->
same;
false ->
persistent_term:put(?KEY, Num),
updated
end.
36 changes: 36 additions & 0 deletions src/mongoose_node_num_mnesia.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-module(mongoose_node_num_mnesia).

-export([init/0]).

-record(node_num, {name :: atom(),
num :: mongoose_node_num:node_num() }).

init() ->
mnesia:create_table(node_num,
[{ram_copies, [node()]}, {type, set},
{attributes, record_info(fields, node_num)}]),
mnesia:add_table_index(node_num, num),
mnesia:add_table_copy(node_num, node(), ram_copies),
register_node(node()),
[#node_num{num = Num}] = mnesia:dirty_read(node_num, node()),
mongoose_node_num:set_node_num(Num),
ok.

-spec register_node(atom()) -> ok.
register_node(NodeName) ->
{atomic, _} = mnesia:transaction(fun() ->
case mnesia:read(node_num, NodeName) of
[] ->
mnesia:write(#node_num{name = NodeName, num = next_node_num()});
[_] -> ok
end
end),
ok.

-spec next_node_num() -> mongoose_node_num:node_num().
next_node_num() ->
max_node_num() + 1.

-spec max_node_num() -> mongoose_node_num:node_num().
max_node_num() ->
mnesia:foldl(fun(#node_num{num = Num}, Max) -> max(Num, Max) end, 0, node_num).

0 comments on commit e1b7ad2

Please sign in to comment.