Skip to content

Commit

Permalink
[fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadMana…
Browse files Browse the repository at this point in the history
…ger is enabled (apache#22496)

(cherry picked from commit 203f305)
(cherry picked from commit f467f37)
  • Loading branch information
heesung-sn committed Jun 27, 2024
1 parent 22bfe27 commit 2557db6
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -1338,8 +1337,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
}

Expand All @@ -1362,8 +1361,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight system bundle override messages.", e);
}

Expand Down Expand Up @@ -1541,8 +1540,8 @@ protected void monitorOwnerships(List<String> brokers) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight messages.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
}

public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
}
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
Expand Down Expand Up @@ -553,7 +554,8 @@ public CompletableFuture<Void> stopReplProducers() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,8 @@ CompletableFuture<Void> checkPersistencePolicies() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand All @@ -40,9 +42,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
* a lot of topic deletion and makes namespace policies being incorrect.
Expand Down

0 comments on commit 2557db6

Please sign in to comment.