diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index a1bfeb2142ffc..45455f16d4dc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -23,6 +23,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -43,9 +44,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; @@ -621,35 +624,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) - .thenAccept(clusters -> { - for (String cluster : clusters) { - if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { - // this call happens in the background without async composition. completion is logged. - pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(cluster) - .thenCompose(clusterDataOp -> - ((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), - numPartitions, - true, null)) - .whenComplete((__, ex) -> { - if (ex != null) { - log.error( - "[{}] Failed to create partitioned topic {} in cluster {}.", - clientAppId(), topicName, cluster, ex); - } else { - log.info( - "[{}] Successfully created partitioned topic {} in " - + "cluster {}", - clientAppId(), topicName, cluster); - } - }); - } + .thenAccept(clusters -> { + // this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); + }); + } + + protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( + Set clusters, int numPartitions) { + final String shortTopicName = topicName.getPartitionedTopicName(); + Map> tasksForAllClusters = new HashMap<>(); + for (String cluster : clusters) { + if (cluster.equals(pulsar().getConfiguration().getClusterName())) { + continue; + } + ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); + CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); + tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { + if (ex1 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. + log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" + + " {}.", clientAppId(), topicName, cluster, ex1); + createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); + return; + } + // Get cluster data success. + TopicsImpl topics = + (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) + .whenComplete((ignore, ex2) -> { + if (ex2 == null) { + // Create success. + log.info("[{}] Successfully created partitioned topic {} in cluster {}", + clientAppId(), topicName, cluster); + createRemoteTopicFuture.complete(null); + return; + } + // Create topic on the remote cluster error. + Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); + // The topic has been created before, check the partitions count is expected. + if (unwrapEx2 instanceof PulsarAdminException.ConflictException) { + topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> { + if (ex3 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the + // "createRemoteTopicFuture" stuck. + log.error("[{}] Failed to check remote-cluster's topic metadata when creating" + + " partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex3); + createRemoteTopicFuture.completeExceptionally(new RestException(ex3)); + } + // Call get partitioned metadata of remote cluster success. + if (topicMeta.partitions == numPartitions) { + log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}", + clientAppId(), topicName, cluster, unwrapEx2.getMessage()); + createRemoteTopicFuture.complete(null); + } else { + String errorMsg = String.format("[%s] There is an exists topic %s with different" + + " partitions %s on the remote cluster %s, you want to create it" + + " with partitions %s", + clientAppId(), shortTopicName, topicMeta.partitions, cluster, + numPartitions); + log.error(errorMsg); + createRemoteTopicFuture.completeExceptionally( + new RestException(Status.PRECONDITION_FAILED, errorMsg)); + } + }); + } else { + // An HTTP error was responded from the remote cluster. + log.error("[{}] Failed to create partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex2); + createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2)); } }); + }); + } + return tasksForAllClusters; } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1f8d06571908e..63ea987bb07fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3253,12 +3253,13 @@ protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQu } protected CompletableFuture internalSetReplicationClusters(List clusterIds) { + if (CollectionUtils.isEmpty(clusterIds)) { + return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "ClusterIds should not be null or empty")); + } + Set replicationClusters = Sets.newHashSet(clusterIds); return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> { - if (CollectionUtils.isEmpty(clusterIds)) { - throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty"); - } - Set replicationClusters = Sets.newHashSet(clusterIds); if (replicationClusters.contains("global")) { throw new RestException(Status.PRECONDITION_FAILED, "Cannot specify global in the list of replication clusters"); @@ -3273,6 +3274,20 @@ protected CompletableFuture internalSetReplicationClusters(List cl futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), clusterId)); } return FutureUtil.waitForAll(futures); + }).thenCompose(__ -> { + // Sync to create partitioned topic on the remote cluster if needed. + TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName()); + return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(topicNameWithoutPartition).thenCompose(topicMetaOp -> { + // Skip to create topic if the topic is non-partitioned, because the replicator will create + // it automatically. + if (topicMetaOp.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return FutureUtil.waitForAll( + internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters, + topicMetaOp.get().partitions).values()); + }); }).thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f9184f2288f52..35073575f34ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,10 +25,12 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; +import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -59,6 +61,9 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; @@ -92,6 +97,20 @@ private void waitReplicatorStarted(String topicName) { }); } + private void waitReplicatorStopped(String topicName) { + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic2.getProducers().isEmpty()); + Optional topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional1.isPresent()); + PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic1.getReplicators().isEmpty() + || !persistentTopic1.getReplicators().get(cluster2).isConnected()); + }); + } + /** * Override "AbstractReplicator.producer" by {@param producer} and return the original value. */ @@ -108,7 +127,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); final String subscribeName = "subscribe_1"; final byte[] msgValue = "test".getBytes(); @@ -134,7 +153,7 @@ public void testReplicatorProducerStatInTopic() throws Exception { @Test(timeOut = 45 * 1000) public void testCreateRemoteConsumerFirst() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); // The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。 @@ -154,7 +173,7 @@ public void testCreateRemoteConsumerFirst() throws Exception { @Test(timeOut = 45 * 1000) public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topicName); // Wait for replicator started. waitReplicatorStarted(topicName); @@ -210,7 +229,7 @@ private void injectMockReplicatorProducerBuilder( BrokerService brokerService = pulsar1.getBrokerService(); // Wait for the internal client created. final String topicNameTriggerInternalClientCreate = - BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); waitReplicatorStarted(topicNameTriggerInternalClientCreate); cleanupTopics(() -> { @@ -338,7 +357,7 @@ void startCallback() { */ @Test(timeOut = 120 * 1000) public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); // Inject an error for "replicator.producer" creation. // The delay time of next retry to create producer is below: // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s... @@ -409,4 +428,62 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception admin2.topics().delete(topicName); }); } + + @Test + public void testPartitionedTopicLevelReplication() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + final String partition0 = TopicName.get(topicName).getPartition(0).toString(); + final String partition1 = TopicName.get(topicName).getPartition(1).toString(); + admin1.topics().createPartitionedTopic(topicName, 2); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + // Check the partitioned topic has been created at the remote cluster. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 2); + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(partition0); + waitReplicatorStopped(partition1); + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } + + @Test + public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + final String partition0 = TopicName.get(topicName).getPartition(0).toString(); + final String partition1 = TopicName.get(topicName).getPartition(1).toString(); + admin1.topics().createPartitionedTopic(topicName, 2); + admin2.topics().createPartitionedTopic(topicName, 2); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + // Check the partitioned topic has been created at the remote cluster. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 2); + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(partition0); + waitReplicatorStopped(partition1); + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } + + @Test + public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin2.topics().createPartitionedTopic(topicName, 3); + admin1.topics().createPartitionedTopic(topicName, 2); + try { + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + fail("Expected error due to a conflict partitioned topic already exists."); + } catch (Exception ex) { + Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unWrapEx.getMessage().contains("with different partitions")); + } + // Check nothing changed. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 3); + assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1); + // cleanup. + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 8e8b444f952c7..181721e34aa73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -45,7 +45,8 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected final String defaultTenant = "public"; - protected final String defaultNamespace = defaultTenant + "/default"; + protected final String replicatedNamespace = defaultTenant + "/default"; + protected final String nonReplicatedNamespace = defaultTenant + "/ns1"; protected final String cluster1 = "r1"; protected URL url1; @@ -142,17 +143,19 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(cluster1, cluster2))); - admin1.namespaces().createNamespace(defaultNamespace, Sets.newHashSet(cluster1, cluster2)); - admin2.namespaces().createNamespace(defaultNamespace); + admin1.namespaces().createNamespace(replicatedNamespace, Sets.newHashSet(cluster1, cluster2)); + admin2.namespaces().createNamespace(replicatedNamespace); + admin1.namespaces().createNamespace(nonReplicatedNamespace); + admin2.namespaces().createNamespace(nonReplicatedNamespace); } protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception { - waitChangeEventsInit(defaultNamespace); - admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Collections.singleton(cluster1)); - admin1.namespaces().unload(defaultNamespace); + waitChangeEventsInit(replicatedNamespace); + admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1)); + admin1.namespaces().unload(replicatedNamespace); cleanupTopicAction.run(); - admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2)); - waitChangeEventsInit(defaultNamespace); + admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2)); + waitChangeEventsInit(replicatedNamespace); } protected void waitChangeEventsInit(String namespace) { @@ -220,11 +223,13 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName @Override protected void cleanup() throws Exception { // delete namespaces. - waitChangeEventsInit(defaultNamespace); - admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1)); - admin1.namespaces().deleteNamespace(defaultNamespace); - admin2.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster2)); - admin2.namespaces().deleteNamespace(defaultNamespace); + waitChangeEventsInit(replicatedNamespace); + admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1)); + admin1.namespaces().deleteNamespace(replicatedNamespace); + admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2)); + admin2.namespaces().deleteNamespace(replicatedNamespace); + admin1.namespaces().deleteNamespace(nonReplicatedNamespace); + admin2.namespaces().deleteNamespace(nonReplicatedNamespace); // shutdown. markCurrentSetupNumberCleaned();