Skip to content

Commit

Permalink
[fix] [client] Fix resource leak in Pulsar Client since HttpLookupSer…
Browse files Browse the repository at this point in the history
…vice doesn't get closed (apache#22858)

(cherry picked from commit bc3dc77)
(cherry picked from commit e9264a9)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Jun 24, 2024
1 parent ab293a2 commit 0d08b5b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.Test;

/**
* Test multi-broker admin api.
*/
@Slf4j
@Test(groups = "broker-admin")
public class PulsarClientImplMultiBrokersTest extends MultiBrokerBaseTest {
@Override
protected int numberOfAdditionalBrokers() {
return 3;
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}

@Override
protected void onCleanup() {
super.onCleanup();
}

@Test(timeOut = 30 * 1000)
public void testReleaseUrlLookupServices() throws Exception {
PulsarClientImpl pulsarClient = (PulsarClientImpl) additionalBrokerClients.get(0);
Map<String, LookupService> urlLookupMap = WhiteboxImpl.getInternalState(pulsarClient, "urlLookupMap");
assertEquals(urlLookupMap.size(), 0);
for (PulsarService pulsar : additionalBrokers) {
pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
pulsarClient.getLookup(pulsar.getWebServiceAddress());
}
assertEquals(urlLookupMap.size(), additionalBrokers.size() * 2);
// Verify: lookup services will be release.
pulsarClient.close();
assertEquals(urlLookupMap.size(), 0);
try {
for (PulsarService pulsar : additionalBrokers) {
pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
pulsarClient.getLookup(pulsar.getWebServiceAddress());
}
fail("Expected a error when calling pulsarClient.getLookup if getLookup was closed");
} catch (IllegalStateException illegalArgumentException) {
assertTrue(illegalArgumentException.getMessage().contains("has been closed"));
}
assertEquals(urlLookupMap.size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -735,6 +736,21 @@ public void close() throws PulsarClientException {
}
}

private void closeUrlLookupMap() {
Map<String, LookupService> closedUrlLookupServices = new HashMap(urlLookupMap.size());
urlLookupMap.entrySet().forEach(e -> {
try {
e.getValue().close();
} catch (Exception ex) {
log.error("Error closing lookup service {}", e.getKey(), ex);
}
closedUrlLookupServices.put(e.getKey(), e.getValue());
});
closedUrlLookupServices.entrySet().forEach(e -> {
urlLookupMap.remove(e.getKey(), e.getValue());
});
}

@Override
public CompletableFuture<Void> closeAsync() {
log.info("Client closing. URL: {}", lookup.getServiceUrl());
Expand All @@ -745,6 +761,8 @@ public CompletableFuture<Void> closeAsync() {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();

closeUrlLookupMap();

producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
if (t != null) {
log.error("Error closing producer {}", p, t);
Expand Down Expand Up @@ -972,6 +990,10 @@ public CompletableFuture<ClientCnx> getConnection(final String topic, final Stri

public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
if (isClosed()) {
throw new IllegalStateException("Pulsar client has been closed, can not build LookupService when"
+ " calling get lookup with an url");
}
try {
return createLookup(serviceUrl);
} catch (PulsarClientException e) {
Expand Down

0 comments on commit 0d08b5b

Please sign in to comment.