-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sticky partition assignor #1416
Conversation
That's really nice, lot going on here, give me some time to read and review, looks great on a cursory look 👍 |
…he active strategy name against the sticky balance strategy name when handling a sync group request to allow for easy extension of the sticky balance strategy
…d if the assignment counts are identical
balance_strategy.go
Outdated
for partition := range partition2AllPotentialConsumers { | ||
sortedPartionIDs = append(sortedPartionIDs, partition) | ||
} | ||
sort.SliceStable(sortedPartionIDs, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java code here has three levels of comparison:
- number of consumers, which is what is done here
- topic name
- partition
872 public int compare(TopicPartition o1, TopicPartition o2) {
873 int ret = map.get(o1).size() - map.get(o2).size();
874 if (ret == 0) {
875 ret = o1.topic().compareTo(o2.topic());
876 if (ret == 0)
877 ret = o1.partition() - o2.partition();
878 }
879 return ret;
880 }
This misses the latter two. SliceStable
is unnecessary once they're added in.
I think that this also sorts from smallest to largest, rather than what you do here of largest to smallest:
545 TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers));
546 sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet());
547
548 while (!sortedAllPartitions.isEmpty())
549 sortedPartitions.add(sortedAllPartitions.pollFirst());
The tree is drained first to last.
FWIW, I'm writing this code up myself and trying to translate the Java code to something that actually makes sense. That's taking a long time. I'm pretty suspicious of some aspects of the Java code. I'm not 100% sure everything the Java code is doing is necessary (for example, I can't think of a way that one map in areSubscriptionsIdentical
would return true while the other returns false).
However, since this PR is looking to be a direct transliteration, I'll try to vet the accuracy of that transliteration the best I can while I go through the Java and ... try to make sense of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty confused about the sortPartitions function in general. It operates in two completely different ways: the normal way, which is ordering partitions by how many consumers can consume from them, and the has-a-lot-of-logic-way, which orders by partitions that were on the most loaded consumer.
The two completely different behaviors makes the future usage of the sorting hard to reason about :/
@twmb Thank you again! The Java implementation leaves a lot to be desired, and was obviously very challenging to port. Unfortunately, I absolutely must have a Golang implementation of sticky-partition-assignor, so I was left with no choice. I appreciate your feedback, and would gladly merge in any contributions you have to this PR. |
if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) { | ||
fixedAssignments[memberID] = currentAssignment[memberID] | ||
delete(currentAssignment, memberID) | ||
sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better for you to move this sort to after the loop, since inside the loop, it's basically throwing away work every iteration.
|
||
// narrow down the reassignment scope to only those consumers that are subject to reassignment | ||
fixedAssignments := make(map[string][]topicPartitionAssignment) | ||
for memberID := range consumer2AllPotentialPartitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the Java code does it, but there's no reason not to range over currentAssignment
here, which has all of the same keys as consumer2AllPotentialPartitions
. Then you have access to the value and can pass it to canConsumerParticipateInReassignment
, rather than looking it up from currentAssignment
first thing in the code.
I think?
And then it'd also afford just doing fixedAssignments[memberID] = partitions
.
|
||
// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later | ||
preBalanceAssignment := deepCopyAssignment(currentAssignment) | ||
preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see a purpose for this preBalancePartitionConsumers
map at all. It's populated here, and it might be used to reset currentPartitionConsumer
below, but currentPartitionConsumer
is unused after that reset and Balance
does nothing for it once balance
returns.
As far as I can tell, the Java code here is just wasting time and objects.
} | ||
|
||
// create a mapping from partitions to the consumer assigned to them | ||
allPartitions := make(map[topicPartitionAssignment]string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of a reason to not pass currentPartitionConsumer
to this function and avoid this exact recreation of it.
for memberID := range assignments { | ||
sortedMemberIDs = append(sortedMemberIDs, memberID) | ||
} | ||
sort.SliceStable(sortedMemberIDs, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason for SliceStable since member IDs are unique.
if ret == 0 { | ||
return sortedMemberIDs[i] < sortedMemberIDs[j] | ||
} | ||
return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ret < 0
?
// otherwise make sure it cannot get any more | ||
potentialTopicPartitions := allSubscriptions[memberID] | ||
for _, partition := range potentialTopicPartitions { | ||
if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use allPartitions
(or my suggested currentPartitionConsumer
). Rather than an O(n) range over all of the member's current partitions, we already have a direct lookup map. I mean, if the conditional here returns false, that's what the next line literally does.
That line could be above this if and then the if could be if currentConsumer != memberID
or something.
|
||
// create a mapping of all current topic partitions and the consumers that can be assigned to them | ||
partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string) | ||
for topic, partitions := range topics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fairly certain that this loop is unneeded and causes the code to change the sort order unnecessarily, but the tests work with or without this loop and this is faithful to the Java.
This code sets all partitions in the client as candidates for moving, even if no member is interested in some topics. This causes sortPartitions to unnecessarily fall into the into the second case where it considers some subscriptions are different. I deleted these 5 lines on your branch and ran the tests and everything passes. Even deleting the five lines, the second case is hit the same amount. Most of the tests revolve around consumer group topic subscription disparities.
} | ||
} | ||
|
||
func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worthwhile to document the purpose of this function, which is undocumented in Java as well, but the behavior is pretty confusing.
If this is trying to move a partition from A to B, if we have already moved a partition from B to A, this returns that partition so that the original move is undone. Effectively, this function can counteract some there-and-back again movements. No test yet triggers this. I'd be curious to see something that did.
The partition has to be in the same topic because if it isn't, then maybe the other consumer is no longer interested in the topic.
return false | ||
} | ||
|
||
func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After staring at performReassignments
for a while, I'm pretty sure that the only benefit of this function is if all group subscriptions are identical. In that case, the return ordering is actually meaningful. performReassignments
will start from partitions that are on overloaded members, and thus it may find balance sooner. Even the loss of that ordering can be largely mitigated by saving if all subscriptions are identical and then using that in isBalanced
: if everything is identical, the max imbalance should be one, and the remainder of the function can be skipped.
If there's any disparity of consuming, the return is pretty useless: it sorts by partitions with the least amount of potential consumers. That does not help reassigning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For further context, even if all subscriptions are identical, the only aid is in maybe less looping. performReassignments
always visits all partitions, left to right. If a partition is on an overloaded member, it gets moved. If a move happens, all "sorted" partitions are revisited left to right. After the first round of moves, sortedPartitions is no longer sorted, yet reassigning still happens in the original order.
I'm honestly not quite sure what sorting here buys at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When all can consume the same, it may help in the case of incoming joins with prior plans
A -> 1
B -> 2, 3, 4
C -> 5, 6, 7, 8, 9
Sorted would say 9, 8, 7, 4, 6, 3, 5, 2, 1
in order of move, which would result in
A -> 1, 9, 8
B -> 2, 3, 4
C -> 5, 6, 7
and be most sticky,
whereas no sorting would be 1, 2, 3, 4, 5, 6, 7, 8, 9
order of move, resulting in
A -> 1, 2, 5
B -> 3, 4, 6
C -> 7, 8, 9
being destructive to B when it shouldn't have. I'll see if there's a test case similar to this.
I'm still mixed on how beneficial sorting is for non-equal consuming and will try to think of a scenario there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does that:
{
name: "Three consumers with a disparity coming into harmony.",
args: args{
members: map[string]ConsumerGroupMemberMetadata{
"c1": ConsumerGroupMemberMetadata{
Topics: []string{"t1"},
UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"t1": []int32{0}}, 1),
},
"c2": ConsumerGroupMemberMetadata{
Topics: []string{"t1"},
UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"t1": []int32{1, 2, 3}}, 1),
},
"c3": ConsumerGroupMemberMetadata{
Topics: []string{"t1"},
UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"t1": []int32{4, 5, 6, 7, 8}}, 1),
},
},
topics: map[string][]int32{
"t1": []int32{0, 1, 2, 3, 4, 5, 6, 7, 8},
},
},
},
And printing the plan with and without sortPartitions shows the problem. Unfortunately, the test code doesn't catch the lack of stickiness without the sorting:
=== RUN Test_stickyBalanceStrategy_Plan/Three_consumers_with_a_disparity_coming_into_harmony.
c1 => map[t1:[0 2 6]]
c2 => map[t1:[1 3 8]]
c3 => map[t1:[7 5 4]]
--- PASS: Test_stickyBalanceStrategy_Plan (0.00s)
--- PASS: Test_stickyBalanceStrategy_Plan/Three_consumers_with_a_disparity_coming_into_harmony. (0.00s)
That should fail, since c2 should not have changed.
So, I think the main flaw with the tests right now is a lack of any golden tests, and it may be worth it to add some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an example of why the second sorting is useless:
{
// A -> 1, can take all
// B -> 2, 3, 4
// C -> 5, 6, 7, 8, 9
//
// Bad would be stealing a partition from B:
// A -> 1, 2, 5
// B -> 3, 4
// C -> 6, 7, 8, 9
//
// Ideal:
// A -> 1, 5, 6
// B -> 2, 3, 4
// C -> 7, 8, 9
name: "Bigly disbalancy.",
args: args{
members: map[string]ConsumerGroupMemberMetadata{
"A": ConsumerGroupMemberMetadata{
Topics: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"1": []int32{0},
}, 1),
},
"B": ConsumerGroupMemberMetadata{
Topics: []string{"2", "3", "4"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"2": []int32{0},
"3": []int32{0},
"4": []int32{0},
}, 1),
},
"C": ConsumerGroupMemberMetadata{
Topics: []string{"5", "6", "7", "8", "9"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"5": []int32{0},
"6": []int32{0},
"7": []int32{0},
"8": []int32{0},
"9": []int32{0},
}, 1),
},
},
topics: map[string][]int32{
"1": []int32{0},
"2": []int32{0},
"3": []int32{0},
"4": []int32{0},
"5": []int32{0},
"6": []int32{0},
"7": []int32{0},
"8": []int32{0},
"9": []int32{0},
},
},
}
That causes a test failure. I think the heap sort is really the only beneficial sort: we should always try to move partitions off of loaded members. I'll try to think of another example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's another test that fails (edit: always fails with the current sorting, sometimes fails when switching to only-overloaded-member sorting) with either heap strategy:
{
// A -> 1, [in all]
// B -> 2, [in 2, 3, 4]
// D -> 3, 4, 5, 6, 7, 8, 9
//
// A -> 1, 9, 8, 7
// B -> 2,
// D -> 3, 4, 5, 6
//
// A -> 1, 9, 8, 7
// B -> 2, 3
// D -> 4, 5, 6
//
// Ideal:
// A -> 1, 9, 8
// B -> 2, 3, 4
// C -> 5, 6, 7
name: "Bigly disbalancy.",
args: args{
members: map[string]ConsumerGroupMemberMetadata{
"A": ConsumerGroupMemberMetadata{
Topics: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"1": []int32{0},
}, 1),
},
"B": ConsumerGroupMemberMetadata{
Topics: []string{"2", "3", "4"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"2": []int32{0},
}, 1),
},
"C": ConsumerGroupMemberMetadata{
Topics: []string{"3", "4", "5", "6", "7", "8", "9"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"3": []int32{0},
"4": []int32{0},
"5": []int32{0},
"6": []int32{0},
"7": []int32{0},
"8": []int32{0},
"9": []int32{0},
}, 1),
},
},
topics: map[string][]int32{
"1": []int32{0},
"2": []int32{0},
"3": []int32{0},
"4": []int32{0},
"5": []int32{0},
"6": []int32{0},
"7": []int32{0},
"8": []int32{0},
"9": []int32{0},
},
},
},
I think maybe a better strategy would be to always try reassigning from the consumer with the least partitions, and iterating over all partitions that could be assigned to it. I'll try that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more complicated one:
{
// Start:
// A -> 1 2
// B -> 3 4
// C -> 5
// D -> a b c d e
// E ->
//
// A takes all,
// B takes 1-5
// C takes 3-5
// D, E take a-e
//
// Ideal:
// A -> 1 e
// B -> 2 3
// C -> 4 5
// D -> a b
// E -> c d
name: "Complicated",
args: args{
members: map[string]ConsumerGroupMemberMetadata{
"A": ConsumerGroupMemberMetadata{
Topics: []string{"1", "2", "3", "4", "5", "a", "b", "c", "d", "e"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"1": []int32{0},
"2": []int32{0},
}, 1),
},
"B": ConsumerGroupMemberMetadata{
Topics: []string{"1", "2", "3", "4", "5"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"3": []int32{0},
"4": []int32{0},
}, 1),
},
"C": ConsumerGroupMemberMetadata{
Topics: []string{"3", "4", "5"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"4": []int32{0},
}, 1),
},
"D": ConsumerGroupMemberMetadata{
Topics: []string{"a", "b", "c", "d", "e"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{
"a": []int32{0},
"b": []int32{0},
"c": []int32{0},
"d": []int32{0},
"e": []int32{0},
}, 1),
},
"E": ConsumerGroupMemberMetadata{
Topics: []string{"a", "b", "c", "d", "e"},
UserData: encodeSubscriberPlanWithGeneration(t,
map[string][]int32{}, 1),
},
},
topics: map[string][]int32{
"1": []int32{0},
"2": []int32{0},
"3": []int32{0},
"4": []int32{0},
"5": []int32{0},
"6": []int32{0},
"7": []int32{0},
"8": []int32{0},
"9": []int32{0},
"a": []int32{0},
"b": []int32{0},
"c": []int32{0},
"d": []int32{0},
"e": []int32{0},
},
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I figured out a way to solve all of these edge cases and implemented it here: https://github.com/twmb/kgo/tree/master/internal/sticky
The key idea is representing the members and partitions they want as a graph with edges, and then using A* search to steal appropriately.
This implementation is faster by >2.5x, more accurate (for the balanced-metric, and I think optimally accurate), uses less garbage, and is cleaner than the Java code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After optimizing, the implementation in the above link is >14x faster (edit: around 100x faster now).
} | ||
var reversePairPartition topicPartitionAssignment | ||
for otherPartition := range partitionMovementsForThisTopic[reversePair] { | ||
reversePairPartition = otherPartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return otherPairPartition
. No need to iterate through all.
Can panic outside of the loop, which should be unreachable.
members := make(map[string]ConsumerGroupMemberMetadata, 3) | ||
members["consumer1"] = ConsumerGroupMemberMetadata{ | ||
Topics: []string{"topic1"}, | ||
UserData: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can take testing.TB
and then the duplicate function just below can be eliminated.
} | ||
tests := []struct { | ||
name string | ||
s *stickyBalanceStrategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused.
AFAICT this LGTM. I've removed some of my comments about potential ideas that may be better; I've been toying around with those ideas and they're better in some cases but not perfect and they are complicated. I'm 100% sure that the algorithm used in Java does not accomplish its original stated goal:
I've devised multiple counterexamples, and while some ideas I'm toying with get some of those edge cases, I can't think of a perfect solution yet. I have a few remaining comments about potential cleanup, but nothing absolutely critical to address. I've also stared at the But, as far as an exact recreation of the Java behavior, I think this does it. |
This is great, thanks for contributing and also thanks for reviewing 👍 |
This PR adds support for the sticky partition assignment strategy defined in Kafka Improvement (KIP) KIP-54 , and later in KIP-341. This includes the use of consumer group generation numbers to reconcile conflicting information from consumer group members.
The sticky partition assignment strategy differs from the range and roundrobin strategies by attempting to keep topic partition assignments on the same consumer from the previous assignment. This is accomplished by having the consumer group leader use assignment information stored in the consumer
userdata
to learn about previous assignments.This assignor aims to be a 1:1 port of the StickyAssignor included in the official Java Kafka client, including all unit tests.
Manual Testing
Manual testing can be performed using the consumer-group example (
examples/consumergroup/main.go
).Setup
sticky-assignment-demo
) with multiple partitions (e.g. 12)Step 1: Start Consumer 1
Start the first consumer, which will be assigned all 12 topic partitions. Nothing unusual here.
Step 2: Start Consumer 2
Start a second consumer, which will obtain 6 of the 12 partitions. Again, nothing unusual yet.
Consumer 2 Startup Output
Consumer 1 output during partition reassignment:
Step 3: Start Consumer 3
Start a third consumer, which will obtain 4 of the 12 partitions. At this point we can see the sticky partition assignment take effect, as consumers 1 and 2 retain 4 of their 6 partition claims.
Consumer 3 Startup Output
Consumer 1 output during partition reassignment:
Consumer 2 output during partition reassignment: