diff --git a/src/khepri_cluster.erl b/src/khepri_cluster.erl index 830d4ca9..892f51d3 100644 --- a/src/khepri_cluster.erl +++ b/src/khepri_cluster.erl @@ -115,6 +115,7 @@ locally_known_nodes/0, locally_known_nodes/1, locally_known_nodes/2, + wait_for_leader/0, wait_for_leader/1, wait_for_leader/2, get_default_ra_system_or_data_dir/0, get_default_store_id/0, get_store_ids/0, @@ -122,8 +123,7 @@ %% Internal. -export([node_to_member/2, - this_member/1, - wait_for_cluster_readiness/2]). + this_member/1]). -ifdef(TEST). -export([wait_for_ra_server_exit/1, @@ -133,8 +133,7 @@ -dialyzer({no_underspecs, [start/1, stop/0, stop/1, stop_locked/1, - join/2, - wait_for_remote_cluster_readiness/3]}). + join/2]}). -define(IS_RA_SYSTEM(RaSystem), is_atom(RaSystem)). -define(IS_RA_SERVER(RaServer), (is_tuple(RaServer) andalso @@ -756,7 +755,7 @@ reset_remotely_and_join_locked( ?LOG_DEBUG( "Remote cluster (reached through node ~0p) is not ready " "for a membership change yet; waiting...", [RemoteNode]), - Ret2 = wait_for_cluster_readiness(StoreId, Timeout1), + Ret2 = wait_for_leader(StoreId, Timeout1), Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2), case Ret2 of ok -> @@ -848,8 +847,7 @@ do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) -> ?LOG_DEBUG( "Remote cluster (reached through node ~0p) is not ready " "for a membership change yet; waiting...", [RemoteNode]), - Ret2 = wait_for_remote_cluster_readiness( - StoreId, RemoteNode, Timeout1), + Ret2 = wait_for_leader(RemoteMember, Timeout1), Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2), case Ret2 of ok -> @@ -873,36 +871,6 @@ do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) -> end end. --spec wait_for_cluster_readiness(StoreId, Timeout) -> - Ret when - StoreId :: khepri:store_id(), - Timeout :: timeout(), - Ret :: ok | khepri:error(?khepri_error( - timeout_waiting_for_cluster_readiness, - #{store_id := StoreId})). -%% @private - -wait_for_cluster_readiness(StoreId, Timeout) -> - %% If querying the cluster members succeeds, we must have a quorum, right? - case members(StoreId, Timeout) of - {ok, _} -> ok; - Error -> Error - end. - --spec wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) -> - Ret when - StoreId :: khepri:store_id(), - RemoteNode :: node(), - Timeout :: timeout(), - Ret :: ok | khepri:error(). -%% @private - -wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) -> - erpc:call( - RemoteNode, - khepri_cluster, wait_for_cluster_readiness, [StoreId, Timeout], - Timeout). - -spec reset() -> Ret when Ret :: ok | khepri:error(). %% @doc Resets the store on this Erlang node. @@ -978,7 +946,7 @@ do_reset(RaSystem, StoreId, ThisMember, Timeout) -> "Cluster is not ready for a membership change yet; waiting", []), try - Ret2 = wait_for_cluster_readiness(StoreId, Timeout1), + Ret2 = wait_for_leader(StoreId, Timeout1), Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2), case Ret2 of ok -> do_reset(RaSystem, StoreId, ThisMember, Timeout2); @@ -1367,6 +1335,80 @@ locally_known_nodes(StoreId, Timeout) -> Error -> Error end. +-spec wait_for_leader() -> Ret when + Ret :: ok | khepri:error(). +%% @doc Waits for a leader to be elected. +%% +%% Calling this function is the same as calling `wait_for_leader(StoreId)' +%% with the default store ID (see {@link +%% khepri_cluster:get_default_store_id/0}). +%% +%% @see wait_for_leader/1. +%% @see wait_for_leader/2. + +wait_for_leader() -> + StoreId = get_default_store_id(), + wait_for_leader(StoreId). + +-spec wait_for_leader(StoreIdOrRaServer) -> Ret when + StoreIdOrRaServer :: StoreId | RaServer, + StoreId :: khepri:store_id(), + RaServer :: ra:server_id(), + Ret :: ok | khepri:error(). +%% @doc Waits for a leader to be elected. +%% +%% Calling this function is the same as calling `wait_for_leader(StoreId, +%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link +%% khepri_app:get_default_timeout/0}. +%% +%% @see wait_for_leader/2. + +wait_for_leader(StoreIdOrRaServer) -> + Timeout = khepri_app:get_default_timeout(), + wait_for_leader(StoreIdOrRaServer, Timeout). + +-spec wait_for_leader(StoreIdOrRaServer, Timeout) -> Ret when + StoreIdOrRaServer :: StoreId | RaServer, + StoreId :: khepri:store_id(), + RaServer :: ra:server_id(), + Timeout :: timeout(), + Ret :: ok | khepri:error(). +%% @doc Waits for a leader to be elected. +%% +%% This is useful if you want to be sure the clustered store is ready before +%% issueing writes and queries. Note that there are obviously no guaranties +%% that the Raft quorum will be lost just after this call. +%% +%% @param StoreId the ID of the store that should elect a leader before this +%% call can return successfully. +%% @param Timeout the timeout. +%% +%% @returns `ok' if a leader was elected or an `{error, Reason}' tuple. + +wait_for_leader(StoreId, Timeout) when is_atom(StoreId) -> + ThisMember = this_member(StoreId), + wait_for_leader(ThisMember, Timeout); +wait_for_leader(RaServer, Timeout) -> + T0 = khepri_utils:start_timeout_window(Timeout), + case ra:members(RaServer, Timeout) of + {ok, _Members, _LeaderId} -> + ok; + {error, Reason} + when ?HAS_TIME_LEFT(Timeout) andalso + (Reason == noproc orelse + Reason == noconnection orelse + Reason == nodedown orelse + Reason == shutdown) -> + NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0), + NewTimeout = khepri_utils:sleep( + ?TRANSIENT_ERROR_RETRY_INTERVAL, NewTimeout0), + wait_for_leader(RaServer, NewTimeout); + {timeout, _} -> + {error, timeout}; + {error, _} = Error -> + Error + end. + -spec node_to_member(StoreId, Node) -> Member when StoreId :: khepri:store_id(), Node :: node(), diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index a6913f17..69ceb7d9 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -30,6 +30,7 @@ can_start_a_single_node/1, can_restart_a_single_node_with_ra_server_config/1, can_query_members_with_a_single_node/1, + can_wait_for_leader_with_a_single_node/1, fail_to_start_with_bad_ra_server_config/1, initial_members_are_ignored/1, can_start_a_three_node_cluster/1, @@ -38,6 +39,7 @@ can_restart_nodes_in_a_three_node_cluster/1, can_reset_a_cluster_member/1, can_query_members_with_a_three_node_cluster/1, + can_wait_for_leader_with_a_three_node_cluster/1, fail_to_join_if_not_started/1, fail_to_join_non_existing_node/1, fail_to_join_non_existing_store/1, @@ -55,6 +57,7 @@ all() -> [can_start_a_single_node, can_restart_a_single_node_with_ra_server_config, can_query_members_with_a_single_node, + can_wait_for_leader_with_a_single_node, fail_to_start_with_bad_ra_server_config, initial_members_are_ignored, can_start_a_three_node_cluster, @@ -63,6 +66,7 @@ all() -> can_restart_nodes_in_a_three_node_cluster, can_reset_a_cluster_member, can_query_members_with_a_three_node_cluster, + can_wait_for_leader_with_a_three_node_cluster, fail_to_join_if_not_started, fail_to_join_non_existing_node, fail_to_join_non_existing_store, @@ -108,6 +112,7 @@ init_per_testcase(Testcase, Config) when Testcase =:= can_start_a_single_node orelse Testcase =:= can_restart_a_single_node_with_ra_server_config orelse Testcase =:= can_query_members_with_a_single_node orelse + Testcase =:= can_wait_for_leader_with_a_single_node orelse Testcase =:= fail_to_start_with_bad_ra_server_config orelse Testcase =:= initial_members_are_ignored orelse Testcase =:= fail_to_join_non_existing_node orelse @@ -123,6 +128,7 @@ init_per_testcase(Testcase, Config) Testcase =:= can_restart_nodes_in_a_three_node_cluster orelse Testcase =:= can_reset_a_cluster_member orelse Testcase =:= can_query_members_with_a_three_node_cluster orelse + Testcase =:= can_wait_for_leader_with_a_three_node_cluster orelse Testcase =:= fail_to_join_if_not_started orelse Testcase =:= fail_to_join_non_existing_store orelse Testcase =:= handle_leader_down_on_three_node_cluster_command orelse @@ -361,6 +367,47 @@ can_query_members_with_a_single_node(Config) -> ok. +can_wait_for_leader_with_a_single_node(Config) -> + Node = node(), + #{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config), + StoreId = RaSystem, + + ct:pal("Wait for leader before starting database"), + ?assertEqual( + {error, noproc}, + khepri_cluster:wait_for_leader(StoreId)), + ?assertEqual( + {error, noproc}, + khepri_cluster:wait_for_leader(StoreId, 2000)), + + ct:pal("Start database and wait for it in parallel"), + Parent = self(), + _ = spawn_link(fun() -> + timer:sleep(2000), + ?assertEqual( + {ok, StoreId}, + khepri:start(RaSystem, StoreId)), + erlang:unlink(Parent) + end), + ?assertEqual( + ok, + khepri_cluster:wait_for_leader(StoreId, 40000)), + + ct:pal("Stop database"), + ?assertEqual( + ok, + khepri:stop(StoreId)), + + ct:pal("Wait for leader after stopping database"), + ?assertEqual( + {error, noproc}, + khepri_cluster:wait_for_leader(StoreId)), + ?assertEqual( + {error, noproc}, + khepri_cluster:wait_for_leader(StoreId, 2000)), + + ok. + fail_to_start_with_bad_ra_server_config(Config) -> Node = node(), #{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config), @@ -1298,6 +1345,100 @@ can_query_members_with_a_three_node_cluster(Config) -> ok. +can_wait_for_leader_with_a_three_node_cluster(Config) -> + PropsPerNode = ?config(ra_system_props, Config), + PeerPerNode = ?config(peer_nodes, Config), + [Node1, Node2, Node3] = Nodes = lists:sort(maps:keys(PropsPerNode)), + + %% We assume all nodes are using the same Ra system name & store ID. + #{ra_system := RaSystem} = maps:get(Node1, PropsPerNode), + StoreId = RaSystem, + + ct:pal("Wait for leader before starting database"), + lists:foreach( + fun(Node) -> + ?assertEqual( + {error, noproc}, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId])), + ?assertEqual( + {error, noproc}, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId, 2000])) + end, Nodes), + + ct:pal("Start database + cluster nodes"), + lists:foreach( + fun(Node) -> + ct:pal("- khepri:start() from node ~s", [Node]), + ?assertEqual( + {ok, StoreId}, + rpc:call(Node, khepri, start, [RaSystem, StoreId])) + end, Nodes), + lists:foreach( + fun(Node) -> + ct:pal("- khepri_cluster:join() from node ~s", [Node]), + ?assertEqual( + ok, + rpc:call(Node, khepri_cluster, join, [StoreId, Node3])) + end, [Node1, Node2]), + + ct:pal("Wait for leader after starting database"), + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId])), + ?assertEqual( + ok, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId, 2000])) + end, Nodes), + + LeaderId1 = get_leader_in_store(StoreId, Nodes), + {StoreId, LeaderNode1} = LeaderId1, + ct:pal("Stop node ~s", [LeaderNode1]), + LeaderPeer1 = proplists:get_value(LeaderNode1, PeerPerNode), + ?assertEqual(ok, stop_erlang_node(LeaderNode1, LeaderPeer1)), + + ct:pal("Wait for leader after stopping leader"), + LeftNodes1 = Nodes -- [LeaderNode1], + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId])), + ?assertEqual( + ok, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId, 2000])) + end, LeftNodes1), + + lists:foreach( + fun(Node) -> + ct:pal("Stop node ~s", [Node]), + ?assertEqual( + ok, + rpc:call(Node, khepri, stop, [StoreId])) + end, LeftNodes1), + + ct:pal("Wait for leader after stopping database"), + lists:foreach( + fun(Node) -> + ?assertEqual( + {error, noproc}, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId])), + ?assertEqual( + {error, noproc}, + erpc:call( + Node, khepri_cluster, wait_for_leader, [StoreId, 2000])) + end, LeftNodes1), + + ok. + fail_to_join_if_not_started(Config) -> PropsPerNode = ?config(ra_system_props, Config), [Node1, Node2, _Node3] = maps:keys(PropsPerNode), @@ -1396,6 +1537,8 @@ can_use_default_store_on_single_node(_Config) -> ?assertEqual({error, noproc}, khepri_cluster:nodes()), ?assertEqual({error, noproc}, khepri_cluster:locally_known_nodes()), + ?assertEqual({error, noproc}, khepri_cluster:wait_for_leader()), + {ok, StoreId} = khepri:start(), ?assert(filelib:is_dir(DataDir)), @@ -1493,6 +1636,8 @@ can_use_default_store_on_single_node(_Config) -> ?assertEqual({ok, [Node]}, khepri_cluster:nodes()), ?assertEqual({ok, [Node]}, khepri_cluster:locally_known_nodes()), + ?assertEqual(ok, khepri_cluster:wait_for_leader()), + ?assertEqual(ok, khepri:stop()), ?assertEqual({error, noproc}, khepri:get([foo])), ?assertEqual({error, noproc}, khepri:exists([foo])),