Skip to content

Commit

Permalink
[fix][broker] Update init and shutdown time and other minor logic (Ex…
Browse files Browse the repository at this point in the history
…tensibleLoadManagerImpl only) (apache#22930)
  • Loading branch information
heesung-sn committed Jun 18, 2024
1 parent bc3dc77 commit aa8f696
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ public static void createTenantIfAbsent(PulsarResources resources, String tenant
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
NamespaceResources namespaceResources = resources.getNamespaceResources();

if (!namespaceResources.namespaceExists(namespaceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,9 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker,
boolean force) {
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -691,7 +693,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
timeout, timeoutUnit);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
Expand Down Expand Up @@ -298,7 +297,8 @@ public synchronized void start() throws PulsarServerException {
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName());

PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(),
config.getDefaultNumberOfNamespaceBundles());

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

Expand Down Expand Up @@ -1018,6 +1018,9 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
if (ex != null) {
log.error("Failed to close topics under bundle:{} in {} ms",
bundle.toString(), unloadBundleTime, ex);
if (!disconnectClients) {
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
} else {
log.info("Unloading bundle:{} with {} topics completed in {} ms",
bundle, unloadedTopics, unloadBundleTime);
Expand Down Expand Up @@ -1342,11 +1345,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
if (cleaned) {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
}
break;
} else {
try {
Expand All @@ -1357,9 +1355,23 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup",
broker);
return;
}
} catch (Exception e) {
log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker);
return;
}

long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;

Expand All @@ -44,6 +43,7 @@
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {

private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
private static final long INIT_TIMEOUT_IN_SECS = 5;

private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
Expand Down Expand Up @@ -123,10 +123,11 @@ public synchronized void start() throws LoadDataStoreException {
public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp = System.currentTimeMillis());
} catch (PulsarClientException e) {
} catch (Exception e) {
tableView = null;
throw new LoadDataStoreException(e);
}
Expand All @@ -137,8 +138,9 @@ public synchronized void startTableView() throws LoadDataStoreException {
public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
} catch (PulsarClientException e) {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1290,7 +1290,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
nsBundle, Optional.empty(), true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void testStopBroker() throws PulsarServerException {
pulsar.close();
final var elapsedMs = System.currentTimeMillis() - beforeStop;
log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs);
Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
Assert.assertTrue(elapsedMs <
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes
}

Expand Down

0 comments on commit aa8f696

Please sign in to comment.