Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level #22203

Closed

Conversation

liangyepianzhou
Copy link
Contributor

Motivation

When the geo-replication is enabled at the namespace level, the partition topic could be created in the remote clusters when it is created at the local cluster.

if (!createLocalTopicOnly && topicName.isGlobal()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
}

However, if the geo-replication is enabled at the topic level, the partition topic cannot be created in the remote clusters when created at the local cluster. The non-partition topics will be auto-created in the remote clusters when the topic starts replicators and builds producers. It is confusing that the topic is partitioned in the local cluster and non-partition in the remote cluster.

Modifications

Create partition topics when updating topic policies.
For the deleting policies when disabling geo-replication at the topic level, there are two solutions:

  1. Delete all the topics in the remote clusters
  2. Use inactive topic policies to clear the useless topics in the remote clusters.
    I prefer to the solution 2, so this PR does not include the modification of deleting topics.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@nodece
Copy link
Member

nodece commented Mar 7, 2024

More context: #21679

@@ -626,17 +626,7 @@ private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int
.thenAccept(clusters -> {
for (String cluster : clusters) {
if (!cluster.equals(pulsar().getConfiguration().getClusterName())) {
// this call happens in the background without async composition. completion is logged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping this comment there is better.

.getClusterAsync(cluster)
.thenCompose(clusterDataOp -> ((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
.createPartitionedTopicAsync(topicName.getPartitionedTopicName(), numPartitions, true, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should care about two scenarios:

  • the partitioned topic on the remote cluster already exists.
  • the partition count of the existing topic on the remote cluster is different from the current cluster.

BTW, we can handle these scenarios in a separate PR.

Comment on lines +70 to +74
TopicName dest1 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
// TODO: the non-partition topic can not be auto-created in the remote clusters when the namespace is empty.
admin1.topics().createPartitionedTopic(dest1.toString(), 3);
admin2.topics().createPartitionedTopic(dest1.toString(), 3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to create topics here?

*/
@Test(groups = "broker")
@Slf4j
public class MultipleZkReplicatorTest extends MultipleZKReplicatorTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can reuse OneWayReplicatorTest

@poorbarcode
Copy link
Contributor

Since #22537 has been merged, I close this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants