Skip to content

Commit

Permalink
Merge pull request #289 from rabbitmq/add-wait_for_leader-function
Browse files Browse the repository at this point in the history
khepri_cluster: Add `wait_for_leader/{0,1,2}`
  • Loading branch information
dumbbell committed Aug 26, 2024
2 parents be71afe + ba3f345 commit 0db32fd
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 38 deletions.
118 changes: 80 additions & 38 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@
locally_known_nodes/0,
locally_known_nodes/1,
locally_known_nodes/2,
wait_for_leader/0, wait_for_leader/1, wait_for_leader/2,
get_default_ra_system_or_data_dir/0,
get_default_store_id/0,
get_store_ids/0,
is_store_running/1]).

%% Internal.
-export([node_to_member/2,
this_member/1,
wait_for_cluster_readiness/2]).
this_member/1]).

-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
Expand All @@ -133,8 +133,7 @@
-dialyzer({no_underspecs, [start/1,
stop/0, stop/1,
stop_locked/1,
join/2,
wait_for_remote_cluster_readiness/3]}).
join/2]}).

-define(IS_RA_SYSTEM(RaSystem), is_atom(RaSystem)).
-define(IS_RA_SERVER(RaServer), (is_tuple(RaServer) andalso
Expand Down Expand Up @@ -756,7 +755,7 @@ reset_remotely_and_join_locked(
?LOG_DEBUG(
"Remote cluster (reached through node ~0p) is not ready "
"for a membership change yet; waiting...", [RemoteNode]),
Ret2 = wait_for_cluster_readiness(StoreId, Timeout1),
Ret2 = wait_for_leader(StoreId, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok ->
Expand Down Expand Up @@ -848,8 +847,7 @@ do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) ->
?LOG_DEBUG(
"Remote cluster (reached through node ~0p) is not ready "
"for a membership change yet; waiting...", [RemoteNode]),
Ret2 = wait_for_remote_cluster_readiness(
StoreId, RemoteNode, Timeout1),
Ret2 = wait_for_leader(RemoteMember, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok ->
Expand All @@ -873,36 +871,6 @@ do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) ->
end
end.

-spec wait_for_cluster_readiness(StoreId, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error(?khepri_error(
timeout_waiting_for_cluster_readiness,
#{store_id := StoreId})).
%% @private

wait_for_cluster_readiness(StoreId, Timeout) ->
%% If querying the cluster members succeeds, we must have a quorum, right?
case members(StoreId, Timeout) of
{ok, _} -> ok;
Error -> Error
end.

-spec wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @private

wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) ->
erpc:call(
RemoteNode,
khepri_cluster, wait_for_cluster_readiness, [StoreId, Timeout],
Timeout).

-spec reset() -> Ret when
Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.
Expand Down Expand Up @@ -978,7 +946,7 @@ do_reset(RaSystem, StoreId, ThisMember, Timeout) ->
"Cluster is not ready for a membership change yet; waiting",
[]),
try
Ret2 = wait_for_cluster_readiness(StoreId, Timeout1),
Ret2 = wait_for_leader(StoreId, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok -> do_reset(RaSystem, StoreId, ThisMember, Timeout2);
Expand Down Expand Up @@ -1367,6 +1335,80 @@ locally_known_nodes(StoreId, Timeout) ->
Error -> Error
end.

-spec wait_for_leader() -> Ret when
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId)'
%% with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see wait_for_leader/1.
%% @see wait_for_leader/2.

wait_for_leader() ->
StoreId = get_default_store_id(),
wait_for_leader(StoreId).

-spec wait_for_leader(StoreIdOrRaServer) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see wait_for_leader/2.

wait_for_leader(StoreIdOrRaServer) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_leader(StoreIdOrRaServer, Timeout).

-spec wait_for_leader(StoreIdOrRaServer, Timeout) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% This is useful if you want to be sure the clustered store is ready before
%% issueing writes and queries. Note that there are obviously no guaranties
%% that the Raft quorum will be lost just after this call.
%%
%% @param StoreId the ID of the store that should elect a leader before this
%% call can return successfully.
%% @param Timeout the timeout.
%%
%% @returns `ok' if a leader was elected or an `{error, Reason}' tuple.

wait_for_leader(StoreId, Timeout) when is_atom(StoreId) ->
ThisMember = this_member(StoreId),
wait_for_leader(ThisMember, Timeout);
wait_for_leader(RaServer, Timeout) ->
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:members(RaServer, Timeout) of
{ok, _Members, _LeaderId} ->
ok;
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse
Reason == noconnection orelse
Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL, NewTimeout0),
wait_for_leader(RaServer, NewTimeout);
{timeout, _} ->
{error, timeout};
{error, _} = Error ->
Error
end.

-spec node_to_member(StoreId, Node) -> Member when
StoreId :: khepri:store_id(),
Node :: node(),
Expand Down
145 changes: 145 additions & 0 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
can_start_a_single_node/1,
can_restart_a_single_node_with_ra_server_config/1,
can_query_members_with_a_single_node/1,
can_wait_for_leader_with_a_single_node/1,
fail_to_start_with_bad_ra_server_config/1,
initial_members_are_ignored/1,
can_start_a_three_node_cluster/1,
Expand All @@ -38,6 +39,7 @@
can_restart_nodes_in_a_three_node_cluster/1,
can_reset_a_cluster_member/1,
can_query_members_with_a_three_node_cluster/1,
can_wait_for_leader_with_a_three_node_cluster/1,
fail_to_join_if_not_started/1,
fail_to_join_non_existing_node/1,
fail_to_join_non_existing_store/1,
Expand All @@ -55,6 +57,7 @@ all() ->
[can_start_a_single_node,
can_restart_a_single_node_with_ra_server_config,
can_query_members_with_a_single_node,
can_wait_for_leader_with_a_single_node,
fail_to_start_with_bad_ra_server_config,
initial_members_are_ignored,
can_start_a_three_node_cluster,
Expand All @@ -63,6 +66,7 @@ all() ->
can_restart_nodes_in_a_three_node_cluster,
can_reset_a_cluster_member,
can_query_members_with_a_three_node_cluster,
can_wait_for_leader_with_a_three_node_cluster,
fail_to_join_if_not_started,
fail_to_join_non_existing_node,
fail_to_join_non_existing_store,
Expand Down Expand Up @@ -108,6 +112,7 @@ init_per_testcase(Testcase, Config)
when Testcase =:= can_start_a_single_node orelse
Testcase =:= can_restart_a_single_node_with_ra_server_config orelse
Testcase =:= can_query_members_with_a_single_node orelse
Testcase =:= can_wait_for_leader_with_a_single_node orelse
Testcase =:= fail_to_start_with_bad_ra_server_config orelse
Testcase =:= initial_members_are_ignored orelse
Testcase =:= fail_to_join_non_existing_node orelse
Expand All @@ -123,6 +128,7 @@ init_per_testcase(Testcase, Config)
Testcase =:= can_restart_nodes_in_a_three_node_cluster orelse
Testcase =:= can_reset_a_cluster_member orelse
Testcase =:= can_query_members_with_a_three_node_cluster orelse
Testcase =:= can_wait_for_leader_with_a_three_node_cluster orelse
Testcase =:= fail_to_join_if_not_started orelse
Testcase =:= fail_to_join_non_existing_store orelse
Testcase =:= handle_leader_down_on_three_node_cluster_command orelse
Expand Down Expand Up @@ -361,6 +367,47 @@ can_query_members_with_a_single_node(Config) ->

ok.

can_wait_for_leader_with_a_single_node(Config) ->
Node = node(),
#{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config),
StoreId = RaSystem,

ct:pal("Wait for leader before starting database"),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId)),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId, 2000)),

ct:pal("Start database and wait for it in parallel"),
Parent = self(),
_ = spawn_link(fun() ->
timer:sleep(2000),
?assertEqual(
{ok, StoreId},
khepri:start(RaSystem, StoreId)),
erlang:unlink(Parent)
end),
?assertEqual(
ok,
khepri_cluster:wait_for_leader(StoreId, 40000)),

ct:pal("Stop database"),
?assertEqual(
ok,
khepri:stop(StoreId)),

ct:pal("Wait for leader after stopping database"),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId)),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId, 2000)),

ok.

fail_to_start_with_bad_ra_server_config(Config) ->
Node = node(),
#{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config),
Expand Down Expand Up @@ -1298,6 +1345,100 @@ can_query_members_with_a_three_node_cluster(Config) ->

ok.

can_wait_for_leader_with_a_three_node_cluster(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
PeerPerNode = ?config(peer_nodes, Config),
[Node1, Node2, Node3] = Nodes = lists:sort(maps:keys(PropsPerNode)),

%% We assume all nodes are using the same Ra system name & store ID.
#{ra_system := RaSystem} = maps:get(Node1, PropsPerNode),
StoreId = RaSystem,

ct:pal("Wait for leader before starting database"),
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, Nodes),

ct:pal("Start database + cluster nodes"),
lists:foreach(
fun(Node) ->
ct:pal("- khepri:start() from node ~s", [Node]),
?assertEqual(
{ok, StoreId},
rpc:call(Node, khepri, start, [RaSystem, StoreId]))
end, Nodes),
lists:foreach(
fun(Node) ->
ct:pal("- khepri_cluster:join() from node ~s", [Node]),
?assertEqual(
ok,
rpc:call(Node, khepri_cluster, join, [StoreId, Node3]))
end, [Node1, Node2]),

ct:pal("Wait for leader after starting database"),
lists:foreach(
fun(Node) ->
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, Nodes),

LeaderId1 = get_leader_in_store(StoreId, Nodes),
{StoreId, LeaderNode1} = LeaderId1,
ct:pal("Stop node ~s", [LeaderNode1]),
LeaderPeer1 = proplists:get_value(LeaderNode1, PeerPerNode),
?assertEqual(ok, stop_erlang_node(LeaderNode1, LeaderPeer1)),

ct:pal("Wait for leader after stopping leader"),
LeftNodes1 = Nodes -- [LeaderNode1],
lists:foreach(
fun(Node) ->
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, LeftNodes1),

lists:foreach(
fun(Node) ->
ct:pal("Stop node ~s", [Node]),
?assertEqual(
ok,
rpc:call(Node, khepri, stop, [StoreId]))
end, LeftNodes1),

ct:pal("Wait for leader after stopping database"),
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, LeftNodes1),

ok.

fail_to_join_if_not_started(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
[Node1, Node2, _Node3] = maps:keys(PropsPerNode),
Expand Down Expand Up @@ -1396,6 +1537,8 @@ can_use_default_store_on_single_node(_Config) ->
?assertEqual({error, noproc}, khepri_cluster:nodes()),
?assertEqual({error, noproc}, khepri_cluster:locally_known_nodes()),

?assertEqual({error, noproc}, khepri_cluster:wait_for_leader()),

{ok, StoreId} = khepri:start(),
?assert(filelib:is_dir(DataDir)),

Expand Down Expand Up @@ -1493,6 +1636,8 @@ can_use_default_store_on_single_node(_Config) ->
?assertEqual({ok, [Node]}, khepri_cluster:nodes()),
?assertEqual({ok, [Node]}, khepri_cluster:locally_known_nodes()),

?assertEqual(ok, khepri_cluster:wait_for_leader()),

?assertEqual(ok, khepri:stop()),
?assertEqual({error, noproc}, khepri:get([foo])),
?assertEqual({error, noproc}, khepri:exists([foo])),
Expand Down

0 comments on commit 0db32fd

Please sign in to comment.