Skip to content

Commit

Permalink
[pinpoint-apm#9922] Enhanced resource reclaimation in hbase2 client
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 11, 2023
1 parent 89fe796 commit 5317b96
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class ConnectionFactoryBean implements FactoryBean<Connection>, Initializ
@Autowired(required = false)
private HbaseSecurityInterceptor hbaseSecurityInterceptor = new EmptyHbaseSecurityInterceptor();

@Autowired(required = false)
private HadoopResourceCleanerRegistry cleaner;

private final boolean warmUp;
private final HbaseTable[] warmUpExclusive = {HbaseTable.AGENT_URI_STAT};
private final Configuration configuration;
Expand All @@ -71,6 +74,10 @@ public ConnectionFactoryBean(Configuration configuration, ExecutorService execut

@Override
public void afterPropertiesSet() throws Exception {
if (this.cleaner != null) {
this.cleaner.register(configuration);
}

hbaseSecurityInterceptor.process(configuration);
try {
if (executorService == null) {
Expand Down Expand Up @@ -130,5 +137,9 @@ public void destroy() throws Exception {
logger.warn("Hbase Connection.close() error: " + e.getMessage(), e);
}
}

if (this.cleaner != null) {
this.cleaner.clean();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTableMultiplexer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;

import java.lang.reflect.Field;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
* @author Taejin Koo
*/
public class HBaseAsyncOperationFactory implements FactoryBean<HBaseAsyncOperation> {
public class HBaseAsyncOperationFactory implements DisposableBean, FactoryBean<HBaseAsyncOperation> {

private static final Logger logger = LogManager.getLogger(HBaseAsyncOperationFactory.class);

public static final String ENABLE_ASYNC_METHOD = "hbase.client.async.enable";
public static final boolean DEFAULT_ENABLE_ASYNC_METHOD = false;
Expand All @@ -43,6 +50,8 @@ public class HBaseAsyncOperationFactory implements FactoryBean<HBaseAsyncOperati

private final Connection connection;
private final Configuration configuration;
private volatile HTableMultiplexer hTableMultiplexer;


public HBaseAsyncOperationFactory(Connection connection, Configuration configuration) {
this.connection = Objects.requireNonNull(connection, "connection");
Expand All @@ -56,19 +65,71 @@ public HBaseAsyncOperation getObject() throws Exception {
return DisabledHBaseAsyncOperation.INSTANCE;
}

int queueSize = configuration.getInt(ASYNC_IN_QUEUE_SIZE, DEFAULT_ASYNC_IN_QUEUE_SIZE);
return new TableMultiplexerAsyncOperation(this.getHTableMultiplexer());
}

private HTableMultiplexer getHTableMultiplexer() {
synchronized (HBaseAsyncOperationFactory.class) {
if (this.hTableMultiplexer != null) {
return this.hTableMultiplexer;
}

if (configuration.get(ASYNC_PERIODIC_FLUSH_TIME, null) == null) {
configuration.setInt(ASYNC_PERIODIC_FLUSH_TIME, DEFAULT_ASYNC_PERIODIC_FLUSH_TIME);
}

if (configuration.get(ASYNC_PERIODIC_FLUSH_TIME, null) == null) {
configuration.setInt(ASYNC_PERIODIC_FLUSH_TIME, DEFAULT_ASYNC_PERIODIC_FLUSH_TIME);
if (configuration.get(ASYNC_MAX_RETRIES_IN_QUEUE, null) == null) {
configuration.setInt(ASYNC_MAX_RETRIES_IN_QUEUE, DEFAULT_ASYNC_RETRY_COUNT);
}

int queueSize = configuration.getInt(ASYNC_IN_QUEUE_SIZE, DEFAULT_ASYNC_IN_QUEUE_SIZE);

this.hTableMultiplexer = new HTableMultiplexer(connection, configuration, queueSize);
}

if (configuration.get(ASYNC_MAX_RETRIES_IN_QUEUE, null) == null) {
configuration.setInt(ASYNC_MAX_RETRIES_IN_QUEUE, DEFAULT_ASYNC_RETRY_COUNT);
return this.hTableMultiplexer;
}

@Override
public void destroy() throws Exception {
closeHTableMultiplexer();
closeHTableFlushWorkers();
}

private void closeHTableFlushWorkers() {
if (this.hTableMultiplexer == null) {
return;
}

return new TableMultiplexerAsyncOperation(connection, configuration, queueSize);
try {
logger.info("Closing hTableFlushWorkers");
final Field executorField = HTableMultiplexer.class.getDeclaredField("executor");
executorField.setAccessible(true);
final Object executorObj = executorField.get(this.hTableMultiplexer);
if (executorObj instanceof ExecutorService) {
((ExecutorService) executorObj).shutdown();
} else {
throw new RuntimeException("Invalid executorService");
}
logger.info("Closed hTableFlushWorkers");
} catch (Exception e) {
logger.warn("Failed to close hTableFlushWorkers", e);
}
}

private void closeHTableMultiplexer() {
if (this.hTableMultiplexer == null) {
return;
}

try {
logger.info("Closing hTableMultiplexer");
this.hTableMultiplexer.close();
logger.info("Closed hTableMultiplexer");
} catch (Exception e) {
logger.warn("Failed to close hTableMultiplexer", e);
}
}

@Override
public Class<HBaseAsyncOperation> getObjectType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.conf.Configuration;

/**
* @author youngjin.kim2
*/
public interface HadoopResourceCleanerRegistry {

void register(Configuration configuration);

void clean();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.util.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTableMultiplexer;
import org.apache.hadoop.hbase.client.Put;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;

/**
Expand All @@ -37,8 +36,8 @@ public class TableMultiplexerAsyncOperation implements HBaseAsyncOperation {
private final LongAdder opsCount = new LongAdder();
private final LongAdder opsRejectCount = new LongAdder();

public TableMultiplexerAsyncOperation(Connection connection, Configuration conf, int perRegionServerBufferQueueSize) {
this.hTableMultiplexer = new HTableMultiplexer(connection, conf, perRegionServerBufferQueueSize);
public TableMultiplexerAsyncOperation(HTableMultiplexer hTableMultiplexer) {
this.hTableMultiplexer = Objects.requireNonNull(hTableMultiplexer, "hTableMultiplexer");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ public interface MappingFunction<T> {
}

private <T extends JoinStatBo> void join(Function<JoinApplicationStatBo, List<T>> fieldSupplier,
MappingFunction<T> joinStatBoTrasform) {
MappingFunction<T> joinStatBoTransform) {
Map<Long, List<T>> joinMap = join(fieldSupplier);
map(joinMap, joinStatBoTrasform);
map(joinMap, joinStatBoTransform);
}

private <T extends JoinStatBo> void map(Map<Long, List<T>> joinMap, MappingFunction<T> joinStatBoTrasform) {
private <T extends JoinStatBo> void map(Map<Long, List<T>> joinMap, MappingFunction<T> joinStatBoTransform) {
for (Map.Entry<Long, List<T>> entry : joinMap.entrySet()) {
final Long key = entry.getKey();
final List<T> statData = entry.getValue();

joinStatBoTrasform.apply(builder, statData, key);
joinStatBoTransform.apply(builder, statData, key);
}
}

Expand Down
20 changes: 20 additions & 0 deletions flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +112,25 @@ public static Bootstrap getInstance(Map<String, String> jobParameters) {
return instance;
}

public static void close() {
synchronized(Bootstrap.class) {
if (instance == null) {
logger.warn("Invalid attempt of closing bootstrap: bootstrap is not initialized yet");
return;
}
logger.info("Closing bootstrap: {}", instance);
final ApplicationContext applicationContext = instance.getApplicationContext();
if (applicationContext != null) {
logger.info("Closing applicationContext: {}", applicationContext);
((ConfigurableApplicationContext) applicationContext).close();
} else {
logger.warn("ApplicationContext was not found while closing bootstrap: {}", instance);
}
logger.info("Closed bootstrap: {}", instance);
instance = null;
}
}

public ApplicationContext getApplicationContext() {
return applicationContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.net.InetAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -97,9 +97,12 @@ private String getRepresentationLocalV4Ip() {
}

// local ip addresses with all LOOPBACK addresses removed
List<String> ipList = NetUtils.getLocalV4IpList();
if (!ipList.isEmpty()) {
return ipList.get(0);
for (final String candidate: NetUtils.getLocalV4IpList()) {
try {
if (InetAddress.getByName(candidate).isReachable(5)) {
return candidate;
}
} catch (Exception ignored) {}
}

return NetUtils.LOOPBACK_ADDRESS_V4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.apache.logging.log4j.Logger;

/**
* @author minwoo.jung
Expand Down Expand Up @@ -52,10 +50,6 @@ public void run(SourceContext<RawData> ctx) throws Exception {
@Override
public void cancel() {
logger.info("cancel TcpSourceFunction.");

ApplicationContext applicationContext = Bootstrap.getInstance(globalJobParameters.toMap()).getApplicationContext();
if (applicationContext != null) {
((ConfigurableApplicationContext) applicationContext).close();
}
Bootstrap.close();
}
}
2 changes: 1 addition & 1 deletion hbase2-module/hbase2-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<artifactId>hbase-shaded-client</artifactId>
</dependency>
</dependencies>

Expand Down
Loading

0 comments on commit 5317b96

Please sign in to comment.