From 03c0975345aed55e344ca422a9f3e7ce1491ec99 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Jun 2024 16:09:38 +0800 Subject: [PATCH] [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773) (cherry picked from commit 6236116754472c61b2166da6d4797fc63c83f364) --- .../pulsar/client/impl/ClientCnxTest.java | 44 +++++++++++++++++++ .../client/impl/BinaryProtoLookupService.java | 6 +++ .../apache/pulsar/client/impl/ClientCnx.java | 5 +++ .../pulsar/common/protocol/Commands.java | 1 + 4 files changed, 56 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index d2f610ae53f65..1a9b4bbcb0d21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -20,13 +20,17 @@ import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -123,4 +127,44 @@ public void testClientVersion() throws Exception { producer.close(); consumer.close(); } + + @Test + public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { + final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); + admin.topics().createNonPartitionedTopic(topic); + PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) PulsarClient.builder() + .maxNumberOfRejectedRequestPerConnection(1) + .connectionMaxIdleSeconds(Integer.MAX_VALUE) + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + ProducerImpl producer = (ProducerImpl) clientWitBinaryLookup.newProducer().topic(topic).create(); + + // Verify: the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker is true. + Awaitility.await().untilAsserted(() -> { + ClientCnx clientCnx = producer.getClientCnx(); + Assert.assertNotNull(clientCnx); + Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()); + }); + Assert.assertEquals( + clientWitBinaryLookup.getPartitionsForTopic(topic, true).get().size(), 1); + + // Inject a "false" value for the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation". + // Verify: client will get a not support error. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (CompletableFuture clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) { + field.set(clientCnxFuture.get(), false); + } + try { + clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); + Assert.fail("Expected an error that the broker version is too old."); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); + } + + // cleanup. + producer.close(); + clientWitBinaryLookup.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index d6f4dd2dcac14..980e8a0c78627 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -229,6 +229,12 @@ private CompletableFuture getPartitionedTopicMetadata( CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { + partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of" + + " getting partitions without auto-creation is not supported from the broker," + + " please upgrade the broker to the latest version.")); + return; + } long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, metadataAutoCreationEnabled); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 8694dad7a2d84..b8caa0f438361 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -186,6 +186,8 @@ public class ClientCnx extends PulsarHandler { protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; private boolean supportsTopicWatchers; + @Getter + private boolean supportsGetPartitionedMetadataWithoutAutoCreation; /** Idle stat. **/ @Getter @@ -382,6 +384,9 @@ protected void handleConnected(CommandConnected connected) { supportsTopicWatchers = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers(); + supportsGetPartitionedMetadataWithoutAutoCreation = + connected.hasFeatureFlags() + && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 01a1bd69e07b8..c352da0c871ed 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -299,6 +299,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setProtocolVersion(versionToAdvertise); connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); + connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); return cmd; }