Skip to content

Commit

Permalink
[fix][broker] Tombsotne Deleted and Free state bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed May 18, 2024
1 parent 4593cc3 commit d7ffdfc
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -834,19 +834,17 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.sourceBroker())) {
ServiceUnitStateData next;
CompletableFuture<Integer> unloadFuture;
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
// If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE.
var disconnectClients = !pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
unloadFuture = closeServiceUnit(serviceUnit, disconnectClients);

} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
}
stateChangeListeners.notifyOnCompletion(unloadFuture
var disconnectClients = !pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
// If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE.
stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit, disconnectClients)
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
Expand All @@ -866,9 +864,13 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
}

if (isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit, true)
: CompletableFuture.completedFuture(0), serviceUnit, data)
CompletableFuture<Integer> unloadFuture = closeServiceUnit(serviceUnit, true);
// If data.force() is true, it means that this Free state is from the orphan cleanup job, where
// the source broker is likely unavailable. In this case, we don't tombstone it immediately.
CompletableFuture<Void> future =
(data.force() ? unloadFuture
: unloadFuture.thenCompose(__ -> tombstoneAsync(serviceUnit))).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -880,9 +882,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted."));
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
tombstoneAsync(serviceUnit), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
Expand All @@ -195,7 +195,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}

switch (state) {
case Free, Owned -> complete(serviceUnit, t);
case Init, Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 1);
assertEquals(unloadCount.get(), 1);
assertEquals(unloadCount.get(), 2); //one from releasing and one from Free
});

broker = admin.lookups().lookupTopic(topicName.toString());
Expand All @@ -385,7 +385,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
Awaitility.await().untilAsserted(() -> {
checkOwnershipState(finalBroker, bundle);
assertEquals(onloadCount.get(), 2);
assertEquals(unloadCount.get(), 1);
assertEquals(unloadCount.get(), 2);
});


Expand All @@ -402,7 +402,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl);
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 3);
assertEquals(unloadCount.get(), 3); //one from releasing and one from owned
assertEquals(unloadCount.get(), 4); //one from releasing and one from owned
});

assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl);
Expand Down Expand Up @@ -739,7 +739,8 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
final String namespace = "public/testSplitBundleAdminAPI";
admin.namespaces().createNamespace(namespace, 1);
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
Expand Down Expand Up @@ -793,6 +794,30 @@ public boolean test(NamespaceBundle namespaceBundle) {
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Invalid bundle range"));
}


// delete and retry
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace);
});
admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 2);
});
}

@Test(timeOut = 30 * 1000)
Expand Down Expand Up @@ -1656,7 +1681,11 @@ public void testTryAcquiringOwnership()
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace, true);
});
}

@Test(timeOut = 30 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception {
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel1.publishSplitEventAsync(split);

waitUntilState(channel1, bundle, Deleted);
waitUntilState(channel2, bundle, Deleted);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
Expand Down Expand Up @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
1,
0,
0,
0,
0,
Expand Down Expand Up @@ -1032,8 +1032,8 @@ public void unloadTest()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);

Expand All @@ -1054,10 +1054,10 @@ public void unloadTest()

channel2.publishUnloadEventAsync(unload2);

waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

// test monitor if Free -> Init
// test monitor if Init -> Init
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
FieldUtils.writeDeclaredField(channel1,
Expand All @@ -1078,7 +1078,7 @@ public void unloadTest()
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
1,
0,
0,
0,
0,
Expand All @@ -1105,8 +1105,8 @@ public void assignTestWhenDestBrokerProducerFails()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());
assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get());
Expand Down Expand Up @@ -1188,8 +1188,8 @@ public void splitTestWhenProducerFails()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

channel1.publishAssignEventAsync(bundle, brokerId1);

Expand Down Expand Up @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails()

var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;

waitUntilStateWithMonitor(leader, bundle, Deleted);
waitUntilStateWithMonitor(channel1, bundle, Deleted);
waitUntilStateWithMonitor(channel2, bundle, Deleted);
waitUntilStateWithMonitor(leader, bundle, Init);
waitUntilStateWithMonitor(channel1, bundle, Init);
waitUntilStateWithMonitor(channel2, bundle, Init);

var ownerAddr1 = channel1.getOwnerAsync(bundle);
var ownerAddr2 = channel2.getOwnerAsync(bundle);

assertTrue(ownerAddr1.isCompletedExceptionally());
assertTrue(ownerAddr2.isCompletedExceptionally());
assertTrue(ownerAddr1.get().isEmpty());
assertTrue(ownerAddr2.get().isEmpty());


FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
((ServiceUnitStateChannelImpl) leader)
.monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilState(leader, bundle3, Deleted);
waitUntilState(channel1, bundle3, Deleted);
waitUntilState(channel2, bundle3, Deleted);

waitUntilState(leader, bundle3, Init);
waitUntilState(channel1, bundle3, Init);
waitUntilState(channel2, bundle3, Init);



validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);

Expand Down Expand Up @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception {

validateMonitorCounters(leader,
0,
1,
0,
1,
0,
0,
Expand Down Expand Up @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData()
waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
Expand Down Expand Up @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData()
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1664,7 +1666,6 @@ public void testActiveGetOwner() throws Exception {
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);

// simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);

Expand Down

0 comments on commit d7ffdfc

Please sign in to comment.