Skip to content

Commit

Permalink
Merge pull request #4042 from esl/feature-cets-node_id
Browse files Browse the repository at this point in the history
Read node_id from discovery_nodes table in RDBMS
  • Loading branch information
chrzaszcz committed Jun 29, 2023
2 parents 89bb2bd + b83dc98 commit 461ba2d
Show file tree
Hide file tree
Showing 23 changed files with 393 additions and 140 deletions.
1 change: 1 addition & 0 deletions big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
{suites, "tests", local_iq_SUITE}.
{suites, "tests", tcp_listener_SUITE}.
{suites, "tests", cets_disco_SUITE}.
{suites, "tests", start_node_id_SUITE}.

{config, ["test.config"]}.
{logdir, "ct_report"}.
Expand Down
1 change: 1 addition & 0 deletions big_tests/dynamic_domains.spec
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
{suites, "tests", local_iq_SUITE}.
{suites, "tests", tcp_listener_SUITE}.
{suites, "tests", cets_disco_SUITE}.
{suites, "tests", start_node_id_SUITE}.

{config, ["dynamic_domains.config", "test.config"]}.

Expand Down
3 changes: 2 additions & 1 deletion big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@
{stream_management_backend, cets},
{auth_method, "rdbms"},
{internal_databases, "[internal_databases.cets]
cluster_name = \"{{cluster_name}}\""},
cluster_name = \"{{cluster_name}}\"
[internal_databases.mnesia]"}, %% We still using mnesia for modules that are not converted to use CETS
{outgoing_pools, "[outgoing_pools.redis.global_distrib]
scope = \"global\"
workers = 10
Expand Down
8 changes: 6 additions & 2 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@ rdbms_backend(_Config) ->
State1 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts1]),
rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State1]),
State2 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts2]),
{{ok, Nodes}, _} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2]),
[test1, test2] = lists:sort(Nodes).
{{ok, Nodes}, State2_2} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2]),
%% "test2" node can see "test1"
true = lists:member(test1, Nodes),
{{ok, _}, State2_3} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2_2]),
%% Check that we follow the right code branch
#{last_query_info := #{already_registered := true}} = State2_3.
55 changes: 55 additions & 0 deletions big_tests/tests/start_node_id_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-module(start_node_id_SUITE).
-compile([export_all, nowarn_export_all]).

-import(distributed_helper, [mim/0, rpc/4]).
-include_lib("common_test/include/ct.hrl").

%%--------------------------------------------------------------------
%% Suite configuration
%%--------------------------------------------------------------------

all() ->
[{group, all}].

groups() ->
[{all, [], cases()}].

cases() ->
[cleaning_works].

suite() ->
escalus:suite().

%%--------------------------------------------------------------------
%% Init & teardown
%%--------------------------------------------------------------------
init_per_suite(Config) ->
escalus:init_per_suite(Config).

end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------

cleaning_works(Config) ->
Id = <<"someid139455">>,
Pid = spawn_link(fun() -> receive stop -> ok end end),
ok = rpc(mim(), mongoose_start_node_id, register_on_remote_node_rpc, [node(), Id, Pid]),
GetF = fun() -> rpc(mim(), mongoose_start_node_id, node_id_to_name, [Id]) end,
mongoose_helper:wait_until(GetF, {ok, node()}),
Pid ! stop,
mongoose_helper:wait_until(GetF, {error, unknown_id}).
2 changes: 2 additions & 0 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -757,5 +757,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num);
2 changes: 2 additions & 0 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -549,5 +549,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_num INT UNSIGNED NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num);
2 changes: 2 additions & 0 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -509,5 +509,7 @@ CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num);
40 changes: 32 additions & 8 deletions src/ejabberd_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,30 @@
%%%

start(normal, _Args) ->
try
do_start()
catch Class:Reason:StackTrace ->
%% Log a stacktrace because while proc_lib:crash_report/4 would report a crash reason,
%% it would not report the stacktrace
?LOG_CRITICAL(#{what => app_failed_to_start,
class => Class, reason => Reason, stacktrace => StackTrace}),
erlang:raise(Class, Reason, StackTrace)
end;
start(_, _) ->
{error, badarg}.

do_start() ->
mongoose_fips:notify(),
write_pid_file(),
update_status_file(starting),
db_init(),
application:start(cache_tab),

mongoose_graphql:init(),
translate:start(),
ejabberd_node_id:start(),
ejabberd_commands:init(),
mongoose_graphql_commands:start(),
mongoose_config:start(),
db_init(),
mongoose_router:start(),
mongoose_logs:set_global_loglevel(mongoose_config:get_opt(loglevel)),
mongoose_deprecations:start(),
Expand All @@ -72,9 +84,7 @@ start(normal, _Args) ->
ejabberd_admin:start(),
update_status_file(started),
?LOG_NOTICE(#{what => mongooseim_node_started, version => ?MONGOOSE_VERSION, node => node()}),
Sup;
start(_, _) ->
{error, badarg}.
Sup.

%% @doc Prepare the application for termination.
%% This function is called when an application is about to be stopped,
Expand All @@ -97,6 +107,9 @@ stop(_State) ->
?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, node => node()}),
delete_pid_file(),
update_status_file(stopped),
%% We cannot stop other applications inside of the stop callback
%% (because we would deadlock the application controller process).
%% That is why we call mnesia:stop() inside of db_init_mnesia() instead.
%%ejabberd_debug:stop(),
ok.

Expand All @@ -105,14 +118,25 @@ stop(_State) ->
%%% Internal functions
%%%
db_init() ->
case mongoose_config:lookup_opt([internal_databases, mnesia]) of
{ok, _} ->
db_init_mnesia(),
mongoose_node_num_mnesia:init();
{error, _} ->
ok
end.

db_init_mnesia() ->
%% Mnesia should not be running at this point, unless it is started by tests.
%% Ensure Mnesia is stopped
mnesia:stop(),
case mnesia:system_info(extra_db_nodes) of
[] ->
application:stop(mnesia),
mnesia:create_schema([node()]),
application:start(mnesia, permanent);
mnesia:create_schema([node()]);
_ ->
ok
end,
application:start(mnesia, permanent),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).

-spec broadcast_c2s_shutdown_listeners() -> ok.
Expand Down
10 changes: 4 additions & 6 deletions src/ejabberd_local.erl
Original file line number Diff line number Diff line change
Expand Up @@ -423,23 +423,21 @@ do_unregister_host(Host) ->

make_iq_id() ->
%% Attach NodeId, so we know to which node to forward the response
{ok, NodeId} = ejabberd_node_id:node_id(),
BinNodeId = mongoose_start_node_id:node_id(),
Rand = mongoose_bin:gen_from_crypto(),
<<(integer_to_binary(NodeId))/binary, "_", Rand/binary>>.
<<BinNodeId/binary, "_", Rand/binary>>.

%% Parses ID, made by make_iq_id function
-spec parse_iq_id(ID :: binary()) ->
local_node | {remote_node, node()}
| {error, {unknown_node_id, term()} | bad_iq_format}.
parse_iq_id(ID) ->
{ok, NodeId} = ejabberd_node_id:node_id(),
BinNodeId = integer_to_binary(NodeId),
BinNodeId = mongoose_start_node_id:node_id(),
case binary:split(ID, <<"_">>) of
[BinNodeId, _Rest] ->
local_node;
[OtherBinNodeId, _Rest] ->
OtherNodeId = binary_to_integer(OtherBinNodeId),
case ejabberd_node_id:node_id_to_name(OtherNodeId) of
case mongoose_start_node_id:node_id_to_name(OtherBinNodeId) of
{ok, NodeName} ->
{remote_node, NodeName};
{error, Reason} ->
Expand Down
70 changes: 0 additions & 70 deletions src/ejabberd_node_id.erl

This file was deleted.

13 changes: 9 additions & 4 deletions src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,27 @@ init([]) ->
PG =
{pg,
{pg, start_link, [mim_scope]},
permanent, infinity, supervisor, [pg]},
permanent, infinity, worker, [pg]},
StartIdServer =
{mongoose_start_node_id,
{mongoose_start_node_id, start_link, []},
permanent, infinity, worker, [mongoose_start_node_id]},
{ok, {{one_for_one, 10, 1},
cets_specs() ++
[PG,
[StartIdServer,
PG,
Hooks,
Cleaner,
SMBackendSupervisor,
Router,
OutgoingPoolsSupervisor
] ++ cets_specs() ++ [
S2S,
Local,
ReceiverSupervisor,
C2SSupervisor,
S2SInSupervisor,
S2SOutSupervisor,
ServiceSupervisor,
OutgoingPoolsSupervisor,
IQSupervisor,
Listener,
MucIQ,
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
%%%
%%% <ul>
%%% <li>date (using `timestamp()');</li>
%%% <li>node number (using {@link ejabberd_node_id}).</li>
%%% <li>node number (using {@link mongoose_node_num}).</li>
%%% </ul>
%%% @end
%%%-------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
%%%
%%% <ul>
%%% <li>date (using `timestamp()');</li>
%%% <li>node number (using {@link ejabberd_node_id}).</li>
%%% <li>node number (using {@link mongoose_node_num}).</li>
%%% </ul>
%%% @end
%%%-------------------------------------------------------------------
Expand Down
14 changes: 7 additions & 7 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,27 @@ get_or_generate_mam_id(Acc) ->

-spec generate_message_id(integer()) -> integer().
generate_message_id(CandidateStamp) ->
{ok, NodeId} = ejabberd_node_id:node_id(),
NodeNum = mongoose_node_num:node_num(),
UniqueStamp = mongoose_mam_id:next_unique(CandidateStamp),
encode_compact_uuid(UniqueStamp, NodeId).
encode_compact_uuid(UniqueStamp, NodeNum).

%% @doc Create a message ID (UID).
%%
%% It removes a leading 0 from 64-bit binary representation.
%% It puts node id as a last byte.
%% The maximum date, that can be encoded is `{{4253, 5, 31}, {22, 20, 37}}'.
-spec encode_compact_uuid(integer(), integer()) -> integer().
encode_compact_uuid(Microseconds, NodeId)
when is_integer(Microseconds), is_integer(NodeId) ->
(Microseconds bsl 8) + NodeId.
encode_compact_uuid(Microseconds, NodeNum)
when is_integer(Microseconds), is_integer(NodeNum) ->
(Microseconds bsl 8) + NodeNum.


%% @doc Extract date and node id from a message id.
-spec decode_compact_uuid(integer()) -> {integer(), byte()}.
decode_compact_uuid(Id) ->
Microseconds = Id bsr 8,
NodeId = Id band 255,
{Microseconds, NodeId}.
NodeNum = Id band 255,
{Microseconds, NodeNum}.


%% @doc Encode a message ID to pass it to the user.
Expand Down
Loading

0 comments on commit 461ba2d

Please sign in to comment.