Skip to content

Commit

Permalink
[fix][cli] Fix Pulsar standalone shutdown - bkCluster.close() wasn't …
Browse files Browse the repository at this point in the history
…called
  • Loading branch information
lhotari committed Jun 7, 2024
1 parent 9326a08 commit f75bce2
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,18 +420,22 @@ public void close() {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
fnWorkerService = null;
}

if (broker != null) {
broker.close();
broker = null;
}

if (bkCluster != null) {
bkCluster.close();
bkCluster = null;
}

if (bkEnsemble != null) {
bkEnsemble.stop();
bkEnsemble = null;
}
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.pulsar;

import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.FileInputStream;
import java.util.Arrays;
import lombok.AccessLevel;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -38,6 +41,9 @@ public class PulsarStandaloneStarter extends PulsarStandalone {

@Option(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;
private Thread shutdownThread;
@Setter(AccessLevel.PACKAGE)
private boolean testMode;

public PulsarStandaloneStarter(String[] args) throws Exception {

Expand Down Expand Up @@ -108,30 +114,54 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
}

@Override
public synchronized void start() throws Exception {
registerShutdownHook();
super.start();
}

protected void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (shutdownThread != null) {
throw new IllegalStateException("Shutdown hook already registered");
}
shutdownThread = new Thread(() -> {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
}

if (broker != null) {
broker.close();
}

if (bkEnsemble != null) {
bkEnsemble.stop();
}
doClose(false);
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
} finally {
LogManager.shutdown();
if (!testMode) {
LogManager.shutdown();
}
}
}));
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

// simulate running the shutdown hook, for testing
@VisibleForTesting
void runShutdownHook() {
if (!testMode) {
throw new IllegalStateException("Not in test mode");
}
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread.run();
shutdownThread = null;
}

@Override
public void close() {
doClose(true);
}

private synchronized void doClose(boolean removeShutdownHook) {
super.close();
if (shutdownThread != null && removeShutdownHook) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread = null;
}
}

protected void exit(int status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -125,4 +126,38 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
cleanDirectory(bkDir);
cleanDirectory(metadataDir);
}


@Test
public void testShutdownHookClosesBkCluster() throws Exception {
File dataDir = IOUtils.createTempDir("data", "");
File metadataDir = new File(dataDir, "metadata");
File bkDir = new File(dataDir, "bookkeeper");
@Cleanup
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
"-nss",
"-nfw",
"--metadata-dir",
metadataDir.getAbsolutePath(),
"--bookkeeper-dir",
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
assertTrue(bkCluster.isClosed());
}

@Test
public void testWipeData() throws Exception {
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/standalone_no_client_auth.conf",
"--wipe-data"
});
assertTrue(standalone.isWipeData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
# under the License.
#

applicationName="pulsar_broker"
applicationName=pulsar_broker
metadataStoreUrl=
configurationMetadataStoreUrl=
brokerServicePort=6650
brokerServicePortTls=6651
brokerServicePortTls=
webServicePort=8080
allowLoopback=true
webServicePortTls=4443
webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=
internalListenerName=internal
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsCertificateFilePath=
tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Getter;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class BKCluster implements AutoCloseable {
protected final ServerConfiguration baseConf;
protected final ClientConfiguration baseClientConf;

private final AtomicBoolean closed = new AtomicBoolean(false);

public static class BKClusterConf {

private ServerConfiguration baseServerConfiguration;
Expand Down Expand Up @@ -148,20 +151,22 @@ private BKCluster(BKClusterConf bkClusterConf) throws Exception {

@Override
public void close() throws Exception {
// stop bookkeeper service
try {
stopBKCluster();
} catch (Exception e) {
log.error("Got Exception while trying to stop BKCluster", e);
}
// cleanup temp dirs
try {
cleanupTempDirs();
} catch (Exception e) {
log.error("Got Exception while trying to cleanupTempDirs", e);
}
if (!closed.compareAndSet(false, true)) {
// stop bookkeeper service
try {
stopBKCluster();
} catch (Exception e) {
log.error("Got Exception while trying to stop BKCluster", e);
}
// cleanup temp dirs
try {
cleanupTempDirs();
} catch (Exception e) {
log.error("Got Exception while trying to cleanupTempDirs", e);
}

this.store.close();
this.store.close();
}
}

private File createTempDir(String prefix, String suffix) throws IOException {
Expand Down Expand Up @@ -399,4 +404,8 @@ private static ServerConfiguration setLoopbackInterfaceAndAllowLoopback(ServerCo
serverConf.setAllowLoopback(true);
return serverConf;
}

public boolean isClosed() {
return closed.get();
}
}

0 comments on commit f75bce2

Please sign in to comment.