Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Create partitioned topics automatically when enable topic level replication #22537

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -43,9 +44,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -621,35 +624,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
.thenAccept(clusters -> {
for (String cluster : clusters) {
if (!cluster.equals(pulsar().getConfiguration().getClusterName())) {
// this call happens in the background without async composition. completion is logged.
pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(cluster)
.thenCompose(clusterDataOp ->
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster,
clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(),
numPartitions,
true, null))
.whenComplete((__, ex) -> {
if (ex != null) {
log.error(
"[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex);
} else {
log.info(
"[{}] Successfully created partitioned topic {} in "
+ "cluster {}",
clientAppId(), topicName, cluster);
}
});
}
.thenAccept(clusters -> {
// this call happens in the background without async composition. completion is logged.
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
});
}

protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
for (String cluster : clusters) {
if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
continue;
}
ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources();
CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>();
tasksForAllClusters.put(cluster, createRemoteTopicFuture);
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
if (ex1 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck.
log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster"
+ " {}.", clientAppId(), topicName, cluster, ex1);
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
return;
}
// Get cluster data success.
TopicsImpl topics =
(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
// Create success.
log.info("[{}] Successfully created partitioned topic {} in cluster {}",
clientAppId(), topicName, cluster);
createRemoteTopicFuture.complete(null);
return;
}
// Create topic on the remote cluster error.
Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2);
// The topic has been created before, check the partitions count is expected.
if (unwrapEx2 instanceof PulsarAdminException.ConflictException) {
topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> {
if (ex3 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the
// "createRemoteTopicFuture" stuck.
log.error("[{}] Failed to check remote-cluster's topic metadata when creating"
+ " partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex3);
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
}
// Call get partitioned metadata of remote cluster success.
if (topicMeta.partitions == numPartitions) {
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}",
clientAppId(), topicName, cluster, unwrapEx2.getMessage());
createRemoteTopicFuture.complete(null);
} else {
String errorMsg = String.format("[%s] There is an exists topic %s with different"
+ " partitions %s on the remote cluster %s, you want to create it"
+ " with partitions %s",
clientAppId(), shortTopicName, topicMeta.partitions, cluster,
numPartitions);
log.error(errorMsg);
createRemoteTopicFuture.completeExceptionally(
new RestException(Status.PRECONDITION_FAILED, errorMsg));
}
});
} else {
// An HTTP error was responded from the remote cluster.
log.error("[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex2);
createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2));
}
});
});
}
return tasksForAllClusters;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3253,12 +3253,13 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
}

protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
if (CollectionUtils.isEmpty(clusterIds)) {
return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"ClusterIds should not be null or empty"));
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of replication clusters");
Expand All @@ -3273,6 +3274,20 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), clusterId));
}
return FutureUtil.waitForAll(futures);
}).thenCompose(__ -> {
// Sync to create partitioned topic on the remote cluster if needed.
TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName());
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicNameWithoutPartition).thenCompose(topicMetaOp -> {
// Skip to create topic if the topic is non-partitioned, because the replicator will create
// it automatically.
if (topicMetaOp.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return FutureUtil.waitForAll(
internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters,
topicMetaOp.get().partitions).values());
});
}).thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -59,6 +61,9 @@
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
Expand Down Expand Up @@ -92,6 +97,20 @@ private void waitReplicatorStarted(String topicName) {
});
}

private void waitReplicatorStopped(String topicName) {
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
assertTrue(persistentTopic2.getProducers().isEmpty());
Optional<Topic> topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional1.isPresent());
PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get();
assertTrue(persistentTopic1.getReplicators().isEmpty()
|| !persistentTopic1.getReplicators().get(cluster2).isConnected());
});
}

/**
* Override "AbstractReplicator.producer" by {@param producer} and return the original value.
*/
Expand All @@ -108,7 +127,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator

@Test(timeOut = 45 * 1000)
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
final String subscribeName = "subscribe_1";
final byte[] msgValue = "test".getBytes();

Expand All @@ -134,7 +153,7 @@ public void testReplicatorProducerStatInTopic() throws Exception {

@Test(timeOut = 45 * 1000)
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();

// The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。
Expand All @@ -154,7 +173,7 @@ public void testCreateRemoteConsumerFirst() throws Exception {

@Test(timeOut = 45 * 1000)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicName);
// Wait for replicator started.
waitReplicatorStarted(topicName);
Expand Down Expand Up @@ -210,7 +229,7 @@ private void injectMockReplicatorProducerBuilder(
BrokerService brokerService = pulsar1.getBrokerService();
// Wait for the internal client created.
final String topicNameTriggerInternalClientCreate =
BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
waitReplicatorStarted(topicNameTriggerInternalClientCreate);
cleanupTopics(() -> {
Expand Down Expand Up @@ -338,7 +357,7 @@ void startCallback() {
*/
@Test(timeOut = 120 * 1000)
public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
// Inject an error for "replicator.producer" creation.
// The delay time of next retry to create producer is below:
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
Expand Down Expand Up @@ -409,4 +428,62 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
admin2.topics().delete(topicName);
});
}

@Test
public void testPartitionedTopicLevelReplication() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
final String partition0 = TopicName.get(topicName).getPartition(0).toString();
final String partition1 = TopicName.get(topicName).getPartition(1).toString();
admin1.topics().createPartitionedTopic(topicName, 2);
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
// Check the partitioned topic has been created at the remote cluster.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(partition0);
waitReplicatorStopped(partition1);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

@Test
public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
final String partition0 = TopicName.get(topicName).getPartition(0).toString();
final String partition1 = TopicName.get(topicName).getPartition(1).toString();
admin1.topics().createPartitionedTopic(topicName, 2);
admin2.topics().createPartitionedTopic(topicName, 2);
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
// Check the partitioned topic has been created at the remote cluster.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(partition0);
waitReplicatorStopped(partition1);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

@Test
public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception {
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
admin2.topics().createPartitionedTopic(topicName, 3);
admin1.topics().createPartitionedTopic(topicName, 2);
try {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
fail("Expected error due to a conflict partitioned topic already exists.");
} catch (Exception ex) {
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unWrapEx.getMessage().contains("with different partitions"));
}
// Check nothing changed.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 3);
assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1);
// cleanup.
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}
}
Loading
Loading