Skip to content

Commit

Permalink
[#11388] Add HbaseAsyncTemplate
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 23, 2024
1 parent fbe7e5a commit c7d1fb1
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 1,145 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.navercorp.pinpoint.collector.monitor.dao.hbase.BulkOperationReporter;
import com.navercorp.pinpoint.collector.dao.hbase.HbaseMapResponseTimeDao;
import com.navercorp.pinpoint.collector.dao.hbase.HbaseMapStatisticsCalleeDao;
import com.navercorp.pinpoint.collector.dao.hbase.HbaseMapStatisticsCallerDao;
import com.navercorp.pinpoint.collector.monitor.dao.hbase.BulkOperationReporter;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -50,13 +51,14 @@ private BulkUpdater getBulkUpdater(String reporterName) {

private BulkWriter newBulkWriter(String loggerName,
HbaseOperations hbaseTemplate,
HbaseAsyncTemplate asyncTemplate,
HbaseColumnFamily descriptor,
TableNameProvider tableNameProvider,
RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
BulkIncrementer bulkIncrementer,
BulkUpdater bulkUpdater) {
if (bulkConfiguration.enableBulk()) {
return new DefaultBulkWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix,
return new DefaultBulkWriter(loggerName, asyncTemplate, rowKeyDistributorByHashPrefix,
bulkIncrementer, bulkUpdater, descriptor, tableNameProvider);
} else {
return new SyncWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix, descriptor, tableNameProvider);
Expand All @@ -81,12 +83,13 @@ public BulkUpdater callerBulkUpdater() {

@Bean
public BulkWriter callerBulkWriter(HbaseOperations hbaseTemplate,
HbaseAsyncTemplate asyncTemplate,
TableNameProvider tableNameProvider,
@Qualifier("statisticsCallerRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
@Qualifier("callerBulkIncrementer") BulkIncrementer bulkIncrementer,
@Qualifier("callerBulkUpdater") BulkUpdater bulkUpdater) {
String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName());
return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLEE_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLEE_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
}


Expand All @@ -108,12 +111,13 @@ public BulkUpdater calleeBulkUpdater() {

@Bean
public BulkWriter calleeBulkWriter(HbaseOperations hbaseTemplate,
HbaseAsyncTemplate asyncTemplate,
TableNameProvider tableNameProvider,
@Qualifier("statisticsCalleeRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
@Qualifier("calleeBulkIncrementer") BulkIncrementer bulkIncrementer,
@Qualifier("calleeBulkUpdater") BulkUpdater bulkUpdater) {
String loggerName = newBulkWriterName(HbaseMapStatisticsCalleeDao.class.getName());
return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLER_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLER_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
}

@Bean
Expand All @@ -133,12 +137,13 @@ public BulkUpdater selfBulkUpdater() {

@Bean
public BulkWriter selfBulkWriter(HbaseOperations hbaseTemplate,
HbaseAsyncTemplate asyncTemplate,
TableNameProvider tableNameProvider,
@Qualifier("statisticsSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
@Qualifier("selfBulkIncrementer") BulkIncrementer bulkIncrementer,
@Qualifier("selfBulkUpdater") BulkUpdater bulkUpdater) {
String loggerName = newBulkWriterName(HbaseMapResponseTimeDao.class.getName());
return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_SELF_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_SELF_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
}

private String newBulkWriterName(String className) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.navercorp.pinpoint.common.hbase.CasResult;
import com.navercorp.pinpoint.common.hbase.CheckAndMax;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.logging.log4j.LogManager;
Expand All @@ -12,6 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* @author emeroad
Expand All @@ -20,7 +24,6 @@ public class DefaultBulkWriter implements BulkWriter {

private final Logger logger;

private final HbaseOperations hbaseTemplate;
private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final BulkIncrementer bulkIncrementer;
Expand All @@ -29,24 +32,29 @@ public class DefaultBulkWriter implements BulkWriter {

private final HbaseColumnFamily tableDescriptor;
private final TableNameProvider tableNameProvider;

private final HbaseAsyncTemplate asyncTemplate;
private int batchSize = 200;

public DefaultBulkWriter(String loggerName,
HbaseOperations hbaseTemplate,
HbaseAsyncTemplate asyncTemplate,
RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
BulkIncrementer bulkIncrementer,
BulkUpdater bulkUpdater,
HbaseColumnFamily tableDescriptor,
TableNameProvider tableNameProvider) {
this.logger = LogManager.getLogger(loggerName);
this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
this.asyncTemplate = Objects.requireNonNull(asyncTemplate, "asyncTemplate");
this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix");
this.bulkIncrementer = Objects.requireNonNull(bulkIncrementer, "bulkIncrementer");
this.bulkUpdater = Objects.requireNonNull(bulkUpdater, "bulkUpdater");
this.tableDescriptor = Objects.requireNonNull(tableDescriptor, "tableDescriptor");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

@Override
public void increment(RowKey rowKey, ColumnName columnName) {
TableName tableName = tableNameProvider.getTableName(tableDescriptor.getTable());
Expand Down Expand Up @@ -77,16 +85,18 @@ public void flushLink() {
if (logger.isDebugEnabled()) {
logger.debug("flush {} to [{}] Increment:{}", this.getClass().getSimpleName(), tableName, increments.size());
}
hbaseTemplate.asyncIncrement(tableName, increments);
List<List<Increment>> partition = ListUtils.partition(increments, batchSize);
for (List<Increment> incrementList : partition) {
asyncTemplate.increment(tableName, incrementList);
}
}

}

private void checkAndMax(TableName tableName, byte[] rowKey, byte[] columnName, long val) {
Objects.requireNonNull(rowKey, "rowKey");
Objects.requireNonNull(columnName, "columnName");

hbaseTemplate.maxColumnValue(tableName, rowKey, getColumnFamilyName(), columnName, val);
private CompletableFuture<CasResult> checkAndMax(TableName tableName, CheckAndMax checkAndMax) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(checkAndMax, "checkAndMax");
return this.asyncTemplate.maxColumnValue(tableName, checkAndMax);
}

@Override
Expand All @@ -104,7 +114,9 @@ public void flushAvgMax() {
final RowInfo rowInfo = entry.getKey();
final Long val = entry.getValue();
final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey());
checkAndMax(rowInfo.getTableName(), rowKey, rowInfo.getColumnName().getColumnName(), val);
byte[] columnName = rowInfo.getColumnName().getColumnName();
CheckAndMax checkAndMax = new CheckAndMax(rowKey, getColumnFamilyName(), columnName, val);
checkAndMax(rowInfo.getTableName(), checkAndMax);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.navercorp.pinpoint.common.hbase.CheckAndMax;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void increment(RowKey rowKey, ColumnName columnName, long addition) {
TableName tableName = tableNameProvider.getTableName(this.tableDescriptor.getTable());
final byte[] rowKeyBytes = getDistributedKey(rowKey.getRowKey());
Increment increment = Increments.increment(rowKeyBytes, getColumnFamilyName(), columnName.getColumnName(), 1);
this.hbaseTemplate.asyncIncrement(tableName, increment);
this.hbaseTemplate.increment(tableName, increment);
}

@Override
Expand All @@ -61,7 +62,8 @@ public void updateMax(RowKey rowKey, ColumnName columnName, long value) {

TableName tableName = tableNameProvider.getTableName(this.tableDescriptor.getTable());
final byte[] rowKeyBytes = getDistributedKey(rowKey.getRowKey());
this.hbaseTemplate.maxColumnValue(tableName, rowKeyBytes, getColumnFamilyName(), columnName.getColumnName(), value);
CheckAndMax checkAndMax = new CheckAndMax(rowKeyBytes, getColumnFamilyName(), columnName.getColumnName(), value);
this.hbaseTemplate.maxColumnValue(tableName, checkAndMax);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.navercorp.pinpoint.common.hbase;


import com.navercorp.pinpoint.common.hbase.util.CheckAndMutates;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Objects;

public final class CheckAndMax {
private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private final long value;

public CheckAndMax(byte[] row, byte[] family, byte[] qualifier, long value) {
this.row = Objects.requireNonNull(row, "row");
this.family = Objects.requireNonNull(family, "family");
this.qualifier = Objects.requireNonNull(qualifier, "qualifier");
this.value = value;
}

public byte[] row() {
return row;
}

public byte[] family() {
return family;
}

public byte[] qualifier() {
return qualifier;
}

public long value() {
return value;
}

public static CheckAndMutate initialMax(CheckAndMax max) {
Put put = new Put(max.row(), true);
put.addColumn(max.family(), max.qualifier(), Bytes.toBytes(max.value()));

return CheckAndMutate.newBuilder(max.row())
.ifNotExists(max.family(), max.qualifier())
.build(put);
}

public static CheckAndMutate casMax(CheckAndMutate mutate) {
Objects.requireNonNull(mutate, "mutate");
return CheckAndMutates.max(mutate.getRow(), mutate.getFamily(), mutate.getQualifier(), mutate.getValue(), (Put) mutate.getAction());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory;
import org.apache.hadoop.conf.Configuration;
import org.springframework.util.StringUtils;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* Base class for {@link HbaseTemplate} , defining commons properties such as {@link TableInterfaceFactory} and {@link Configuration}.
* Base class for {@link HbaseTemplate} , defining commons properties such as {@link org.apache.hadoop.hbase.client.Connection} and {@link Configuration}.
*
* Not intended to be used directly.
*
Expand All @@ -36,8 +35,6 @@ public abstract class HbaseAccessor {

private TableFactory tableFactory;

private AsyncTableFactory asyncTableFactory;

private Configuration configuration;

/**
Expand Down Expand Up @@ -79,12 +76,4 @@ public Charset getCharset() {
return (StringUtils.hasText(encoding) ? Charset.forName(encoding) : CHARSET);
}


public AsyncTableFactory getAsyncTableFactory() {
return asyncTableFactory;
}

public void setAsyncTableFactory(AsyncTableFactory asyncTableFactory) {
this.asyncTableFactory = asyncTableFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@

package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.hbase.async.AdvancedAsyncTableCallback;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCallback;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* @author emeroad
Expand Down Expand Up @@ -69,12 +65,9 @@ public interface HbaseOperations {
/**
*
* @param tableName target table
* @param rowName to check
* @param familyName column family to check
* @param qualifier column qualifier to check
* @param value if the value provided is greater than the saved, update the saved
* @param checkAndMax checkAndMax
*/
void maxColumnValue(TableName tableName, final byte[] rowName, final byte[] familyName, final byte[] qualifier, final long value);
CasResult maxColumnValue(TableName tableName, final CheckAndMax checkAndMax);

void delete(TableName tableName, final Delete delete);
void delete(TableName tableName, final List<Delete> deletes);
Expand Down Expand Up @@ -110,14 +103,6 @@ public interface HbaseOperations {
*/
List<Result> increment(TableName tableName, final List<Increment> incrementList);

long incrementColumnValue(TableName tableName, final byte[] rowName, final byte[] familyName, final byte[] qualifier, final long amount);
long incrementColumnValue(TableName tableName, final byte[] rowName, final byte[] familyName, final byte[] qualifier, final long amount, final Durability durability);

CompletableFuture<Result> asyncIncrement(final TableName tableName, final Increment incrementList);
List<CompletableFuture<Result>> asyncIncrement(final TableName tableName, final List<Increment> incrementList);
CompletableFuture<Long> asyncIncrement(TableName tableName, byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability);


/**
* Executes the given action against the specified table handling resource management.
* <p>
Expand Down Expand Up @@ -158,7 +143,4 @@ public interface HbaseOperations {
<T> List<T> find(TableName tableName, final Scan scan, final RowMapper<T> action);


<T> T asyncExecute(TableName tableName, AdvancedAsyncTableCallback<T> action);

<T> T asyncExecute(TableName tableName, AsyncTableCallback<T> action);
}
Loading

0 comments on commit c7d1fb1

Please sign in to comment.