Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][cli] Fix Pulsar standalone shutdown - bkCluster wasn't closed #22868

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,18 +420,22 @@ public void close() {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
fnWorkerService = null;
}

if (broker != null) {
broker.close();
broker = null;
}

if (bkCluster != null) {
bkCluster.close();
bkCluster = null;
}

if (bkEnsemble != null) {
bkEnsemble.stop();
bkEnsemble = null;
}
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
Expand Down Expand Up @@ -496,5 +500,11 @@ private static void processTerminator(int exitCode) {
ShutdownUtil.triggerImmediateForcefulShutdown(exitCode);
}

public String getBrokerServiceUrl() {
return broker.getBrokerServiceUrl();
}

public String getWebServiceUrl() {
return broker.getWebServiceAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.pulsar;

import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.FileInputStream;
import java.util.Arrays;
import lombok.AccessLevel;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -38,6 +41,9 @@ public class PulsarStandaloneStarter extends PulsarStandalone {

@Option(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;
private Thread shutdownThread;
@Setter(AccessLevel.PACKAGE)
private boolean testMode;

public PulsarStandaloneStarter(String[] args) throws Exception {

Expand Down Expand Up @@ -108,30 +114,54 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
}

@Override
public synchronized void start() throws Exception {
registerShutdownHook();
super.start();
}

protected void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (shutdownThread != null) {
throw new IllegalStateException("Shutdown hook already registered");
}
shutdownThread = new Thread(() -> {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
}

if (broker != null) {
broker.close();
}

if (bkEnsemble != null) {
bkEnsemble.stop();
}
doClose(false);
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
} finally {
LogManager.shutdown();
if (!testMode) {
LogManager.shutdown();
}
}
}));
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

// simulate running the shutdown hook, for testing
@VisibleForTesting
void runShutdownHook() {
if (!testMode) {
throw new IllegalStateException("Not in test mode");
}
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread.run();
shutdownThread = null;
}

@Override
public void close() {
doClose(true);
}

private synchronized void doClose(boolean removeShutdownHook) {
super.close();
if (shutdownThread != null && removeShutdownHook) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
shutdownThread = null;
}
}

protected void exit(int status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -46,12 +47,15 @@ public Object[][] enableBrokerClientAuth() {
@Test
public void testStandaloneWithRocksDB() throws Exception {
String[] args = new String[]{"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
"-nss",
"-nfw"};
final int bookieNum = 3;
final File tempDir = IOUtils.createTempDir("standalone", "test");

PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
standalone.setBkPort(0);
standalone.setNumOfBk(bookieNum);

standalone.startBookieWithMetadataStore();
Expand Down Expand Up @@ -90,11 +94,12 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
}
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
standalone.setBkPort(0);
standalone.setBkDir(bkDir.getAbsolutePath());
standalone.start();

@Cleanup PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:8080")
.serviceHttpUrl(standalone.getWebServiceUrl())
.authentication(new MockTokenAuthenticationProvider.MockAuthentication())
.build();
if (enableBrokerClientAuth) {
Expand All @@ -104,8 +109,8 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
} else {
assertTrue(admin.clusters().getClusters().isEmpty());
admin.clusters().createCluster("test_cluster", ClusterData.builder()
.serviceUrl("http://localhost:8080/")
.brokerServiceUrl("pulsar://localhost:6650/")
.serviceUrl(standalone.getWebServiceUrl())
.brokerServiceUrl(standalone.getBrokerServiceUrl())
.build());
assertTrue(admin.tenants().getTenants().isEmpty());
admin.tenants().createTenant("public", TenantInfo.builder()
Expand All @@ -125,4 +130,39 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
cleanDirectory(bkDir);
cleanDirectory(metadataDir);
}


@Test
public void testShutdownHookClosesBkCluster() throws Exception {
File dataDir = IOUtils.createTempDir("data", "");
File metadataDir = new File(dataDir, "metadata");
File bkDir = new File(dataDir, "bookkeeper");
@Cleanup
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
"-nss",
"-nfw",
"--metadata-dir",
metadataDir.getAbsolutePath(),
"--bookkeeper-dir",
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
standalone.setBkPort(0);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
assertTrue(bkCluster.isClosed());
}

@Test
public void testWipeData() throws Exception {
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[] {
"--config",
"./src/test/resources/configurations/standalone_no_client_auth.conf",
"--wipe-data"
});
assertTrue(standalone.isWipeData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
# under the License.
#

applicationName="pulsar_broker"
zookeeperServers="localhost"
configurationStoreServers="localhost"
applicationName=pulsar_broker
zookeeperServers=localhost
configurationStoreServers=localhost
brokerServicePort=6650
brokerServicePortTls=6651
brokerServicePortTls=
webServicePort=8080
webServicePortTls=4443
webServicePortTls=
httpMaxRequestHeaderSize=1234
bindAddress=0.0.0.0
advertisedAddress=
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -42,17 +42,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsCertificateFilePath=
tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -64,7 +64,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
# under the License.
#

applicationName="pulsar_broker"
metadataStoreUrl="zk:localhost:2181/ledger"
configurationMetadataStoreUrl="zk:localhost:2181"
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=4443
applicationName=pulsar_broker
metadataStoreUrl=zk:localhost:2181/ledger
configurationMetadataStoreUrl=zk:localhost:2181
brokerServicePort=0
brokerServicePortTls=
webServicePort=0
webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651
internalListenerName=internal
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -49,11 +49,11 @@ tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -65,7 +65,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
# under the License.
#

applicationName="pulsar_broker"
applicationName=pulsar_broker
metadataStoreUrl=
configurationMetadataStoreUrl=
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
brokerServicePort=0
brokerServicePortTls=
webServicePort=0
allowLoopback=true
webServicePortTls=4443
webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=
internalListenerName=internal
clusterName="test_cluster"
clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
Expand All @@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsCertificateFilePath=
tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
superUserRoles=test_user
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
Expand All @@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# under the License.
#

brokerServicePort=6650
webServicePort=8080
brokerServicePort=0
webServicePort=0
allowLoopback=true
clusterName=test_cluster
superUserRoles=admin
Expand Down
Loading
Loading