Skip to content

Commit

Permalink
[pinpoint-apm#11388] Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 26, 2024
1 parent 1925030 commit 230680d
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.hbase.async.AsyncTableCallback;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate;
import com.navercorp.pinpoint.common.hbase.parallel.ParallelResultScanner;
import com.navercorp.pinpoint.common.hbase.parallel.ScanTaskException;
Expand All @@ -36,7 +35,6 @@
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -46,7 +44,6 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -91,7 +88,6 @@ public class HbaseTemplate extends HbaseAccessor implements HbaseOperations, Ini
private boolean enableParallelScan = false;
private int maxThreads = DEFAULT_MAX_THREADS_FOR_PARALLEL_SCANNER;
private int maxThreadsPerParallelScan = DEFAULT_MAX_THREADS_PER_PARALLEL_SCAN;
private int maxConcurrentAsyncScanner = 256;


private boolean nativeAsync = false;
Expand All @@ -100,7 +96,7 @@ public class HbaseTemplate extends HbaseAccessor implements HbaseOperations, Ini

private ScanMetricReporter scanMetric = new EmptyScanMetricReporter();

private ResultScannerFactory scannerFactory;
private ResultScannerFactory resultScannerFactory;

private HbaseAsyncTemplate asyncTemplate;

Expand All @@ -123,8 +119,8 @@ public void setMaxThreadsPerParallelScan(int maxThreadsPerParallelScan) {
this.maxThreadsPerParallelScan = maxThreadsPerParallelScan;
}

public void setMaxConcurrentAsyncScanner(int maxConcurrentAsyncScanner) {
this.maxConcurrentAsyncScanner = maxConcurrentAsyncScanner;
public void setResultScannerFactory(ResultScannerFactory resultScannerFactory) {
this.resultScannerFactory = resultScannerFactory;
}

public void setNativeAsync(boolean nativeAsync) {
Expand All @@ -150,9 +146,9 @@ public void afterPropertiesSet() {
Objects.requireNonNull(configuration, "configuration is required");
Objects.requireNonNull(getTableFactory(), "tableFactory is required");
Objects.requireNonNull(asyncTemplate, "asyncTemplate is required");
Objects.requireNonNull(resultScannerFactory, "resultScannerFactory");

this.executor = newExecutor();
this.scannerFactory = new ResultScannerFactory(maxConcurrentAsyncScanner);
}

private ExecutorService newExecutor() {
Expand Down Expand Up @@ -349,7 +345,7 @@ public <T> List<T> find(TableName tableName, final List<Scan> scanList, final Re
public List<T> doInTable(Table table) throws Throwable {
Scan[] copy = scanList.toArray(new Scan[0]);
final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "find", copy);
Scanner<T> scanner = scannerFactory.newScanner(table, copy);
Scanner<T> scanner = resultScannerFactory.newScanner(table, copy);

List<T> result = scanner.extractData(action);

Expand All @@ -365,39 +361,19 @@ public <T> List<List<T>> find(TableName tableName, List<Scan> scanList, RowMappe
}

public <T> List<T> findParallel(final TableName tableName, final List<Scan> scans, final ResultsExtractor<T> action) {
if (nativeAsync) {
return findParallel_async(tableName, scans, action);
}
return findParallel_block(tableName, scans, action);
}

public <T> List<T> findParallel_async(final TableName tableName, final List<Scan> scans, final ResultsExtractor<T> action) {
assertAccessAvailable();
if (isSimpleScan(scans.size())) {
return find(tableName, scans, action);
}
return asyncTemplate.execute(tableName, new AsyncTableCallback<>() {
@Override
public List<T> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
final Scan[] copy = scans.toArray(new Scan[0]);

final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", copy);

Scanner<T> scanner = scannerFactory.newScanner(table, copy);
List<T> results = scanner.extractData(action);
reporter.report(scanner::getScanMetrics);
return results;
}
});
if (nativeAsync) {
return asyncTemplate.findParallel(tableName, scans, action);
}
return findParallel0(tableName, scans, action);
}


public <T> List<T> findParallel_block(final TableName tableName, final List<Scan> scans, final ResultsExtractor<T> action) {
assertAccessAvailable();
public <T> List<T> findParallel0(final TableName tableName, final List<Scan> scans, final ResultsExtractor<T> action) {
final Scan[] copy = scans.toArray(new Scan[0]);
if (isSimpleScan(copy.length)) {
return find(tableName, scans, action);
}

final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "block-multi", copy);
final Collection<ScanMetrics> scanMetricsList = new ArrayBlockingQueue<>(copy.length);
Expand Down Expand Up @@ -484,14 +460,14 @@ public <T> T find(TableName tableName, final Scan scan, final AbstractRowKeyDist
}

protected final <T> T executeDistributedScan(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action) {
assertAccessAvailable();
if (nativeAsync) {
return executeDistributedScan_async(tableName, scan, rowKeyDistributor, action);
return this.asyncTemplate.executeDistributedScan(tableName, scan, rowKeyDistributor, action);
}
return executeDistributedScan_block(tableName, scan, rowKeyDistributor, action);
return executeDistributedScan0(tableName, scan, rowKeyDistributor, action);
}

protected final <T> T executeDistributedScan_block(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action) {
assertAccessAvailable();
protected final <T> T executeDistributedScan0(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action) {
return execute(tableName, new TableCallback<>() {
@Override
public T doInTable(Table table) throws Throwable {
Expand All @@ -517,35 +493,6 @@ public T doInTable(Table table) throws Throwable {
});
}

protected final <T> T executeDistributedScan_async(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action) {
assertAccessAvailable();

final T result = asyncTemplate.execute(tableName, new AsyncTableCallback<>() {
@Override
public T doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
final StopWatch watch = StopWatch.createStarted();
final boolean debugEnabled = logger.isDebugEnabled();

Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor);
final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans);
final ResultScanner[] splitScanners = ScanUtils.newScanners(table, scans);
try (ResultScanner scanner = new DistributedScanner(rowKeyDistributor, splitScanners)) {
if (debugEnabled) {
logger.debug("DistributedScanner createTime: {}ms", watch.stop());
watch.start();
}
return action.extractData(scanner);
} finally {
if (debugEnabled) {
logger.debug("DistributedScanner scanTime: {}ms", watch.stop());
}
reporter.report(splitScanners);
}
}
});
return result;
}


@Override
public <T> List<T> findParallel(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, RowMapper<T> action, int numParallelThreads) {
Expand Down Expand Up @@ -599,45 +546,15 @@ public <T> T findParallel(TableName tableName, Scan scan, AbstractRowKeyDistribu
}

protected final <T> T executeParallelDistributedScan(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> action, int numParallelThreads) {
if (nativeAsync) {
return executeParallelDistributedScan_async(tableName, scan, rowKeyDistributor, action, numParallelThreads);
}
return executeParallelDistributedScan_block(tableName, scan, rowKeyDistributor, action, numParallelThreads);
}

protected final <T> T executeParallelDistributedScan_async(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> action, int numParallelThreads) {
assertAccessAvailable();
try {
StopWatch watch = StopWatch.createStarted();

final Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor);
T result = asyncTemplate.execute(tableName, new AsyncTableCallback<T>() {
@Override
public T doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans);
ResultScanner[] resultScanners = ScanUtils.newScanners(table, scans);

ResultScanner scanner = new DistributedScanner(rowKeyDistributor, resultScanners);
try (scanner) {
return action.extractData(scanner);
} finally {
reporter.report(resultScanners);
}
}
});

if (logger.isDebugEnabled()) {
logger.debug("executeParallelDistributedScan createTime: {}ms", watch.stop());
}
return result;
} catch (Exception e) {
throw new HbaseSystemException(e);
if (nativeAsync) {
return asyncTemplate.executeParallelDistributedScan(tableName, scan, rowKeyDistributor, action, numParallelThreads);
}
return executeParallelDistributedScan0(tableName, scan, rowKeyDistributor, action, numParallelThreads);
}


protected final <T> T executeParallelDistributedScan_block(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> action, int numParallelThreads) {
assertAccessAvailable();
protected <T> T executeParallelDistributedScan0(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> action, int numParallelThreads) {
try {
StopWatch watch = StopWatch.createStarted();
final boolean debugEnabled = logger.isDebugEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
Expand All @@ -21,28 +22,39 @@ public interface AsyncHbaseOperations {

<T> T execute(TableName tableName, AsyncTableCallback<T> action);


CompletableFuture<Void> put(TableName tableName, Put put);

List<CompletableFuture<Void>> put(TableName tableName, List<Put> puts);


<T> CompletableFuture<T> get(TableName tableName, Get get, RowMapper<T> mapper);

<T> List<CompletableFuture<T>> get(TableName tableName, final List<Get> gets, final RowMapper<T> mapper);


CompletableFuture<Void> delete(TableName tableName, final Delete delete);


default CompletableFuture<Long> increment(TableName tableName, byte[] row, byte[] family, byte[] qualifier, long amount) {
return increment(tableName, row, family, qualifier, amount, Durability.SKIP_WAL);
}

default CompletableFuture<Long> increment(TableName tableName, byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) {
return increment(tableName, new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)).thenApply((r) -> {
Increment increment = new Increment(row).
addColumn(family, qualifier, amount).
setDurability(durability);

return increment(tableName, increment).thenApply((r) -> {
return Bytes.toLong(r.getValue(family, qualifier));
});
}

List<CompletableFuture<Result>> increment(TableName tableName, List<Increment> incrementList);

CompletableFuture<Result> increment(TableName tableName, Increment increment);

List<CompletableFuture<Result>> increment(TableName tableName, List<Increment> incrementList);


CompletableFuture<CasResult> maxColumnValue(TableName tableName, CheckAndMax max);

Expand Down

This file was deleted.

Loading

0 comments on commit 230680d

Please sign in to comment.