From b63e288e09b12b293446ce46573ac6bcb8249e87 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 12 Dec 2020 13:10:49 +0800 Subject: [PATCH] Support configure max subscriptions per topic on the namespace level policy (#8924) Master Issue: #8866 ### Motivation Currently, #8289 introduced max subscriptions per topic at the broker level but does not support overwrite on the namespace level ### Modifications Add api for namespace-level policy ### Verifying this change AdminApiTest2.java 1) Verify that the basic API is correct 2) Verify that the restriction is in effect 3) Verify the priority of namespace level and broker level --- .../broker/admin/impl/NamespacesBase.java | 15 +++ .../pulsar/broker/admin/v2/Namespaces.java | 38 ++++++ .../pulsar/broker/service/AbstractTopic.java | 2 + .../service/persistent/PersistentTopic.java | 12 +- .../pulsar/broker/admin/AdminApiTest2.java | 123 ++++++++++++++++++ .../pulsar/client/admin/Namespaces.java | 50 +++++++ .../client/admin/internal/NamespacesImpl.java | 80 ++++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 7 + .../pulsar/admin/cli/CmdNamespaces.java | 44 +++++++ .../pulsar/common/policies/data/Policies.java | 2 + .../common/policies/data/PolicyName.java | 1 + 11 files changed, 371 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 7b668ea9310da..ec9a923cffd5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2776,6 +2776,21 @@ protected int internalGetMaxUnackedMessagesPerSubscription() { return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription; } + protected Integer internalGetMaxSubscriptionsPerTopic() { + validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ); + return getNamespacePolicies(namespaceName).max_subscriptions_per_topic; + } + + protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { + throw new RestException(Status.PRECONDITION_FAILED, + "maxSubscriptionsPerTopic must be 0 or more"); + } + internalSetPolicies("max_subscriptions_per_topic", maxSubscriptionsPerTopic); + } + protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index f221262bd3bf2..72cb4a40b7d9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1093,6 +1093,44 @@ public void setMaxUnackedMessagesPerSubscription( internalSetMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription); } + @GET + @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic") + @ApiOperation(value = "Get maxSubscriptionsPerTopic config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public Integer getMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + return internalGetMaxSubscriptionsPerTopic(); + } + + @POST + @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic") + @ApiOperation(value = " Set maxSubscriptionsPerTopic configuration on a namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "maxUnackedMessagesPerSubscription value is not valid")}) + public void setMaxSubscriptionsPerTopic( + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @ApiParam(value = "Number of maximum subscriptions per topic", required = true) + int maxSubscriptionsPerTopic) { + validateNamespaceName(tenant, namespace); + internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic); + } + + @DELETE + @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic") + @ApiOperation(value = "Remove maxSubscriptionsPerTopic configuration on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification") }) + public void removeMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetMaxSubscriptionsPerTopic(null); + } + @POST @Path("/{tenant}/{namespace}/antiAffinity") @ApiOperation(value = "Set anti-affinity group for a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1b4a068fccc17..bbbde6ca872d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -97,6 +97,8 @@ public abstract class AbstractTopic implements Topic { protected volatile int maxUnackedMessagesOnConsumer = -1; + protected volatile Integer maxSubscriptionsPerTopic = null; + protected volatile PublishRateLimiter topicPublishRateLimiter; protected boolean preciseTopicPublishRateLimitingEnable; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 70922bb98ae8b..d60f30e17733a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2006,6 +2006,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data); maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data); + maxSubscriptionsPerTopic = data.max_subscriptions_per_topic; if (data.delayed_delivery_policies != null) { delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime(); @@ -2700,9 +2701,14 @@ public MessageDeduplication getMessageDeduplication() { } private boolean checkMaxSubscriptionsPerTopicExceed() { - final int maxSubscriptionsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic(); - if (maxSubscriptionsPerTopic > 0) { - if (subscriptions != null && subscriptions.size() >= maxSubscriptionsPerTopic) { + Integer maxSubsPerTopic = maxSubscriptionsPerTopic; + + if (maxSubsPerTopic == null) { + maxSubsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic(); + } + + if (maxSubsPerTopic > 0) { + if (subscriptions != null && subscriptions.size() >= maxSubsPerTopic) { return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 332bd92b9696b..209a996c9272e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.lang.reflect.Field; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -83,6 +84,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1423,4 +1425,125 @@ public void testMaxSubscriptionsPerTopic() throws Exception { consumer2.close(); admin.topics().deletePartitionedTopic(topic); } + + @Test(timeOut = 30000) + public void testMaxSubPerTopicApi() throws Exception { + final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + + assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)); + + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100); + assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100); + admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); + assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)); + + admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get(); + assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200); + admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace); + assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get()); + + try { + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100); + fail("should fail"); + } catch (PulsarAdminException ignore) { + } + } + + @Test(timeOut = 30000) + public void testMaxSubPerTopic() throws Exception { + final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic"; + pulsarClient.newProducer().topic(topic).create().close(); + final int maxSub = 2; + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); + field.setAccessible(true); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == maxSub); + + List> consumerList = new ArrayList<>(maxSub); + for (int i = 0; i < maxSub; i++) { + Consumer consumer = + pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); + consumerList.add(consumer); + } + //Create a client that can fail quickly + try (PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS) + .serviceUrl(brokerUrl.toString()).build()){ + client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); + fail("should fail"); + } catch (Exception ignore) { + } + //After removing the restriction, it should be able to create normally + admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) + .subscribe(); + consumerList.add(consumer); + + for (Consumer c : consumerList) { + c.close(); + } + } + + @Test(timeOut = 30000) + public void testMaxSubPerTopicPriority() throws Exception { + final int brokerLevelMaxSub = 2; + super.internalCleanup(); + mockPulsarSetup.cleanup(); + conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub); + super.internalSetup(); + + admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("prop-xyz", tenantInfo); + final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic"; + //Create a client that can fail quickly + PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS) + .serviceUrl(brokerUrl.toString()).build(); + //We can only create 2 consumers + List> consumerList = new ArrayList<>(brokerLevelMaxSub); + for (int i = 0; i < brokerLevelMaxSub; i++) { + Consumer consumer = + pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); + consumerList.add(consumer); + } + try { + client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); + fail("should fail"); + } catch (Exception ignore) { + + } + //Set namespace-level policy,the limit should up to 4 + final int nsLevelMaxSub = 4; + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); + field.setAccessible(true); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) + .subscribe(); + consumerList.add(consumer); + assertEquals(consumerList.size(), 3); + //After removing the restriction, it should fail again + admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); + try { + client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); + fail("should fail"); + } catch (Exception ignore) { + + } + + for (Consumer c : consumerList) { + c.close(); + } + client.close(); + } + + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 8bbc8f33d1cba..bf2ced1695f29 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -2452,6 +2452,56 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription */ CompletableFuture removeDeduplicationSnapshotIntervalAsync(String namespace); + /** + * Get the maxSubscriptionsPerTopic for a namespace. + * + * @param namespace + * @return + * @throws PulsarAdminException + */ + Integer getMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException; + + /** + * Get the maxSubscriptionsPerTopic for a namespace asynchronously. + * + * @param namespace + * @return + */ + CompletableFuture getMaxSubscriptionsPerTopicAsync(String namespace); + + /** + * Set the maxSubscriptionsPerTopic for a namespace. + * + * @param namespace + * @param maxSubscriptionsPerTopic + * @throws PulsarAdminException + */ + void setMaxSubscriptionsPerTopic(String namespace, int maxSubscriptionsPerTopic) throws PulsarAdminException; + + /** + * Set the maxSubscriptionsPerTopic for a namespace asynchronously. + * + * @param namespace + * @param maxSubscriptionsPerTopic + * @return + */ + CompletableFuture setMaxSubscriptionsPerTopicAsync(String namespace, int maxSubscriptionsPerTopic); + + /** + * Remove the maxSubscriptionsPerTopic for a namespace. + * + * @param namespace + * @throws PulsarAdminException + */ + void removeMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException; + + /** + * Remove the maxSubscriptionsPerTopic for a namespace asynchronously. + * @param namespace + * @return + */ + CompletableFuture removeMaxSubscriptionsPerTopicAsync(String namespace); + /** * Get the maxProducersPerTopic for a namespace. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 36fabef458361..4cf55aea7288d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -2037,6 +2037,86 @@ public CompletableFuture removeDeduplicationSnapshotIntervalAsync(String n return setDeduplicationSnapshotIntervalAsync(namespace, null); } + @Override + public Integer getMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException { + try { + return getMaxSubscriptionsPerTopicAsync(namespace). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getMaxSubscriptionsPerTopicAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Integer maxSubscriptionsPerTopic) { + future.complete(maxSubscriptionsPerTopic); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setMaxSubscriptionsPerTopic(String namespace, int maxSubscriptionsPerTopic) + throws PulsarAdminException { + try { + setMaxSubscriptionsPerTopicAsync(namespace, maxSubscriptionsPerTopic). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setMaxSubscriptionsPerTopicAsync(String namespace, int maxSubscriptionsPerTopic) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic"); + return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException { + try { + removeMaxSubscriptionsPerTopicAsync(namespace) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeMaxSubscriptionsPerTopicAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic"); + return asyncDeleteRequest(path); + } + @Override public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index f39916e4f7847..5c10268b59f72 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -352,6 +352,13 @@ public void namespaces() throws Exception { namespaces.run(split("get-persistence myprop/clust/ns1")); verify(mockNamespaces).getPersistence("myprop/clust/ns1"); + namespaces.run(split("get-max-subscriptions-per-topic myprop/clust/ns1")); + verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1"); + namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1 -m 300")); + verify(mockNamespaces).setMaxSubscriptionsPerTopic("myprop/clust/ns1", 300); + namespaces.run(split("remove-max-subscriptions-per-topic myprop/clust/ns1")); + verify(mockNamespaces).removeMaxSubscriptionsPerTopic("myprop/clust/ns1"); + namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300")); verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 7247f415268a5..d2052cd9cacab 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -330,6 +330,46 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get max subscriptions per topic for a namespace") + private class GetMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getMaxSubscriptionsPerTopic(namespace)); + } + } + + @Parameters(commandDescription = "Set max subscriptions per topic for a namespace") + private class SetMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "--maxSubscriptionsPerTopic", "-m" }, description = "Max subscriptions per topic", + required = true) + private int maxSubscriptionsPerTopic; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().setMaxSubscriptionsPerTopic(namespace, maxSubscriptionsPerTopic); + } + } + + @Parameters(commandDescription = "Remove max subscriptions per topic for a namespace") + private class RemoveMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().removeMaxSubscriptionsPerTopic(namespace); + } + } + @Parameters(commandDescription = "Set subscription expiration time for a namespace") private class SetSubscriptionExpirationTime extends CliCommand { @Parameter(description = "tenant/namespace", required = true) @@ -1815,6 +1855,10 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("set-message-ttl", new SetMessageTTL()); jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL()); + jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic()); + jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic()); + jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic()); + jcommander.addCommand("get-subscription-expiration-time", new GetSubscriptionExpirationTime()); jcommander.addCommand("set-subscription-expiration-time", new SetSubscriptionExpirationTime()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 969c0c0215100..a7ee4f764bccb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -86,6 +86,8 @@ public class Policies { public int max_unacked_messages_per_consumer = -1; @SuppressWarnings("checkstyle:MemberName") public int max_unacked_messages_per_subscription = -1; + @SuppressWarnings("checkstyle:MemberName") + public Integer max_subscriptions_per_topic = null; @SuppressWarnings("checkstyle:MemberName") public long compaction_threshold = 0; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index b7f8f6a76cf42..c8fa3d77274b0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -34,6 +34,7 @@ public enum PolicyName { MAX_PRODUCERS, DEDUPLICATION_SNAPSHOT, MAX_UNACKED, + MAX_SUBSCRIPTIONS, OFFLOAD, PERSISTENCE, RATE,