Skip to content

Commit

Permalink
[fix][cli] Fix Pulsar standalone shutdown - bkCluster wasn't closed (#…
Browse files Browse the repository at this point in the history
…22868)

(cherry picked from commit c5cc25e)
  • Loading branch information
lhotari committed Jun 10, 2024
1 parent ce9f15f commit 140685c
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,22 @@ public void close() {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
fnWorkerService = null;
}

if (broker != null) {
broker.close();
broker = null;
}

if (bkCluster != null) {
bkCluster.close();
bkCluster = null;
}

if (bkEnsemble != null) {
bkEnsemble.stop();
bkEnsemble = null;
}
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
Expand Down Expand Up @@ -493,5 +497,11 @@ private static void processTerminator(int exitCode) {
ShutdownUtil.triggerImmediateForcefulShutdown(exitCode);
}

public String getBrokerServiceUrl() {
return broker.getBrokerServiceUrl();
}

public String getWebServiceUrl() {
return broker.getWebServiceAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.FileInputStream;
import java.util.Arrays;
import lombok.AccessLevel;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -38,6 +41,9 @@ public class PulsarStandaloneStarter extends PulsarStandalone {

@Parameter(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;
private Thread shutdownThread;
@Setter(AccessLevel.PACKAGE)
private boolean testMode;

public PulsarStandaloneStarter(String[] args) throws Exception {

Expand Down Expand Up @@ -108,30 +114,54 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
}

@Override
public synchronized void start() throws Exception {
registerShutdownHook();
super.start();
}

protected void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (shutdownThread != null) {
throw new IllegalStateException("Shutdown hook already registered");
}
shutdownThread = new Thread(() -> {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
}

if (broker != null) {
broker.close();
}

if (bkEnsemble != null) {
bkEnsemble.stop();
}
doClose(false);
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
} finally {
LogManager.shutdown();
if (!testMode) {
LogManager.shutdown();
}
}
}));
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

// simulate running the shutdown hook, for testing
@VisibleForTesting
void runShutdownHook() {
if (!testMode) {
throw new IllegalStateException("Not in test mode");
}
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread.run();
shutdownThread = null;
}

@Override
public void close() {
doClose(true);
}

private synchronized void doClose(boolean removeShutdownHook) {
super.close();
if (shutdownThread != null && removeShutdownHook) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread = null;
}
}

protected void exit(int status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -46,12 +47,15 @@ public Object[][] enableBrokerClientAuth() {
@Test
public void testStandaloneWithRocksDB() throws Exception {
String[] args = new String[]{"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
"-nss",
"-nfw"};
final int bookieNum = 3;
final File tempDir = IOUtils.createTempDir("standalone", "test");

PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
standalone.setBkPort(0);
standalone.setNumOfBk(bookieNum);

standalone.startBookieWithMetadataStore();
Expand Down Expand Up @@ -90,11 +94,12 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
}
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
standalone.setBkPort(0);
standalone.setBkDir(bkDir.getAbsolutePath());
standalone.start();

@Cleanup PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:8080")
.serviceHttpUrl(standalone.getWebServiceUrl())
.authentication(new MockTokenAuthenticationProvider.MockAuthentication())
.build();
if (enableBrokerClientAuth) {
Expand All @@ -104,8 +109,8 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
} else {
assertTrue(admin.clusters().getClusters().isEmpty());
admin.clusters().createCluster("test_cluster", ClusterData.builder()
.serviceUrl("http://localhost:8080/")
.brokerServiceUrl("pulsar://localhost:6650/")
.serviceUrl(standalone.getWebServiceUrl())
.brokerServiceUrl(standalone.getBrokerServiceUrl())
.build());
assertTrue(admin.tenants().getTenants().isEmpty());
admin.tenants().createTenant("public", TenantInfo.builder()
Expand All @@ -125,4 +130,39 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
cleanDirectory(bkDir);
cleanDirectory(metadataDir);
}


@Test
public void testShutdownHookClosesBkCluster() throws Exception {
File dataDir = IOUtils.createTempDir("data", "");
File metadataDir = new File(dataDir, "metadata");
File bkDir = new File(dataDir, "bookkeeper");
@Cleanup
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
"-nss",
"-nfw",
"--metadata-dir",
metadataDir.getAbsolutePath(),
"--bookkeeper-dir",
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
standalone.setBkPort(0);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
assertTrue(bkCluster.isClosed());
}

@Test
public void testWipeData() throws Exception {
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/standalone_no_client_auth.conf",
"--wipe-data"
});
assertTrue(standalone.isWipeData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
# under the License.
#

applicationName="pulsar_broker"
zookeeperServers="localhost"
configurationStoreServers="localhost"
applicationName=pulsar_broker
zookeeperServers=localhost
configurationStoreServers=localhost
brokerServicePort=6650
brokerServicePortTls=6651
brokerServicePortTls=
webServicePort=8080
webServicePortTls=4443
webServicePortTls=
httpMaxRequestHeaderSize=1234
bindAddress=0.0.0.0
advertisedAddress=
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -42,17 +42,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsCertificateFilePath=
tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -64,7 +64,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
# under the License.
#

applicationName="pulsar_broker"
metadataStoreUrl="zk:localhost:2181/ledger"
configurationMetadataStoreUrl="zk:localhost:2181"
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=4443
applicationName=pulsar_broker
metadataStoreUrl=zk:localhost:2181/ledger
configurationMetadataStoreUrl=zk:localhost:2181
brokerServicePort=0
brokerServicePortTls=
webServicePort=0
webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651
internalListenerName=internal
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -49,11 +49,11 @@ tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -65,7 +65,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
# under the License.
#

applicationName="pulsar_broker"
applicationName=pulsar_broker
metadataStoreUrl=
configurationMetadataStoreUrl=
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
brokerServicePort=0
brokerServicePortTls=
webServicePort=0
allowLoopback=true
webServicePortTls=4443
webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=
internalListenerName=internal
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsCertificateFilePath=
tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# under the License.
#

brokerServicePort=6650
webServicePort=8080
brokerServicePort=0
webServicePort=0
allowLoopback=true
clusterName=test_cluster
superUserRoles=admin
Expand Down
Loading

0 comments on commit 140685c

Please sign in to comment.