Skip to content

Commit

Permalink
updated the tombstone logic
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed May 20, 2024
1 parent 03f513b commit 4dcd4ac
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA
}

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker) {
Optional<String> destinationBroker,
boolean force) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -686,7 +687,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
log.warn(msg);
throw new IllegalArgumentException(msg);
}
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force);
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,11 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
}

if (isTargetBroker(data.sourceBroker())) {
// If data.force() is true, it means that this Free state is from the orphan cleanup job, where
// the source broker is likely unavailable. In this case, we don't tombstone it immediately.
// If data.force(), try closeServiceUnit and tombstone the bundle.
CompletableFuture<Void> future =
(data.force() ? closeServiceUnit(serviceUnit, true)
: tombstoneAsync(serviceUnit)).thenApply(__ -> null);
.thenCompose(__ -> tombstoneAsync(serviceUnit))
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}

switch (state) {
case Init, Owned -> complete(serviceUnit, t);
case Free -> {
if (!data.force()) {
complete(serviceUnit, t);
}
}
case Init -> {
if (data.force()) {
complete(serviceUnit, t);
}
}
case Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1286,7 +1286,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,8 @@ public void unloadTest()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);

Expand All @@ -1054,10 +1054,10 @@ public void unloadTest()

channel2.publishUnloadEventAsync(unload2);

waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);

// test monitor if Init -> Init
// test monitor if Free -> Init
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
FieldUtils.writeDeclaredField(channel1,
Expand All @@ -1078,7 +1078,7 @@ public void unloadTest()
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
0,
1,
0,
0,
0,
Expand All @@ -1105,8 +1105,8 @@ public void assignTestWhenDestBrokerProducerFails()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);

assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());
assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get());
Expand Down Expand Up @@ -1188,8 +1188,8 @@ public void splitTestWhenProducerFails()

channel1.publishUnloadEventAsync(unload);

waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);

channel1.publishAssignEventAsync(bundle, brokerId1);

Expand Down Expand Up @@ -1665,6 +1665,9 @@ public void testActiveGetOwner() throws Exception {
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);

Expand All @@ -137,17 +140,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);

// Success with Free state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3);


}

@Test
Expand Down

0 comments on commit 4dcd4ac

Please sign in to comment.