diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 1e4994e768e2a..f14f9fab96cd5 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -83,7 +83,12 @@
-
+
+ ${project.groupId}
+ testmocks
+ ${project.version}
+ test
+
org.xerial.snappy
snappy-java
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
index 9750fb52d41a3..14cdf3e1fc29c 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
@@ -31,7 +31,6 @@
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerAuditorManager;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
index 5c0a3f39325e5..6b58c72af0766 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
@@ -30,7 +30,6 @@
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index ffd71f9311f42..ec5f77f79464b 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -67,7 +67,6 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index 4acb207570a2d..9e8c5a54a5d91 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -30,7 +30,6 @@
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 8eb04ce7185d7..9c5805dc536d6 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -66,7 +66,6 @@
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
index 1bafb8589d91a..8b9c0b143028a 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
@@ -31,7 +31,6 @@
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 159a4e88a33bd..5637819a9275b 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -53,7 +53,6 @@
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
index 21dd2807b75d3..62162bd25f427 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
@@ -31,7 +31,6 @@
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index a4d6d86deced2..2e9dbc158597d 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -57,7 +57,6 @@
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index 2c458d635f528..3e5081ed0ef9d 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -29,7 +29,6 @@
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestCallbacks;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
index db338d1bb4b39..41e159b77714f 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
@@ -27,7 +27,6 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.ClientConnectionPeer;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 62416968142b1..1d741c551ddb9 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -25,10 +25,8 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.TestUtils;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
@@ -42,7 +40,6 @@
/**
* Test the AuditorPeer.
*/
-@Slf4j
public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
public AutoRecoveryMainTest() throws Exception {
@@ -68,7 +65,6 @@ public void tearDown() throws Exception {
*/
@Test
public void testStartup() throws Exception {
- log.info("testStartup()");
confByIndex(0).setMetadataServiceUri(
zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
@@ -89,7 +85,6 @@ public void testStartup() throws Exception {
*/
@Test
public void testShutdown() throws Exception {
- log.info("testShutdown()");
confByIndex(0).setMetadataServiceUri(
zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
@@ -113,7 +108,6 @@ public void testShutdown() throws Exception {
*/
@Test
public void testAutoRecoverySessionLoss() throws Exception {
- log.info("testAutoRecoverySessionLoss()");
confByIndex(0).setMetadataServiceUri(
zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
confByIndex(1).setMetadataServiceUri(
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
new file mode 100644
index 0000000000000..c681a1f0764ee
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from BookKeeperClusterTestCase from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.testng.Assert.assertFalse;
+import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.client.TestStatsProvider;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.ServerTester;
+import org.apache.bookkeeper.test.TmpDirs;
+import org.apache.bookkeeper.test.ZooKeeperCluster;
+import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
+import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class BookKeeperClusterTestCase {
+
+ static final Logger LOG = LoggerFactory.getLogger(BookKeeperClusterTestCase.class);
+
+ protected String testName;
+
+ @BeforeMethod
+ public void handleTestMethodName(Method method) {
+ testName = method.getName();
+ }
+
+ // Metadata service related variables
+ protected final ZooKeeperCluster zkUtil;
+ protected ZooKeeper zkc;
+ protected String metadataServiceUri;
+ protected FaultInjectionMetadataStore metadataStore;
+
+ // BookKeeper related variables
+ protected final TmpDirs tmpDirs = new TmpDirs();
+ protected final List servers = new LinkedList<>();
+
+ protected int numBookies;
+ protected BookKeeperTestClient bkc;
+ protected boolean useUUIDasBookieId = true;
+
+ /*
+ * Loopback interface is set as the listening interface and allowloopback is
+ * set to true in this server config. So bookies in this test process would
+ * bind to loopback address.
+ */
+ protected final ServerConfiguration baseConf = TestBKConfiguration.newServerConfiguration();
+ protected final ClientConfiguration baseClientConf = TestBKConfiguration.newClientConfiguration();
+
+ private boolean isAutoRecoveryEnabled;
+ protected ExecutorService executor;
+ private final List bookiePorts = new ArrayList<>();
+
+ SynchronousQueue asyncExceptions = new SynchronousQueue<>();
+ protected void captureThrowable(Runnable c) {
+ try {
+ c.run();
+ } catch (Throwable e) {
+ LOG.error("Captured error: ", e);
+ asyncExceptions.add(e);
+ }
+ }
+
+ public BookKeeperClusterTestCase(int numBookies) {
+ this(numBookies, 120);
+ }
+
+ public BookKeeperClusterTestCase(int numBookies, int testTimeoutSecs) {
+ this(numBookies, 1, testTimeoutSecs);
+ }
+
+ public BookKeeperClusterTestCase(int numBookies, int numOfZKNodes, int testTimeoutSecs) {
+ this.numBookies = numBookies;
+ if (numOfZKNodes == 1) {
+ zkUtil = new ZooKeeperUtil(getLedgersRootPath());
+ } else {
+ try {
+ zkUtil = new ZooKeeperClusterUtil(numOfZKNodes);
+ } catch (IOException | KeeperException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @BeforeTest
+ public void setUp() throws Exception {
+ setUp(getLedgersRootPath());
+ }
+
+ protected void setUp(String ledgersRootPath) throws Exception {
+ LOG.info("Setting up test {}", getClass());
+ InMemoryMetaStore.reset();
+ setMetastoreImplClass(baseConf);
+ setMetastoreImplClass(baseClientConf);
+ executor = Executors.newCachedThreadPool();
+
+ Stopwatch sw = Stopwatch.createStarted();
+ try {
+ // start zookeeper service
+ startZKCluster();
+ // start bookkeeper service
+ this.metadataServiceUri = getMetadataServiceUri(ledgersRootPath);
+ startBKCluster(metadataServiceUri);
+ LOG.info("Setup testcase {} @ metadata service {} in {} ms.",
+ testName, metadataServiceUri, sw.elapsed(TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ LOG.error("Error setting up", e);
+ throw e;
+ }
+ }
+
+ protected String getMetadataServiceUri(String ledgersRootPath) {
+ return zkUtil.getMetadataServiceUri(ledgersRootPath);
+ }
+
+ private String getLedgersRootPath() {
+ return changeLedgerPath() + "/ledgers";
+ }
+
+ protected String changeLedgerPath() {
+ return "";
+ }
+
+ @AfterTest(alwaysRun = true)
+ public void tearDown() throws Exception {
+ boolean failed = false;
+ for (Throwable e : asyncExceptions) {
+ LOG.error("Got async exception: ", e);
+ failed = true;
+ }
+ assertFalse(failed, "Async failure");
+ Stopwatch sw = Stopwatch.createStarted();
+ LOG.info("TearDown");
+ Exception tearDownException = null;
+ // stop bookkeeper service
+ try {
+ stopBKCluster();
+ } catch (Exception e) {
+ LOG.error("Got Exception while trying to stop BKCluster", e);
+ tearDownException = e;
+ }
+ // stop zookeeper service
+ try {
+ // cleanup for metrics.
+ metadataStore.close();
+ stopZKCluster();
+ } catch (Exception e) {
+ LOG.error("Got Exception while trying to stop ZKCluster", e);
+ tearDownException = e;
+ }
+ // cleanup temp dirs
+ try {
+ tmpDirs.cleanup();
+ } catch (Exception e) {
+ LOG.error("Got Exception while trying to cleanupTempDirs", e);
+ tearDownException = e;
+ }
+
+ executor.shutdownNow();
+
+ LOG.info("Tearing down test {} in {} ms.", testName, sw.elapsed(TimeUnit.MILLISECONDS));
+ if (tearDownException != null) {
+ throw tearDownException;
+ }
+ }
+
+ /**
+ * Start zookeeper cluster.
+ *
+ * @throws Exception
+ */
+ protected void startZKCluster() throws Exception {
+ zkUtil.startCluster();
+ zkc = zkUtil.getZooKeeperClient();
+ metadataStore = new FaultInjectionMetadataStore(
+ MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
+ MetadataStoreConfig.builder().build()));
+ }
+
+ /**
+ * Stop zookeeper cluster.
+ *
+ * @throws Exception
+ */
+ protected void stopZKCluster() throws Exception {
+ zkUtil.killCluster();
+ }
+
+ /**
+ * Start cluster. Also, starts the auto recovery process for each bookie, if
+ * isAutoRecoveryEnabled is true.
+ *
+ * @throws Exception
+ */
+ protected void startBKCluster(String metadataServiceUri) throws Exception {
+ baseConf.setMetadataServiceUri(metadataServiceUri);
+ baseClientConf.setMetadataServiceUri(metadataServiceUri);
+ baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+
+ if (numBookies > 0) {
+ bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
+ }
+
+ // Create Bookie Servers (B1, B2, B3)
+ for (int i = 0; i < numBookies; i++) {
+ bookiePorts.add(startNewBookie());
+ }
+ }
+
+ /**
+ * Stop cluster. Also, stops all the auto recovery processes for the bookie
+ * cluster, if isAutoRecoveryEnabled is true.
+ *
+ * @throws Exception
+ */
+ protected void stopBKCluster() throws Exception {
+ if (bkc != null) {
+ bkc.close();
+ }
+
+ for (ServerTester t : servers) {
+ t.shutdown();
+ }
+ servers.clear();
+ bookiePorts.removeIf(PortManager::releaseLockedPort);
+ }
+
+ protected ServerConfiguration newServerConfiguration() throws Exception {
+ File f = tmpDirs.createNew("bookie", "test");
+
+ int port;
+ if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
+ port = nextLockedFreePort();
+ } else {
+ port = 0;
+ }
+ return newServerConfiguration(port, f, new File[] { f });
+ }
+
+ protected ClientConfiguration newClientConfiguration() {
+ return new ClientConfiguration(baseConf);
+ }
+
+ protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
+ ServerConfiguration conf = new ServerConfiguration(baseConf);
+ conf.setBookiePort(port);
+ conf.setJournalDirName(journalDir.getPath());
+ String[] ledgerDirNames = new String[ledgerDirs.length];
+ for (int i = 0; i < ledgerDirs.length; i++) {
+ ledgerDirNames[i] = ledgerDirs[i].getPath();
+ }
+ conf.setLedgerDirNames(ledgerDirNames);
+ conf.setEnableTaskExecutionStats(true);
+ conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+ return conf;
+ }
+
+ protected void stopAllBookies() throws Exception {
+ stopAllBookies(true);
+ }
+
+ protected void stopAllBookies(boolean shutdownClient) throws Exception {
+ for (ServerTester t : servers) {
+ t.shutdown();
+ }
+ servers.clear();
+ if (shutdownClient && bkc != null) {
+ bkc.close();
+ bkc = null;
+ }
+ }
+
+ protected String newMetadataServiceUri(String ledgersRootPath) {
+ return zkUtil.getMetadataServiceUri(ledgersRootPath);
+ }
+
+ protected String newMetadataServiceUri(String ledgersRootPath, String type) {
+ return zkUtil.getMetadataServiceUri(ledgersRootPath, type);
+ }
+
+ /**
+ * Get bookie address for bookie at index.
+ */
+ public BookieId getBookie(int index) throws Exception {
+ return servers.get(index).getServer().getBookieId();
+ }
+
+ protected List bookieAddresses() throws Exception {
+ List bookieIds = new ArrayList<>();
+ for (ServerTester a : servers) {
+ bookieIds.add(a.getServer().getBookieId());
+ }
+ return bookieIds;
+ }
+
+ protected List bookieLedgerDirs() throws Exception {
+ return servers.stream()
+ .flatMap(t -> Arrays.stream(t.getConfiguration().getLedgerDirs()))
+ .collect(Collectors.toList());
+ }
+
+ protected List bookieJournalDirs() throws Exception {
+ return servers.stream()
+ .flatMap(t -> Arrays.stream(t.getConfiguration().getJournalDirs()))
+ .collect(Collectors.toList());
+ }
+
+ protected BookieId addressByIndex(int index) throws Exception {
+ return servers.get(index).getServer().getBookieId();
+ }
+
+ protected BookieServer serverByIndex(int index) throws Exception {
+ return servers.get(index).getServer();
+ }
+
+ protected ServerConfiguration confByIndex(int index) throws Exception {
+ return servers.get(index).getConfiguration();
+ }
+
+ private Optional byAddress(BookieId addr) throws UnknownHostException {
+ for (ServerTester s : servers) {
+ if (s.getServer().getBookieId().equals(addr)) {
+ return Optional.of(s);
+ }
+ }
+ return Optional.empty();
+ }
+
+ protected int indexOfServer(BookieServer b) throws Exception {
+ for (int i = 0; i < servers.size(); i++) {
+ if (servers.get(i).getServer().equals(b)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ protected int lastBookieIndex() {
+ return servers.size() - 1;
+ }
+
+ protected int bookieCount() {
+ return servers.size();
+ }
+
+ private OptionalInt indexByAddress(BookieId addr) throws UnknownHostException {
+ for (int i = 0; i < servers.size(); i++) {
+ if (addr.equals(servers.get(i).getServer().getBookieId())) {
+ return OptionalInt.of(i);
+ }
+ }
+ return OptionalInt.empty();
+ }
+
+ /**
+ * Get bookie configuration for bookie.
+ */
+ public ServerConfiguration getBkConf(BookieId addr) throws Exception {
+ return byAddress(addr).get().getConfiguration();
+ }
+
+ /**
+ * Kill a bookie by its socket address. Also, stops the autorecovery process
+ * for the corresponding bookie server, if isAutoRecoveryEnabled is true.
+ *
+ * @param addr
+ * Socket Address
+ * @return the configuration of killed bookie
+ * @throws InterruptedException
+ */
+ public ServerConfiguration killBookie(BookieId addr) throws Exception {
+ Optional tester = byAddress(addr);
+ if (tester.isPresent()) {
+ if (tester.get().autoRecovery != null
+ && tester.get().autoRecovery.getAuditor() != null
+ && tester.get().autoRecovery.getAuditor().isRunning()) {
+ LOG.warn("Killing bookie {} who is the current Auditor", addr);
+ }
+ servers.remove(tester.get());
+ tester.get().shutdown();
+ return tester.get().getConfiguration();
+ }
+ return null;
+ }
+
+ /**
+ * Set the bookie identified by its socket address to readonly.
+ *
+ * @param addr
+ * Socket Address
+ * @throws InterruptedException
+ */
+ public void setBookieToReadOnly(BookieId addr) throws Exception {
+ Optional tester = byAddress(addr);
+ if (tester.isPresent()) {
+ tester.get().getServer().getBookie().getStateManager().transitionToReadOnlyMode().get();
+ }
+ }
+
+ /**
+ * Kill a bookie by index. Also, stops the respective auto recovery process
+ * for this bookie, if isAutoRecoveryEnabled is true.
+ *
+ * @param index
+ * Bookie Index
+ * @return the configuration of killed bookie
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public ServerConfiguration killBookie(int index) throws Exception {
+ ServerTester tester = servers.remove(index);
+ tester.shutdown();
+ return tester.getConfiguration();
+ }
+
+ /**
+ * Kill bookie by index and verify that it's stopped.
+ *
+ * @param index index of bookie to kill
+ *
+ * @return configuration of killed bookie
+ */
+ public ServerConfiguration killBookieAndWaitForZK(int index) throws Exception {
+ ServerTester tester = servers.get(index); // IKTODO: this method is awful
+ ServerConfiguration ret = killBookie(index);
+ while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
+ + tester.getServer().getBookieId().toString(), false) != null) {
+ Thread.sleep(500);
+ }
+ return ret;
+ }
+
+ /**
+ * Sleep a bookie.
+ *
+ * @param addr
+ * Socket Address
+ * @param seconds
+ * Sleep seconds
+ * @return Count Down latch which will be counted down just after sleep begins
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public CountDownLatch sleepBookie(BookieId addr, final int seconds)
+ throws Exception {
+ Optional tester = byAddress(addr);
+ if (tester.isPresent()) {
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread sleeper = new Thread() {
+ @Override
+ public void run() {
+ try {
+ tester.get().getServer().suspendProcessing();
+ LOG.info("bookie {} is asleep", tester.get().getAddress());
+ latch.countDown();
+ Thread.sleep(seconds * 1000);
+ tester.get().getServer().resumeProcessing();
+ LOG.info("bookie {} is awake", tester.get().getAddress());
+ } catch (Exception e) {
+ LOG.error("Error suspending bookie", e);
+ }
+ }
+ };
+ sleeper.start();
+ return latch;
+ } else {
+ throw new IOException("Bookie not found");
+ }
+ }
+
+ /**
+ * Sleep a bookie until I count down the latch.
+ *
+ * @param addr
+ * Socket Address
+ * @param l
+ * Latch to wait on
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void sleepBookie(BookieId addr, final CountDownLatch l)
+ throws InterruptedException, IOException {
+ final CountDownLatch suspendLatch = new CountDownLatch(1);
+ sleepBookie(addr, l, suspendLatch);
+ suspendLatch.await();
+ }
+
+ public void sleepBookie(BookieId addr, final CountDownLatch l, final CountDownLatch suspendLatch)
+ throws InterruptedException, IOException {
+ Optional tester = byAddress(addr);
+ if (tester.isPresent()) {
+ BookieServer bookie = tester.get().getServer();
+ LOG.info("Sleep bookie {}.", addr);
+ Thread sleeper = new Thread() {
+ @Override
+ public void run() {
+ try {
+ bookie.suspendProcessing();
+ if (null != suspendLatch) {
+ suspendLatch.countDown();
+ }
+ l.await();
+ bookie.resumeProcessing();
+ } catch (Exception e) {
+ LOG.error("Error suspending bookie", e);
+ }
+ }
+ };
+ sleeper.start();
+ } else {
+ throw new IOException("Bookie not found");
+ }
+ }
+
+ /**
+ * Restart bookie servers. Also restarts all the respective auto recovery
+ * process, if isAutoRecoveryEnabled is true.
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws BookieException
+ */
+ public void restartBookies()
+ throws Exception {
+ restartBookies(c -> c);
+ }
+
+ /**
+ * Restart a bookie. Also restart the respective auto recovery process,
+ * if isAutoRecoveryEnabled is true.
+ *
+ * @param addr
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws BookieException
+ */
+ public void restartBookie(BookieId addr) throws Exception {
+ OptionalInt toRemove = indexByAddress(addr);
+ if (toRemove.isPresent()) {
+ ServerConfiguration newConfig = killBookie(toRemove.getAsInt());
+ Thread.sleep(1000);
+ startAndAddBookie(newConfig);
+ } else {
+ throw new IOException("Bookie not found");
+ }
+ }
+
+ public void restartBookies(Function reconfFunction)
+ throws Exception {
+ // shut down bookie server
+ List confs = new ArrayList<>();
+ for (ServerTester server : servers) {
+ server.shutdown();
+ confs.add(server.getConfiguration());
+ }
+ servers.clear();
+ Thread.sleep(1000);
+ // restart them to ensure we can't
+ for (ServerConfiguration conf : confs) {
+ // ensure the bookie port is loaded correctly
+ startAndAddBookie(reconfFunction.apply(conf));
+ }
+ }
+
+ /**
+ * Helper method to startup a new bookie server with the indicated port
+ * number. Also, starts the auto recovery process, if the
+ * isAutoRecoveryEnabled is set true.
+ *
+ * @throws IOException
+ */
+ public int startNewBookie()
+ throws Exception {
+ return startNewBookieAndReturnAddress().getPort();
+ }
+
+ public BookieSocketAddress startNewBookieAndReturnAddress()
+ throws Exception {
+ ServerConfiguration conf = newServerConfiguration();
+ LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
+ return startAndAddBookie(conf).getServer().getLocalAddress();
+ }
+
+ public BookieId startNewBookieAndReturnBookieId()
+ throws Exception {
+ ServerConfiguration conf = newServerConfiguration();
+ LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
+ return startAndAddBookie(conf).getServer().getBookieId();
+ }
+
+ protected ServerTester startAndAddBookie(ServerConfiguration conf) throws Exception {
+ ServerTester server = startBookie(conf);
+ servers.add(server);
+ return server;
+ }
+
+ protected ServerTester startAndAddBookie(ServerConfiguration conf, Bookie b) throws Exception {
+ ServerTester server = startBookie(conf, b);
+ servers.add(server);
+ return server;
+ }
+ /**
+ * Helper method to startup a bookie server using a configuration object.
+ * Also, starts the auto recovery process if isAutoRecoveryEnabled is true.
+ *
+ * @param conf
+ * Server Configuration Object
+ *
+ */
+ protected ServerTester startBookie(ServerConfiguration conf)
+ throws Exception {
+ ServerTester tester = new ServerTester(conf);
+
+ if (bkc == null) {
+ bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
+ }
+
+ BookieId address = tester.getServer().getBookieId();
+ Future> waitForBookie = conf.isForceReadOnlyBookie()
+ ? bkc.waitForReadOnlyBookie(address)
+ : bkc.waitForWritableBookie(address);
+
+ tester.getServer().start();
+
+ waitForBookie.get(30, TimeUnit.SECONDS);
+ LOG.info("New bookie '{}' has been created.", address);
+
+ if (isAutoRecoveryEnabled()) {
+ tester.startAutoRecovery();
+ }
+
+ int port = conf.getBookiePort();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
+ + tester.getServer().getBookieId().toString(), false) == null) {
+ Thread.sleep(100);
+ }
+ return true;
+ });
+ bkc.readBookiesBlocking();
+
+ LOG.info("New bookie on port " + port + " has been created.");
+
+ return tester;
+ }
+
+ /**
+ * Start a bookie with the given bookie instance. Also, starts the auto
+ * recovery for this bookie, if isAutoRecoveryEnabled is true.
+ */
+ protected ServerTester startBookie(ServerConfiguration conf, final Bookie b)
+ throws Exception {
+ ServerTester tester = new ServerTester(conf, b);
+ if (bkc == null) {
+ bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
+ }
+ BookieId address = tester.getServer().getBookieId();
+ Future> waitForBookie = conf.isForceReadOnlyBookie()
+ ? bkc.waitForReadOnlyBookie(address)
+ : bkc.waitForWritableBookie(address);
+
+ tester.getServer().start();
+
+ waitForBookie.get(30, TimeUnit.SECONDS);
+
+ if (isAutoRecoveryEnabled()) {
+ tester.startAutoRecovery();
+ }
+
+ int port = conf.getBookiePort();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+ metadataStore.exists(
+ getLedgersRootPath() + "/available/" + address).join()
+ );
+ bkc.readBookiesBlocking();
+
+ LOG.info("New bookie '{}' has been created.", address);
+ return tester;
+ }
+
+ public void setMetastoreImplClass(AbstractConfiguration conf) {
+ conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
+ }
+
+ /**
+ * Flags used to enable/disable the auto recovery process. If it is enabled,
+ * starting the bookie server will starts the auto recovery process for that
+ * bookie. Also, stopping bookie will stops the respective auto recovery
+ * process.
+ *
+ * @param isAutoRecoveryEnabled
+ * Value true will enable the auto recovery process. Value false
+ * will disable the auto recovery process
+ */
+ public void setAutoRecoveryEnabled(boolean isAutoRecoveryEnabled) {
+ this.isAutoRecoveryEnabled = isAutoRecoveryEnabled;
+ }
+
+ /**
+ * Flag used to check whether auto recovery process is enabled/disabled. By
+ * default the flag is false.
+ *
+ * @return true, if the auto recovery is enabled. Otherwise return false.
+ */
+ public boolean isAutoRecoveryEnabled() {
+ return isAutoRecoveryEnabled;
+ }
+
+ /**
+ * Will starts the auto recovery process for the bookie servers. One auto
+ * recovery process per each bookie server, if isAutoRecoveryEnabled is
+ * enabled.
+ */
+ public void startReplicationService() throws Exception {
+ for (ServerTester t : servers) {
+ t.startAutoRecovery();
+ }
+ }
+
+ /**
+ * Will stops all the auto recovery processes for the bookie cluster, if
+ * isAutoRecoveryEnabled is true.
+ */
+ public void stopReplicationService() throws Exception{
+ for (ServerTester t : servers) {
+ t.stopAutoRecovery();
+ }
+ }
+
+ public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception {
+ final long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
+ while (System.nanoTime() < timeoutAt) {
+ for (ServerTester t : servers) {
+ Auditor a = t.getAuditor();
+ ReplicationWorker replicationWorker = t.getReplicationWorker();
+
+ // found a candidate Auditor + ReplicationWorker
+ if (a != null && a.isRunning()
+ && replicationWorker != null && replicationWorker.isRunning()) {
+ int deathWatchInterval = t.getConfiguration().getDeathWatchInterval();
+ Thread.sleep(deathWatchInterval + 1000);
+ }
+
+ // double check, because in the meantime AutoRecoveryDeathWatcher may have killed the
+ // AutoRecovery daemon
+ if (a != null && a.isRunning()
+ && replicationWorker != null && replicationWorker.isRunning()) {
+ LOG.info("Found Auditor Bookie {}", t.getServer().getBookieId());
+ return a;
+ }
+ }
+ Thread.sleep(100);
+ }
+ throw new Exception("No auditor found");
+ }
+
+ /**
+ * Check whether the InetSocketAddress was created using a hostname or an IP
+ * address. Represent as 'hostname/IPaddress' if the InetSocketAddress was
+ * created using hostname. Represent as '/IPaddress' if the
+ * InetSocketAddress was created using an IPaddress
+ *
+ * @param bookieId id
+ * @return true if the address was created using an IP address, false if the
+ * address was created using a hostname
+ */
+ public boolean isCreatedFromIp(BookieId bookieId) {
+ BookieSocketAddress addr = bkc.getBookieAddressResolver().resolve(bookieId);
+ return addr.getSocketAddress().toString().startsWith("/");
+ }
+
+ public void resetBookieOpLoggers() {
+ servers.forEach(t -> t.getStatsProvider().clear());
+ }
+
+ public TestStatsProvider getStatsProvider(BookieId addr) throws UnknownHostException {
+ return byAddress(addr).get().getStatsProvider();
+ }
+
+ public TestStatsProvider getStatsProvider(int index) throws Exception {
+ return servers.get(index).getStatsProvider();
+ }
+
+}
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index c8c76302b89e1..888303d3e665c 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -44,7 +44,6 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
index eb9f95ffdf7a5..1d5cf868cce65 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
@@ -35,7 +35,6 @@
import org.apache.bookkeeper.meta.LayoutManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.PulsarLayoutManager;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
index 8a2e7f2747a22..11797c8373715 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
@@ -27,7 +27,6 @@
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index ca02f91d1de36..7938feaba19fe 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -78,7 +78,6 @@
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java
new file mode 100644
index 0000000000000..5113edb72c49a
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from ZooKeeperUtil from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.bookkeeper.replication;
+
+import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.test.ZooKeeperCluster;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the zookeeper utilities.
+ */
+public class ZooKeeperUtil implements ZooKeeperCluster {
+
+ static {
+ // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+ // are disabled by default due to security reasons
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ }
+ static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class);
+
+ // ZooKeeper related variables
+ protected Integer zooKeeperPort = 0;
+ private InetSocketAddress zkaddr;
+
+ protected ZooKeeperServer zks;
+ protected ZooKeeper zkc; // zookeeper client
+ protected NIOServerCnxnFactory serverFactory;
+ protected File zkTmpDir;
+ private String connectString;
+ private String ledgersRootPath;
+
+ public ZooKeeperUtil(String ledgersRootPath) {
+ this.ledgersRootPath = ledgersRootPath;
+ String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress();
+ zkaddr = new InetSocketAddress(loopbackIPAddr, 0);
+ connectString = loopbackIPAddr + ":" + zooKeeperPort;
+ }
+
+ @Override
+ public ZooKeeper getZooKeeperClient() {
+ return zkc;
+ }
+
+ @Override
+ public String getZooKeeperConnectString() {
+ return connectString;
+ }
+
+ @Override
+ public String getMetadataServiceUri() {
+ return getMetadataServiceUri("/ledgers");
+ }
+
+ @Override
+ public String getMetadataServiceUri(String zkLedgersRootPath) {
+ return "zk://" + connectString + zkLedgersRootPath;
+ }
+
+ @Override
+ public String getMetadataServiceUri(String zkLedgersRootPath, String type) {
+ return "zk+" + type + "://" + connectString + zkLedgersRootPath;
+ }
+
+ @Override
+ public void startCluster() throws Exception {
+ // create a ZooKeeper server(dataDir, dataLogDir, port)
+ LOG.debug("Running ZK server");
+ ClientBase.setupTestEnv();
+ zkTmpDir = IOUtils.createTempDir("zookeeper", "test");
+
+ // start the server and client.
+ restartCluster();
+
+ // create default bk ensemble
+ createBKEnsemble(ledgersRootPath);
+ }
+
+ @Override
+ public void createBKEnsemble(String ledgersPath) throws KeeperException, InterruptedException {
+ int last = ledgersPath.lastIndexOf('/');
+ if (last > 0) {
+ String pathToCreate = ledgersPath.substring(0, last);
+ CompletableFuture future = new CompletableFuture<>();
+ if (zkc.exists(pathToCreate, false) == null) {
+ ZkUtils.asyncCreateFullPathOptimistic(zkc,
+ pathToCreate,
+ new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, (i, s, o, s1) -> {
+ future.complete(null);
+ }, null);
+ }
+ future.join();
+ }
+
+ ZooKeeperCluster.super.createBKEnsemble(ledgersPath);
+ }
+ @Override
+ public void restartCluster() throws Exception {
+ zks = new ZooKeeperServer(zkTmpDir, zkTmpDir,
+ ZooKeeperServer.DEFAULT_TICK_TIME);
+ serverFactory = new NIOServerCnxnFactory();
+ serverFactory.configure(zkaddr, 100);
+ serverFactory.startup(zks);
+
+ if (0 == zooKeeperPort) {
+ zooKeeperPort = serverFactory.getLocalPort();
+ zkaddr = new InetSocketAddress(zkaddr.getAddress().getHostAddress(), zooKeeperPort);
+ connectString = zkaddr.getAddress().getHostAddress() + ":" + zooKeeperPort;
+ }
+
+ boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(),
+ ClientBase.CONNECTION_TIMEOUT);
+ LOG.debug("Server up: " + b);
+
+ // create a zookeeper client
+ LOG.debug("Instantiate ZK Client");
+ zkc = ZooKeeperClient.newBuilder()
+ .connectString(getZooKeeperConnectString())
+ .sessionTimeoutMs(10000)
+ .build();
+ }
+
+ @Override
+ public void sleepCluster(final int time,
+ final TimeUnit timeUnit,
+ final CountDownLatch l)
+ throws InterruptedException, IOException {
+ Thread[] allthreads = new Thread[Thread.activeCount()];
+ Thread.enumerate(allthreads);
+ for (final Thread t : allthreads) {
+ if (t.getName().contains("SyncThread:0")) {
+ Thread sleeper = new Thread() {
+ @SuppressWarnings("deprecation")
+ public void run() {
+ try {
+ t.suspend();
+ l.countDown();
+ timeUnit.sleep(time);
+ t.resume();
+ } catch (Exception e) {
+ LOG.error("Error suspending thread", e);
+ }
+ }
+ };
+ sleeper.start();
+ return;
+ }
+ }
+ throw new IOException("ZooKeeper thread not found");
+ }
+
+ @Override
+ public void stopCluster() throws Exception {
+ if (zkc != null) {
+ zkc.close();
+ }
+
+ // shutdown ZK server
+ if (serverFactory != null) {
+ serverFactory.shutdown();
+ assertTrue(ClientBase.waitForServerDown(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT),
+ "waiting for server down");
+ }
+ if (zks != null) {
+ zks.getTxnLogFactory().close();
+ }
+ }
+
+ @Override
+ public void killCluster() throws Exception {
+ stopCluster();
+ FileUtils.deleteDirectory(zkTmpDir);
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 40c2041d4e6c4..43db5ad4ba845 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -70,7 +70,6 @@
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.metastore.InMemoryMetaStore;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -486,7 +485,7 @@ public ServerConfiguration killBookie(int index) throws Exception {
public ServerConfiguration killBookieAndWaitForZK(int index) throws Exception {
ServerTester tester = servers.get(index); // IKTODO: this method is awful
ServerConfiguration ret = killBookie(index);
- while (zkc.exists(ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/" + AVAILABLE_NODE + "/"
+ while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
+ tester.getServer().getBookieId().toString(), false) != null) {
Thread.sleep(500);
}
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index d023427e3be31..dd33c2c4532bf 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -52,7 +52,6 @@ public BookKeeperTestClient(ClientConfiguration conf, ZooKeeper zkc)
throws IOException, InterruptedException, BKException {
super(conf, zkc, null, new UnpooledByteBufAllocator(false),
NullStatsLogger.INSTANCE, null, null, null);
- this.statsProvider = statsProvider;
}
public BookKeeperTestClient(ClientConfiguration conf)