Skip to content

Commit

Permalink
[improve][broker] part 1:make some methods async in Namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt committed Jul 27, 2022
1 parent f582620 commit cbb1c2a
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1293,13 +1293,6 @@ protected CompletableFuture<Void> internalRemovePublishRateAsync() {
}));
}

protected PublishRate internalGetPublishRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
}

protected CompletableFuture<PublishRate> internalGetPublishRateAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
Expand Down Expand Up @@ -1365,14 +1358,6 @@ protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
}));
}

@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
}

@SuppressWarnings("deprecation")
protected CompletableFuture<DispatchRate> internalGetTopicDispatchRateAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
Expand Down Expand Up @@ -1825,11 +1810,6 @@ protected Boolean internalGetEncryptionRequired() {
return policies.encryption_required;
}

protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).delayed_delivery_policies;
}

protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -1957,13 +1937,6 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
}
}

protected RetentionPolicies internalGetRetention() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.retention_policies;
}

private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
if (backlogQuotaMap.isEmpty()) {
Expand Down Expand Up @@ -2244,11 +2217,6 @@ protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSub
}
}

protected Integer internalGetMaxUnackedMessagesPerConsumer() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
}

protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessagesPerConsumer) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand All @@ -2273,16 +2241,6 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag
}
}

protected Integer internalGetMaxUnackedMessagesPerSubscription() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}

protected Integer internalGetMaxSubscriptionsPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_subscriptions_per_topic;
}

protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -2399,14 +2357,6 @@ protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompa
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}

protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);

return policies.schema_compatibility_strategy;
}

@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
Expand Down Expand Up @@ -2615,12 +2565,6 @@ private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
}
}

protected int internalGetMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
? getNamespacePolicies(namespaceName).max_topics_per_namespace : 0;
}

protected void internalRemoveMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
internalSetMaxTopicsPerNamespace(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -1017,10 +1016,21 @@ public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+ "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetTopicDispatchRate();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(
policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName())))
.exceptionally(ex -> {
log.error("[{}] Failed to get dispatch-rate configured for the namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down Expand Up @@ -1177,10 +1187,20 @@ public void removeBacklogQuota(@PathParam("property") String property, @PathPara
@ApiOperation(hidden = true, value = "Get retention config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public RetentionPolicies getRetention(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetRetention();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
.exceptionally(ex -> {
log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Loading

0 comments on commit cbb1c2a

Please sign in to comment.