From db9c3f017f612527ee85cfd9a8673ca3b2b799e9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Sep 2021 10:31:21 +0100 Subject: [PATCH 01/12] Add option to always rebalance Never attempt a simple transfer, always rebalance when a cluster leave operation is requested --- priv/riak_core.schema | 10 ++++++++++ src/riak_core_gossip.erl | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/priv/riak_core.schema b/priv/riak_core.schema index 3c2bac226..c62ef5cc1 100644 --- a/priv/riak_core.schema +++ b/priv/riak_core.schema @@ -202,6 +202,16 @@ hidden ]}. +%% @doc On cluster leave - rebalance partitions +%% By default on a cluster leave there will be an attempt to simply handoff +%% vnodes in a random (and potentially unbalanced) way, which will also ignored +%% location preferences +{mapping, "always_rebalance_onleave", "riak_core.always_rebalance_onleave", [ + {datatype, flag}, + {default, off} +]}. + + %% Async Job Management %% %% This is a translation for mappings that appear in other schema files. diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 944a502f1..5c54004f1 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -375,7 +375,7 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> case attempt_simple_transfer(Seed, Ring, AllOwners, ExitingNode) of {ok, NR} -> NR; - target_n_fail -> + Err when Err == target_n_fail; Err == always_rebalance_onleave -> %% re-diagonalize %% first hand off all claims to *any* one else, %% just so rebalance doesn't include exiting node @@ -393,12 +393,19 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> ExitRing. attempt_simple_transfer(Seed, Ring, Owners, ExitingNode) -> + attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, + app_helper:get_env(riak_core, always_rebalance_onleave, false)). + +attempt_simple_transfer(_Seed, _Ring, _Owners, _ExitingNode, true) -> + always_rebalance_onleave; +attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> TargetN = app_helper:get_env(riak_core, target_n_val), attempt_simple_transfer(Seed, Ring, Owners, TargetN, ExitingNode, 0, [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring), O /= ExitingNode]). + attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> %% handoff case [ N || {N, I} <- Last, Idx-I >= TargetN ] of From 256d12d38d1f234b86ca7414c8f39048b253c561 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Sep 2021 14:56:46 +0100 Subject: [PATCH 02/12] Use sequential_claim not claim_rebalance_n sequential_claim is the refactored version not the v1 version (which should handle tail violations). --- src/riak_core_claim.erl | 7 ++++++- src/riak_core_gossip.erl | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/riak_core_claim.erl b/src/riak_core_claim.erl index f205698db..59bc30998 100644 --- a/src/riak_core_claim.erl +++ b/src/riak_core_claim.erl @@ -64,7 +64,8 @@ choose_claim_v2/1, choose_claim_v2/2, choose_claim_v2/3, choose_claim_v3/1, choose_claim_v3/2, choose_claim_v3/3, claim_rebalance_n/2, claim_diversify/3, claim_diagonal/3, - wants/1, wants_owns_diff/2, meets_target_n/2, diagonal_stripe/2]). + wants/1, wants_owns_diff/2, meets_target_n/2, diagonal_stripe/2, + sequential_claim/2]). -ifdef(TEST). -compile(export_all). @@ -620,6 +621,10 @@ claim_diagonal(Wants, Owners, Params) -> end, {lists:flatten([lists:duplicate(Reps, Claiming), Last]), [diagonalized]}. +sequential_claim(Ring, Node) -> + TN = app_helper:get_env(riak_core, target_n_val, ?DEF_TARGET_N), + sequential_claim(Ring, Node, TN). + %% @private fall back to diagonal striping vnodes across nodes in a %% sequential round robin (eg n1 | n2 | n3 | n4 | n5 | n1 | n2 | n3 %% etc) However, different to `claim_rebalance_n', this function diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 5c54004f1..988934e0d 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -388,7 +388,7 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> end, Ring, AllOwners), - riak_core_claim:claim_rebalance_n(TempRing, Other) + riak_core_claim:sequential_claim(TempRing, Other) end, ExitRing. From 7a3d7424f2e973c5eafbda2d5c83756a3a8f8de8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Sep 2021 19:48:30 +0100 Subject: [PATCH 03/12] Simple transfer to track counts Prefer to transfer to a node with a lower current count of vnodes - so that by default simple_transfer will tend to give more balanced results --- src/riak_core_claim.erl | 2 +- src/riak_core_gossip.erl | 69 ++++++++++++++++++++++++++-------------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/riak_core_claim.erl b/src/riak_core_claim.erl index 59bc30998..d337a2f31 100644 --- a/src/riak_core_claim.erl +++ b/src/riak_core_claim.erl @@ -65,7 +65,7 @@ choose_claim_v3/1, choose_claim_v3/2, choose_claim_v3/3, claim_rebalance_n/2, claim_diversify/3, claim_diagonal/3, wants/1, wants_owns_diff/2, meets_target_n/2, diagonal_stripe/2, - sequential_claim/2]). + sequential_claim/2, get_counts/2]). -ifdef(TEST). -compile(export_all). diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 988934e0d..cded48034 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -400,47 +400,70 @@ attempt_simple_transfer(_Seed, _Ring, _Owners, _ExitingNode, true) -> always_rebalance_onleave; attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> TargetN = app_helper:get_env(riak_core, target_n_val), + Counts = riak_core_claim:get_counts(Owners, Owners), attempt_simple_transfer(Seed, Ring, Owners, TargetN, ExitingNode, 0, [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring), - O /= ExitingNode]). + O /= ExitingNode], + Counts). -attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> - %% handoff +attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], + TargetN, Exit, Idx, Last, Counts) -> case [ N || {N, I} <- Last, Idx-I >= TargetN ] of [] -> target_n_fail; Candidates -> - %% these nodes don't violate target_n in the reverse direction - StepsToNext = fun(Node) -> - length(lists:takewhile( - fun({_, Owner}) -> Node /= Owner end, - Rest)) - end, - case lists:filter(fun(N) -> - Next = StepsToNext(N), - (Next+1 >= TargetN) - orelse (Next == length(Rest)) - end, - Candidates) of + CheckNextFun = + %% these nodes don't violate target_n in the reverse direction + fun(Node) -> + Next = + length( + lists:takewhile( + fun({_, Owner}) -> Node /= Owner end, + Rest)), + (Next + 1 >= TargetN) orelse (Next == length(Rest)) + end, + Qualifiers = lists:filter(CheckNextFun, Candidates), + ScoreFun = + fun(Node, {LowCount, LowNodes}) -> + {Node, CurrentCount} = + lists:keyfind(Node, 1, Counts), + case CurrentCount of + LowCount -> + {LowCount, [Node|LowNodes]}; + NewCount when NewCount < LowCount -> + {NewCount, [Node]}; + _ -> + {LowCount, LowNodes} + end + end, + {PreferredCount, PreferredCandidates} = + lists:foldl(ScoreFun, {infinity, []}, Qualifiers), + case PreferredCandidates of [] -> target_n_fail; - Qualifiers -> + PreferredCandidates -> %% these nodes don't violate target_n forward - {Rand, Seed2} = rand:uniform_s(length(Qualifiers), Seed), - Chosen = lists:nth(Rand, Qualifiers), + {Rand, Seed2} = + rand:uniform_s(length(PreferredCandidates), Seed), + Chosen = lists:nth(Rand, PreferredCandidates), %% choose one, and do the rest of the ring attempt_simple_transfer( Seed2, riak_core_ring:transfer_node(P, Chosen, Ring), Rest, TargetN, Exit, Idx+1, - lists:keyreplace(Chosen, 1, Last, {Chosen, Idx})) + lists:keyreplace(Chosen, 1, Last, + {Chosen, Idx}), + lists:keyreplace(Chosen, 1, Counts, + {Chosen, PreferredCount + 1})) end end; -attempt_simple_transfer(Seed, Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) -> +attempt_simple_transfer(Seed, Ring, [{_, N}|Rest], + TargetN, Exit, Idx, Last, Counts) -> %% just keep track of seeing this node - attempt_simple_transfer(Seed, Ring, Rest, TargetN, Exit, Idx+1, - lists:keyreplace(N, 1, Last, {N, Idx})); -attempt_simple_transfer(_, Ring, [], _, _, _, _) -> + attempt_simple_transfer(Seed, Ring, Rest, TargetN, Exit, Idx + 1, + lists:keyreplace(N, 1, Last, {N, Idx}), + Counts); +attempt_simple_transfer(_, Ring, [], _, _, _, _, _) -> {ok, Ring}. From 2c0d1488f4841bd9a02ee19a5a080a876737d1ca Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Sep 2021 23:44:04 +0100 Subject: [PATCH 04/12] Correct inputs to get_counts/2 --- src/riak_core_gossip.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index cded48034..751840cb5 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -400,7 +400,9 @@ attempt_simple_transfer(_Seed, _Ring, _Owners, _ExitingNode, true) -> always_rebalance_onleave; attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> TargetN = app_helper:get_env(riak_core, target_n_val), - Counts = riak_core_claim:get_counts(Owners, Owners), + Counts = + riak_core_claim:get_counts(riak_core_ring:claiming_members(Ring), + Owners), attempt_simple_transfer(Seed, Ring, Owners, TargetN, ExitingNode, 0, From 7f3a1a1d1d2b64ddfa5e060eacc0c0570ad0f275 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 29 Sep 2021 09:08:17 +0100 Subject: [PATCH 05/12] Rename and re-explain Also include warning in the plan output --- priv/riak_core.schema | 16 +++++++++++----- src/riak_core_console.erl | 10 ++++++++++ src/riak_core_gossip.erl | 6 +++--- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/priv/riak_core.schema b/priv/riak_core.schema index c62ef5cc1..8ca4dfb13 100644 --- a/priv/riak_core.schema +++ b/priv/riak_core.schema @@ -202,11 +202,17 @@ hidden ]}. -%% @doc On cluster leave - rebalance partitions -%% By default on a cluster leave there will be an attempt to simply handoff -%% vnodes in a random (and potentially unbalanced) way, which will also ignored -%% location preferences -{mapping, "always_rebalance_onleave", "riak_core.always_rebalance_onleave", [ +%% @doc On cluster leave - force full rebalance partitions +%% By default on a cluster leave there will first be an attempt to handoff +%% vnodes to safe (in terms of target_n_val) locations. In small clusters, +%% there may be insufficient safe locations, and a temporary state can be +%% created where a single node has a large number of vnodes. +%% To mitigate this, a full rebalance (a re-assignment that does not optimise +%% based on the starting position), can be forced by setting this option on +%% all nodes. +%% Please carefully consider any cluster plan created with this option before +%% committing +{mapping, "full_rebalance_onleave", "riak_core.full_rebalance_onleave", [ {datatype, flag}, {default, off} ]}. diff --git a/src/riak_core_console.erl b/src/riak_core_console.erl index fcb085799..50ff1a328 100644 --- a/src/riak_core_console.erl +++ b/src/riak_core_console.erl @@ -685,6 +685,16 @@ print_plan(Changes, Ring, NextRings) -> "cluster transitions~n~n", [Transitions]) end, + Leaves = length(lists:filter(fun({_N, A}) -> A == leave end, Changes)) > 0, + + case Leaves and app_helper:get_env(riak_core, full_rebalance_onleave) of + true -> + io:format("WARNING: full rebalance forced through non-default " + "option riak_core.full_rebalance_onleave = true~n~n"); + _ -> + ok + end, + _ = lists:foldl(fun({Ring1, Ring2}, I) -> io:format("~79..#s~n", [""]), io:format("~24.. s After cluster transition ~b/~b~n", diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 751840cb5..3b97b5205 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -375,7 +375,7 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> case attempt_simple_transfer(Seed, Ring, AllOwners, ExitingNode) of {ok, NR} -> NR; - Err when Err == target_n_fail; Err == always_rebalance_onleave -> + Err when Err == target_n_fail; Err == full_rebalance_onleave -> %% re-diagonalize %% first hand off all claims to *any* one else, %% just so rebalance doesn't include exiting node @@ -394,10 +394,10 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> attempt_simple_transfer(Seed, Ring, Owners, ExitingNode) -> attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, - app_helper:get_env(riak_core, always_rebalance_onleave, false)). + app_helper:get_env(riak_core, full_rebalance_onleave, false)). attempt_simple_transfer(_Seed, _Ring, _Owners, _ExitingNode, true) -> - always_rebalance_onleave; + full_rebalance_onleave; attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> TargetN = app_helper:get_env(riak_core, target_n_val), Counts = From 29c8e50098038fbe0387227c2bf404268cf6e20a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 29 Sep 2021 10:29:33 +0100 Subject: [PATCH 06/12] Tidy-up console warning --- src/riak_core_console.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_core_console.erl b/src/riak_core_console.erl index 50ff1a328..bdd75b2ca 100644 --- a/src/riak_core_console.erl +++ b/src/riak_core_console.erl @@ -689,7 +689,7 @@ print_plan(Changes, Ring, NextRings) -> case Leaves and app_helper:get_env(riak_core, full_rebalance_onleave) of true -> - io:format("WARNING: full rebalance forced through non-default " + io:format("WARNING: Full rebalance forced by non-default " "option riak_core.full_rebalance_onleave = true~n~n"); _ -> ok From 07045482683066d769a4e6b30fe4d632f847185e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 29 Sep 2021 20:46:27 +0100 Subject: [PATCH 07/12] Add comments Reviewed previous code, to ensure this is being handled correctly. However, this does appear to be incorrect - as the current simple_transfer does not handle tail violations. It only checks forward to the end of the list for TargetN violation - it does not wrap around to the start of the list. --- src/riak_core_gossip.erl | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 3b97b5205..def63d776 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -412,12 +412,26 @@ attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last, Counts) -> - case [ N || {N, I} <- Last, Idx-I >= TargetN ] of + %% If a node has been allocated a partition, or been seen as the existing + %% owner of the partition within the last TargetN loops, then it cannot be + %% a candidate for this partition. + %% + %% If TargetN is 4, then this is the list of Nodes which have not been + %% allocated any of the last 3 partitions + NotLastTargetN = [ N || {N, I} <- Last, Idx-I >= TargetN ], + + case NotLastTargetN of [] -> target_n_fail; Candidates -> + %% Look forward to see if this Node has been allocated already any + %% of the next (TargetN - 1) partitions - in which case, it is not + %% a safe home for this partition and will be filtered from the + %% Candidate list. + %% + %% If TargetN is 4, then this is the list of Nodes which have not + %% been allocated any of the next 3 partitions CheckNextFun = - %% these nodes don't violate target_n in the reverse direction fun(Node) -> Next = length( @@ -427,6 +441,10 @@ attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], (Next + 1 >= TargetN) orelse (Next == length(Rest)) end, Qualifiers = lists:filter(CheckNextFun, Candidates), + + %% Look at the current allocated vnode counts for each qualifying + %% node, and choose one which has the lowest of these counts + %% (choosing at random between ties on count) ScoreFun = fun(Node, {LowCount, LowNodes}) -> {Node, CurrentCount} = @@ -442,15 +460,17 @@ attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], end, {PreferredCount, PreferredCandidates} = lists:foldl(ScoreFun, {infinity, []}, Qualifiers), + + %% Final selection of a node as a destination for this partition, + %% The node Counts must be updated to reflect any allocation, as + %% well as the "Last" list of already allocated partitions. case PreferredCandidates of [] -> target_n_fail; PreferredCandidates -> - %% these nodes don't violate target_n forward {Rand, Seed2} = rand:uniform_s(length(PreferredCandidates), Seed), Chosen = lists:nth(Rand, PreferredCandidates), - %% choose one, and do the rest of the ring attempt_simple_transfer( Seed2, riak_core_ring:transfer_node(P, Chosen, Ring), From 1695743bb2f9e6cde186f76186b7128e8ef2cd94 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 30 Sep 2021 15:37:27 +0100 Subject: [PATCH 08/12] Refactored to handle tail violations Initial unit test added --- src/riak_core_gossip.erl | 286 +++++++++++++++++++++++++-------------- 1 file changed, 183 insertions(+), 103 deletions(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index def63d776..946f9793d 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -43,6 +43,10 @@ -include("riak_core_ring.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %% Default gossip rate: allow at most 45 gossip messages every 10 seconds -define(DEFAULT_LIMIT, {45, 10000}). @@ -367,125 +371,201 @@ remove_from_cluster(Ring, ExitingNode) -> remove_from_cluster(Ring, ExitingNode, rand:seed(exrop, os:timestamp())). remove_from_cluster(Ring, ExitingNode, Seed) -> - % Get a list of indices owned by the ExitingNode... - AllOwners = riak_core_ring:all_owners(Ring), - % Transfer indexes to other nodes... ExitRing = - case attempt_simple_transfer(Seed, Ring, AllOwners, ExitingNode) of + case attempt_simple_transfer(Seed, Ring, ExitingNode) of {ok, NR} -> NR; - Err when Err == target_n_fail; Err == full_rebalance_onleave -> + _ -> %% re-diagonalize %% first hand off all claims to *any* one else, %% just so rebalance doesn't include exiting node + Owners = riak_core_ring:all_owners(Ring), Members = riak_core_ring:claiming_members(Ring), - Other = hd(lists:delete(ExitingNode, Members)), - TempRing = lists:foldl( - fun({I,N}, R) when N == ExitingNode -> - riak_core_ring:transfer_node(I, Other, R); - (_, R) -> R - end, - Ring, - AllOwners), - riak_core_claim:sequential_claim(TempRing, Other) + HN = hd(lists:delete(ExitingNode, Members)), + TempRing = + lists:foldl(fun({I,N}, R) when N == ExitingNode -> + riak_core_ring:transfer_node(I, HN, R); + (_, R) -> + R + end, + Ring, + Owners), + riak_core_claim:sequential_claim(TempRing, HN) end, ExitRing. -attempt_simple_transfer(Seed, Ring, Owners, ExitingNode) -> - attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, - app_helper:get_env(riak_core, full_rebalance_onleave, false)). - -attempt_simple_transfer(_Seed, _Ring, _Owners, _ExitingNode, true) -> - full_rebalance_onleave; -attempt_simple_transfer(Seed, Ring, Owners, ExitingNode, false) -> - TargetN = app_helper:get_env(riak_core, target_n_val), - Counts = - riak_core_claim:get_counts(riak_core_ring:claiming_members(Ring), - Owners), - attempt_simple_transfer(Seed, Ring, Owners, - TargetN, - ExitingNode, 0, - [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring), - O /= ExitingNode], - Counts). - -attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], - TargetN, Exit, Idx, Last, Counts) -> - %% If a node has been allocated a partition, or been seen as the existing - %% owner of the partition within the last TargetN loops, then it cannot be - %% a candidate for this partition. - %% - %% If TargetN is 4, then this is the list of Nodes which have not been - %% allocated any of the last 3 partitions - NotLastTargetN = [ N || {N, I} <- Last, Idx-I >= TargetN ], +-ifdef(TEST). +-type transfer_ring() :: [{integer(), term()}]. +-else. +-type transfer_ring() :: riak_core_ring:riak_core_ring(). +-endif. + +%% @doc Simple transfer of leaving node's vnodes to safe place +%% Where safe place is any node that satisfies target_n_val for that vnode - +%% but with a preference to transfer to a node that has a lower number of +%% vnodes currently allocated. +%% If safe places cannot be found for all vnodes returns `target_n_fail` +%% Simple transfer is not location aware, but generally this wll be an initial +%% phase of a plan, and hence a temporary home - so location awareness is not +%% necessary. +%% `riak_core.full_rebalance_onleave = true` may be used to avoid this step, +%% although this may result in a large number of transfers +-spec attempt_simple_transfer(random:ran(), + transfer_ring(), + term()) -> + {ok, transfer_ring()}| + target_n_fail| + force_rebalance. +attempt_simple_transfer(Seed, Ring, ExitingNode) -> + ForceRebalance = + app_helper:get_env(riak_core, full_rebalance_onleave, false), + case ForceRebalance of + true -> + force_rebalance; + false -> + TargetN = app_helper:get_env(riak_core, target_n_val), + Owners = riak_core_ring:all_owners(Ring), + Counts = + riak_core_claim:get_counts( + riak_core_ring:claiming_members(Ring), + Owners), + RingFun = + fun(Partition, Node, R) -> + riak_core_ring:transfer_node(Partition, Node, R), + R + end, + simple_transfer(Owners, + {RingFun, TargetN, ExitingNode}, + Ring, + {Seed, [], Counts}) + end. - case NotLastTargetN of +%% @doc Simple transfer of leaving node's vnodes to safe place +-spec simple_transfer([{integer(), term()}], + {fun((integer(), + term(), + transfer_ring()) -> transfer_ring()), + pos_integer(), + term()}, + transfer_ring(), + {random:ran(), + [{integer(), term()}], + [{term(), non_neg_integer()}]}) -> + {ok, transfer_ring()}|target_n_fail. +simple_transfer([{P, ExitingNode}|Rest], + {RingFun, TargetN, ExitingNode}, + Ring, + {Seed, Prev, Counts}) -> + %% The ring is split into two parts: + %% Rest - this is forward looking from the current partition, in partition + %% order + %% Prev - this is the part of the ring that has already been processed, + %% which is also in partition order + %% + %% With a ring size of 8, having looped to partition 3: + %% Rest = [{4, N4}, {5, N5}, {6, N6}, {7, N7}] + %% Prev = [{2, N2}, {1, N1}, {0, N0}] + %% + %% If we have a partition that is on the Exiting Node it is necessary to + %% look forward (TargetN - 1) allocations in Rest. It is also necessary + %% to look backward (TargetN - 1) allocations in Prev (from the rear of the + %% Prev list). + %% + %% This must be treated as a Ring though - as we reach an end of the list + %% the search must wrap around to the other end of the alternate list (i.e. + %% from 0 -> 7 and from 7 -> 0). + CheckRingFun = + fun(ForwardL, BackL) -> + Steps = TargetN - 1, + UnsafeNodeTuples = + case length(ForwardL) of + L when L < Steps -> + ForwardL ++ + lists:sublist(lists:reverse(BackL), Steps - L); + _ -> + lists:sublist(ForwardL, Steps) + end, + fun({Node, _Count}) -> + not lists:keymember(Node, 2, UnsafeNodeTuples) + end + end, + %% Filter candidate Nodes looking back in the ring at previous allocations + CandidatesB = lists:filter(CheckRingFun(Prev, Rest), Counts), + %% Filter candidate Nodes looking forward in the ring at existing + %% allocations + CandidatesF = lists:filter(CheckRingFun(Rest, Prev), CandidatesB), + + %% Qualifying candidates will be tuples of {Node, Count} where the Count + %% is that node's current count of allocated vnodes + case CandidatesF of [] -> target_n_fail; - Candidates -> - %% Look forward to see if this Node has been allocated already any - %% of the next (TargetN - 1) partitions - in which case, it is not - %% a safe home for this partition and will be filtered from the - %% Candidate list. - %% - %% If TargetN is 4, then this is the list of Nodes which have not - %% been allocated any of the next 3 partitions - CheckNextFun = - fun(Node) -> - Next = - length( - lists:takewhile( - fun({_, Owner}) -> Node /= Owner end, - Rest)), - (Next + 1 >= TargetN) orelse (Next == length(Rest)) - end, - Qualifiers = lists:filter(CheckNextFun, Candidates), - + Qualifiers -> %% Look at the current allocated vnode counts for each qualifying - %% node, and choose one which has the lowest of these counts - %% (choosing at random between ties on count) - ScoreFun = - fun(Node, {LowCount, LowNodes}) -> - {Node, CurrentCount} = - lists:keyfind(Node, 1, Counts), - case CurrentCount of - LowCount -> - {LowCount, [Node|LowNodes]}; - NewCount when NewCount < LowCount -> - {NewCount, [Node]}; - _ -> - {LowCount, LowNodes} - end - end, - {PreferredCount, PreferredCandidates} = - lists:foldl(ScoreFun, {infinity, []}, Qualifiers), + %% node, and find all qualifying nodes with the lowest of these + %% counts + [{Q0, BestCnt}|Others] = lists:keysort(2, Qualifiers), + PreferredCandidates = + [{Q0, BestCnt}| + lists:takewhile(fun({_, C}) -> C == BestCnt end, Others)], %% Final selection of a node as a destination for this partition, - %% The node Counts must be updated to reflect any allocation, as - %% well as the "Last" list of already allocated partitions. - case PreferredCandidates of - [] -> - target_n_fail; - PreferredCandidates -> - {Rand, Seed2} = - rand:uniform_s(length(PreferredCandidates), Seed), - Chosen = lists:nth(Rand, PreferredCandidates), - attempt_simple_transfer( - Seed2, - riak_core_ring:transfer_node(P, Chosen, Ring), - Rest, TargetN, Exit, Idx+1, - lists:keyreplace(Chosen, 1, Last, - {Chosen, Idx}), - lists:keyreplace(Chosen, 1, Counts, - {Chosen, PreferredCount + 1})) - end + %% The node Counts must be updated to reflect this allocation, and + %% the RingFun applied to actually queue the transfer + {Rand, Seed2} = rand:uniform_s(length(PreferredCandidates), Seed), + {Chosen, BestCnt} = lists:nth(Rand, PreferredCandidates), + UpdRing = RingFun(P, Chosen, Ring), + UpdCounts = + lists:keyreplace(Chosen, 1, Counts, {Chosen, BestCnt + 1}), + simple_transfer(Rest, + {RingFun, TargetN, ExitingNode}, + UpdRing, + {Seed2, [{P, Chosen}|Prev], UpdCounts}) end; -attempt_simple_transfer(Seed, Ring, [{_, N}|Rest], - TargetN, Exit, Idx, Last, Counts) -> - %% just keep track of seeing this node - attempt_simple_transfer(Seed, Ring, Rest, TargetN, Exit, Idx + 1, - lists:keyreplace(N, 1, Last, {N, Idx}), - Counts); -attempt_simple_transfer(_, Ring, [], _, _, _, _, _) -> +simple_transfer([{P, N}|Rest], Statics, Ring, {Seed, Prev, Counts}) -> + %% This is already allocated to a node other than the exiting node, so + %% simply transition to the Previous ring accumulator + simple_transfer(Rest, Statics, Ring, {Seed, [{P, N}|Prev], Counts}); +simple_transfer([], _Statics, Ring, _LoopAccs) -> {ok, Ring}. + + +%% =================================================================== +%% Unit tests +%% =================================================================== + +-ifdef(TEST). + +test_ring_fun(P, N, R) -> + io:format("~p ~p ~p", [P, N, R]), + lists:keyreplace(P, 1, R, {P, N}). + +count_nodes(TestRing) -> + CountFun = + fun({_P, N}, Acc) -> + case lists:keyfind(N, 1, Acc) of + false -> + lists:ukeysort(1, [{N, 1}|Acc]); + {N, C} -> + lists:ukeysort(1, [{N, C + 1}|Acc]) + end + end, + lists:foldl(CountFun, [], TestRing). + +simple_transfer_simple_test() -> + R0 = [{0, n5}, {1, n1}, {2, n2}, {3, n3}, + {4, n4}, {5, n5}, {6, n3}, {7, n2}], + SomeTime = {1632,989499,279637}, + FixedSeed = rand:seed(exrop, SomeTime), + Counts = lists:keydelete(n4, 1, count_nodes(R0)), + {ok, R1} = + simple_transfer(R0, + {fun test_ring_fun/3, 2, n4}, + R0, + {FixedSeed, [], Counts}), + ?assertMatch({4, n1}, lists:keyfind(4, 1, R1)). + + +-endif. + From accc8591dba264cb962997e74d30e24ff0b61f2e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 30 Sep 2021 19:34:36 +0100 Subject: [PATCH 09/12] Extend simple_transfer eunit tests --- src/riak_core_gossip.erl | 96 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 91 insertions(+), 5 deletions(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 946f9793d..581e2f664 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -538,7 +538,6 @@ simple_transfer([], _Statics, Ring, _LoopAccs) -> -ifdef(TEST). test_ring_fun(P, N, R) -> - io:format("~p ~p ~p", [P, N, R]), lists:keyreplace(P, 1, R, {P, N}). count_nodes(TestRing) -> @@ -558,14 +557,101 @@ simple_transfer_simple_test() -> {4, n4}, {5, n5}, {6, n3}, {7, n2}], SomeTime = {1632,989499,279637}, FixedSeed = rand:seed(exrop, SomeTime), - Counts = lists:keydelete(n4, 1, count_nodes(R0)), {ok, R1} = simple_transfer(R0, - {fun test_ring_fun/3, 2, n4}, + {fun test_ring_fun/3, 3, n4}, R0, - {FixedSeed, [], Counts}), - ?assertMatch({4, n1}, lists:keyfind(4, 1, R1)). + {FixedSeed, + [], + lists:keydelete(n4, 1, count_nodes(R0))}), + ?assertMatch({4, n1}, lists:keyfind(4, 1, R1)), + {ok, R2} = + simple_transfer(R0, + {fun test_ring_fun/3, 3, n5}, + R0, + {FixedSeed, + [], + lists:keydelete(n5, 1, count_nodes(R0))}), + ?assertMatch({0, n4}, lists:keyfind(0, 1, R2)), + ?assertMatch({5, n1}, lists:keyfind(5, 1, R2)), + + {ok, R3} = + simple_transfer(R0, + {fun test_ring_fun/3, 3, n1}, + R0, + {FixedSeed, + [], + lists:keydelete(n1, 1, count_nodes(R0))}), + ?assertMatch({1, n4}, lists:keyfind(1, 1, R3)), + + target_n_fail = + simple_transfer(R0, + {fun test_ring_fun/3, 3, n3}, + R0, + {FixedSeed, + [], + lists:keydelete(n3, 1, count_nodes(R0))}), + + target_n_fail = + simple_transfer(R0, + {fun test_ring_fun/3, 3, n2}, + R0, + {FixedSeed, + [], + lists:keydelete(n2, 1, count_nodes(R0))}), + + %% Target n failures due to wrap-around tail violations + R4 = [{0, n5}, {1, n1}, {2, n2}, {3, n3}, + {4, n4}, {5, n2}, {6, n3}, {7, n4}], + + target_n_fail = + simple_transfer(R4, + {fun test_ring_fun/3, 3, n5}, + R4, + {FixedSeed, + [], + lists:keydelete(n5, 1, count_nodes(R4))}), + + target_n_fail = + simple_transfer(R4, + {fun test_ring_fun/3, 3, n4}, + R4, + {FixedSeed, + [], + lists:keydelete(n4, 1, count_nodes(R4))}). + +simple_transfer_evendistribution_test() -> + R0 = [{0, n1}, {1, n2}, {2, n3}, {3, n4}, {4, n5}, + {5, n6}, {6, n7}, {7, n8}, {8, n9}, {9, n10}, + {10, n1}, {11, n2}, {12, n3}, {13, n4}, {14, n5}, + {15, n6}, {16, n7}, {17, n8}, {18, n9}, {19, n10}, + {20, n1}, {21, n2}, {22, n3}, {23, n4}, {24, n5}, + {25, n6}, {26, n7}, {27, n8}, {28, n9}, {29, n10}, + {30, n1}, {31, n2}, {32, n3}, {33, n4}, {34, n5}, + {35, n6}, {36, n7}, {37, n8}, {38, n9}, {39, n10}, + {40, n1}, {41, n2}, {42, n3}, {43, n4}, {44, n5}, + {45, n6}, {46, n7}, {47, n8}, {48, n9}, {49, n10}, + {50, n1}, {51, n2}, {52, n3}, {53, n4}, {54, n5}, + {55, n6}, {56, n1}, {57, n2}, {58, n3}, {59, n10}, + {60, n5}, {61, n6}, {62, n7}, {63, n8}], + + SomeTime = {1632,989499,279637}, + FixedSeed = rand:seed(exrop, SomeTime), + {ok, R1} = + simple_transfer(R0, + {fun test_ring_fun/3, 3, n1}, + R0, + {FixedSeed, + [], + lists:keydelete(n1, 1, count_nodes(R0))}), + + NodeCounts = lists:keysort(2, count_nodes(R1)), + io:format("NodeCounts ~w~n", [NodeCounts]), + [{_LN, LC}|Rest] = NodeCounts, + [{_HN, HC}|_] = lists:reverse(Rest), + true = HC - LC == 2. + -endif. From 72343fd2c4d96d13f58521fb195b772ab654e7ff Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 4 Oct 2021 09:42:34 +0100 Subject: [PATCH 10/12] Code review --- src/riak_core_gossip.erl | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 581e2f664..b68294fb7 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -372,16 +372,17 @@ remove_from_cluster(Ring, ExitingNode) -> remove_from_cluster(Ring, ExitingNode, Seed) -> % Transfer indexes to other nodes... + Owners = riak_core_ring:all_owners(Ring), + Members = riak_core_ring:claiming_members(Ring), ExitRing = - case attempt_simple_transfer(Seed, Ring, ExitingNode) of + case attempt_simple_transfer(Ring, ExitingNode, Seed, + Owners, Members) of {ok, NR} -> NR; _ -> %% re-diagonalize %% first hand off all claims to *any* one else, %% just so rebalance doesn't include exiting node - Owners = riak_core_ring:all_owners(Ring), - Members = riak_core_ring:claiming_members(Ring), HN = hd(lists:delete(ExitingNode, Members)), TempRing = lists:foldl(fun({I,N}, R) when N == ExitingNode -> @@ -411,13 +412,15 @@ remove_from_cluster(Ring, ExitingNode, Seed) -> %% necessary. %% `riak_core.full_rebalance_onleave = true` may be used to avoid this step, %% although this may result in a large number of transfers --spec attempt_simple_transfer(random:ran(), - transfer_ring(), - term()) -> +-spec attempt_simple_transfer(transfer_ring(), + term(), + random:ran(), + [{integer(), term()}], + [term()]) -> {ok, transfer_ring()}| target_n_fail| force_rebalance. -attempt_simple_transfer(Seed, Ring, ExitingNode) -> +attempt_simple_transfer(Ring, ExitingNode, Seed, Owners, Members) -> ForceRebalance = app_helper:get_env(riak_core, full_rebalance_onleave, false), case ForceRebalance of @@ -425,20 +428,17 @@ attempt_simple_transfer(Seed, Ring, ExitingNode) -> force_rebalance; false -> TargetN = app_helper:get_env(riak_core, target_n_val), - Owners = riak_core_ring:all_owners(Ring), Counts = - riak_core_claim:get_counts( - riak_core_ring:claiming_members(Ring), - Owners), + riak_core_claim:get_counts(Members, Owners), RingFun = fun(Partition, Node, R) -> riak_core_ring:transfer_node(Partition, Node, R), R end, simple_transfer(Owners, - {RingFun, TargetN, ExitingNode}, - Ring, - {Seed, [], Counts}) + {RingFun, TargetN, ExitingNode}, + Ring, + {Seed, [], Counts}) end. %% @doc Simple transfer of leaving node's vnodes to safe place From d92e163b573f4a97e7938c9db03ae541d7105bfc Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 4 Oct 2021 14:07:15 +0100 Subject: [PATCH 11/12] Clarify comments --- src/riak_core_gossip.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index b68294fb7..21f5d1055 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -459,9 +459,9 @@ simple_transfer([{P, ExitingNode}|Rest], {Seed, Prev, Counts}) -> %% The ring is split into two parts: %% Rest - this is forward looking from the current partition, in partition - %% order + %% order (ascending by partition number) %% Prev - this is the part of the ring that has already been processed, - %% which is also in partition order + %% which is also in partition order (but descending by index number) %% %% With a ring size of 8, having looped to partition 3: %% Rest = [{4, N4}, {5, N5}, {6, N6}, {7, N7}] @@ -487,10 +487,14 @@ simple_transfer([{P, ExitingNode}|Rest], lists:sublist(ForwardL, Steps) end, fun({Node, _Count}) -> + %% Nodes will remian as candidates if they are not in the list + %% of unsafe nodes not lists:keymember(Node, 2, UnsafeNodeTuples) end end, - %% Filter candidate Nodes looking back in the ring at previous allocations + %% Filter candidate Nodes looking back in the ring at previous allocations. + %% The starting list of candidates is the list the claiming members in + %% Counts. CandidatesB = lists:filter(CheckRingFun(Prev, Rest), Counts), %% Filter candidate Nodes looking forward in the ring at existing %% allocations From cc9535396717c0a8de8793885e88c8e56c73a7be Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 7 Oct 2021 11:57:18 +0100 Subject: [PATCH 12/12] Additional test following review Just to explicitly state the requirement that input to the function (Owners) must be sorted. This is a common assumption in riak_core, and was an assumption of the replaced function. --- src/riak_core_gossip.erl | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 21f5d1055..4d40477ce 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -442,6 +442,9 @@ attempt_simple_transfer(Ring, ExitingNode, Seed, Owners, Members) -> end. %% @doc Simple transfer of leaving node's vnodes to safe place +%% Iterates over Owners, which must be sorted by Index (from 0...), and +%% attempts to safely re-allocate each ownerhsip which is currently set to +%% the exiting node -spec simple_transfer([{integer(), term()}], {fun((integer(), term(), @@ -624,7 +627,23 @@ simple_transfer_simple_test() -> {FixedSeed, [], lists:keydelete(n4, 1, count_nodes(R4))}). - + +simple_transfer_needstobesorted_test() -> + lists:foreach(fun transfer_needstobesorted_tester/1, lists:seq(1, 100)). + +transfer_needstobesorted_tester(I) -> + R0 = [{6,n3}, {13,n3}, {12,n6}, {11,n5}, {10,n4}, {9,n3}, {8,n2}, + {7,n1}, {5,n6}, {4,n5}, {3,n4}, {2,n3}, {1,n2}, {0,n1}], + VariableSeed = rand:seed(exrop, {1632, 989499, I * 13}), + {ok, R1} = + simple_transfer(lists:keysort(1, R0), + {fun test_ring_fun/3, 3, n3}, + R0, + {VariableSeed, + [], + lists:keydelete(n3, 1, count_nodes(R0))}), + ?assertMatch({13, n4}, lists:keyfind(13, 1, R1)). + simple_transfer_evendistribution_test() -> R0 = [{0, n1}, {1, n2}, {2, n3}, {3, n4}, {4, n5}, {5, n6}, {6, n7}, {7, n8}, {8, n9}, {9, n10},