Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite ping #41

Merged
merged 41 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
162cb24
Improve net_adm:ping/1
arcusfelis Nov 9, 2023
18e119c
Rewrite ping logic - use connect_node there instead
arcusfelis Nov 10, 2023
96cf985
Do check_could_reach_each_other before check_fully_connected
arcusfelis Nov 10, 2023
d6e617e
Report transaction retry on info level, not error
arcusfelis Nov 14, 2023
3ad38f3
Log nodeup/nodedown events with warning level
arcusfelis Nov 14, 2023
7acbabc
Report downtime duration
arcusfelis Nov 14, 2023
bca3206
Use undefined instead of unknown for timestaps in logs
arcusfelis Nov 14, 2023
df69652
Skip undefined timestamps in logs
arcusfelis Nov 14, 2023
82b170b
Report alive nodes count on nodeup/nodedown
arcusfelis Nov 14, 2023
9686b0d
Report time_since_startup_in_milliseconds for nodeup/nodedown events
arcusfelis Nov 14, 2023
a02e328
Use epmd instead of resolver to test if node is pingable
arcusfelis Nov 16, 2023
966bf81
Do not report long task failing with reason stop
arcusfelis Nov 16, 2023
22afd47
Log warning if same node reconnects
arcusfelis Nov 16, 2023
ea5d848
Add node_down_history
arcusfelis Nov 17, 2023
8308ef6
Sort available nodes in the status API
arcusfelis Nov 17, 2023
bd2560b
Add +node_down_history_is_updated_when_netsplit_happens testcase
arcusfelis Nov 20, 2023
3bbb154
Add disco_logs_nodeup
arcusfelis Nov 20, 2023
f7d74a0
Add disco_logs_nodedown
arcusfelis Nov 20, 2023
bf0f174
Add disco_logs_nodeup_after_downtime testcase
arcusfelis Nov 20, 2023
9508a8d
Move logging into handle_nodedown/handle_nodeup
arcusfelis Nov 20, 2023
03cdbb3
Add disco_logs_node_reconnects_after_downtime testcase
arcusfelis Nov 20, 2023
75d9ee8
Add disco_logs_nodeup_no_log/disco_logs_nodedown_no_log testcases
arcusfelis Nov 20, 2023
21b4280
Tweak for code coverage and logging
arcusfelis Nov 20, 2023
52c04ea
Test that start_time is set in logs
arcusfelis Nov 20, 2023
5b403b7
Add disco_nodeup_timestamp_is_updated_after_node_reconnects testcase
arcusfelis Nov 20, 2023
1ead59a
Add disco_node_start_timestamp_is_updated_after_node_restarts testcase
arcusfelis Nov 20, 2023
ef05e4f
Add cets_discovery into a list of modules to disable logging during t…
arcusfelis Nov 20, 2023
bb1e849
Use proto_dist to just do one adress_please call per ping
arcusfelis Nov 20, 2023
b1dd16f
Add cets_ping_non_existing_node testcase
arcusfelis Nov 20, 2023
39dfdb9
Add tests for net_family code in cets_ping
arcusfelis Nov 20, 2023
e637138
Add stop_reason_is_not_logged_in_tracked testcase
arcusfelis Nov 20, 2023
87adc9f
Fail if cannot split nodename in cets_ping
arcusfelis Nov 20, 2023
7732bb8
Fix flaking tests
arcusfelis Nov 20, 2023
1f2817d
Rename special reason from stop to shutdown
arcusfelis Nov 21, 2023
1e65ec1
Add unexpected_nodedown_is_ignored_by_disco testcase
arcusfelis Nov 22, 2023
d04673f
Add ping_pairs tests
arcusfelis Nov 22, 2023
293ba6b
Simplify ipv6 matching in net_family
arcusfelis Nov 24, 2023
9e3e789
Fix typos in cets_ping
arcusfelis Nov 24, 2023
2a11c19
Deduplicate code in tests
arcusfelis Nov 24, 2023
49cf3f9
Trigger get_nodes on nodeup
arcusfelis Nov 24, 2023
4b1d86d
Remove outdated comment
arcusfelis Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
is_leader := boolean(),
opts := start_opts(),
backlog := [backlog_entry()],
pause_monitors := [pause_monitor()]
pause_monitors := [pause_monitor()],
node_down_history := [node()]
}.

-type long_msg() ::
Expand All @@ -154,7 +155,8 @@
memory := non_neg_integer(),
ack_pid := ack_pid(),
join_ref := join_ref(),
opts := start_opts()
opts := start_opts(),
node_down_history := [node()]
}.

-type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok).
Expand Down Expand Up @@ -417,7 +419,8 @@ init({Tab, Opts}) ->
is_leader => true,
opts => Opts,
backlog => [],
pause_monitors => []
pause_monitors => [],
node_down_history => []
}}.

-spec handle_call(long_msg() | {op, op()}, from(), state()) ->
Expand Down Expand Up @@ -524,7 +527,7 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid})
cets_ack:send_remote_down(AckPid, RemotePid),
call_user_handle_down(RemotePid, State),
Servers2 = lists:delete(RemotePid, Servers),
set_other_servers(Servers2, State);
update_node_down_history(RemotePid, set_other_servers(Servers2, State));
false ->
%% This should not happen
?LOG_ERROR(#{
Expand All @@ -535,6 +538,9 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid})
State
end.

update_node_down_history(RemotePid, State = #{node_down_history := History}) ->
State#{node_down_history := [node(RemotePid) | History]}.

%% Merge two lists of pids, create the missing monitors.
-spec add_servers(Servers, Servers) -> Servers when Servers :: servers().
add_servers(Pids, Servers) ->
Expand Down Expand Up @@ -742,6 +748,7 @@ handle_get_info(
other_servers := Servers,
ack_pid := AckPid,
join_ref := JoinRef,
node_down_history := DownHistory,
opts := Opts
}
) ->
Expand All @@ -752,6 +759,7 @@ handle_get_info(
memory => ets:info(Tab, memory),
ack_pid => AckPid,
join_ref => JoinRef,
node_down_history => DownHistory,
opts => Opts
}.

Expand Down
140 changes: 131 additions & 9 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@
join_status := not_running | running,
should_retry_join := boolean(),
timer_ref := reference() | undefined,
pending_wait_for_ready := [gen_server:from()]
pending_wait_for_ready := [gen_server:from()],
nodeup_timestamps := #{node() => milliseconds()},
nodedown_timestamps := #{node() => milliseconds()},
node_start_timestamps := #{node() => milliseconds()},
start_time := milliseconds()
}.
-type milliseconds() :: integer().

%% Backend could define its own options
-type opts() :: #{name := atom(), _ := _}.
Expand Down Expand Up @@ -151,6 +156,7 @@ wait_for_ready(Server, Timeout) ->

-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
%% Sends nodeup / nodedown
ok = net_kernel:monitor_nodes(true),
Mod = maps:get(backend_module, Opts, cets_discovery_file),
Expand All @@ -159,7 +165,7 @@ init(Opts) ->
BackendState = Mod:init(Opts),
%% Changes phase from initial to regular (affects the check interval)
erlang:send_after(timer:minutes(5), self(), enter_regular_phase),
{ok, #{
State = #{
phase => initial,
results => [],
nodes => [],
Expand All @@ -174,8 +180,16 @@ init(Opts) ->
join_status => not_running,
should_retry_join => false,
timer_ref => undefined,
pending_wait_for_ready => []
}}.
pending_wait_for_ready => [],
nodeup_timestamps => #{},
node_start_timestamps => #{},
nodedown_timestamps => #{},
start_time => StartTime
},
%% Set initial timestamps because we would not receive nodeup events for
%% already connected nodes
State2 = lists:foldl(fun handle_nodeup/2, State, nodes()),
{ok, State2}.

-spec handle_call(term(), from(), state()) -> {reply, term(), state()} | {noreply, state()}.
handle_call(get_tables, _From, State = #{tables := Tables}) ->
Expand Down Expand Up @@ -216,12 +230,21 @@ handle_info(check, State) ->
handle_info({handle_check_result, Result, BackendState}, State) ->
{noreply, handle_get_nodes_result(Result, BackendState, State)};
handle_info({nodeup, Node}, State) ->
State2 = remove_node_from_unavailable_list(Node, State),
{noreply, try_joining(State2)};
handle_info({nodedown, _Node}, State) ->
%% nodeup triggers get_nodes call.
%% We are interested in up-to-date data
%% (in MongooseIM we want to know IPs of other nodes as soon as possible
%% after some node connects to us)
self() ! check,
State2 = handle_nodeup(Node, State),
State3 = remove_node_from_unavailable_list(Node, State2),
{noreply, try_joining(State3)};
handle_info({nodedown, Node}, State) ->
State2 = handle_nodedown(Node, State),
%% Do another check to update unavailable_nodes list
self() ! check,
{noreply, State};
{noreply, State2};
handle_info({start_time, Node, StartTime}, State) ->
{noreply, handle_receive_start_time(Node, StartTime, State)};
handle_info({joining_finished, Results}, State) ->
{noreply, handle_joining_finished(Results, State)};
handle_info({ping_result, Node, Result}, State) ->
Expand Down Expand Up @@ -315,7 +338,7 @@ ping_not_connected_nodes(Nodes) ->
Self = self(),
NotConNodes = Nodes -- [node() | nodes()],
[
spawn(fun() -> Self ! {ping_result, Node, net_adm:ping(Node)} end)
spawn(fun() -> Self ! {ping_result, Node, cets_ping:ping(Node)} end)
|| Node <- lists:sort(NotConNodes)
],
ok.
Expand Down Expand Up @@ -454,3 +477,102 @@ has_join_result_for(Node, Table, #{results := Results}) ->
-spec handle_system_info(state()) -> system_info().
handle_system_info(State) ->
State#{verify_ready => verify_ready(State)}.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State) ->
State2 = remember_nodedown_timestamp(Node, State),
{NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2),
?LOG_WARNING(
set_defined(connected_millisecond_duration, NodeUpTime, #{
what => nodedown,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State3.

-spec handle_nodeup(node(), state()) -> state().
handle_nodeup(Node, State) ->
send_start_time_to(Node, State),
State2 = remember_nodeup_timestamp(Node, State),
NodeDownTime = get_downtime(Node, State2),
?LOG_WARNING(
set_defined(downtime_millisecond_duration, NodeDownTime, #{
what => nodeup,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
%% We report that time so we could work on minimizing that time.
%% It says how long it took to discover nodes after startup.
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State2.

-spec remember_nodeup_timestamp(node(), state()) -> state().
remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Map2 = Map#{Node => Time},
State#{nodeup_timestamps := Map2}.

-spec remember_nodedown_timestamp(node(), state()) -> state().
remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Map2 = Map#{Node => Time},
State#{nodedown_timestamps := Map2}.

-spec remove_nodeup_timestamp(node(), state()) -> {integer(), state()}.
remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
StartTime = maps:get(Node, Map, undefined),
NodeUpTime = calculate_uptime(StartTime),
Map2 = maps:remove(Node, Map),
{NodeUpTime, State#{nodeup_timestamps := Map2}}.

calculate_uptime(undefined) ->
undefined;
calculate_uptime(StartTime) ->
time_since(StartTime).

get_downtime(Node, #{nodedown_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
undefined;
WentDown ->
time_since(WentDown)
end.

set_defined(_Key, undefined, Map) ->
Map;
set_defined(Key, Value, Map) ->
Map#{Key => Value}.

time_since_startup_in_milliseconds(#{start_time := StartTime}) ->
time_since(StartTime).

time_since(StartTime) ->
erlang:system_time(millisecond) - StartTime.

send_start_time_to(Node, #{start_time := StartTime}) ->
case erlang:process_info(self(), registered_name) of
{registered_name, Name} ->
erlang:send({Name, Node}, {start_time, node(), StartTime});
_ ->
ok
end.

handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
ok;
StartTime ->
?LOG_WARNING(#{
what => node_reconnects,
remote_node => Node,
start_time => StartTime,
text => <<"Netsplit recovery. The remote node has been connected to us before.">>
});
_ ->
%% Restarted node reconnected, this is fine during the rolling updates
ok
end,
State#{node_start_timestamps := maps:put(Node, StartTime, Map)}.
19 changes: 6 additions & 13 deletions src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->
%% Just lock all nodes, no magic here :)
Nodes = [node() | nodes()],
Retries = 1,
%% global could abort the transaction when one of the nodes goes down.
%% It could usually abort it during startup or update.
case global:trans(LockRequest, F, Nodes, Retries) of
aborted ->
checkpoint(before_retry, JoinOpts),
?LOG_ERROR(Info#{what => join_retry, reason => lock_aborted}),
?LOG_INFO(Info#{what => join_retry, reason => lock_aborted}),
join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts);
Result ->
Result
Expand Down Expand Up @@ -195,9 +197,9 @@ get_pids(Pid) ->
check_pids(Info, LocPids, RemPids, JoinOpts) ->
check_do_not_overlap(Info, LocPids, RemPids),
checkpoint(before_check_fully_connected, JoinOpts),
check_could_reach_each_other(Info, LocPids, RemPids),
check_fully_connected(Info, LocPids),
check_fully_connected(Info, RemPids),
check_could_reach_each_other(Info, LocPids, RemPids).
check_fully_connected(Info, RemPids).

-spec check_could_reach_each_other(cets_long:log_info(), cets:servers(), cets:servers()) -> ok.
check_could_reach_each_other(Info, LocPids, RemPids) ->
Expand All @@ -207,16 +209,7 @@ check_could_reach_each_other(Info, LocPids, RemPids) ->
{min(LocNode, RemNode), max(LocNode, RemNode)}
|| LocNode <- LocNodes, RemNode <- RemNodes, LocNode =/= RemNode
]),
Results =
[
{Node1, Node2,
cets_long:run_tracked(
#{task => ping_node, node1 => Node1, node2 => Node2}, fun() ->
rpc:call(Node1, net_adm, ping, [Node2], 10000)
end
)}
|| {Node1, Node2} <- Pairs
],
Results = cets_ping:ping_pairs(Pairs),
NotConnected = [X || {_Node1, _Node2, Res} = X <- Results, Res =/= pong],
case NotConnected of
[] ->
Expand Down
3 changes: 3 additions & 0 deletions src/cets_long.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ run_monitor(Info, Ref, Parent, Start) ->

monitor_loop(Mon, Info, Parent, Start, Interval) ->
receive
{'DOWN', _MonRef, process, _Pid, shutdown} ->
%% Special case, the long task is stopped using exit(Pid, shutdown)
ok;
{'DOWN', MonRef, process, _Pid, Reason} when Mon =:= MonRef ->
?LOG_ERROR(Info#{
what => task_failed,
Expand Down
79 changes: 79 additions & 0 deletions src/cets_ping.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
-module(cets_ping).
-export([ping/1, ping_pairs/1]).

-ifdef(TEST).
-export([net_family/1]).
-endif.

ping(Node) when is_atom(Node) ->
%% It is important to understand, that initial setup for dist connections
%% is done by the single net_kernel process.
%% It calls net_kernel:setup, which calls inet_tcp_dist, which calls
%% erl_epmd:address_please/3, which does a DNS request.
%% If DNS is slow - net_kernel process would become busy.
%% But if we have a lot of offline nodes in the CETS discovery table,
%% we would try to call net_kernel for each node (even if we probably would receive
%% {error, nxdomain} from erl_epmd:address_please/3).
%% So, we first do nslookup here and only after that we try to connect.
case lists:member(Node, nodes()) of
true ->
pong;
false ->
{node, Name, Host} = dist_util:split_node(Node),
Epmd = net_kernel:epmd_module(),
case Epmd:address_please(Name, Host, net_family()) of
{error, _} ->
pang;
_ ->
connect_ping(Node)
end
end.

%% The user should use proto_dist flag to enable inet6.
-spec net_family() -> inet | inet6.
net_family() ->
net_family(init:get_argument(proto_dist)).

net_family({ok, [["inet6" ++ _]]}) ->
inet6;
net_family(_) ->
inet.

connect_ping(Node) ->
%% We could use net_adm:ping/1 but it:
%% - disconnects node on pang - we don't want that
%% (because it could disconnect an already connected node because of race conditions)
%% - calls net_kernel's gen_server of the remote server,
%% but it could be busy doing something, which means slower response time.
case net_kernel:connect_node(Node) of
true ->
pong;
_ ->
pang
end.

-spec ping_pairs([{node(), node()}]) -> [{node(), node(), pong | Reason :: term()}].
ping_pairs(Pairs) ->
%% We could use rpc:multicall(Nodes, cets_ping, ping, Args).
%% But it means more chance of nodes trying to contact each other.
ping_pairs_stop_on_pang(Pairs).

ping_pairs_stop_on_pang([{Node1, Node2} | Pairs]) ->
F = fun() -> rpc:call(Node1, cets_ping, ping, [Node2], 10000) end,
Info = #{task => ping_node, node1 => Node1, node2 => Node2},
Res = cets_long:run_tracked(Info, F),
case Res of
pong ->
[{Node1, Node2, pong} | ping_pairs_stop_on_pang(Pairs)];
Other ->
%% We do not need to ping the rest of nodes -
%% one node returning pang is enough to cancel join.
%% We could exit earlier and save some time
%% (connect_node to the dead node could be time consuming)
[{Node1, Node2, Other} | fail_pairs(Pairs, skipped)]
end;
ping_pairs_stop_on_pang([]) ->
[].

fail_pairs(Pairs, Reason) ->
[{Node1, Node2, Reason} || {Node1, Node2} <- Pairs].
2 changes: 1 addition & 1 deletion src/cets_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ gather_data(Disco) ->
ThisNode = node(),
Info = cets_discovery:system_info(Disco),
#{tables := Tables} = Info,
OnlineNodes = [ThisNode | nodes()],
OnlineNodes = lists:sort([ThisNode | nodes()]),
AvailNodes = available_nodes(Disco, OnlineNodes),
Expected = get_local_table_to_other_nodes_map(Tables),
OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco),
Expand Down
Loading