From 66fda61d3f6fed2a8057e0a7a259307f7bc4aa06 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 16 Feb 2023 21:47:57 +0200 Subject: [PATCH] [fix][broker] Terminate the async call chain when the condition isn't met for resetCursor (#19541) --- .../broker/admin/impl/PersistentTopicsBase.java | 2 +- .../pulsar/broker/admin/AdminApi2Test.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0214079335bb3..633c4747ee068 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2672,7 +2672,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String if (topicMetadata.partitions > 0) { log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName, subName); - asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic")); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index f7e7bcb4ea129..66b2b1d1470ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -661,6 +661,22 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception { setup(); } + @Test + public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException { + final String partitionedTopicName = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("parttopic"); + admin.topics().createPartitionedTopic(partitionedTopicName, 4); + @Cleanup + Consumer consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub") + .subscriptionType(SubscriptionType.Shared).subscribe(); + try { + admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest); + fail(); + } catch (PulsarAdminException.NotAllowedException e) { + assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"), + "Condition doesn't match. Actual message:" + e.getMessage()); + } + } + private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { Producer producer = pulsarClient.newProducer() .topic(topicName)