Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Immediately tombstone Deleted and Free state bundles #22743

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA
}

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker) {
Optional<String> destinationBroker,
boolean force) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -686,7 +687,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
log.warn(msg);
throw new IllegalArgumentException(msg);
}
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force);
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
}
// If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE.
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
Expand All @@ -866,9 +867,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
}

if (isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit, true)
: CompletableFuture.completedFuture(0), serviceUnit, data)
// If data.force(), try closeServiceUnit and tombstone the bundle.
CompletableFuture<Void> future =
(data.force() ? closeServiceUnit(serviceUnit, true)
.thenCompose(__ -> tombstoneAsync(serviceUnit))
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -880,9 +884,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted."));
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
tombstoneAsync(serviceUnit), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
return;
}
switch (state) {
case Deleted, Owned, Init -> this.complete(serviceUnit, t);
case Init -> this.complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
Expand All @@ -195,7 +195,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}

switch (state) {
case Free, Owned -> complete(serviceUnit, t);
case Free -> {
if (!data.force()) {
complete(serviceUnit, t);
}
}
case Init -> {
if (data.force()) {
complete(serviceUnit, t);
}
}
case Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1286,7 +1286,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
final String namespace = "public/testSplitBundleAdminAPI";
admin.namespaces().createNamespace(namespace, 1);
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
Expand Down Expand Up @@ -793,6 +794,30 @@ public boolean test(NamespaceBundle namespaceBundle) {
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Invalid bundle range"));
}


// delete and retry
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace);
});
admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 2);
});
}

@Test(timeOut = 30 * 1000)
Expand Down Expand Up @@ -1656,7 +1681,11 @@ public void testTryAcquiringOwnership()
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace, true);
});
}

@Test(timeOut = 30 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception {
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel1.publishSplitEventAsync(split);

waitUntilState(channel1, bundle, Deleted);
waitUntilState(channel2, bundle, Deleted);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
Expand Down Expand Up @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
1,
0,
0,
0,
0,
Expand Down Expand Up @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails()

var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;

waitUntilStateWithMonitor(leader, bundle, Deleted);
waitUntilStateWithMonitor(channel1, bundle, Deleted);
waitUntilStateWithMonitor(channel2, bundle, Deleted);
waitUntilStateWithMonitor(leader, bundle, Init);
waitUntilStateWithMonitor(channel1, bundle, Init);
waitUntilStateWithMonitor(channel2, bundle, Init);

var ownerAddr1 = channel1.getOwnerAsync(bundle);
var ownerAddr2 = channel2.getOwnerAsync(bundle);

assertTrue(ownerAddr1.isCompletedExceptionally());
assertTrue(ownerAddr2.isCompletedExceptionally());
assertTrue(ownerAddr1.get().isEmpty());
assertTrue(ownerAddr2.get().isEmpty());


FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
((ServiceUnitStateChannelImpl) leader)
.monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilState(leader, bundle3, Deleted);
waitUntilState(channel1, bundle3, Deleted);
waitUntilState(channel2, bundle3, Deleted);

waitUntilState(leader, bundle3, Init);
waitUntilState(channel1, bundle3, Init);
waitUntilState(channel2, bundle3, Init);


validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);

Expand Down Expand Up @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception {

validateMonitorCounters(leader,
0,
1,
0,
1,
0,
0,
Expand Down Expand Up @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData()
waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
Expand Down Expand Up @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData()
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1663,8 +1665,10 @@ public void testActiveGetOwner() throws Exception {
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(inFlightUnloadRequests.size(), 0);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
assertEquals(inFlightUnloadRequests.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
Expand All @@ -136,17 +140,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);

// Success with Free state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3);


}

@Test
Expand Down
Loading