Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][meta] Bookie Info lost by notification race condition. (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Jun 30, 2023
1 parent cb5676e commit 43dc123
Show file tree
Hide file tree
Showing 3 changed files with 874 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import static org.apache.pulsar.common.util.FutureUtil.Sequencer;
import static org.apache.pulsar.common.util.FutureUtil.waitForAll;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -42,10 +47,10 @@
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;

@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {
Expand All @@ -56,28 +61,29 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final String bookieRegistrationPath;
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;

private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
new ConcurrentHashMap();
private final Set<RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet<>();
private final Set<RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet<>();
private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;
private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo;
private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo;
private final FutureUtil.Sequencer<Void> sequencer;

public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);

this.sequencer = Sequencer.create();
this.writableBookieInfo = new ConcurrentHashMap<>();
this.readOnlyBookieInfo = new ConcurrentHashMap<>();
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
// we can disable this feature, in case the BK cluster has only
// static addresses
this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;

this.executor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

Expand All @@ -91,38 +97,62 @@ public void close() {

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
return getChildren(bookieRegistrationPath);
return getBookiesThenFreshCache(bookieRegistrationPath);
}

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the bookies
// that are not in a running state
return getChildren(bookieAllRegistrationPath);
return getBookiesThenFreshCache(bookieAllRegistrationPath);
}

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
return getChildren(bookieReadonlyRegistrationPath);
return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}

private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
/**
* @throws IllegalArgumentException if parameter path is null or empty.
*/
private CompletableFuture<Versioned<Set<BookieId>>> getBookiesThenFreshCache(String path) {
if (path == null || path.isEmpty()) {
return failedFuture(
new IllegalArgumentException("parameter [path] can not be null or empty."));
}
return store.getChildren(path)
.thenComposeAsync(children -> {
Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
List<CompletableFuture<?>> bookieInfoUpdated =
new ArrayList<>(bookieIds.size());
final Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
final List<CompletableFuture<?>> bookieInfoUpdated = new ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
if (!bookieServiceInfoCache.containsKey(id)) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
if (path.equals(bookieReadonlyRegistrationPath) && readOnlyBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
continue;
}
if (path.equals(bookieRegistrationPath) && writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id));
continue;
}
if (path.equals(bookieAllRegistrationPath)) {
if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) {
// jump to next bookie id
continue;
}
// check writable first
final CompletableFuture<?> revalidateAllBookiesFuture = readBookieInfoAsWritableBookie(id)
.thenCompose(writableBookieInfo -> writableBookieInfo
.<CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>>map(
bookieServiceInfo -> completedFuture(null))
// check read-only then
.orElseGet(() -> readBookieInfoAsReadonlyBookie(id)));
bookieInfoUpdated.add(revalidateAllBookiesFuture);
}
}
if (bookieInfoUpdated.isEmpty()) {
return CompletableFuture.completedFuture(bookieIds);
return completedFuture(bookieIds);
} else {
return FutureUtil
.waitForAll(bookieInfoUpdated)
return waitForAll(bookieInfoUpdated)
.thenApply(___ -> bookieIds);
}
})
Expand Down Expand Up @@ -153,42 +183,67 @@ public void unwatchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}

private void handleDeletedBookieNode(Notification n) {
if (n.getType() == NotificationType.Deleted) {
BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
log.info("Bookie {} disappeared", bookieId);
bookieServiceInfoCache.remove(bookieId);
}
/**
* This method will receive metadata store notifications and then update the
* local cache in background sequentially.
*/
private void updatedBookies(Notification n) {
// make the notification callback run sequential in background.
final String path = n.getPath();
if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) {
// ignore unknown path
return;
}
}

private void handleUpdatedBookieNode(Notification n) {
BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
log.info("Bookie {} info updated", bookieId);
readBookieServiceInfoAsync(bookieId);
if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) {
// ignore root path
return;
}
}

private void updatedBookies(Notification n) {
if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
});
handleDeletedBookieNode(n);
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
handleDeletedBookieNode(n);
}
} else if (n.getType() == NotificationType.Modified) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
|| n.getPath().startsWith(bookieRegistrationPath)) {
handleUpdatedBookieNode(n);
final BookieId bookieId = stripBookieIdFromPath(n.getPath());
sequencer.sequential(() -> {
switch (n.getType()) {
case Created:
log.info("Bookie {} created. path: {}", bookieId, n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
return getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
}
return getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
case Modified:
if (bookieId == null) {
return completedFuture(null);
}
log.info("Bookie {} modified. path: {}", bookieId, n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
return readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
}
return readBookieInfoAsWritableBookie(bookieId).thenApply(__ -> null);
case Deleted:
if (bookieId == null) {
return completedFuture(null);
}
log.info("Bookie {} deleted. path: {}", bookieId, n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
readOnlyBookieInfo.remove(bookieId);
return getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies)));
});
}
if (path.startsWith(bookieRegistrationPath)) {
writableBookieInfo.remove(bookieId);
return getWritableBookies().thenAccept(bookies -> {
writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies)));
});
}
return completedFuture(null);
default:
return completedFuture(null);
}
}
});
}

private static BookieId stripBookieIdFromPath(String path) {
Expand All @@ -200,7 +255,7 @@ private static BookieId stripBookieIdFromPath(String path) {
try {
return BookieId.parse(path.substring(slash + 1));
} catch (IllegalArgumentException e) {
log.warn("Cannot decode bookieId from {}", path, e);
log.warn("Cannot decode bookieId from {}, error: {}", path, e.getMessage());
}
}
return null;
Expand All @@ -227,46 +282,48 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
// this is because there are a few cases in which some operations on the main thread
// wait for the result. This is due to the fact that resolving the address of a bookie
// is needed in many code paths.
Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
Versioned<BookieServiceInfo> info;
if ((info = writableBookieInfo.get(bookieId)) == null) {
info = readOnlyBookieInfo.get(bookieId);
}
if (log.isDebugEnabled()) {
log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
}
if (resultFromCache != null) {
return CompletableFuture.completedFuture(resultFromCache);
if (info != null) {
return completedFuture(info);
} else {
return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
}
}

public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId bookieId) {
String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asWritable)
.thenCompose((Optional<BookieServiceInfo> getResult) -> {
if (getResult.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResult.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
bookieServiceInfoCache.put(bookieId, res);
return CompletableFuture.completedFuture(null);
} else {
return readBookieInfoAsReadonlyBookie(bookieId);
}
}
);
public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsWritableBookie(
BookieId bookieId) {
final String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.getWithStats(asWritable)
.thenApply((Optional<CacheGetResult<BookieServiceInfo>> bkInfoWithStats) -> {
if (bkInfoWithStats.isPresent()) {
final CacheGetResult<BookieServiceInfo> r = bkInfoWithStats.get();
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, r.getValue());
writableBookieInfo.put(bookieId,
new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion())));
}
return bkInfoWithStats;
}
);
}

final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asReadonly)
.thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
if (getResultAsReadOnly.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
getResultAsReadOnly.get());
bookieServiceInfoCache.put(bookieId, res);
final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsReadonlyBookie(
BookieId bookieId) {
final String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.getWithStats(asReadonly)
.thenApply((Optional<CacheGetResult<BookieServiceInfo>> bkInfoWithStats) -> {
if (bkInfoWithStats.isPresent()) {
final CacheGetResult<BookieServiceInfo> r = bkInfoWithStats.get();
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, r.getValue());
readOnlyBookieInfo.put(bookieId,
new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion())));
}
return null;
return bkInfoWithStats;
});
}
}
Loading

0 comments on commit 43dc123

Please sign in to comment.