diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java new file mode 100644 index 0000000000000..29604d0440b05 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.fail; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.MultiBrokerBaseTest; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.Test; + +/** + * Test multi-broker admin api. + */ +@Slf4j +@Test(groups = "broker-admin") +public class PulsarClientImplMultiBrokersTest extends MultiBrokerBaseTest { + @Override + protected int numberOfAdditionalBrokers() { + return 3; + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setManagedLedgerMaxEntriesPerLedger(10); + } + + @Override + protected void onCleanup() { + super.onCleanup(); + } + + @Test(timeOut = 30 * 1000) + public void testReleaseUrlLookupServices() throws Exception { + PulsarClientImpl pulsarClient = (PulsarClientImpl) additionalBrokerClients.get(0); + Map urlLookupMap = WhiteboxImpl.getInternalState(pulsarClient, "urlLookupMap"); + assertEquals(urlLookupMap.size(), 0); + for (PulsarService pulsar : additionalBrokers) { + pulsarClient.getLookup(pulsar.getBrokerServiceUrl()); + pulsarClient.getLookup(pulsar.getWebServiceAddress()); + } + assertEquals(urlLookupMap.size(), additionalBrokers.size() * 2); + // Verify: lookup services will be release. + pulsarClient.close(); + assertEquals(urlLookupMap.size(), 0); + try { + for (PulsarService pulsar : additionalBrokers) { + pulsarClient.getLookup(pulsar.getBrokerServiceUrl()); + pulsarClient.getLookup(pulsar.getWebServiceAddress()); + } + fail("Expected a error when calling pulsarClient.getLookup if getLookup was closed"); + } catch (IllegalStateException illegalArgumentException) { + assertTrue(illegalArgumentException.getMessage().contains("has been closed")); + } + assertEquals(urlLookupMap.size(), 0); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e8107efe98ec0..f4afb2931cc9e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -33,6 +33,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -744,6 +745,21 @@ public void close() throws PulsarClientException { } } + private void closeUrlLookupMap() { + Map closedUrlLookupServices = new HashMap(urlLookupMap.size()); + urlLookupMap.entrySet().forEach(e -> { + try { + e.getValue().close(); + } catch (Exception ex) { + log.error("Error closing lookup service {}", e.getKey(), ex); + } + closedUrlLookupServices.put(e.getKey(), e.getValue()); + }); + closedUrlLookupServices.entrySet().forEach(e -> { + urlLookupMap.remove(e.getKey(), e.getValue()); + }); + } + @Override public CompletableFuture closeAsync() { log.info("Client closing. URL: {}", lookup.getServiceUrl()); @@ -754,6 +770,8 @@ public CompletableFuture closeAsync() { final CompletableFuture closeFuture = new CompletableFuture<>(); List> futures = new ArrayList<>(); + closeUrlLookupMap(); + producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> { if (t != null) { log.error("Error closing producer {}", p, t); @@ -982,6 +1000,10 @@ public CompletableFuture getConnection(final String topic, final Stri public LookupService getLookup(String serviceUrl) { return urlLookupMap.computeIfAbsent(serviceUrl, url -> { + if (isClosed()) { + throw new IllegalStateException("Pulsar client has been closed, can not build LookupService when" + + " calling get lookup with an url"); + } try { return createLookup(serviceUrl); } catch (PulsarClientException e) {