Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Messages lost on the remote cluster when using topic level replication #22890

Merged
merged 7 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,6 @@ public CompletableFuture<Void> initialize() {
this.createPersistentSubscriptions();
}));

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
Expand Down Expand Up @@ -476,6 +469,7 @@ public CompletableFuture<Void> initialize() {
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> removeOrphanReplicationCursors())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
Expand Down Expand Up @@ -553,6 +547,21 @@ private void createPersistentSubscriptions() {
checkReplicatedSubscriptionControllerState();
}

private CompletableFuture<Void> removeOrphanReplicationCursors() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
if (!replicationClusters.contains(remoteCluster)) {
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
log.warn("Remove the orphan replicator because the cluster '{}' does not exist", remoteCluster);
futures.add(removeReplicator(remoteCluster));
}
}
}
return FutureUtil.waitForAll(futures);
}

/**
* Unload a subscriber.
* @throws SubscriptionNotFoundException If subscription not founded.
Expand Down Expand Up @@ -2055,30 +2064,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
.orElse(Collections.emptySet()).contains(remoteCluster)
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
}

protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
.thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
return removeReplicator(remoteCluster).thenApply(__ -> null);
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
log.error("[{}] Can not create replicator because the remote client can not be created."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
return;
}
lock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
Expand All @@ -58,7 +59,10 @@
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -78,6 +82,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -809,4 +814,102 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
admin2.topics().deletePartitionedTopic(topicName, false);
});
}

private String getTheLatestMessage(String topic, PulsarClient client, PulsarAdmin admin) throws Exception {
String dummySubscription = "s_" + UUID.randomUUID().toString().replace("-", "");
admin.topics().createSubscription(topic, dummySubscription, MessageId.earliest);
Consumer<String> c = client.newConsumer(Schema.STRING).topic(topic).subscriptionName(dummySubscription)
.subscribe();
String lastMsgValue = null;
while (true) {
Message<String> msg = c.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
lastMsgValue = msg.getValue();
}
c.unsubscribe();
return lastMsgValue;
}

enum ReplicationLevel {
TOPIC_LEVEL,
NAMESPACE_LEVEL;
}

@DataProvider(name = "replicationLevels")
public Object[][] replicationLevels() {
return new Object[][]{
{ReplicationLevel.TOPIC_LEVEL},
{ReplicationLevel.NAMESPACE_LEVEL}
};
}

@Test(dataProvider = "replicationLevels")
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
final String topicName = ((Supplier<String>) () -> {
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
return BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
} else {
return BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
}
}).get();
admin1.topics().createNonPartitionedTopic(topicName);
admin2.topics().createNonPartitionedTopic(topicName);
admin2.topics().createSubscription(topicName, "s1", MessageId.earliest);
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
} else {
pulsar1.getConfig().setTopicLevelPoliciesEnabled(false);
}
verifyReplicationWorks(topicName);

/**
* Verify:
* 1. Inject an error to make the replicator is not able to work.
* 2. Send one message, since the replicator does not work anymore, this message will not be replicated.
* 3. Unload topic, the replicator will be re-created.
* 4. Verify: the message can be replicated to the remote cluster.
*/
// Step 1: Inject an error to make the replicator is not able to work.
Replicator replicator = broker1.getTopic(topicName, false).join().get().getReplicators().get(cluster2);
replicator.terminate();

// Step 2: Send one message, since the replicator does not work anymore, this message will not be replicated.
String msg = UUID.randomUUID().toString();
Producer p1 = client1.newProducer(Schema.STRING).topic(topicName).create();
p1.send(msg);
p1.close();
// The result of "peek message" will be the messages generated, so it is not the same as the message just sent.
Thread.sleep(3000);
assertNotEquals(getTheLatestMessage(topicName, client2, admin2), msg);
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 1);

// Step 3: Unload topic, the replicator will be re-created.
admin1.topics().unload(topicName);

// Step 4. Verify: the message can be replicated to the remote cluster.
Awaitility.await().atMost(Duration.ofSeconds(300)).untilAsserted(() -> {
log.info("replication backlog: {}",
admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog());
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 0);
assertEquals(getTheLatestMessage(topicName, client2, admin2), msg);
});

// Cleanup.
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0);
});
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
} else {
pulsar1.getConfig().setTopicLevelPoliciesEnabled(true);
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,28 @@ protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception
}

protected void verifyReplicationWorks(String topic) throws Exception {
// Wait for replicator starting.
Awaitility.await().until(() -> {
try {
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topic, false).join().get();
if (persistentTopic.getReplicators().size() > 0) {
return true;
}
} catch (Exception ex) {}

try {
String partition0 = TopicName.get(topic).getPartition(0).toString();
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(partition0, false).join().get();
if (persistentTopic.getReplicators().size() > 0) {
return true;
}
} catch (Exception ex) {}

return false;
});
// Verify: pub & sub.
final String subscription = "__subscribe_1";
final String msgValue = "__msg1";
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topic).create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
super.testExpandTopicPartitionsOnNamespaceLevelReplication();
}

@Test(enabled = false)
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
}
}
Loading