From 8b626bdef6ef08bb8c91833c31b31aad2e487f07 Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Thu, 13 Jun 2024 12:26:40 -0700 Subject: [PATCH] [fix][broker] Asynchronously return brokerRegistry.lookupAsync when checking if broker is active(ExtensibleLoadManagerImpl only) (#22899) (cherry picked from commit c2702e9bc46c444cbc99f4b64cb453c622b56c26) --- .../channel/ServiceUnitStateChannelImpl.java | 84 +++++++++++-------- .../channel/ServiceUnitStateChannelTest.java | 51 +++++++++-- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 9d4bdecc41c53..a15f2ae938b26 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -484,7 +484,7 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { - return deferGetOwnerRequest(serviceUnit) + return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) { return CompletableFuture.completedFuture(null); @@ -622,7 +622,7 @@ public CompletableFuture publishAssignEventAsync(String serviceUnit, Str } EventType eventType = Assign; eventCounters.get(eventType).getTotal().incrementAndGet(); - CompletableFuture getOwnerRequest = deferGetOwnerRequest(serviceUnit); + CompletableFuture getOwnerRequest = dedupeGetOwnerRequest(serviceUnit); pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit))) .whenComplete((__, ex) -> { @@ -932,44 +932,54 @@ private boolean isTargetBroker(String broker) { return broker.equals(brokerId); } - private CompletableFuture deferGetOwnerRequest(String serviceUnit) { + private CompletableFuture deferGetOwner(String serviceUnit) { + var future = new CompletableFuture().orTimeout(inFlightStateWaitingTimeInMillis, + TimeUnit.MILLISECONDS) + .exceptionally(e -> { + var ownerAfter = getOwner(serviceUnit); + log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to " + + "return the current owner:{}", + brokerId, serviceUnit, ownerAfter, e); + if (ownerAfter == null) { + throw new IllegalStateException(e); + } + return ownerAfter.orElse(null); + }); + if (debug()) { + log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit); + } + return future; + } + + private CompletableFuture dedupeGetOwnerRequest(String serviceUnit) { var requested = new MutableObject>(); try { - return getOwnerRequests - .computeIfAbsent(serviceUnit, k -> { - var ownerBefore = getOwner(serviceUnit); - if (ownerBefore != null && ownerBefore.isPresent()) { - // Here, we do a quick active check first with the computeIfAbsent lock - brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty()) - .ifPresent(__ -> requested.setValue( - CompletableFuture.completedFuture(ownerBefore.get()))); - - if (requested.getValue() != null) { - return requested.getValue(); - } - } - - - CompletableFuture future = - new CompletableFuture().orTimeout(inFlightStateWaitingTimeInMillis, - TimeUnit.MILLISECONDS) - .exceptionally(e -> { - var ownerAfter = getOwner(serviceUnit); - log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to " - + "return the current owner:{}", - brokerId, serviceUnit, ownerAfter, e); - if (ownerAfter == null) { - throw new IllegalStateException(e); - } - return ownerAfter.orElse(null); - }); - if (debug()) { - log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit); - } - requested.setValue(future); - return future; - }); + return getOwnerRequests.computeIfAbsent(serviceUnit, k -> { + var ownerBefore = getOwner(serviceUnit); + if (ownerBefore != null && ownerBefore.isPresent()) { + // Here, we do the broker active check first with the computeIfAbsent lock + requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get()) + .thenCompose(brokerLookupData -> { + if (brokerLookupData.isPresent()) { + // The owner broker is active. + // Immediately return the request. + return CompletableFuture.completedFuture(ownerBefore.get()); + } else { + // The owner broker is inactive. + // The leader broker should be cleaning up the orphan service units. + // Defer this request til the leader notifies the new ownerships. + return deferGetOwner(serviceUnit); + } + })); + } else { + // The owner broker has not been declared yet. + // The ownership should be in the middle of transferring or assigning. + // Defer this request til the inflight ownership change is complete. + requested.setValue(deferGetOwner(serviceUnit)); + } + return requested.getValue(); + }); } finally { var future = requested.getValue(); if (future != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index c0fdd95a6a3db..837aceca1416f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -1620,32 +1620,63 @@ public void testOverrideOrphanStateData() @Test(priority = 19) public void testActiveGetOwner() throws Exception { - - // set the bundle owner is the broker + // case 1: the bundle owner is empty String broker = brokerId2; String bundle = "public/owned/0xfffffff0_0xffffffff"; + overrideTableViews(bundle, null); + assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get()); + + // case 2: the bundle ownership is transferring, and the dst broker is not the channel owner + overrideTableViews(bundle, + new ServiceUnitStateData(Releasing, broker, brokerId1, 1)); + assertEquals(Optional.of(broker), channel1.getOwnerAsync(bundle).get()); + + + // case 3: the bundle ownership is transferring, and the dst broker is the channel owner + overrideTableViews(bundle, + new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1)); + assertTrue(!channel1.getOwnerAsync(bundle).isDone()); + + // case 4: the bundle ownership is found overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get(); assertEquals(owner, broker); - // simulate the owner is inactive + // case 5: the owner lookup gets delayed var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); - doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(spyRegistry).lookupAsync(eq(broker)); FieldUtils.writeDeclaredField(channel1, "brokerRegistry", spyRegistry , true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1000, true); + var delayedFuture = new CompletableFuture(); + doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker)); + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt();; + } + delayedFuture.complete(Optional.of(broker)); + }); - - // verify getOwnerAsync times out because the owner is inactive now. + // verify the owner eventually returns in inFlightStateWaitingTimeInMillis. long start = System.currentTimeMillis(); + assertEquals(broker, channel1.getOwnerAsync(bundle).get().get()); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed < 1000); + + // case 6: the owner is inactive + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(spyRegistry).lookupAsync(eq(broker)); + + // verify getOwnerAsync times out + start = System.currentTimeMillis(); var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get()); assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); - // simulate ownership cleanup(no selected owner) by the leader channel + // case 7: the ownership cleanup(no new owner) by the leader channel doReturn(CompletableFuture.completedFuture(Optional.empty())) .when(loadManager).selectAsync(any(), any(), any()); var leaderChannel = channel1; @@ -1669,7 +1700,8 @@ public void testActiveGetOwner() throws Exception { waitUntilState(channel2, bundle, Init); assertTrue(System.currentTimeMillis() - start < 20_000); - // simulate ownership cleanup(brokerId1 selected owner) by the leader channel + + // case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1))) @@ -1694,6 +1726,7 @@ public void testActiveGetOwner() throws Exception { } + private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { return (ConcurrentHashMap>>)