diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index b785448cdacaf..7f80aa29f53d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -420,18 +420,22 @@ public void close() { try { if (fnWorkerService != null) { fnWorkerService.stop(); + fnWorkerService = null; } if (broker != null) { broker.close(); + broker = null; } if (bkCluster != null) { bkCluster.close(); + bkCluster = null; } if (bkEnsemble != null) { bkEnsemble.stop(); + bkEnsemble = null; } } catch (Exception e) { log.error("Shutdown failed: {}", e.getMessage(), e); @@ -496,5 +500,11 @@ private static void processTerminator(int exitCode) { ShutdownUtil.triggerImmediateForcefulShutdown(exitCode); } + public String getBrokerServiceUrl() { + return broker.getBrokerServiceUrl(); + } + public String getWebServiceUrl() { + return broker.getWebServiceAddress(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index 0ab731591da14..29feac8cb46eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -19,9 +19,12 @@ package org.apache.pulsar; import static org.apache.commons.lang3.StringUtils.isBlank; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import java.io.FileInputStream; import java.util.Arrays; +import lombok.AccessLevel; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -38,6 +41,9 @@ public class PulsarStandaloneStarter extends PulsarStandalone { @Option(names = {"-g", "--generate-docs"}, description = "Generate docs") private boolean generateDocs = false; + private Thread shutdownThread; + @Setter(AccessLevel.PACKAGE) + private boolean testMode; public PulsarStandaloneStarter(String[] args) throws Exception { @@ -108,30 +114,54 @@ public PulsarStandaloneStarter(String[] args) throws Exception { } } } + } + @Override + public synchronized void start() throws Exception { registerShutdownHook(); + super.start(); } protected void registerShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (shutdownThread != null) { + throw new IllegalStateException("Shutdown hook already registered"); + } + shutdownThread = new Thread(() -> { try { - if (fnWorkerService != null) { - fnWorkerService.stop(); - } - - if (broker != null) { - broker.close(); - } - - if (bkEnsemble != null) { - bkEnsemble.stop(); - } + doClose(false); } catch (Exception e) { log.error("Shutdown failed: {}", e.getMessage(), e); } finally { - LogManager.shutdown(); + if (!testMode) { + LogManager.shutdown(); + } } - })); + }); + Runtime.getRuntime().addShutdownHook(shutdownThread); + } + + // simulate running the shutdown hook, for testing + @VisibleForTesting + void runShutdownHook() { + if (!testMode) { + throw new IllegalStateException("Not in test mode"); + } + Runtime.getRuntime().removeShutdownHook(shutdownThread); + shutdownThread.run(); + shutdownThread = null; + } + + @Override + public void close() { + doClose(true); + } + + private synchronized void doClose(boolean removeShutdownHook) { + super.close(); + if (shutdownThread != null && removeShutdownHook) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + shutdownThread = null; + } } protected void exit(int status) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java index 6ed93a75a3fb5..3d22feb822e32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.metadata.bookkeeper.BKCluster; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -46,12 +47,15 @@ public Object[][] enableBrokerClientAuth() { @Test public void testStandaloneWithRocksDB() throws Exception { String[] args = new String[]{"--config", - "./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"}; + "./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf", + "-nss", + "-nfw"}; final int bookieNum = 3; final File tempDir = IOUtils.createTempDir("standalone", "test"); PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args); standalone.setBkDir(tempDir.getAbsolutePath()); + standalone.setBkPort(0); standalone.setNumOfBk(bookieNum); standalone.startBookieWithMetadataStore(); @@ -90,11 +94,12 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex } final File bkDir = IOUtils.createTempDir("standalone", "bk"); standalone.setNumOfBk(1); + standalone.setBkPort(0); standalone.setBkDir(bkDir.getAbsolutePath()); standalone.start(); @Cleanup PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl("http://localhost:8080") + .serviceHttpUrl(standalone.getWebServiceUrl()) .authentication(new MockTokenAuthenticationProvider.MockAuthentication()) .build(); if (enableBrokerClientAuth) { @@ -104,8 +109,8 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex } else { assertTrue(admin.clusters().getClusters().isEmpty()); admin.clusters().createCluster("test_cluster", ClusterData.builder() - .serviceUrl("http://localhost:8080/") - .brokerServiceUrl("pulsar://localhost:6650/") + .serviceUrl(standalone.getWebServiceUrl()) + .brokerServiceUrl(standalone.getBrokerServiceUrl()) .build()); assertTrue(admin.tenants().getTenants().isEmpty()); admin.tenants().createTenant("public", TenantInfo.builder() @@ -125,4 +130,39 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex cleanDirectory(bkDir); cleanDirectory(metadataDir); } + + + @Test + public void testShutdownHookClosesBkCluster() throws Exception { + File dataDir = IOUtils.createTempDir("data", ""); + File metadataDir = new File(dataDir, "metadata"); + File bkDir = new File(dataDir, "bookkeeper"); + @Cleanup + PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] { + "--config", + "./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf", + "-nss", + "-nfw", + "--metadata-dir", + metadataDir.getAbsolutePath(), + "--bookkeeper-dir", + bkDir.getAbsolutePath() + }); + standalone.setTestMode(true); + standalone.setBkPort(0); + standalone.start(); + BKCluster bkCluster = standalone.bkCluster; + standalone.runShutdownHook(); + assertTrue(bkCluster.isClosed()); + } + + @Test + public void testWipeData() throws Exception { + PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] { + "--config", + "./src/test/resources/configurations/standalone_no_client_auth.conf", + "--wipe-data" + }); + assertTrue(standalone.isWipeData()); + } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index f2316111f8017..ddda30d0a4bd9 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -17,17 +17,17 @@ # under the License. # -applicationName="pulsar_broker" -zookeeperServers="localhost" -configurationStoreServers="localhost" +applicationName=pulsar_broker +zookeeperServers=localhost +configurationStoreServers=localhost brokerServicePort=6650 -brokerServicePortTls=6651 +brokerServicePortTls= webServicePort=8080 -webServicePortTls=4443 +webServicePortTls= httpMaxRequestHeaderSize=1234 bindAddress=0.0.0.0 advertisedAddress= -clusterName="test_cluster" +clusterName=test_cluster brokerShutdownTimeoutMs=3000 backlogQuotaCheckEnabled=true backlogQuotaCheckIntervalInSeconds=60 @@ -42,17 +42,17 @@ clientLibraryVersionCheckEnabled=false clientLibraryVersionCheckAllowUnversioned=true statusFilePath=/tmp/status.html tlsEnabled=false -tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt -tlsKeyFilePath=/home/local/conf/pulsar/server.key +tlsCertificateFilePath= +tlsKeyFilePath= tlsTrustCertsFilePath= tlsAllowInsecureConnection=false authenticationEnabled=false authorizationEnabled=false -superUserRoles="test_user" -brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled" +superUserRoles=test_user +brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled brokerClientAuthenticationParameters= -bookkeeperClientAuthenticationPlugin="test_auth_plugin" -bookkeeperClientAuthenticationAppId="test_auth_id" +bookkeeperClientAuthenticationPlugin= +bookkeeperClientAuthenticationAppId=test_auth_id bookkeeperClientTimeoutInSeconds=30 bookkeeperClientSpeculativeReadTimeoutInMillis=0 bookkeeperClientHealthCheckEnabled=true @@ -64,7 +64,7 @@ bookkeeperClientRegionawarePolicyEnabled=false bookkeeperClientMinNumRacksPerWriteQuorum=2 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false bookkeeperClientReorderReadSequenceEnabled=false -bookkeeperClientIsolationGroups="test_group" +bookkeeperClientIsolationGroups=test_group managedLedgerDefaultEnsembleSize=3 managedLedgerDefaultWriteQuorum=2 managedLedgerDefaultAckQuorum=2 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 4a40d9f0c6565..812c8dc9748f9 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -17,18 +17,18 @@ # under the License. # -applicationName="pulsar_broker" -metadataStoreUrl="zk:localhost:2181/ledger" -configurationMetadataStoreUrl="zk:localhost:2181" -brokerServicePort=6650 -brokerServicePortTls=6651 -webServicePort=8080 -webServicePortTls=4443 +applicationName=pulsar_broker +metadataStoreUrl=zk:localhost:2181/ledger +configurationMetadataStoreUrl=zk:localhost:2181 +brokerServicePort=0 +brokerServicePortTls= +webServicePort=0 +webServicePortTls= bindAddress=0.0.0.0 advertisedAddress= advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651 internalListenerName=internal -clusterName="test_cluster" +clusterName=test_cluster brokerShutdownTimeoutMs=3000 backlogQuotaCheckEnabled=true backlogQuotaCheckIntervalInSeconds=60 @@ -49,11 +49,11 @@ tlsTrustCertsFilePath= tlsAllowInsecureConnection=false authenticationEnabled=false authorizationEnabled=false -superUserRoles="test_user" -brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled" +superUserRoles=test_user +brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled brokerClientAuthenticationParameters= -bookkeeperClientAuthenticationPlugin="test_auth_plugin" -bookkeeperClientAuthenticationAppId="test_auth_id" +bookkeeperClientAuthenticationPlugin= +bookkeeperClientAuthenticationAppId= bookkeeperClientTimeoutInSeconds=30 bookkeeperClientSpeculativeReadTimeoutInMillis=0 bookkeeperClientHealthCheckEnabled=true @@ -65,7 +65,7 @@ bookkeeperClientRegionawarePolicyEnabled=false bookkeeperClientMinNumRacksPerWriteQuorum=2 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false bookkeeperClientReorderReadSequenceEnabled=false -bookkeeperClientIsolationGroups="test_group" +bookkeeperClientIsolationGroups= managedLedgerDefaultEnsembleSize=3 managedLedgerDefaultWriteQuorum=2 managedLedgerDefaultAckQuorum=2 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf index d8b26bbbfa99d..46c876686b05b 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf @@ -17,19 +17,19 @@ # under the License. # -applicationName="pulsar_broker" +applicationName=pulsar_broker metadataStoreUrl= configurationMetadataStoreUrl= -brokerServicePort=6650 -brokerServicePortTls=6651 -webServicePort=8080 +brokerServicePort=0 +brokerServicePortTls= +webServicePort=0 allowLoopback=true -webServicePortTls=4443 +webServicePortTls= bindAddress=0.0.0.0 advertisedAddress= advertisedListeners= internalListenerName=internal -clusterName="test_cluster" +clusterName=test_cluster brokerShutdownTimeoutMs=3000 backlogQuotaCheckEnabled=true backlogQuotaCheckIntervalInSeconds=60 @@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false clientLibraryVersionCheckAllowUnversioned=true statusFilePath=/tmp/status.html tlsEnabled=false -tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt -tlsKeyFilePath=/home/local/conf/pulsar/server.key +tlsCertificateFilePath= +tlsKeyFilePath= tlsTrustCertsFilePath= tlsAllowInsecureConnection=false authenticationEnabled=false authorizationEnabled=false -superUserRoles="test_user" -brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled" +superUserRoles=test_user +brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled brokerClientAuthenticationParameters= -bookkeeperClientAuthenticationPlugin="test_auth_plugin" -bookkeeperClientAuthenticationAppId="test_auth_id" +bookkeeperClientAuthenticationPlugin= +bookkeeperClientAuthenticationAppId=test_auth_id bookkeeperClientTimeoutInSeconds=30 bookkeeperClientSpeculativeReadTimeoutInMillis=0 bookkeeperClientHealthCheckEnabled=true @@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false bookkeeperClientMinNumRacksPerWriteQuorum=2 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false bookkeeperClientReorderReadSequenceEnabled=false -bookkeeperClientIsolationGroups="test_group" +bookkeeperClientIsolationGroups=test_group managedLedgerDefaultEnsembleSize=3 managedLedgerDefaultWriteQuorum=2 managedLedgerDefaultAckQuorum=2 diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf index 4e2fd40298354..6f0d82cef17bc 100644 --- a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf +++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf @@ -17,8 +17,8 @@ # under the License. # -brokerServicePort=6650 -webServicePort=8080 +brokerServicePort=0 +webServicePort=0 allowLoopback=true clusterName=test_cluster superUserRoles=admin diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java index c2f3f72ec21c0..8d3a90239efd3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.Getter; @@ -49,8 +50,8 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain; import org.apache.bookkeeper.server.conf.BookieConfiguration; import org.apache.bookkeeper.util.IOUtils; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -74,6 +75,9 @@ public class BKCluster implements AutoCloseable { protected final ServerConfiguration baseConf; protected final ClientConfiguration baseClientConf; + private final List lockedPorts = new ArrayList<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + public static class BKClusterConf { private ServerConfiguration baseServerConfiguration; @@ -148,20 +152,24 @@ private BKCluster(BKClusterConf bkClusterConf) throws Exception { @Override public void close() throws Exception { - // stop bookkeeper service - try { - stopBKCluster(); - } catch (Exception e) { - log.error("Got Exception while trying to stop BKCluster", e); - } - // cleanup temp dirs - try { - cleanupTempDirs(); - } catch (Exception e) { - log.error("Got Exception while trying to cleanupTempDirs", e); - } + if (closed.compareAndSet(false, true)) { + // stop bookkeeper service + try { + stopBKCluster(); + } catch (Exception e) { + log.error("Got Exception while trying to stop BKCluster", e); + } + lockedPorts.forEach(PortManager::releaseLockedPort); + lockedPorts.clear(); + // cleanup temp dirs + try { + cleanupTempDirs(); + } catch (Exception e) { + log.error("Got Exception while trying to cleanupTempDirs", e); + } - this.store.close(); + this.store.close(); + } } private File createTempDir(String prefix, String suffix) throws IOException { @@ -229,7 +237,8 @@ private ServerConfiguration newServerConfiguration(int index) throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) { - port = PortManager.nextFreePort(); + port = PortManager.nextLockedFreePort(); + lockedPorts.add(port); } else { // bk 4.15 cookie validation finds the same ip:port in case of port 0 // and 2nd bookie's cookie validation fails @@ -399,4 +408,8 @@ private static ServerConfiguration setLoopbackInterfaceAndAllowLoopback(ServerCo serverConf.setAllowLoopback(true); return serverConf; } + + public boolean isClosed() { + return closed.get(); + } }