Skip to content

Commit

Permalink
[improve][meta] Improve fault tolerance of blocking calls by supporti…
Browse files Browse the repository at this point in the history
…ng timeout (apache#21028)
  • Loading branch information
mattisonchao authored Aug 21, 2023
1 parent deeb8a2 commit 976a580
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
Expand All @@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements Closeable {
public static final String METADATA_STORE_SCHEME = "metadata-store";

public static final String METADATA_STORE_INSTANCE = "metadata-store-instance";
public static final long BLOCKING_CALL_TIMEOUT = TimeUnit.SECONDS.toMillis(30);

protected MetadataStoreExtended store;
private boolean storeInstanceIsOwned;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.metadata.api.MetadataStore;


/**
* Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
*
Expand Down Expand Up @@ -67,7 +71,7 @@ public LegacyHierarchicalLedgerRangeIterator(MetadataStore store, String ledgers
* @return false if have visited all level1 nodes
* @throws InterruptedException/KeeperException if error occurs reading zookeeper children
*/
private boolean nextL1Node() throws ExecutionException, InterruptedException {
private boolean nextL1Node() throws ExecutionException, InterruptedException, TimeoutException {
l2NodesIter = null;
while (l2NodesIter == null) {
if (l1NodesIter.hasNext()) {
Expand All @@ -79,7 +83,8 @@ private boolean nextL1Node() throws ExecutionException, InterruptedException {
if (!isLedgerParentNode(curL1Nodes)) {
continue;
}
List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes).get();
List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes)
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l2NodesIter = l2Nodes.iterator();
if (!l2NodesIter.hasNext()) {
l2NodesIter = null;
Expand All @@ -94,15 +99,16 @@ private synchronized void preload() throws IOException {
boolean hasMoreElements = false;
try {
if (l1NodesIter == null) {
List<String> l1Nodes = store.getChildren(ledgersRoot).get();
List<String> l1Nodes = store.getChildren(ledgersRoot)
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l1NodesIter = l1Nodes.iterator();
hasMoreElements = nextL1Node();
} else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
hasMoreElements = nextL1Node();
} else {
hasMoreElements = true;
}
} catch (ExecutionException ke) {
} catch (ExecutionException | TimeoutException ke) {
throw new IOException("Error preloading next range", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -156,8 +162,8 @@ LedgerManager.LedgerRange getLedgerRangeByLevel(final String level1, final Strin
String nodePath = nodeBuilder.toString();
List<String> ledgerNodes = null;
try {
ledgerNodes = store.getChildren(nodePath).get();
} catch (ExecutionException e) {
ledgerNodes = store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException("Error when get child nodes from zk", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.StringUtils;
Expand Down Expand Up @@ -57,8 +59,9 @@ class LongHierarchicalLedgerRangeIterator implements LedgerManager.LedgerRangeIt
*/
List<String> getChildrenAt(String path) throws IOException {
try {
return store.getChildren(path).get();
} catch (ExecutionException e) {
return store.getChildren(path)
.get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to get children at {}", path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieException;
Expand All @@ -30,6 +33,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;


class PulsarLayoutManager implements LayoutManager {

@Getter(AccessLevel.PACKAGE)
Expand All @@ -49,14 +53,14 @@ class PulsarLayoutManager implements LayoutManager {
@Override
public LedgerLayout readLedgerLayout() throws IOException {
try {
byte[] layoutData = store.get(layoutPath).get()
byte[] layoutData = store.get(layoutPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
.orElseThrow(() -> new BookieException.MetadataStoreException("Layout node not found"))
.getValue();
return LedgerLayout.parseLayout(layoutData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (BookieException | ExecutionException e) {
} catch (BookieException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}
Expand All @@ -66,10 +70,13 @@ public void storeLedgerLayout(LedgerLayout ledgerLayout) throws IOException {
try {
byte[] layoutData = ledgerLayout.serialize();

store.put(layoutPath, layoutData, Optional.of(-1L)).get();
store.put(layoutPath, layoutData, Optional.of(-1L))
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof MetadataStoreException.BadVersionException) {
throw new LedgerLayoutExistsException(e);
Expand All @@ -82,11 +89,12 @@ public void storeLedgerLayout(LedgerLayout ledgerLayout) throws IOException {
@Override
public void deleteLedgerLayout() throws IOException {
try {
store.delete(layoutPath, Optional.empty()).get();
store.delete(layoutPath, Optional.empty())
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
} catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
package org.apache.pulsar.metadata.bookkeeper;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.AbstractConfiguration;
Expand Down Expand Up @@ -110,7 +114,13 @@ public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf,
* before proceeding with nuking existing cluster, make sure there
* are no unexpected nodes under ledgersRootPath
*/
List<String> ledgersRootPathChildrenList = store.getChildren(ledgerRootPath).join();
final List<String> ledgersRootPathChildrenList;
try {
ledgersRootPathChildrenList = store.getChildren(ledgerRootPath)
.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}
for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
if ((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
&& (!ledgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
Expand All @@ -124,18 +134,34 @@ public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf,
format(conf, layoutManager);

// now delete all the special nodes recursively
for (String ledgersRootPathChildren : store.getChildren(ledgerRootPath).join()) {
if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
store.deleteRecursive(ledgerRootPath + "/" + ledgersRootPathChildren).join();
final List<String> ledgersRootPathChildren;
try {
ledgersRootPathChildren = store.getChildren(ledgerRootPath)
.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}
for (String ledgersRootPathChild :ledgersRootPathChildren) {
if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChild)) {
try {
store.deleteRecursive(ledgerRootPath + "/" + ledgersRootPathChild)
.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}
} else {
log.error("Found unexpected node : {} under ledgersRootPath : {} so exiting nuke operation",
ledgersRootPathChildren, ledgerRootPath);
ledgersRootPathChild, ledgerRootPath);
return false;
}
}

// finally deleting the ledgers rootpath
store.deleteRecursive(ledgerRootPath).join();
try {
store.deleteRecursive(ledgerRootPath).get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}

log.info("Successfully nuked existing cluster");
return true;
Expand Down
Loading

0 comments on commit 976a580

Please sign in to comment.