From e1b7ad2e2e233a729c8b55149fa7644771296c95 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 21 Jun 2023 17:28:30 +0200 Subject: [PATCH] Add mongoose_node_num instead of ejabberd_node_id There are two backends: Mnesia or CETS disco. If none backends are useful, we fallback to node_id to be 0 (which is not that major issue for MAM ID collisions) --- priv/mssql2012.sql | 4 +- priv/mysql.sql | 4 +- priv/pg.sql | 4 +- src/ejabberd_app.erl | 2 +- src/mam/mod_mam_muc.erl | 2 +- src/mam/mod_mam_pm.erl | 2 +- src/mam/mod_mam_utils.erl | 14 ++-- src/mongoose_cets_discovery_rdbms.erl | 102 ++++++++++++++++---------- src/mongoose_node_num.erl | 30 ++++++++ src/mongoose_node_num_mnesia.erl | 36 +++++++++ 10 files changed, 147 insertions(+), 53 deletions(-) create mode 100644 src/mongoose_node_num.erl create mode 100644 src/mongoose_node_num_mnesia.erl diff --git a/priv/mssql2012.sql b/priv/mssql2012.sql index 6feb633d43..9f55767fd8 100644 --- a/priv/mssql2012.sql +++ b/priv/mssql2012.sql @@ -757,7 +757,7 @@ CREATE TABLE discovery_nodes ( node_name varchar(250), cluster_name varchar(250), updated_timestamp BIGINT NOT NULL, -- in microseconds - node_id INT UNSIGNED, + node_num INT UNSIGNED NOT NULL, PRIMARY KEY (cluster_name, node_name) ); -CREATE UNIQUE INDEX i_discovery_nodes_nodeid ON discovery_nodes(cluster_name, node_id); +CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num); diff --git a/priv/mysql.sql b/priv/mysql.sql index 70826dcc5d..392dd0df63 100644 --- a/priv/mysql.sql +++ b/priv/mysql.sql @@ -549,7 +549,7 @@ CREATE TABLE discovery_nodes ( node_name varchar(250), cluster_name varchar(250), updated_timestamp BIGINT NOT NULL, -- in microseconds - node_id INT UNSIGNED, + node_num INT UNSIGNED NOT NULL, PRIMARY KEY (cluster_name, node_name) ); -CREATE UNIQUE INDEX i_discovery_nodes_nodeid USING BTREE ON discovery_nodes(cluster_name, node_id); +CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num); diff --git a/priv/pg.sql b/priv/pg.sql index 42ec43bf5e..58f64ec30a 100644 --- a/priv/pg.sql +++ b/priv/pg.sql @@ -509,7 +509,7 @@ CREATE TABLE discovery_nodes ( node_name varchar(250), cluster_name varchar(250), updated_timestamp BIGINT NOT NULL, -- in microseconds - node_id INT UNSIGNED, + node_num INT NOT NULL, PRIMARY KEY (cluster_name, node_name) ); -CREATE UNIQUE INDEX i_discovery_nodes_nodeid ON discovery_nodes USING BTREE(cluster_name, node_id); +CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num); diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 7f92428628..2df8af3f59 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -120,7 +120,7 @@ db_init() -> ok; _ -> db_init_mnesia(), - mongoose_short_number_node_id_mnesia:init() + mongoose_node_num_mnesia:init() end. db_init_mnesia() -> diff --git a/src/mam/mod_mam_muc.erl b/src/mam/mod_mam_muc.erl index f12aa58714..42f4fe49fc 100644 --- a/src/mam/mod_mam_muc.erl +++ b/src/mam/mod_mam_muc.erl @@ -23,7 +23,7 @@ %%% %%% %%% @end %%%------------------------------------------------------------------- diff --git a/src/mam/mod_mam_pm.erl b/src/mam/mod_mam_pm.erl index 619f8763d1..db437b7ab9 100644 --- a/src/mam/mod_mam_pm.erl +++ b/src/mam/mod_mam_pm.erl @@ -23,7 +23,7 @@ %%% %%% %%% @end %%%------------------------------------------------------------------- diff --git a/src/mam/mod_mam_utils.erl b/src/mam/mod_mam_utils.erl index 31e7494f3d..2a8ff2c991 100644 --- a/src/mam/mod_mam_utils.erl +++ b/src/mam/mod_mam_utils.erl @@ -179,9 +179,9 @@ get_or_generate_mam_id(Acc) -> -spec generate_message_id(integer()) -> integer(). generate_message_id(CandidateStamp) -> - {ok, NodeId} = ejabberd_node_id:node_id(), + NodeNum = mongoose_node_num:node_num(), UniqueStamp = mongoose_mam_id:next_unique(CandidateStamp), - encode_compact_uuid(UniqueStamp, NodeId). + encode_compact_uuid(UniqueStamp, NodeNum). %% @doc Create a message ID (UID). %% @@ -189,17 +189,17 @@ generate_message_id(CandidateStamp) -> %% It puts node id as a last byte. %% The maximum date, that can be encoded is `{{4253, 5, 31}, {22, 20, 37}}'. -spec encode_compact_uuid(integer(), integer()) -> integer(). -encode_compact_uuid(Microseconds, NodeId) - when is_integer(Microseconds), is_integer(NodeId) -> - (Microseconds bsl 8) + NodeId. +encode_compact_uuid(Microseconds, NodeNum) + when is_integer(Microseconds), is_integer(NodeNum) -> + (Microseconds bsl 8) + NodeNum. %% @doc Extract date and node id from a message id. -spec decode_compact_uuid(integer()) -> {integer(), byte()}. decode_compact_uuid(Id) -> Microseconds = Id bsr 8, - NodeId = Id band 255, - {Microseconds, NodeId}. + NodeNum = Id band 255, + {Microseconds, NodeNum}. %% @doc Encode a message ID to pass it to the user. diff --git a/src/mongoose_cets_discovery_rdbms.erl b/src/mongoose_cets_discovery_rdbms.erl index 106c4e2d91..9d4083ac38 100644 --- a/src/mongoose_cets_discovery_rdbms.erl +++ b/src/mongoose_cets_discovery_rdbms.erl @@ -14,43 +14,71 @@ init(Opts = #{cluster_name := _, node_name_to_insert := _}) -> -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}. get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) -> - prepare(), - insert(ClusterName, Node), - try mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]) of - {selected, Rows} -> - Nodes = [binary_to_atom(X) || {X} <- Rows, X =/= <<>>], - {{ok, Nodes}, State} - catch Class:Reason:Stacktrace -> - ?LOG_ERROR(#{ - what => discovery_failed_select, - class => Class, - reason => Reason, - stacktrace => Stacktrace - }), - {{error, Reason}, State} + try + {Num, Nodes} = try_register(ClusterName, Node), + mongoose_node_num:set_node_num(Num), + {{ok, Nodes}, State} + catch Class:Reason:Stacktrace -> + ?LOG_ERROR(#{what => discovery_failed_select, class => Class, + reason => Reason, stacktrace => Stacktrace}), + {{error, Reason}, State} end. +try_register(ClusterName, Node) -> + prepare(), + {selected, Rows} = select(ClusterName), + Pairs = [{binary_to_atom(NodeBin), Num} || {NodeBin, Num} <- Rows], + {Nodes, Nums} = lists:unzip(Pairs), + Inserted = lists:member(Node, Nodes), + Timestamp = timestamp(), + NodeNum = + case Inserted of + true -> + update_existing(ClusterName, Node, Timestamp), + {value, {_, Num}} = lists:keysearch(Node, 1, Pairs), + Num; + false -> + Num = next_free_num(lists:usort(Nums)), + insert_new(ClusterName, Node, Timestamp, Num), + Num + end, + {NodeNum, Nodes}. + prepare() -> - Filter = [<<"node_name">>, <<"cluster_name">>], - Fields = [<<"updated_timestamp">>], - rdbms_queries:prepare_upsert(global, cets_disco_insert, discovery_nodes, - Filter ++ Fields, Fields, Filter), - mongoose_rdbms:prepare(cets_disco_select, discovery_nodes, [cluster_name], - <<"SELECT node_name FROM discovery_nodes WHERE cluster_name = ?">>). - -insert(ClusterName, Node) -> - Timestamp = os:system_time(microsecond), - Filter = [Node, ClusterName], - Fields = [Timestamp], - try - {updated, _} = rdbms_queries:execute_upsert(global, cets_disco_insert, - Filter ++ Fields, Fields, - Filter) - catch Class:Reason:Stacktrace -> - ?LOG_ERROR(#{ - what => discovery_failed_insert, - class => Class, - reason => Reason, - stacktrace => Stacktrace - }) - end. + T = discovery_nodes, + mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()), + mongoose_rdbms:prepare(cets_disco_insert_new, T, + [cluster_name, node_name, node_num, timestamp], insert_new()), + mongoose_rdbms:prepare(cets_disco_update_existing, T, + [timestamp, cluster_name, node_name], update_existing()). + +select() -> + <<"SELECT node_name, node_num FROM discovery_nodes WHERE cluster_name = ?">>. + +select(ClusterName) -> + mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]). + +insert_new() -> + <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, timestamp)" + " VALUES (?, ?, ?, ?)">>. + +insert_new(ClusterName, Node, Timestamp, Num) -> + mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]). + +update_existing() -> + <<"UPDATE discovery_nodes SET timestamp = ? WHERE cluster_name = ? AND node_name = ?">>. + +update_existing(ClusterName, Node, Timestamp) -> + mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]). + +timestamp() -> + os:system_time(microsecond). + +%% Returns a next free node id based on the currently registered ids +next_free_num([]) -> + 0; +next_free_num([H | T = [E | _]]) when ((H + 1) =:= E) -> + %% Sequential, ignore H + next_free_num(T); +next_free_num([H | _]) -> + H + 1. diff --git a/src/mongoose_node_num.erl b/src/mongoose_node_num.erl new file mode 100644 index 0000000000..68c53c7d96 --- /dev/null +++ b/src/mongoose_node_num.erl @@ -0,0 +1,30 @@ +%% Returns a numeric id from 0 to 255 for the current node. +%% Used to generate MAM IDs. +-module(mongoose_node_num). +-export([set_node_num/1]). +-export([node_num/0]). + +-include("mongoose.hrl"). +-include("jlib.hrl"). +-include("mongoose_config_spec.hrl"). +-include("mongoose_logger.hrl"). + +-type node_num() :: 0..255. +-define(KEY, ?MODULE). +-export_type([node_num/0]). + +%% @doc Return an integer node ID. +-spec node_num() -> node_num(). +node_num() -> + %% We just return 0 if service is not running. + persistent_term:get(?KEY, 0). + +-spec set_node_num(node_num()) -> ignore | updated | same. +set_node_num(Num) -> + case node_num() =:= Num of + true -> + same; + false -> + persistent_term:put(?KEY, Num), + updated + end. diff --git a/src/mongoose_node_num_mnesia.erl b/src/mongoose_node_num_mnesia.erl new file mode 100644 index 0000000000..4a3f209f53 --- /dev/null +++ b/src/mongoose_node_num_mnesia.erl @@ -0,0 +1,36 @@ +-module(mongoose_node_num_mnesia). + +-export([init/0]). + +-record(node_num, {name :: atom(), + num :: mongoose_node_num:node_num() }). + +init() -> + mnesia:create_table(node_num, + [{ram_copies, [node()]}, {type, set}, + {attributes, record_info(fields, node_num)}]), + mnesia:add_table_index(node_num, num), + mnesia:add_table_copy(node_num, node(), ram_copies), + register_node(node()), + [#node_num{num = Num}] = mnesia:dirty_read(node_num, node()), + mongoose_node_num:set_node_num(Num), + ok. + +-spec register_node(atom()) -> ok. +register_node(NodeName) -> + {atomic, _} = mnesia:transaction(fun() -> + case mnesia:read(node_num, NodeName) of + [] -> + mnesia:write(#node_num{name = NodeName, num = next_node_num()}); + [_] -> ok + end + end), + ok. + +-spec next_node_num() -> mongoose_node_num:node_num(). +next_node_num() -> + max_node_num() + 1. + +-spec max_node_num() -> mongoose_node_num:node_num(). +max_node_num() -> + mnesia:foldl(fun(#node_num{num = Num}, Max) -> max(Num, Max) end, 0, node_num).