Skip to content

Commit

Permalink
[fix][broker] Fix hash collision when using a consumer name that ends…
Browse files Browse the repository at this point in the history
… with a number (apache#22053)

(cherry picked from commit ae2299c)
  • Loading branch information
lhotari authored and mukesh-ctds committed Mar 1, 2024
1 parent c60900f commit b957d59
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
* number of keys assigned to each consumer.
*/
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

// use NUL character as field separator for hash key calculation
private static final String KEY_SEPARATOR = "\0";
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

// Consistent-Hash ring
Expand All @@ -59,8 +60,7 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return Lists.newArrayList(consumer);
Expand All @@ -79,14 +79,18 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
}
}

private static int calculateHashForConsumerAndIndex(Consumer consumer, int index) {
String key = consumer.consumerName() + KEY_SEPARATOR + index;
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
}

@Override
public void removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
// Remove all the points that were added for this consumer
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class ConsistentHashingStickyKeyConsumerSelectorTest {
Expand Down Expand Up @@ -154,22 +154,66 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
}
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Arrays.asList(
Range.of(0, 330121749),
Range.of(330121750, 618146114),
Range.of(1797637922, 1976098885)));
Range.of(119056335, 242013991),
Range.of(722195657, 1656011842),
Range.of(1707482098, 1914695766)));
expectedResult.put(consumers.get(1), Arrays.asList(
Range.of(938427576, 1094135919),
Range.of(1138613629, 1342907082),
Range.of(1342907083, 1797637921)));
Range.of(0, 90164503),
Range.of(90164504, 119056334),
Range.of(382436668, 722195656)));
expectedResult.put(consumers.get(2), Arrays.asList(
Range.of(618146115, 772640562),
Range.of(772640563, 938427575),
Range.of(1094135920, 1138613628)));
Range.of(242013992, 242377547),
Range.of(242377548, 382436667),
Range.of(1656011843, 1707482097)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
System.out.println(entry.getValue());
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

// reproduces https://github.com/apache/pulsar/issues/22050
@Test
public void shouldNotCollideWithConsumerNameEndsWithNumber() {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(12);
List<String> consumerName = Arrays.asList("consumer1", "consumer11");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}
Map<Range, Consumer> rangeToConsumer = new HashMap<>();
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
for (Range range : entry.getValue()) {
Consumer previous = rangeToConsumer.put(range, entry.getKey());
if (previous != null) {
Assert.fail("Ranges are colliding between " + previous.consumerName() + " and " + entry.getKey()
.consumerName());
}
}
}
}

@Test
public void shouldRemoveConsumersFromConsumerKeyHashRanges() {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(12);
List<Consumer> consumers = IntStream.range(1, 100).mapToObj(i -> "consumer" + i)
.map(consumerName -> {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(consumerName);
return consumer;
}).collect(Collectors.toList());

// when consumers are added
consumers.forEach(selector::addConsumer);
// then each consumer should have a range
Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), consumers.size());
// when consumers are removed
consumers.forEach(selector::removeConsumer);
// then there should be no mapping remaining
Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testSkipRedeliverTemporally() {
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1")));
final List<Entry> readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key22")));

try {
Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testMessageRedelivery() throws Exception {

// Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2
final List<Entry> allEntries = new ArrayList<>();
allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2")));
allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key22")));
allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1")));
allEntries.forEach(entry -> ((EntryImpl) entry).retain());
Expand Down

0 comments on commit b957d59

Please sign in to comment.