Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom epmd module - simple solution #4179

Merged
merged 15 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 152 additions & 16 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ rdbms_cases() ->
[rdbms_backend,
rdbms_backend_supports_auto_cleaning,
rdbms_backend_node_doesnt_remove_itself,
rdbms_backend_db_queries].
rdbms_backend_db_queries,
rdbms_backend_publishes_node_ip,
no_record_for_node,
no_ip_in_db,
cannot_connect_to_epmd,
address_please,
address_please_returns_ip,
address_please_returns_ip_fallbacks_to_resolve].

suite() ->
distributed_helper:require_rpc_nodes([mim, mim2]).
Expand All @@ -34,6 +41,8 @@ suite() ->
%%--------------------------------------------------------------------

init_per_group(rdbms, Config) ->
start_node_address_server(mim()),
start_node_address_server(mim2()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove?

case not ct_helper:is_ct_running()
orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
false -> {skip, rdbms_or_ct_not_running};
Expand All @@ -42,11 +51,26 @@ init_per_group(rdbms, Config) ->
init_per_group(_, Config) ->
Config.

end_per_group(rdbms, Config) ->
stop_node_address_server(mim()),
stop_node_address_server(mim2()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove?

Config;
end_per_group(_, Config) ->
Config.

init_per_testcase(address_please_returns_ip, Config) ->
start_cets_discovery(Config, true);
init_per_testcase(address_please_returns_ip_fallbacks_to_resolve, Config) ->
start_cets_discovery(Config, false);
init_per_testcase(_CaseName, Config) -> Config.

end_per_testcase(address_please_returns_ip, Config) ->
stop_cets_discovery(),
unmock_epmd(mim()),
Config;
end_per_testcase(address_please_returns_ip_fallbacks_to_resolve, Config) ->
stop_cets_discovery(),
Config;
end_per_testcase(_CaseName, Config) ->
unmock(mim()),
unmock(mim2()).
Expand Down Expand Up @@ -144,24 +168,77 @@ rdbms_backend_db_queries(_Config) ->
TS2 = TS + 100,

%% insertion fails if node name or node num is already added for the cluster
?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, TS, 1)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 1)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 2)),
?assertMatch({error, _}, insert_new(CN, <<"test2">>, TS, 1)),
?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, TS, 2)),
?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)),
?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)),

%% update of the timestamp works correctly
{selected, SelectedNodes1} = select(CN),
?assertEqual(lists:sort([{<<"test1">>, 1, TS}, {<<"test2">>, 2, TS}]),
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]),
lists:sort(SelectedNodes1)),
?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, TS2)),
?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, <<>>, TS2)),
{selected, SelectedNodes2} = select(CN),
?assertEqual(lists:sort([{<<"test1">>, 1, TS2}, {<<"test2">>, 2, TS}]),
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]),
lists:sort(SelectedNodes2)),

%% node removal work correctly
?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)),
?assertEqual({selected, [{<<"test2">>, 2, TS}]}, select(CN)).
?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)).

rdbms_backend_publishes_node_ip(_Config) ->
%% get_pairs would return only real available nodes, so use the real node names
Node1b = atom_to_binary(maps:get(node, mim())),
Node2b = atom_to_binary(maps:get(node, mim2())),
CN = random_cluster_name(?FUNCTION_NAME),
Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b,
node_ip_binary => <<"127.0.0.1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b,
node_ip_binary => <<"127.0.0.1">>},
State1 = disco_init(mim(), Opts1),
State2 = disco_init(mim2(), Opts2),
{{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
{{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should already see the entry from mim() at this point, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I guess

{{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
{{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
{ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b),
{ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b).

no_record_for_node(_Config) ->
Node = <<"mongoose@badhost">>,
BackState = #{address_pairs => #{}},
{error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node),
ok.

no_ip_in_db(_Config) ->
Node = <<"mongoose@noiphost">>,
BackState = #{address_pairs => #{Node => <<>>}},
{error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node),
ok.

cannot_connect_to_epmd(_Config) ->
Node = <<"mongoose@noepmdhost">>,
%% IP from a test range
BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}},
{error, {cannot_connect_to_epmd, Node, {192, 0, 2, 1}}} = match_node_name(mim(), BackState, Node),
ok.

address_please(_Config) ->
{error, nxdomain} =
rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]).

address_please_returns_ip(Config) ->
Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
ct:log("system_info ~p", [Info]),
{ok, {192, 168, 115, 112}} = Res.

address_please_returns_ip_fallbacks_to_resolve(Config) ->
Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
ct:log("system_info ~p", [Info]),
{ok, {127, 0, 0, 1}} = Res.

%%--------------------------------------------------------------------
%% Helpers
Expand All @@ -177,6 +254,9 @@ disco_get_nodes(Node, State) ->
log_disco_request(?FUNCTION_NAME, Node, State, NewState),
NewState.

match_node_name(Node, SysInfo, NodeToLookup) ->
rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]).

log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
ct:log("[0] disco_init(~p,~n" ++
" ~p) =~n" ++
Expand Down Expand Up @@ -209,29 +289,85 @@ mock_timestamp(Node, Timestamp) ->
unmock_timestamp(Node) ->
ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).

mock_epmd(Node) ->
ok = rpc(Node, meck, new, [mongoose_epmd, [passthrough, no_link]]),
ok = rpc(Node, meck, expect, [mongoose_epmd, can_connect, 1, true]),
%% Ensure that we mock
true = rpc(Node, mongoose_epmd, can_connect, [{192, 168, 0, 100}]).

unmock_epmd(Node) ->
ok = rpc(Node, meck, unload, [mongoose_epmd]).

unmock(Node) ->
rpc(Node, meck, unload, []).

random_cluster_name(CaseName) ->
Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
<<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.

insert_new(CN, BinNode, TS, NodeNum) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, TS, NodeNum]),
ct:log("insert_new(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, TS, NodeNum, Ret]),
insert_new(CN, BinNode, NodeNum, Address, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]),
ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]),
Ret.

select(CN) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
ct:log("select(~p) = ~p", [CN, Ret]),
Ret.

update_existing(CN, BinNode, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, TS]),
ct:log("select(~p, ~p, ~p) = ~p", [CN, BinNode, TS, Ret]),
update_existing(CN, BinNode, Address, TS) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, Address, TS]),
ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]),
Ret.

delete_node_from_db(CN, BinNode) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]),
ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]),
Ret.

start_node_address_server(Node) ->
MFA = {mongoose_node_address, start_link, []},
ChildSpec = #{id => mongoose_node_address, start => MFA, restart => temporary},
rpc(Node, supervisor, start_child, [ejabberd_sup, ChildSpec]).

stop_node_address_server(Node) ->
rpc(Node, supervisor, terminate_child, [ejabberd_sup, mongoose_node_address]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove?


start_cets_discovery(Config, MockEpmd) ->
case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of
undefined ->
case MockEpmd of
true ->
mock_epmd(mim());
false ->
ok
end,
{ok, _} = rpc(mim(), supervisor, start_child, [ejabberd_sup, cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)]),
{ok, _} = rpc(mim2(), supervisor, start_child, [ejabberd_sup, cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)]),
%% Force nodes to see each other
rpc(mim2(), erlang, send, [mongoose_cets_discovery, check]),
ok = rpc(mim2(), cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]),
rpc(mim(), erlang, send, [mongoose_cets_discovery, check]),
Config;
_ ->
{skip, cets_disco_already_running}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is strange, because we will skip cets-specific tests in the cets preset, but run them in other presets. It is unexpected, because the cets preset should be the one for which all CETS functionality is tested. I'd rather have it skipped in other presets than in the CETS one.

end.

stop_cets_discovery() ->
ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).

cets_disco_spec(Node, IP) ->
DiscoOpts = #{
backend_module => mongoose_cets_discovery_rdbms,
cluster_name => <<"mim">>,
node_name_to_insert => Node,
node_ip_binary => IP,
name => mongoose_cets_discovery},
#{
id => cets_discovery,
start => {mongoose_cets_discovery, start_link, [DiscoOpts]},
restart => temporary,
type => worker,
shutdown => infinity,
modules => [cets_discovery]}.
7 changes: 4 additions & 3 deletions big_tests/tests/graphql_cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,17 @@ register_bad_node() ->
ClusterName = <<"mim">>,
Node = <<"badnode@localhost">>,
Num = 100,
Address = <<>>,
Timestamp = rpc(mim(), mongoose_rdbms_timestamp, select, []),
InsertArgs = [ClusterName, Node, Num, Timestamp],
{updated, 1} = rpc(mim(), mongoose_rdbms, execute, [global, cets_disco_insert_new, InsertArgs]).
InsertArgs = [ClusterName, Node, Num, Address, Timestamp],
{updated, 1} = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, InsertArgs).

ensure_bad_node_unregistered() ->
ClusterName = <<"mim">>,
Node = <<"badnode@localhost">>,
DeleteArgs = [ClusterName, Node],
%% Ensure the node is removed
{updated, _} = rpc(mim(), mongoose_rdbms, execute, [global, cets_delete_node_from_db, DeleteArgs]).
{updated, _} = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, DeleteArgs).

force_check() ->
Pid = rpc(mim(), erlang, whereis, [mongoose_cets_discovery]),
Expand Down
7 changes: 4 additions & 3 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num);
7 changes: 4 additions & 3 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT UNSIGNED NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num);
7 changes: 4 additions & 3 deletions priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,11 @@ CREATE TABLE domain_events (
CREATE INDEX i_domain_events_domain ON domain_events(domain);

CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in seconds
cluster_name varchar(250) NOT NULL,
node_name varchar(250) NOT NULL,
node_num INT NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num);
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"2ca31058bf616392ed91e4eb8642cc5af872902e"}},
{ref,"ce92a1bf821b5cf7d4241f2d49f656171875cafe"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
2 changes: 2 additions & 0 deletions rel/files/vm.args
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

## Tweak GC to run more often
-env ERL_FULLSWEEP_AFTER 2

{{epmd_vm_args}}
Copy link
Member

@chrzaszcz chrzaszcz Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to document this option. Maybe it should be enabled by default (but then we need to make sure it doesn't break without CETS).

1 change: 1 addition & 0 deletions rel/mim3.vars-toml.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
%% vm.args
{node_name, "mongooseim3@localhost"}.
{epmd_vm_args, "-epmd_module mongoose_epmd"}.

%% mongooseim.toml
{c2s_port, 5262}.
Expand Down
15 changes: 11 additions & 4 deletions src/mongoose_cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ supervisor_specs(#{backend := DiscoBackend, cluster_name := ClusterName} = Opts)
backend_module => disco_backend_to_module(DiscoBackend),
cluster_name => atom_to_binary(ClusterName),
node_name_to_insert => atom_to_binary(node(), latin1),
node_ip_binary => get_node_ip_binary(),
name => mongoose_cets_discovery, disco_file => DiscoFile},
CetsDisco =
{cets_discovery,
{?MODULE, start_link, [DiscoOpts]},
permanent, infinity, supervisor, [cets_discovery]},
CetsDisco = #{
id => cets_discovery,
start => {?MODULE, start_link, [DiscoOpts]},
restart => permanent,
type => worker,
shutdown => infinity,
modules => [cets_discovery]},
[CetsDisco].

disco_backend_to_module(rdbms) -> mongoose_cets_discovery_rdbms;
disco_backend_to_module(file) -> cets_discovery_file.

get_node_ip_binary() ->
list_to_binary(os:getenv("MIM_NODE_IP", "")).
Loading