Skip to content

Commit

Permalink
[fix][broker] Terminate the async call chain when the condition isn't…
Browse files Browse the repository at this point in the history
… met for resetCursor (apache#19541)
  • Loading branch information
lhotari committed Feb 16, 2023
1 parent 2d90089 commit 66fda61
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2672,7 +2672,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}",
clientAppId(), topicName, subName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic"));
}
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,22 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
setup();
}

@Test
public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("parttopic");
admin.topics().createPartitionedTopic(partitionedTopicName, 4);
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared).subscribe();
try {
admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
fail();
} catch (PulsarAdminException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
"Condition doesn't match. Actual message:" + e.getMessage());
}
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
Expand Down

0 comments on commit 66fda61

Please sign in to comment.