Skip to content

Commit

Permalink
[improve] [broker] PIP-356 Support Geo-Replication starts at earliest…
Browse files Browse the repository at this point in the history
… position (apache#22856)
  • Loading branch information
poorbarcode authored Jun 19, 2024
1 parent e0f545a commit 5fc0eaf
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "The position that replication task start at, it can be set to earliest or latest (default).")
private String replicationStartAt = "latest";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2041,7 +2041,14 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
final InitialPosition initialPosition;
if (MessageId.earliest.toString()
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
}
ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -36,6 +37,7 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -912,4 +915,100 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL
});
}
}

protected void enableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

protected void disableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

@Test
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
String subscription1 = "s1";
admin1.namespaces().createNamespace(ns1);
if (!usingGlobalZK) {
admin2.namespaces().createNamespace(ns1);
}

RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
admin1.namespaces().setRetention(ns1, retentionPolicies);
admin2.namespaces().setRetention(ns1, retentionPolicies);

// 1. default config.
// Enable replication for topic1.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic1);
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
p1.send("msg-1");
p1.close();
enableReplication(topic1);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1)
.subscribe();
Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
assertNull(msg1);
c1.close();
disableReplication(topic1);

// 2.Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
});

final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic2);
admin1.topics().createSubscription(topic2, subscription1, MessageId.earliest);
Producer<String> p2 = client1.newProducer(Schema.STRING).topic(topic2).create();
p2.send("msg-1");
p2.close();
enableReplication(topic2);
// Verify: since the replication was started at earliest, there is one message to consume.
Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
.subscribe();
Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
assertNotNull(msg2);
assertEquals(msg2.getValue(), "msg-1");
c2.close();
disableReplication(topic2);

// 2.Update config: start at "latest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});

final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic3);
admin1.topics().createSubscription(topic3, subscription1, MessageId.earliest);
Producer<String> p3 = client1.newProducer(Schema.STRING).topic(topic3).create();
p3.send("msg-1");
p3.close();
enableReplication(topic3);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c3 = client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1)
.subscribe();
Message<String> msg3 = c3.receive(2, TimeUnit.SECONDS);
assertNull(msg3);
c3.close();
disableReplication(topic3);

// cleanup.
// There is no good way to delete topics when using global ZK, skip cleanup.
admin1.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster1));
admin1.namespaces().unload(ns1);
admin2.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster2));
admin2.namespaces().unload(ns1);
admin1.topics().delete(topic1, false);
admin2.topics().delete(topic1, false);
admin1.topics().delete(topic2, false);
admin2.topics().delete(topic2, false);
admin1.topics().delete(topic3, false);
admin2.topics().delete(topic3, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -109,4 +121,44 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
}

@Test
@Override
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
String subscription1 = "s1";
admin1.namespaces().createNamespace(ns1);
RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
admin1.namespaces().setRetention(ns1, retentionPolicies);
admin2.namespaces().setRetention(ns1, retentionPolicies);

// Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
});

// Verify: since the replication was started at earliest, there is one message to consume.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic1);
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
org.apache.pulsar.client.api.Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
p1.send("msg-1");
p1.close();

admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
org.apache.pulsar.client.api.Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1)
.subscriptionName(subscription1).subscribe();
Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
assertNotNull(msg2);
assertEquals(msg2.getValue(), "msg-1");
c1.close();

// cleanup.
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});
}
}

0 comments on commit 5fc0eaf

Please sign in to comment.