Skip to content

Commit

Permalink
[fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient n…
Browse files Browse the repository at this point in the history
…ot aware rack info problem. (apache#18672)

(cherry picked from commit 43335fb)
  • Loading branch information
horizonzy authored and liangyepianzhou committed Feb 9, 2023
1 parent f6da22b commit 003c186
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,8 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
Expand Down Expand Up @@ -118,6 +121,7 @@ public synchronized void setConf(Configuration conf) {
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
Expand All @@ -132,6 +136,28 @@ public synchronized void setConf(Configuration conf) {
}
}

private void watchAvailableBookies() {
BookieAddressResolver bookieAddressResolver = getBookieAddressResolver();
if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
try {
Field field = DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
field.setAccessible(true);
RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
registrationClient.watchWritableBookies(versioned -> {
try {
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to update rack info. ", e);
}
});
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Failed watch available bookies.", e);
}
}
}

private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) {
// In config z-node, the bookies are added in the `ip:port` notation, while BK will ask
// for just the IP/hostname when trying to get the rack for a bookie.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,46 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -198,4 +223,113 @@ public void testBookieInfoChange() throws Exception {
assertNull(r.get(2));
});
}

@Test
public void testWithPulsarRegistrationClient() throws Exception {
String data = "{\"group1\": {\"" + BOOKIE1
+ "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + BOOKIE2
+ "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}";
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();

// Case1: ZKCache is given
BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
Field field = BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
field.setAccessible(true);

ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);

PulsarRegistrationClient pulsarRegistrationClient =
new PulsarRegistrationClient(store, "/ledgers");
DefaultBookieAddressResolver defaultBookieAddressResolver =
new DefaultBookieAddressResolver(pulsarRegistrationClient);

mapping.setBookieAddressResolver(defaultBookieAddressResolver);
mapping.setConf(bkClientConf);
List<String> racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).collect(Collectors.toList());
assertEquals(racks.size(), 0);

HashedWheelTimer timer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
bkClientConf.getTimeoutTimerNumTicks());

RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
Field field1 = clazz1.getDeclaredField("knownBookies");
field1.setAccessible(true);
Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>) field1.get(repp);
repp.initialize(bkClientConf, Optional.of(mapping), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, defaultBookieAddressResolver);

Class<?> clazz2 = Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
Constructor<?> constructor =
clazz2.getDeclaredConstructor(ClientConfiguration.class, EnsemblePlacementPolicy.class,
RegistrationClient.class, BookieAddressResolver.class, StatsLogger.class);
constructor.setAccessible(true);
Object o = constructor.newInstance(bkClientConf, repp, pulsarRegistrationClient, defaultBookieAddressResolver,
NullStatsLogger.INSTANCE);
Method method = clazz2.getDeclaredMethod("initialBlockingBookieRead");
method.setAccessible(true);
method.invoke(o);

Set<BookieId> bookieIds = new HashSet<>();
bookieIds.add(BOOKIE1.toBookieId());

Field field2 = BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
field2.setAccessible(true);
BookieServiceInfoSerde serviceInfoSerde = (BookieServiceInfoSerde) field2.get(null);

BookieServiceInfo bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE1.toString());
store.put("/ledgers/available/" + BOOKIE1, serviceInfoSerde.serialize("", bookieServiceInfo),
Optional.of(-1L)).get();

Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).collect(Collectors.toList());
assertEquals(racks.size(), 1);
assertEquals(racks.get(0), "/rack0");
assertEquals(knownBookies.size(), 1);
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");

bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE2.toString());
store.put("/ledgers/available/" + BOOKIE2, serviceInfoSerde.serialize("", bookieServiceInfo),
Optional.of(-1L)).get();
Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);

racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).collect(Collectors.toList());
assertEquals(racks.size(), 2);
assertEquals(racks.get(0), "/rack0");
assertEquals(racks.get(1), "/rack1");
assertEquals(knownBookies.size(), 2);
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");

bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE3.toString());
store.put("/ledgers/available/" + BOOKIE3, serviceInfoSerde.serialize("", bookieServiceInfo),
Optional.of(-1L)).get();
Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);

racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).collect(Collectors.toList());
assertEquals(racks.size(), 2);
assertEquals(racks.get(0), "/rack0");
assertEquals(racks.get(1), "/rack1");
assertEquals(knownBookies.size(), 3);
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");

timer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -60,8 +60,8 @@ public class PulsarRegistrationClient implements RegistrationClient {

private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
new ConcurrentHashMap();
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
private final Set<RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet<>();
private final Set<RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet<>();
private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;

Expand Down Expand Up @@ -138,7 +138,7 @@ private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {

@Override
public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
writableBookiesWatchers.add(registrationListener);
return getWritableBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
Expand All @@ -150,7 +150,7 @@ public void unwatchWritableBookies(RegistrationListener registrationListener) {

@Override
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
readOnlyBookiesWatchers.add(registrationListener);
return getReadOnlyBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
Expand All @@ -173,13 +173,11 @@ private void updatedBookies(Notification n) {

if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
});
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
}
}
}
Expand Down

0 comments on commit 003c186

Please sign in to comment.