Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] PIP-356 Support Geo-Replication starts at earliest position #22856

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "The position that replication task start at, it can be set to earliest or latest (default).")
private String replicationStartAt = "latest";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2041,7 +2041,14 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
final InitialPosition initialPosition;
if (MessageId.earliest.toString()
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
}
ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -36,6 +37,7 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -912,4 +915,100 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL
});
}
}

protected void enableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

protected void disableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

@Test
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
String subscription1 = "s1";
admin1.namespaces().createNamespace(ns1);
if (!usingGlobalZK) {
admin2.namespaces().createNamespace(ns1);
}

RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
admin1.namespaces().setRetention(ns1, retentionPolicies);
admin2.namespaces().setRetention(ns1, retentionPolicies);

// 1. default config.
// Enable replication for topic1.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic1);
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
p1.send("msg-1");
p1.close();
enableReplication(topic1);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1)
.subscribe();
Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
assertNull(msg1);
c1.close();
disableReplication(topic1);

// 2.Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
});

final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic2);
admin1.topics().createSubscription(topic2, subscription1, MessageId.earliest);
Producer<String> p2 = client1.newProducer(Schema.STRING).topic(topic2).create();
p2.send("msg-1");
p2.close();
enableReplication(topic2);
// Verify: since the replication was started at earliest, there is one message to consume.
Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
.subscribe();
Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
assertNotNull(msg2);
assertEquals(msg2.getValue(), "msg-1");
c2.close();
disableReplication(topic2);

// 2.Update config: start at "latest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});

final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic3);
admin1.topics().createSubscription(topic3, subscription1, MessageId.earliest);
Producer<String> p3 = client1.newProducer(Schema.STRING).topic(topic3).create();
p3.send("msg-1");
p3.close();
enableReplication(topic3);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c3 = client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1)
.subscribe();
Message<String> msg3 = c3.receive(2, TimeUnit.SECONDS);
assertNull(msg3);
c3.close();
disableReplication(topic3);

// cleanup.
// There is no good way to delete topics when using global ZK, skip cleanup.
admin1.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster1));
admin1.namespaces().unload(ns1);
admin2.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster2));
admin2.namespaces().unload(ns1);
admin1.topics().delete(topic1, false);
admin2.topics().delete(topic1, false);
admin1.topics().delete(topic2, false);
admin2.topics().delete(topic2, false);
admin1.topics().delete(topic3, false);
admin2.topics().delete(topic3, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -109,4 +121,44 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
}

@Test
@Override
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
String subscription1 = "s1";
admin1.namespaces().createNamespace(ns1);
RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
admin1.namespaces().setRetention(ns1, retentionPolicies);
admin2.namespaces().setRetention(ns1, retentionPolicies);

// Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
});

// Verify: since the replication was started at earliest, there is one message to consume.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic1);
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
org.apache.pulsar.client.api.Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
p1.send("msg-1");
p1.close();

admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
org.apache.pulsar.client.api.Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1)
.subscriptionName(subscription1).subscribe();
Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
assertNotNull(msg2);
assertEquals(msg2.getValue(), "msg-1");
c1.close();

// cleanup.
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});
}
}
Loading