Skip to content

Commit

Permalink
Merge pull request #287 from rabbitmq/fixes-to-khepri_cluster-member-…
Browse files Browse the repository at this point in the history
…queries

Fixes to `khepri_cluster` member queries
  • Loading branch information
michaelklishin authored Aug 22, 2024
2 parents 1bc7edb + dd04e04 commit 2e7f1b8
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 19 deletions.
18 changes: 17 additions & 1 deletion src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,8 @@ do_query_members(StoreId, RaServer, QueryType, Timeout) ->
case ra:members(Arg, Timeout) of
{ok, Members, _} ->
{ok, lists:sort(Members)};
{error, noproc} = Error ->
{error, noproc} = Error
when ?HAS_TIME_LEFT(Timeout) ->
case khepri_utils:is_ra_server_alive(RaServer) of
true ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
Expand All @@ -1235,6 +1236,21 @@ do_query_members(StoreId, RaServer, QueryType, Timeout) ->
[StoreId]),
Error
end;
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noconnection orelse
Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?NOPROC_RETRY_INTERVAL, NewTimeout0),
do_query_members(
StoreId, RaServer, QueryType, NewTimeout);
{timeout, _} ->
?LOG_WARNING(
"Timeout while querying members in store \"~s\"",
[StoreId]),
{error, timeout};
Error ->
?LOG_WARNING(
"Failed to query members in store \"~s\": ~p",
Expand Down
2 changes: 2 additions & 0 deletions src/khepri_cluster.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@

-define(IS_TIMEOUT(Timeout), (Timeout =:= infinity orelse
(is_integer(Timeout) andalso Timeout >= 0))).

-define(HAS_TIME_LEFT(Timeout), (Timeout =:= infinity orelse Timeout > 0)).
2 changes: 0 additions & 2 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@
command/0,
old_command/0]).

-define(HAS_TIME_LEFT(Timeout), (Timeout =:= infinity orelse Timeout > 0)).

-define(PROJECTION_PROPS_TO_RETURN, [payload_version,
child_list_version,
child_list_length,
Expand Down
61 changes: 45 additions & 16 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ can_reset_a_cluster_member(Config) ->

can_query_members_with_a_three_node_cluster(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
PeerPerNode = ?config(peer_nodes, Config),
[Node1, Node2, Node3] = Nodes = lists:sort(maps:keys(PropsPerNode)),

%% We assume all nodes are using the same Ra system name & store ID.
Expand Down Expand Up @@ -1218,54 +1219,82 @@ can_query_members_with_a_three_node_cluster(Config) ->
khepri_cluster, locally_known_nodes, [StoreId, 10000]))
end, Nodes),

ct:pal("Stop database"),
LeaderId1 = get_leader_in_store(StoreId, Nodes),
{StoreId, LeaderNode1} = LeaderId1,
ct:pal("Stop node ~s", [LeaderNode1]),
LeaderPeer1 = proplists:get_value(LeaderNode1, PeerPerNode),
?assertEqual(ok, stop_erlang_node(LeaderNode1, LeaderPeer1)),

ct:pal("Query members after stopping node ~s", [LeaderNode1]),
LeftNodes1 = Nodes -- [LeaderNode1],
lists:foreach(
fun(Node) ->
ct:pal("- khepri:stop() from node ~s", [Node]),
?assertEqual(ok, rpc:call(Node, khepri, stop, [StoreId]))
end, Nodes),
?assertEqual(
{ok, Nodes},
erpc:call(Node, khepri_cluster, nodes, [StoreId])),
?assertEqual(
{ok, Nodes},
erpc:call(Node, khepri_cluster, nodes, [StoreId, 10000])),
?assertEqual(
{ok, Nodes},
erpc:call(
Node,
khepri_cluster, locally_known_nodes, [StoreId])),
?assertEqual(
{ok, Nodes},
erpc:call(
Node,
khepri_cluster, locally_known_nodes, [StoreId, 10000]))
end, LeftNodes1),

ct:pal("Query members after stopping database"),
LeaderId2 = get_leader_in_store(StoreId, LeftNodes1),
{StoreId, LeaderNode2} = LeaderId2,
ct:pal("Stop node ~s", [LeaderNode2]),
LeaderPeer2 = proplists:get_value(LeaderNode2, PeerPerNode),
?assertEqual(ok, stop_erlang_node(LeaderNode2, LeaderPeer2)),

ct:pal("Query members after stopping node ~s", [LeaderNode2]),
LeftNodes2 = LeftNodes1 -- [LeaderNode2],
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
{error, timeout},
erpc:call(Node, khepri_cluster, members, [StoreId])),
?assertEqual(
{error, noproc},
{error, timeout},
erpc:call(Node, khepri_cluster, members, [StoreId, 10000])),
?assertEqual(
{error, noproc},
{ok, [{StoreId, N} || N <- Nodes]},
erpc:call(
Node,
khepri_cluster, locally_known_members, [StoreId])),
?assertEqual(
{error, noproc},
{ok, [{StoreId, N} || N <- Nodes]},
erpc:call(
Node,
khepri_cluster, locally_known_members, [StoreId, 10000]))
end, Nodes),
end, LeftNodes2),

ct:pal("Query nodes after stopping database"),
ct:pal("Query nodes after stopping node ~s", [LeaderNode2]),
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
{error, timeout},
erpc:call(Node, khepri_cluster, nodes, [StoreId])),
?assertEqual(
{error, noproc},
{error, timeout},
erpc:call(Node, khepri_cluster, nodes, [StoreId, 10000])),
?assertEqual(
{error, noproc},
{ok, Nodes},
erpc:call(
Node,
khepri_cluster, locally_known_nodes, [StoreId])),
?assertEqual(
{error, noproc},
{ok, Nodes},
erpc:call(
Node,
khepri_cluster, locally_known_nodes, [StoreId, 10000]))
end, Nodes),
end, LeftNodes2),

ok.

Expand Down

0 comments on commit 2e7f1b8

Please sign in to comment.