diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index b1a7190b823ac..d2cbaa5428a74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; @@ -358,7 +359,8 @@ public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -381,7 +383,8 @@ public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenAccept(__ -> validateOffloadPolicies(offloadPolicies)) .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) @@ -404,7 +407,8 @@ public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -428,7 +432,8 @@ public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse async @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse); @@ -452,7 +457,8 @@ public void setMaxUnackedMessagesOnConsumer( @ApiParam(value = "Max unacked messages on consumer policies for the specified topic") Integer maxUnackedNum) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -474,7 +480,8 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -497,7 +504,8 @@ public void getDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyn @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)) .thenAccept(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); @@ -525,7 +533,8 @@ public void setDeduplicationSnapshotInterval( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -547,7 +556,8 @@ public void deleteDeduplicationSnapshotInterval(@Suspended final AsyncResponse a @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -571,7 +581,8 @@ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse); @@ -594,7 +605,8 @@ public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @ApiParam(value = "inactive topic policies for the specified topic") InactiveTopicPolicies inactiveTopicPolicies) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -616,7 +628,8 @@ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResp @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -640,7 +653,8 @@ public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse a @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -715,7 +729,8 @@ public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRespo @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -1931,16 +1946,17 @@ public void examineMessage( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - internalExamineMessageAsync(initialPosition, messagePosition, authoritative) - .thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, - ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES) + .thenCompose(__ -> internalExamineMessageAsync(initialPosition, messagePosition, authoritative)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + if (isNot307And404Exception(ex)) { + log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, + ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -2047,7 +2063,8 @@ public void getBacklog( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - internalGetBacklogAsync(authoritative) + validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE) + .thenCompose(__ -> internalGetBacklogAsync(authoritative)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { Throwable t = FutureUtil.unwrapCompletionException(ex); @@ -2103,7 +2120,8 @@ public void getBacklogQuotaMap( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -2186,7 +2204,8 @@ public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse, + "For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) .thenAccept(op -> { asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> { @@ -2267,7 +2286,8 @@ public void getMessageTTL(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)) .thenAccept(op -> asyncResponse.resume(op .map(TopicPolicies::getMessageTTLInSeconds) @@ -2304,7 +2324,8 @@ public void setMessageTTL(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2331,7 +2352,8 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMessageTTL(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2357,7 +2379,8 @@ public void getDeduplication(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetDeduplication(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -2384,7 +2407,8 @@ public void setDeduplication( @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDeduplication(enabled, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2409,7 +2433,8 @@ public void removeDeduplication(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDeduplication(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2532,7 +2557,9 @@ public void setDispatcherPauseOnAckStatePersistent(@Suspended final AsyncRespons @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, + PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDispatcherPauseOnAckStatePersistent(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully enabled dispatcherPauseOnAckStatePersistent: namespace={}, topic={}", @@ -2561,7 +2588,9 @@ public void removeDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResp @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, + PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveDispatcherPauseOnAckStatePersistent(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove dispatcherPauseOnAckStatePersistent: namespace={}, topic={}", @@ -2589,7 +2618,9 @@ public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncRespons @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, + PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetDispatcherPauseOnAckStatePersistent(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getDispatcherPauseOnAckStatePersistent", ex, asyncResponse); @@ -2617,7 +2648,8 @@ public void getPersistence(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetPersistence(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -2645,7 +2677,8 @@ public void setPersistence(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetPersistence(persistencePolicies, isGlobal)) .thenRun(() -> { try { @@ -2681,7 +2714,8 @@ public void removePersistence(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemovePersistence(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}", @@ -2712,7 +2746,8 @@ public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal)) .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build())) @@ -2740,7 +2775,8 @@ public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}" @@ -2770,7 +2806,8 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}", @@ -2800,7 +2837,8 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -2827,7 +2865,8 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}" @@ -2857,7 +2896,8 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}", @@ -2887,7 +2927,8 @@ public void getMaxProducers(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxProducers(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -2914,7 +2955,8 @@ public void setMaxProducers(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "The max producers of the topic") int maxProducers) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}", @@ -2946,7 +2988,8 @@ public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveMaxProducers(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove max producers: namespace={}, topic={}", @@ -2978,7 +3021,8 @@ public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -3005,7 +3049,8 @@ public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "The max consumers of the topic") int maxConsumers) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}", @@ -3037,7 +3082,8 @@ public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveMaxConsumers(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove max consumers: namespace={}, topic={}", @@ -3068,7 +3114,8 @@ public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateAdminAccessForTenantAsync(topicName.getTenant()) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxMessageSize(isGlobal)) .thenAccept(policies -> { asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build()); @@ -3097,7 +3144,8 @@ public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "The max message size of the topic") int maxMessageSize) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateAdminAccessForTenantAsync(topicName.getTenant()) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize, isGlobal)) .thenRun(() -> { log.info( @@ -3131,7 +3179,8 @@ public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateAdminAccessForTenantAsync(topicName.getTenant()) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove max message size: namespace={}, topic={}", @@ -3439,7 +3488,8 @@ public void getDispatchRate(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetDispatchRate(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -3465,7 +3515,8 @@ public void setDispatchRate(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal)) .thenRun(() -> { try { @@ -3501,7 +3552,8 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveDispatchRate(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}", @@ -3537,7 +3589,8 @@ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResp @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -3565,7 +3618,8 @@ public void setSubscriptionDispatchRate( @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRateImpl dispatchRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal)) .thenRun(() -> { try { @@ -3601,7 +3655,8 @@ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncR @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}", @@ -3635,7 +3690,8 @@ public void getSubscriptionLevelDispatchRate(@Suspended final AsyncResponse asyn @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetSubscriptionLevelDispatchRate( Codec.decode(encodedSubscriptionName), applied, isGlobal)) .thenApply(asyncResponse::resume) @@ -3665,7 +3721,8 @@ public void setSubscriptionLevelDispatchRate( @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRateImpl dispatchRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSubscriptionLevelDispatchRate( Codec.decode(encodedSubscriptionName), dispatchRate, isGlobal)) .thenRun(() -> { @@ -3703,7 +3760,8 @@ public void removeSubscriptionLevelDispatchRate( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate( Codec.decode(encodedSubscriptionName), isGlobal)) .thenRun(() -> { @@ -3735,7 +3793,8 @@ public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { @@ -3761,7 +3820,8 @@ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal)) .thenRun(() -> { try { @@ -3797,7 +3857,8 @@ public void removeCompactionThreshold(@Suspended final AsyncResponse asyncRespon @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}", @@ -3832,7 +3893,8 @@ public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncR @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal)) .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build())) @@ -3860,7 +3922,8 @@ public void setMaxConsumersPerSubscription( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal)) .thenRun(() -> { try { @@ -3896,7 +3959,8 @@ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asy @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove topic max consumers per subscription:" @@ -3929,7 +3993,8 @@ public void getPublishRate(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetPublishRate(isGlobal)) .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build())) @@ -3956,7 +4021,8 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal)) .thenRun(() -> { try { @@ -3993,7 +4059,8 @@ public void removePublishRate(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemovePublishRate(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}", @@ -4030,7 +4097,8 @@ public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal)) .thenAccept(op -> { asyncResponse.resume(op.isPresent() ? op.get() @@ -4060,7 +4128,8 @@ public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp @ApiParam(value = "Enable sub types for the specified topic") Set subscriptionTypesEnabled) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal)) .thenRun(() -> { try { @@ -4096,7 +4165,8 @@ public void removeSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncR @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}", @@ -4128,7 +4198,8 @@ public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getSubscribeRate", ex, asyncResponse); @@ -4154,7 +4225,8 @@ public void setSubscribeRate( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal)) .thenRun(() -> { try { @@ -4192,7 +4264,8 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveSubscribeRate(isGlobal)) .thenRun(() -> { log.info( @@ -4440,7 +4513,8 @@ public void getSchemaValidationEnforced(@Suspended AsyncResponse asyncResponse, + "broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetSchemaValidationEnforced(applied)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -4467,7 +4541,8 @@ public void setSchemaValidationEnforced(@Suspended AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(required = true) boolean schemaValidationEnforced) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSchemaValidationEnforced(schemaValidationEnforced)) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -4663,7 +4738,8 @@ public void setAutoSubscriptionCreation( @ApiParam(value = "Settings for automatic subscription creation") AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetAutoSubscriptionCreation(autoSubscriptionCreationOverride, isGlobal)) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -4689,7 +4765,8 @@ public void getAutoSubscriptionCreation( @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetAutoSubscriptionCreation(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getAutoSubscriptionCreation", ex, asyncResponse); @@ -4714,7 +4791,8 @@ public void removeAutoSubscriptionCreation( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetAutoSubscriptionCreation(null, isGlobal)) .thenRun(() -> { log.info( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java index f07b9a6c2aabf..bcb8e3233a093 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.awaitility.Awaitility.await; import io.jsonwebtoken.Jwts; import java.util.Set; import java.util.UUID; @@ -27,6 +28,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.security.MockedPulsarStandalone; @@ -35,8 +38,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.awaitility.Awaitility.await; - public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone { @@ -172,4 +173,311 @@ public void testRetention() { } } + @SneakyThrows + @Test + public void testOffloadPolicy() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // mocked data + final OffloadPoliciesImpl definedOffloadPolicies = new OffloadPoliciesImpl(); + definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L); + definedOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(100L); + definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L); + definedOffloadPolicies.setManagedLedgerOffloadDriver("s3"); + definedOffloadPolicies.setManagedLedgerOffloadBucket("buck"); + + // test superuser + superUserAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + + // because the topic policies is eventual consistency, we should wait here + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertEquals(offloadPolicy, definedOffloadPolicies); + }); + superUserAdmin.topicPolicies().removeOffloadPolicies(topic); + + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertNull(offloadPolicy); + }); + + // test tenant manager + + tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertEquals(offloadPolicy, definedOffloadPolicies); + }); + tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic); + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertNull(offloadPolicy); + }); + + // test nobody + + try { + subAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + // test sub user with permissions + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + subject, Set.of(action)); + try { + subAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); + } + } + + @SneakyThrows + @Test + public void testMaxUnackedMessagesOnConsumer() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // mocked data + int definedUnackedMessagesOnConsumer = 100; + + // test superuser + superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer); + + // because the topic policies is eventual consistency, we should wait here + await().untilAsserted(() -> { + final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies() + .getMaxUnackedMessagesOnConsumer(topic); + Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer); + }); + superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic); + + await().untilAsserted(() -> { + final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); + Assert.assertNull(unackedMessagesOnConsumer); + }); + + // test tenant manager + + tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer); + await().untilAsserted(() -> { + final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); + Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer); + }); + tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic); + await().untilAsserted(() -> { + final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); + Assert.assertNull(unackedMessagesOnConsumer); + }); + + // test nobody + + try { + subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + // test sub user with permissions + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + subject, Set.of(action)); + try { + subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); + } + } + + @SneakyThrows + @Test + public void testMaxUnackedMessagesOnSubscription() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // mocked data + int definedUnackedMessagesOnConsumer = 100; + + // test superuser + superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer); + + // because the topic policies is eventual consistency, we should wait here + await().untilAsserted(() -> { + final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies() + .getMaxUnackedMessagesOnSubscription(topic); + Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer); + }); + superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic); + + await().untilAsserted(() -> { + final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies() + .getMaxUnackedMessagesOnSubscription(topic); + Assert.assertNull(unackedMessagesOnConsumer); + }); + + // test tenant manager + + tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer); + await().untilAsserted(() -> { + final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic); + Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer); + }); + tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic); + await().untilAsserted(() -> { + final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies() + .getMaxUnackedMessagesOnSubscription(topic); + Assert.assertNull(unackedMessagesOnConsumer); + }); + + // test nobody + + try { + subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + // test sub user with permissions + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + subject, Set.of(action)); + try { + subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); + } + + } }