diff --git a/big_tests/tests/cets_disco_SUITE.erl b/big_tests/tests/cets_disco_SUITE.erl index 8a7acf6907..6f690691b4 100644 --- a/big_tests/tests/cets_disco_SUITE.erl +++ b/big_tests/tests/cets_disco_SUITE.erl @@ -24,7 +24,15 @@ rdbms_cases() -> [rdbms_backend, rdbms_backend_supports_auto_cleaning, rdbms_backend_node_doesnt_remove_itself, - rdbms_backend_db_queries]. + rdbms_backend_db_queries, + rdbms_backend_publishes_node_ip, + no_record_for_node, + no_ip_in_db, + epmd_just_returns_ip_from_db, + address_please, + address_please_returns_ip, + address_please_returns_ip_fallbacks_to_resolve_with_file_backend, + address_please_returns_ip_127_0_0_1_from_db]. suite() -> distributed_helper:require_rpc_nodes([mim, mim2]). @@ -37,16 +45,32 @@ init_per_group(rdbms, Config) -> case not ct_helper:is_ct_running() orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of false -> {skip, rdbms_or_ct_not_running}; - true -> Config + true -> + stop_and_delete_cets_discovery_if_running(), + Config end; init_per_group(_, Config) -> Config. +end_per_group(rdbms, Config) -> + restore_default_cets_discovery(), + Config; end_per_group(_, Config) -> Config. +init_per_testcase(address_please_returns_ip, Config) -> + start_cets_discovery(Config); +init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) -> + start_cets_discovery_with_file_backnend(Config); +init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) -> + start_cets_discovery_with_real_ips(Config); init_per_testcase(_CaseName, Config) -> Config. +end_per_testcase(Name, Config) when Name == address_please_returns_ip; + Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend; + Name == address_please_returns_ip_127_0_0_1_from_db -> + stop_cets_discovery(), + Config; end_per_testcase(_CaseName, Config) -> unmock(mim()), unmock(mim2()). @@ -144,24 +168,82 @@ rdbms_backend_db_queries(_Config) -> TS2 = TS + 100, %% insertion fails if node name or node num is already added for the cluster - ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, TS, 1)), - ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 1)), - ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 2)), - ?assertMatch({error, _}, insert_new(CN, <<"test2">>, TS, 1)), - ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, TS, 2)), + ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)), + ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)), + ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)), + ?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)), + ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)), %% update of the timestamp works correctly {selected, SelectedNodes1} = select(CN), - ?assertEqual(lists:sort([{<<"test1">>, 1, TS}, {<<"test2">>, 2, TS}]), + ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]), lists:sort(SelectedNodes1)), - ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, TS2)), + ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, <<>>, TS2)), {selected, SelectedNodes2} = select(CN), - ?assertEqual(lists:sort([{<<"test1">>, 1, TS2}, {<<"test2">>, 2, TS}]), + ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]), lists:sort(SelectedNodes2)), %% node removal work correctly ?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)), - ?assertEqual({selected, [{<<"test2">>, 2, TS}]}, select(CN)). + ?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)). + +rdbms_backend_publishes_node_ip(_Config) -> + %% get_pairs would return only real available nodes, so use the real node names + Node1b = atom_to_binary(maps:get(node, mim())), + Node2b = atom_to_binary(maps:get(node, mim2())), + CN = random_cluster_name(?FUNCTION_NAME), + Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b, + node_ip_binary => <<"127.0.0.1">>}, + Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b, + node_ip_binary => <<"127.0.0.1">>}, + State1 = disco_init(mim(), Opts1), + State2 = disco_init(mim2(), Opts2), + {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), + {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), + {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), + {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), + {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b), + {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b). + +no_record_for_node(_Config) -> + Node = <<"mongoose@badhost">>, + BackState = #{address_pairs => #{}}, + {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node), + ok. + +no_ip_in_db(_Config) -> + Node = <<"mongoose@noiphost">>, + BackState = #{address_pairs => #{Node => <<>>}}, + {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node), + ok. + +epmd_just_returns_ip_from_db(_Config) -> + Node = <<"mongoose@noepmdhost">>, + %% IP from a test range + BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}}, + {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node). + +address_please(_Config) -> + {error, nxdomain} = + rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]). + +address_please_returns_ip(Config) -> + Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]), + Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]), + ct:log("system_info ~p", [Info]), + {ok, {192, 168, 115, 112}} = Res. + +address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) -> + Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]), + Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), + ct:log("system_info ~p", [Info]), + {ok, {127, 0, 0, 1}} = Res. + +address_please_returns_ip_127_0_0_1_from_db(Config) -> + Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]), + Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), + ct:log("system_info ~p", [Info]), + {ok, {127, 0, 0, 1}} = Res. %%-------------------------------------------------------------------- %% Helpers @@ -177,6 +259,9 @@ disco_get_nodes(Node, State) -> log_disco_request(?FUNCTION_NAME, Node, State, NewState), NewState. +match_node_name(Node, SysInfo, NodeToLookup) -> + rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]). + log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) -> ct:log("[0] disco_init(~p,~n" ++ " ~p) =~n" ++ @@ -216,9 +301,9 @@ random_cluster_name(CaseName) -> Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []), <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>. -insert_new(CN, BinNode, TS, NodeNum) -> - Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, TS, NodeNum]), - ct:log("insert_new(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, TS, NodeNum, Ret]), +insert_new(CN, BinNode, NodeNum, Address, TS) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]), + ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]), Ret. select(CN) -> @@ -226,12 +311,100 @@ select(CN) -> ct:log("select(~p) = ~p", [CN, Ret]), Ret. -update_existing(CN, BinNode, TS) -> - Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, TS]), - ct:log("select(~p, ~p, ~p) = ~p", [CN, BinNode, TS, Ret]), +update_existing(CN, BinNode, Address, TS) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, Address, TS]), + ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]), Ret. delete_node_from_db(CN, BinNode) -> Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]), ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]), Ret. + +start_cets_discovery(Config) -> + start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)), + start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)), + force_nodes_to_see_each_other(mim(), mim2()), + Config. + +start_cets_discovery_with_real_ips(Config) -> + start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)), + start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)), + force_nodes_to_see_each_other(mim(), mim2()), + Config. + +start_cets_discovery_with_file_backnend(Config) -> + start_disco(mim(), cets_disco_spec_for_file_backend()), + start_disco(mim2(), cets_disco_spec_for_file_backend()), + Config. + +stop_cets_discovery() -> + ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]), + ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]). + +stop_and_delete_cets_discovery() -> + stop_cets_discovery(), + ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]), + ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]). + +stop_and_delete_cets_discovery_if_running() -> + case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of + undefined -> + ok; + _ -> + stop_and_delete_cets_discovery() + end. + +restore_default_cets_discovery() -> + restore_default_cets_discovery(mim()), + restore_default_cets_discovery(mim2()). + +restore_default_cets_discovery(Node) -> + case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of + [] -> + ok; + [Spec] -> + start_disco(Node, Spec) + end. + +cets_disco_spec(Node, IP) -> + DiscoOpts = #{ + backend_module => mongoose_cets_discovery_rdbms, + cluster_name => <<"mim">>, + node_name_to_insert => Node, + node_ip_binary => IP, + name => mongoose_cets_discovery}, + cets_disco_spec(DiscoOpts). + +cets_disco_spec_for_file_backend() -> + DiscoOpts = #{ + backend_module => cets_discovery_file, + disco_file => "/tmp/does_not_exist", + name => mongoose_cets_discovery}, + cets_disco_spec(DiscoOpts). + +cets_disco_spec(DiscoOpts) -> + #{ + id => cets_discovery, + start => {mongoose_cets_discovery, start_link, [DiscoOpts]}, + restart => temporary, + type => worker, + shutdown => infinity, + modules => [cets_discovery]}. + +send_check(Node) -> + rpc(Node, erlang, send, [mongoose_cets_discovery, check]). + +wait_for_get_nodes(Node) -> + ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]). + +start_disco(Node, Spec) -> + {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]). + +force_nodes_to_see_each_other(Node1, Node2) -> + send_check(Node2), + wait_for_get_nodes(Node2), + send_check(Node1), + wait_for_get_nodes(Node1), + send_check(Node2), + wait_for_get_nodes(Node2). diff --git a/big_tests/tests/graphql_cets_SUITE.erl b/big_tests/tests/graphql_cets_SUITE.erl index b22831ec58..54a30bba69 100644 --- a/big_tests/tests/graphql_cets_SUITE.erl +++ b/big_tests/tests/graphql_cets_SUITE.erl @@ -206,16 +206,17 @@ register_bad_node() -> ClusterName = <<"mim">>, Node = <<"badnode@localhost">>, Num = 100, + Address = <<>>, Timestamp = rpc(mim(), mongoose_rdbms_timestamp, select, []), - InsertArgs = [ClusterName, Node, Num, Timestamp], - {updated, 1} = rpc(mim(), mongoose_rdbms, execute, [global, cets_disco_insert_new, InsertArgs]). + InsertArgs = [ClusterName, Node, Num, Address, Timestamp], + {updated, 1} = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, InsertArgs). ensure_bad_node_unregistered() -> ClusterName = <<"mim">>, Node = <<"badnode@localhost">>, DeleteArgs = [ClusterName, Node], %% Ensure the node is removed - {updated, _} = rpc(mim(), mongoose_rdbms, execute, [global, cets_delete_node_from_db, DeleteArgs]). + {updated, _} = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, DeleteArgs). force_check() -> Pid = rpc(mim(), erlang, whereis, [mongoose_cets_discovery]), diff --git a/doc/configuration/release-options.md b/doc/configuration/release-options.md index 22937368cd..281cfa7110 100644 --- a/doc/configuration/release-options.md +++ b/doc/configuration/release-options.md @@ -33,6 +33,17 @@ These options are inserted into the `rel/files/vm.args` template. * **Syntax:** command-line arguments * **Example:** `{highload_vm_args, "+P 10000000 -env ERL_MAX_PORTS 250000"}.` +### epmd_module + +Allows to set EPMD module to `mongoose_epmd` in case CETS is used with RDBMS backend +to enable getting IP addresses of the remote nodes using RDBMS instead of the default +resolver. + +* **Type:** parameter +* **Option:** value of `-epmd_module` in [vm.args](configuration-files.md#vmargs) +* **Syntax:** Erlang module name: `mongoose_epmd` +* **Example:** `{epmd_module, "mongoose_epmd"}.` + ## TOML Options These options are inserted into the `rel/files/mongooseim.toml` template. diff --git a/priv/mssql2012.sql b/priv/mssql2012.sql index e079ccaec8..7712a00c17 100644 --- a/priv/mssql2012.sql +++ b/priv/mssql2012.sql @@ -754,10 +754,11 @@ CREATE TABLE domain_events ( CREATE INDEX i_domain_events_domain ON domain_events(domain); CREATE TABLE discovery_nodes ( - node_name varchar(250), - cluster_name varchar(250), - updated_timestamp BIGINT NOT NULL, -- in seconds + cluster_name varchar(250) NOT NULL, + node_name varchar(250) NOT NULL, node_num INT NOT NULL, + address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS + updated_timestamp BIGINT NOT NULL, -- in seconds PRIMARY KEY (cluster_name, node_name) ); CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes(cluster_name, node_num); diff --git a/priv/mysql.sql b/priv/mysql.sql index 2f27ee21fa..8295302072 100644 --- a/priv/mysql.sql +++ b/priv/mysql.sql @@ -546,10 +546,11 @@ CREATE TABLE domain_events ( CREATE INDEX i_domain_events_domain ON domain_events(domain); CREATE TABLE discovery_nodes ( - node_name varchar(250), - cluster_name varchar(250), - updated_timestamp BIGINT NOT NULL, -- in seconds + cluster_name varchar(250) NOT NULL, + node_name varchar(250) NOT NULL, node_num INT UNSIGNED NOT NULL, + address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS + updated_timestamp BIGINT NOT NULL, -- in seconds PRIMARY KEY (cluster_name, node_name) ); CREATE UNIQUE INDEX i_discovery_nodes_node_num USING BTREE ON discovery_nodes(cluster_name, node_num); diff --git a/priv/pg.sql b/priv/pg.sql index 646ddd8f03..8b78cddffb 100644 --- a/priv/pg.sql +++ b/priv/pg.sql @@ -506,10 +506,11 @@ CREATE TABLE domain_events ( CREATE INDEX i_domain_events_domain ON domain_events(domain); CREATE TABLE discovery_nodes ( - node_name varchar(250), - cluster_name varchar(250), - updated_timestamp BIGINT NOT NULL, -- in seconds + cluster_name varchar(250) NOT NULL, + node_name varchar(250) NOT NULL, node_num INT NOT NULL, + address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS + updated_timestamp BIGINT NOT NULL, -- in seconds PRIMARY KEY (cluster_name, node_name) ); CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num); diff --git a/rebar.lock b/rebar.lock index 4108d96944..b9dc1d7132 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,7 +8,7 @@ {<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1}, {<<"cets">>, {git,"https://github.com/esl/cets.git", - {ref,"2ca31058bf616392ed91e4eb8642cc5af872902e"}}, + {ref,"9c5b2c9269daeb4ee6830f7cd1538e0a4218e281"}}, 0}, {<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0}, {<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0}, diff --git a/rel/files/vm.args b/rel/files/vm.args index 0b24338fab..f93acc7264 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -20,3 +20,7 @@ ## Tweak GC to run more often -env ERL_FULLSWEEP_AFTER 2 + +{{#epmd_module}} +-epmd_module {{{epmd_module}}} +{{/epmd_module}} diff --git a/rel/mim3.vars-toml.config b/rel/mim3.vars-toml.config index 7583766b78..9d00c27336 100644 --- a/rel/mim3.vars-toml.config +++ b/rel/mim3.vars-toml.config @@ -1,5 +1,6 @@ %% vm.args {node_name, "mongooseim3@localhost"}. +{epmd_module, "mongoose_epmd"}. %% mongooseim.toml {c2s_port, 5262}. diff --git a/src/mongoose_cets_discovery.erl b/src/mongoose_cets_discovery.erl index f3278a4e96..99ae0c2c6e 100644 --- a/src/mongoose_cets_discovery.erl +++ b/src/mongoose_cets_discovery.erl @@ -33,12 +33,19 @@ supervisor_specs(#{backend := DiscoBackend, cluster_name := ClusterName} = Opts) backend_module => disco_backend_to_module(DiscoBackend), cluster_name => atom_to_binary(ClusterName), node_name_to_insert => atom_to_binary(node(), latin1), + node_ip_binary => get_node_ip_binary(), name => mongoose_cets_discovery, disco_file => DiscoFile}, - CetsDisco = - {cets_discovery, - {?MODULE, start_link, [DiscoOpts]}, - permanent, infinity, supervisor, [cets_discovery]}, + CetsDisco = #{ + id => cets_discovery, + start => {?MODULE, start_link, [DiscoOpts]}, + restart => permanent, + type => worker, + shutdown => infinity, + modules => [cets_discovery]}, [CetsDisco]. disco_backend_to_module(rdbms) -> mongoose_cets_discovery_rdbms; disco_backend_to_module(file) -> cets_discovery_file. + +get_node_ip_binary() -> + list_to_binary(os:getenv("MIM_NODE_IP", "")). diff --git a/src/mongoose_cets_discovery_rdbms.erl b/src/mongoose_cets_discovery_rdbms.erl index 6708f91277..d5487bb287 100644 --- a/src/mongoose_cets_discovery_rdbms.erl +++ b/src/mongoose_cets_discovery_rdbms.erl @@ -4,8 +4,8 @@ -export([init/1, get_nodes/1]). %% these functions are exported for testing purposes only. --export([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]). --ignore_xref([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]). +-export([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]). +-ignore_xref([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]). -include("mongoose_logger.hrl"). @@ -15,29 +15,34 @@ -type opts() :: #{cluster_name := binary(), node_name_to_insert := binary(), last_query_info => map(), expire_time => non_neg_integer(), + node_ip_binary => binary(), any() => any()}. -type state() :: #{cluster_name := binary(), node_name_to_insert := binary(), - last_query_info := map(), expire_time := non_neg_integer()}. + last_query_info := map(), expire_time := non_neg_integer(), + node_ip_binary := binary(), address_pairs := #{binary() => binary()}}. -spec init(opts()) -> state(). -init(Opts = #{cluster_name := _, node_name_to_insert := _}) -> - Keys = [cluster_name, node_name_to_insert, last_query_info, expire_time], +init(Opts = #{cluster_name := ClusterName, node_name_to_insert := Node}) + when is_binary(ClusterName), is_binary(Node) -> + Keys = [cluster_name, node_name_to_insert, last_query_info, expire_time, node_ip_binary], maps:with(Keys, maps:merge(defaults(), Opts)). defaults() -> #{expire_time => 60 * 60 * 1, %% 1 hour in seconds - last_query_info => #{}}. + last_query_info => #{}, + node_ip_binary => <<>>, + address_pairs => #{}}. -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}. get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) -> case is_rdbms_running() of true -> try try_register(ClusterName, Node, State) of - {Num, Nodes, Info} -> + {Num, Nodes, Info, AddrPairs} -> mongoose_node_num:set_node_num(Num), {{ok, [binary_to_atom(N) || N <- Nodes]}, - State#{last_query_info => Info}} + State#{last_query_info => Info, address_pairs => AddrPairs}} catch Class:Reason:Stacktrace -> ?LOG_ERROR(#{what => discovery_failed_select, class => Class, reason => Reason, stacktrace => Stacktrace}), @@ -55,23 +60,27 @@ is_rdbms_running() -> false end. -try_register(ClusterName, Node, State) when is_binary(Node), is_binary(ClusterName) -> +try_register(ClusterName, Node, State = #{node_ip_binary := Address}) + when is_binary(Node), is_binary(ClusterName) -> prepare(), Timestamp = timestamp(), {selected, Rows} = select(ClusterName), - {Nodes, Nums, _Timestamps} = lists:unzip3(Rows), + Nodes = [element(1, Row) || Row <- Rows], + Nums = [element(2, Row) || Row <- Rows], + Addresses = [element(3, Row) || Row <- Rows], + AddrPairs = maps:from_list(lists:zip(Nodes, Addresses)), AlreadyRegistered = lists:member(Node, Nodes), NodeNum = case AlreadyRegistered of true -> - update_existing(ClusterName, Node, Timestamp), - {value, {_, Num, _TS}} = lists:keysearch(Node, 1, Rows), + update_existing(ClusterName, Node, Address, Timestamp), + {value, {_, Num, _Addr, _TS}} = lists:keysearch(Node, 1, Rows), Num; false -> Num = first_free_num(lists:usort(Nums)), %% Could fail with duplicate node_num reason. %% In this case just wait for the next get_nodes call. - case insert_new(ClusterName, Node, Timestamp, Num) of + case insert_new(ClusterName, Node, Num, Address, Timestamp) of {error, _} -> 0; %% return default node num {updated, 1} -> Num end @@ -79,15 +88,16 @@ try_register(ClusterName, Node, State) when is_binary(Node), is_binary(ClusterNa RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State), %% This could be used for debugging Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp, + address => Address, node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult}, - {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info}. + {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info, AddrPairs}. skip_expired_nodes(Nodes, {removed, ExpiredNodes}) -> (Nodes -- ExpiredNodes). run_cleaning(ClusterName, Timestamp, Rows, State) -> #{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State, - ExpiredNodes = [DbNode || {DbNode, _Num, DbTS} <- Rows, + ExpiredNodes = [DbNode || {DbNode, _Num, _Addr, DbTS} <- Rows, is_expired(DbTS, Timestamp, ExpireTime), DbNode =/= CurrentNode], [delete_node_from_db(ClusterName, DbNode) || DbNode <- ExpiredNodes], @@ -110,30 +120,31 @@ prepare() -> mongoose_rdbms_timestamp:prepare(), mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()), mongoose_rdbms:prepare(cets_disco_insert_new, T, - [cluster_name, node_name, node_num, updated_timestamp], insert_new()), + [cluster_name, node_name, node_num, address, updated_timestamp], insert_new()), mongoose_rdbms:prepare(cets_disco_update_existing, T, - [updated_timestamp, cluster_name, node_name], update_existing()), + [updated_timestamp, address, cluster_name, node_name], update_existing()), mongoose_rdbms:prepare(cets_delete_node_from_db, T, [cluster_name, node_name], delete_node_from_db()). select() -> - <<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>. + <<"SELECT node_name, node_num, address, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>. select(ClusterName) -> mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]). insert_new() -> - <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, updated_timestamp)" - " VALUES (?, ?, ?, ?)">>. + <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, address, updated_timestamp)" + " VALUES (?, ?, ?, ?, ?)">>. -insert_new(ClusterName, Node, Timestamp, Num) -> - mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]). +insert_new(ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp) -> + mongoose_rdbms:execute(global, cets_disco_insert_new, + [ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp]). update_existing() -> - <<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>. + <<"UPDATE discovery_nodes SET updated_timestamp = ?, address = ? WHERE cluster_name = ? AND node_name = ?">>. -update_existing(ClusterName, Node, Timestamp) -> - mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]). +update_existing(ClusterName, NodeName, Address, UpdatedTimestamp) -> + mongoose_rdbms:execute(global, cets_disco_update_existing, [UpdatedTimestamp, Address, ClusterName, NodeName]). delete_node_from_db() -> <<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>. diff --git a/src/mongoose_cluster_id.erl b/src/mongoose_cluster_id.erl index 1420314e6f..82d7399720 100644 --- a/src/mongoose_cluster_id.erl +++ b/src/mongoose_cluster_id.erl @@ -29,6 +29,9 @@ start() -> Backend = which_backend_available(), IntBackend = which_volatile_backend_available(), maybe_prepare_queries(Backend), + cets_long:run_tracked(#{task => wait_for_any_backend, + backend => Backend, volatile_backend => IntBackend}, + fun() -> wait_for_any_backend(Backend, IntBackend) end), CachedRes = get_cached_cluster_id(IntBackend), BackendRes = get_backend_cluster_id(), case {CachedRes, BackendRes} of @@ -47,6 +50,48 @@ start() -> {error, conflict} end. + +%% If RDBMS is available before CETS - it is enough for us to continue +%% the starting procedure +wait_for_any_backend(Backend, IntBackend) -> + Alias = erlang:alias([reply]), + Pids = lists:append([wait_for_backend_promise(B, Alias) || B <- lists:sort([Backend, IntBackend])]), + wait_for_first_reply(Alias), + %% Interrupt other waiting calls to reduce the logging noise + [erlang:exit(Pid, shutdown) || Pid <- Pids], + ok. + +wait_for_first_reply(Alias) -> + receive + {ready, Alias} -> + ok + end. + +wait_for_backend_promise(mnesia, Alias) -> + Alias ! {ready, Alias}, + []; +wait_for_backend_promise(cets, Alias) -> + [spawn(fun() -> + %% We have to do it, because we want to read from across the cluster + %% in the start/0 function. + ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity), + Alias ! {ready, Alias} + end)]; +wait_for_backend_promise(rdbms, Alias) -> + [spawn(fun() -> + cets_long:run_tracked(#{task => wait_for_rdbms}, fun() -> wait_for_rdbms() end), + Alias ! {ready, Alias} + end)]. + +wait_for_rdbms() -> + case get_backend_cluster_id(rdbms) of + {ok, _} -> + ok; + _ -> + timer:sleep(100), + wait_for_rdbms() + end. + %% Get cached version -spec get_cached_cluster_id() -> maybe_cluster_id(). get_cached_cluster_id() -> @@ -102,10 +147,7 @@ init_cache(mnesia) -> ]); init_cache(cets) -> cets:start(cets_cluster_id, #{}), - cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id), - %% We have to do it, because we want to read from across the cluster - %% in the start/0 function. - ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity). + cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id). -spec maybe_prepare_queries(mongoose_backend()) -> ok. maybe_prepare_queries(mnesia) -> ok; diff --git a/src/mongoose_epmd.erl b/src/mongoose_epmd.erl new file mode 100644 index 0000000000..ccfd006251 --- /dev/null +++ b/src/mongoose_epmd.erl @@ -0,0 +1,97 @@ +%% EPMD implementation which redefines how name lookups work +%% There is no behaviour for erl_epmd +-module(mongoose_epmd). +-export([ + start/0, + start_link/0, + stop/0, + port_please/2, port_please/3, + listen_port_please/2, + names/0, names/1, + register_node/2, register_node/3, + address_please/3, + open/0, open/1, open/2 +]). +-include_lib("kernel/include/logger.hrl"). + +%% For debugging +-export([lookup_ip/1]). +-export([match_node_name/2]). + +-type lookup_error() :: + {no_ip_in_db, node()} + | {cannot_connect_to_epmd, node(), inet:ip_address()} + | {no_record_for_node, node()} + | mongoose_node_address_ets_table_not_found. + +-ignore_xref([ + start/0, + start_link/0, + stop/0, + port_please/2, port_please/3, + listen_port_please/2, + names/0, names/1, + register_node/2, register_node/3, + address_please/3, + open/0, open/1, open/2, + lookup_ip/1, match_node_name/2 +]). + +start() -> erl_epmd:start(). +start_link() -> erl_epmd:start_link(). +stop() -> erl_epmd:stop(). +port_please(A, B) -> erl_epmd:port_please(A, B). +port_please(A, B, C) -> erl_epmd:port_please(A, B, C). +listen_port_please(A, B) -> erl_epmd:listen_port_please(A, B). +names() -> erl_epmd:names(). +names(A) -> erl_epmd:names(A). +register_node(A, B) -> erl_epmd:register_node(A, B). +register_node(A, B, C) -> erl_epmd:register_node(A, B, C). +open() -> erl_epmd:open(). +open(A) -> erl_epmd:open(A). +open(A, B) -> erl_epmd:open(A, B). + +address_please(Name, Host, AddressFamily) -> + Node = list_to_binary(Name ++ "@" ++ Host), + case lookup_ip(Node) of + {ok, _IP} = Res -> + Res; + _ -> + %% Fallback to the default behaviour + inet:getaddr(Host, AddressFamily) + end. + +%% This function waits for the IP address to appear. +%% It only works well in case net_kernel does not contact a lot of +%% dead nodes. +%% Generally, we only expect calls for nodes that are alive +%% and the connect is issued by CETS (but CETS checks that node is +%% reachable first) or by connect_all feature in the global module of OTP. +-spec lookup_ip(binary()) -> {ok, inet:ip_address()} | {error, lookup_error()}. +lookup_ip(Node) -> + try + cets_discovery:wait_for_get_nodes(mongoose_cets_discovery, 3000), + cets_discovery:system_info(mongoose_cets_discovery) + of + #{backend_state := BackendState} -> + match_node_name(BackendState, Node) + catch _:_ -> + {error, failed_to_get_disco_state} + end. + +match_node_name(#{address_pairs := Pairs}, Node) -> + case Pairs of + #{Node := <<>>} -> + %% The caller should try DNS. + {error, {no_ip_in_db, Node}}; + #{Node := Bin} -> + inet:parse_address(binary_to_list(Bin)); + #{} -> + {error, {no_record_for_node, Node}} + end; +match_node_name(BackendState, Node) -> + %% Could happen if CETS backend is not mongoose_cets_discovery_rdbms + ?LOG_ERROR(#{what => cets_no_address_pairs_set, + backend_state => BackendState, + remote_node => Node}), + {error, no_address_pairs_set}.