From b88fe8cc4825f88050e16ea656adfaf72865b0a2 Mon Sep 17 00:00:00 2001 From: zifengmo <38554710+zifengmo@users.noreply.github.com> Date: Mon, 26 Feb 2024 18:06:28 +0800 Subject: [PATCH] [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22078) --- .../service/BrokerServiceException.java | 2 ++ .../client/api/MultiTopicsConsumerTest.java | 32 +++++++++++++++++++ .../client/api/PulsarClientException.java | 17 ++++++++++ .../apache/pulsar/client/impl/ClientCnx.java | 2 ++ 4 files changed, 53 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 3e77588b2459f..831d6068e2097 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -258,6 +258,8 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU return ServerError.ServiceNotReady; } else if (t instanceof TopicNotFoundException) { return ServerError.TopicNotFound; + } else if (t instanceof SubscriptionNotFoundException) { + return ServerError.SubscriptionNotFound; } else if (t instanceof IncompatibleSchemaException || t instanceof InvalidSchemaDataException) { // for backward compatible with old clients, invalid schema data diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 315ce378d6953..bb8bab29ad9ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -371,4 +371,36 @@ public void testMultipleIOThreads() throws PulsarAdminException, PulsarClientExc assertTrue(consumer instanceof MultiTopicsConsumerImpl); assertTrue(consumer.isConnected()); } + + @Test(timeOut = 30000) + public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException { + final var topic1 = newTopicName(); + final var topic2 = newTopicName(); + + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + try { + final var singleTopicConsumer = pulsarClient.newConsumer() + .topic(topic1) + .subscriptionName("sub-1") + .isAckReceiptEnabled(true) + .subscribe(); + assertTrue(singleTopicConsumer instanceof ConsumerImpl); + } catch (Throwable t) { + assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException); + } + + try { + final var multiTopicsConsumer = pulsarClient.newConsumer() + .topics(List.of(topic1, topic2)) + .subscriptionName("sub-2") + .isAckReceiptEnabled(true) + .subscribe(); + assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl); + } catch (Throwable t) { + assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException); + } + + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 007308ec7ab46..c460fee11d0e6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -344,6 +344,22 @@ public TopicDoesNotExistException(String msg) { } } + /** + * Not found subscription that cannot be created. + */ + public static class SubscriptionNotFoundException extends PulsarClientException { + /** + * Constructs an {@code SubscriptionNotFoundException} with the specified detail message. + * + * @param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public SubscriptionNotFoundException(String msg) { + super(msg); + } + } + /** * Lookup exception thrown by Pulsar client. */ @@ -1163,6 +1179,7 @@ public static boolean isRetriableError(Throwable t) { || t instanceof NotFoundException || t instanceof IncompatibleSchemaException || t instanceof TopicDoesNotExistException + || t instanceof SubscriptionNotFoundException || t instanceof UnsupportedAuthenticationException || t instanceof InvalidMessageException || t instanceof InvalidTopicNameException diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 75e84eeca3e6a..b3444ae393ef0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1336,6 +1336,8 @@ public static PulsarClientException getPulsarClientException(ServerError error, return new PulsarClientException.IncompatibleSchemaException(errorMsg); case TopicNotFound: return new PulsarClientException.TopicDoesNotExistException(errorMsg); + case SubscriptionNotFound: + return new PulsarClientException.SubscriptionNotFoundException(errorMsg); case ConsumerAssignError: return new PulsarClientException.ConsumerAssignException(errorMsg); case NotAllowedError: