Skip to content

Commit

Permalink
Support configure max subscriptions per topic on the namespace level …
Browse files Browse the repository at this point in the history
…policy (#8924)

Master Issue: #8866

### Motivation

Currently, #8289 introduced max subscriptions per topic at the broker level but does not support overwrite on the namespace level

### Modifications
Add api for namespace-level policy

### Verifying this change
AdminApiTest2.java

1) Verify that the basic API is correct
2) Verify that the restriction is in effect
3) Verify the priority of namespace level and broker level
  • Loading branch information
315157973 authored Dec 12, 2020
1 parent 35acfe1 commit b63e288
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2776,6 +2776,21 @@ protected int internalGetMaxUnackedMessagesPerSubscription() {
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}

protected Integer internalGetMaxSubscriptionsPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_subscriptions_per_topic;
}

protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
internalSetPolicies("max_subscriptions_per_topic", maxSubscriptionsPerTopic);
}

protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,44 @@ public void setMaxUnackedMessagesPerSubscription(
internalSetMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription);
}

@GET
@Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
@ApiOperation(value = "Get maxSubscriptionsPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Integer getMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxSubscriptionsPerTopic();
}

@POST
@Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
@ApiOperation(value = " Set maxSubscriptionsPerTopic configuration on a namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "maxUnackedMessagesPerSubscription value is not valid")})
public void setMaxSubscriptionsPerTopic(
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Number of maximum subscriptions per topic", required = true)
int maxSubscriptionsPerTopic) {
validateNamespaceName(tenant, namespace);
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
}

@DELETE
@Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
@ApiOperation(value = "Remove maxSubscriptionsPerTopic configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetMaxSubscriptionsPerTopic(null);
}

@POST
@Path("/{tenant}/{namespace}/antiAffinity")
@ApiOperation(value = "Set anti-affinity group for a namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile int maxUnackedMessagesOnConsumer = -1;

protected volatile Integer maxSubscriptionsPerTopic = null;

protected volatile PublishRateLimiter topicPublishRateLimiter;

protected boolean preciseTopicPublishRateLimitingEnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2006,6 +2006,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data);
maxSubscriptionsPerTopic = data.max_subscriptions_per_topic;

if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
Expand Down Expand Up @@ -2700,9 +2701,14 @@ public MessageDeduplication getMessageDeduplication() {
}

private boolean checkMaxSubscriptionsPerTopicExceed() {
final int maxSubscriptionsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
if (maxSubscriptionsPerTopic > 0) {
if (subscriptions != null && subscriptions.size() >= maxSubscriptionsPerTopic) {
Integer maxSubsPerTopic = maxSubscriptionsPerTopic;

if (maxSubsPerTopic == null) {
maxSubsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
}

if (maxSubsPerTopic > 0) {
if (subscriptions != null && subscriptions.size() >= maxSubsPerTopic) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1423,4 +1425,125 @@ public void testMaxSubscriptionsPerTopic() throws Exception {
consumer2.close();
admin.topics().deletePartitionedTopic(topic);
}

@Test(timeOut = 30000)
public void testMaxSubPerTopicApi() throws Exception {
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));

assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));

admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100);
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100);
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));

admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get();
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200);
admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace);
assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get());

try {
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100);
fail("should fail");
} catch (PulsarAdminException ignore) {
}
}

@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
pulsarClient.newProducer().topic(topic).create().close();
final int maxSub = 2;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == maxSub);

List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
consumerList.add(consumer);
}
//Create a client that can fail quickly
try (PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build()){
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);

for (Consumer<?> c : consumerList) {
c.close();
}
}

@Test(timeOut = 30000)
public void testMaxSubPerTopicPriority() throws Exception {
final int brokerLevelMaxSub = 2;
super.internalCleanup();
mockPulsarSetup.cleanup();
conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
//Create a client that can fail quickly
PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build();
//We can only create 2 consumers
List<Consumer<?>> consumerList = new ArrayList<>(brokerLevelMaxSub);
for (int i = 0; i < brokerLevelMaxSub; i++) {
Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
consumerList.add(consumer);
}
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {

}
//Set namespace-level policy,the limit should up to 4
final int nsLevelMaxSub = 4;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null);
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {

}

for (Consumer<?> c : consumerList) {
c.close();
}
client.close();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,56 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription
*/
CompletableFuture<Void> removeDeduplicationSnapshotIntervalAsync(String namespace);

/**
* Get the maxSubscriptionsPerTopic for a namespace.
*
* @param namespace
* @return
* @throws PulsarAdminException
*/
Integer getMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException;

/**
* Get the maxSubscriptionsPerTopic for a namespace asynchronously.
*
* @param namespace
* @return
*/
CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String namespace);

/**
* Set the maxSubscriptionsPerTopic for a namespace.
*
* @param namespace
* @param maxSubscriptionsPerTopic
* @throws PulsarAdminException
*/
void setMaxSubscriptionsPerTopic(String namespace, int maxSubscriptionsPerTopic) throws PulsarAdminException;

/**
* Set the maxSubscriptionsPerTopic for a namespace asynchronously.
*
* @param namespace
* @param maxSubscriptionsPerTopic
* @return
*/
CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String namespace, int maxSubscriptionsPerTopic);

/**
* Remove the maxSubscriptionsPerTopic for a namespace.
*
* @param namespace
* @throws PulsarAdminException
*/
void removeMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException;

/**
* Remove the maxSubscriptionsPerTopic for a namespace asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String namespace);

/**
* Get the maxProducersPerTopic for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,86 @@ public CompletableFuture<Void> removeDeduplicationSnapshotIntervalAsync(String n
return setDeduplicationSnapshotIntervalAsync(namespace, null);
}

@Override
public Integer getMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException {
try {
return getMaxSubscriptionsPerTopicAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxSubscriptionsPerTopic) {
future.complete(maxSubscriptionsPerTopic);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void setMaxSubscriptionsPerTopic(String namespace, int maxSubscriptionsPerTopic)
throws PulsarAdminException {
try {
setMaxSubscriptionsPerTopicAsync(namespace, maxSubscriptionsPerTopic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String namespace, int maxSubscriptionsPerTopic) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic, MediaType.APPLICATION_JSON));
}

@Override
public void removeMaxSubscriptionsPerTopic(String namespace) throws PulsarAdminException {
try {
removeMaxSubscriptionsPerTopicAsync(namespace)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
return asyncDeleteRequest(path);
}

@Override
public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
try {
Expand Down
Loading

0 comments on commit b63e288

Please sign in to comment.