Skip to content

Commit

Permalink
Merge pull request #4179 from esl/custom-epmd-simple
Browse files Browse the repository at this point in the history
Custom epmd module - simple solution
  • Loading branch information
chrzaszcz authored Nov 30, 2023
2 parents 3619f12 + 1627994 commit 44b57e6
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 63 deletions.
207 changes: 190 additions & 17 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ rdbms_cases() ->
[rdbms_backend,
rdbms_backend_supports_auto_cleaning,
rdbms_backend_node_doesnt_remove_itself,
rdbms_backend_db_queries].
rdbms_backend_db_queries,
rdbms_backend_publishes_node_ip,
no_record_for_node,
no_ip_in_db,
epmd_just_returns_ip_from_db,
address_please,
address_please_returns_ip,
address_please_returns_ip_fallbacks_to_resolve_with_file_backend,
address_please_returns_ip_127_0_0_1_from_db].

suite() ->
distributed_helper:require_rpc_nodes([mim, mim2]).
Expand All @@ -37,16 +45,32 @@ init_per_group(rdbms, Config) ->
case not ct_helper:is_ct_running()
orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
false -> {skip, rdbms_or_ct_not_running};
true -> Config
true ->
stop_and_delete_cets_discovery_if_running(),
Config
end;
init_per_group(_, Config) ->
Config.

end_per_group(rdbms, Config) ->
restore_default_cets_discovery(),
Config;
end_per_group(_, Config) ->
Config.

init_per_testcase(address_please_returns_ip, Config) ->
start_cets_discovery(Config);
init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) ->
start_cets_discovery_with_file_backnend(Config);
init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) ->
start_cets_discovery_with_real_ips(Config);
init_per_testcase(_CaseName, Config) -> Config.

end_per_testcase(Name, Config) when Name == address_please_returns_ip;
Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend;
Name == address_please_returns_ip_127_0_0_1_from_db ->
stop_cets_discovery(),
Config;
end_per_testcase(_CaseName, Config) ->
unmock(mim()),
unmock(mim2()).
Expand Down Expand Up @@ -144,24 +168,82 @@ rdbms_backend_db_queries(_Config) ->
TS2 = TS + 100,

%% insertion fails if node name or node num is already added for the cluster
?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, TS, 1)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 1)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 2)),
?assertMatch({error, _}, insert_new(CN, <<"test2">>, TS, 1)),
?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, TS, 2)),
?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)),
?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)),

%% update of the timestamp works correctly
{selected, SelectedNodes1} = select(CN),
?assertEqual(lists:sort([{<<"test1">>, 1, TS}, {<<"test2">>, 2, TS}]),
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]),
lists:sort(SelectedNodes1)),
?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, TS2)),
?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, <<>>, TS2)),
{selected, SelectedNodes2} = select(CN),
?assertEqual(lists:sort([{<<"test1">>, 1, TS2}, {<<"test2">>, 2, TS}]),
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]),
lists:sort(SelectedNodes2)),

%% node removal work correctly
?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)),
?assertEqual({selected, [{<<"test2">>, 2, TS}]}, select(CN)).
?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)).

rdbms_backend_publishes_node_ip(_Config) ->
%% get_pairs would return only real available nodes, so use the real node names
Node1b = atom_to_binary(maps:get(node, mim())),
Node2b = atom_to_binary(maps:get(node, mim2())),
CN = random_cluster_name(?FUNCTION_NAME),
Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b,
node_ip_binary => <<"127.0.0.1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b,
node_ip_binary => <<"127.0.0.1">>},
State1 = disco_init(mim(), Opts1),
State2 = disco_init(mim2(), Opts2),
{{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
{{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
{{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
{{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
{ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b),
{ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b).

no_record_for_node(_Config) ->
Node = <<"mongoose@badhost">>,
BackState = #{address_pairs => #{}},
{error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node),
ok.

no_ip_in_db(_Config) ->
Node = <<"mongoose@noiphost">>,
BackState = #{address_pairs => #{Node => <<>>}},
{error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node),
ok.

epmd_just_returns_ip_from_db(_Config) ->
Node = <<"mongoose@noepmdhost">>,
%% IP from a test range
BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}},
{ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node).

address_please(_Config) ->
{error, nxdomain} =
rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]).

address_please_returns_ip(Config) ->
Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
ct:log("system_info ~p", [Info]),
{ok, {192, 168, 115, 112}} = Res.

address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) ->
Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]),
Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
ct:log("system_info ~p", [Info]),
{ok, {127, 0, 0, 1}} = Res.

address_please_returns_ip_127_0_0_1_from_db(Config) ->
Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]),
Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
ct:log("system_info ~p", [Info]),
{ok, {127, 0, 0, 1}} = Res.

%%--------------------------------------------------------------------
%% Helpers
Expand All @@ -177,6 +259,9 @@ disco_get_nodes(Node, State) ->
log_disco_request(?FUNCTION_NAME, Node, State, NewState),
NewState.

match_node_name(Node, SysInfo, NodeToLookup) ->
rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]).

log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
ct:log("[0] disco_init(~p,~n" ++
" ~p) =~n" ++
Expand Down Expand Up @@ -216,22 +301,110 @@ random_cluster_name(CaseName) ->
Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
<<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.

insert_new(CN, BinNode, TS, NodeNum) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, TS, NodeNum]),
ct:log("insert_new(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, TS, NodeNum, Ret]),
insert_new(CN, BinNode, NodeNum, Address, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]),
ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]),
Ret.

select(CN) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
ct:log("select(~p) = ~p", [CN, Ret]),
Ret.

update_existing(CN, BinNode, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, TS]),
ct:log("select(~p, ~p, ~p) = ~p", [CN, BinNode, TS, Ret]),
update_existing(CN, BinNode, Address, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, Address, TS]),
ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]),
Ret.

delete_node_from_db(CN, BinNode) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]),
ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]),
Ret.

start_cets_discovery(Config) ->
start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)),
start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)),
force_nodes_to_see_each_other(mim(), mim2()),
Config.

start_cets_discovery_with_real_ips(Config) ->
start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)),
start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)),
force_nodes_to_see_each_other(mim(), mim2()),
Config.

start_cets_discovery_with_file_backnend(Config) ->
start_disco(mim(), cets_disco_spec_for_file_backend()),
start_disco(mim2(), cets_disco_spec_for_file_backend()),
Config.

stop_cets_discovery() ->
ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).

stop_and_delete_cets_discovery() ->
stop_cets_discovery(),
ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]),
ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]).

stop_and_delete_cets_discovery_if_running() ->
case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of
undefined ->
ok;
_ ->
stop_and_delete_cets_discovery()
end.

restore_default_cets_discovery() ->
restore_default_cets_discovery(mim()),
restore_default_cets_discovery(mim2()).

restore_default_cets_discovery(Node) ->
case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of
[] ->
ok;
[Spec] ->
start_disco(Node, Spec)
end.

cets_disco_spec(Node, IP) ->
DiscoOpts = #{
backend_module => mongoose_cets_discovery_rdbms,
cluster_name => <<"mim">>,
node_name_to_insert => Node,
node_ip_binary => IP,
name => mongoose_cets_discovery},
cets_disco_spec(DiscoOpts).

cets_disco_spec_for_file_backend() ->
DiscoOpts = #{
backend_module => cets_discovery_file,
disco_file => "/tmp/does_not_exist",
name => mongoose_cets_discovery},
cets_disco_spec(DiscoOpts).

cets_disco_spec(DiscoOpts) ->
#{
id => cets_discovery,
start => {mongoose_cets_discovery, start_link, [DiscoOpts]},
restart => temporary,
type => worker,
shutdown => infinity,
modules => [cets_discovery]}.

send_check(Node) ->
rpc(Node, erlang, send, [mongoose_cets_discovery, check]).

wait_for_get_nodes(Node) ->
ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]).

start_disco(Node, Spec) ->
{ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]).

force_nodes_to_see_each_other(Node1, Node2) ->
send_check(Node2),
wait_for_get_nodes(Node2),
send_check(Node1),
wait_for_get_nodes(Node1),
send_check(Node2),
wait_for_get_nodes(Node2).
7 changes: 4 additions & 3 deletions big_tests/tests/graphql_cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,17 @@ register_bad_node() ->
ClusterName = <<"mim">>,
Node = <<"badnode@localhost">>,
Num = 100,
Address = <<>>,
Timestamp = rpc(mim(), mongoose_rdbms_timestamp, select, []),
InsertArgs = [ClusterName, Node, Num, Timestamp],
{updated, 1} = rpc(mim(), mongoose_rdbms, execute, [global, cets_disco_insert_new, InsertArgs]).
InsertArgs = [ClusterName, Node, Num, Address, Timestamp],
{updated, 1} = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, InsertArgs).

ensure_bad_node_unregistered() ->
ClusterName = <<"mim">>,
Node = <<"badnode@localhost">>,
DeleteArgs = [ClusterName, Node],
%% Ensure the node is removed
{updated, _} = rpc(mim(), mongoose_rdbms, execute, [global, cets_delete_node_from_db, DeleteArgs]).
{updated, _} = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, DeleteArgs).

force_check() ->
Pid = rpc(mim(), erlang, whereis, [mongoose_cets_discovery]),
Expand Down
11 changes: 11 additions & 0 deletions doc/configuration/release-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ These options are inserted into the `rel/files/vm.args` template.
* **Syntax:** command-line arguments
* **Example:** `{highload_vm_args, "+P 10000000 -env ERL_MAX_PORTS 250000"}.`

### epmd_module

Allows to set EPMD module to `mongoose_epmd` in case CETS is used with RDBMS backend
to enable getting IP addresses of the remote nodes using RDBMS instead of the default
resolver.

* **Type:** parameter
* **Option:** value of `-epmd_module` in [vm.args](configuration-files.md#vmargs)
* **Syntax:** Erlang module name: `mongoose_epmd`
* **Example:** `{epmd_module, "mongoose_epmd"}.`

## TOML Options

These options are inserted into the `rel/files/mongooseim.toml` template.
Expand Down
7 changes: 4 additions & 3 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num);
7 changes: 4 additions & 3 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT UNSIGNED NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num);
7 changes: 4 additions & 3 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num);
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"2ca31058bf616392ed91e4eb8642cc5af872902e"}},
{ref,"9c5b2c9269daeb4ee6830f7cd1538e0a4218e281"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
4 changes: 4 additions & 0 deletions rel/files/vm.args
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@

## Tweak GC to run more often
-env ERL_FULLSWEEP_AFTER 2

{{#epmd_module}}
-epmd_module {{{epmd_module}}}
{{/epmd_module}}
1 change: 1 addition & 0 deletions rel/mim3.vars-toml.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
%% vm.args
{node_name, "mongooseim3@localhost"}.
{epmd_module, "mongoose_epmd"}.

%% mongooseim.toml
{c2s_port, 5262}.
Expand Down
Loading

0 comments on commit 44b57e6

Please sign in to comment.