diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java index 2ffe483e2359..675845ba3e56 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java @@ -16,7 +16,6 @@ package com.navercorp.pinpoint.common.hbase; -import com.navercorp.pinpoint.common.hbase.async.AsyncTableCallback; import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate; import com.navercorp.pinpoint.common.hbase.parallel.ParallelResultScanner; import com.navercorp.pinpoint.common.hbase.parallel.ScanTaskException; @@ -36,7 +35,6 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Delete; @@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; @@ -91,7 +88,6 @@ public class HbaseTemplate extends HbaseAccessor implements HbaseOperations, Ini private boolean enableParallelScan = false; private int maxThreads = DEFAULT_MAX_THREADS_FOR_PARALLEL_SCANNER; private int maxThreadsPerParallelScan = DEFAULT_MAX_THREADS_PER_PARALLEL_SCAN; - private int maxConcurrentAsyncScanner = 256; private boolean nativeAsync = false; @@ -100,7 +96,7 @@ public class HbaseTemplate extends HbaseAccessor implements HbaseOperations, Ini private ScanMetricReporter scanMetric = new EmptyScanMetricReporter(); - private ResultScannerFactory scannerFactory; + private ResultScannerFactory resultScannerFactory; private HbaseAsyncTemplate asyncTemplate; @@ -123,8 +119,8 @@ public void setMaxThreadsPerParallelScan(int maxThreadsPerParallelScan) { this.maxThreadsPerParallelScan = maxThreadsPerParallelScan; } - public void setMaxConcurrentAsyncScanner(int maxConcurrentAsyncScanner) { - this.maxConcurrentAsyncScanner = maxConcurrentAsyncScanner; + public void setResultScannerFactory(ResultScannerFactory resultScannerFactory) { + this.resultScannerFactory = resultScannerFactory; } public void setNativeAsync(boolean nativeAsync) { @@ -150,9 +146,9 @@ public void afterPropertiesSet() { Objects.requireNonNull(configuration, "configuration is required"); Objects.requireNonNull(getTableFactory(), "tableFactory is required"); Objects.requireNonNull(asyncTemplate, "asyncTemplate is required"); + Objects.requireNonNull(resultScannerFactory, "resultScannerFactory"); this.executor = newExecutor(); - this.scannerFactory = new ResultScannerFactory(maxConcurrentAsyncScanner); } private ExecutorService newExecutor() { @@ -349,7 +345,7 @@ public List find(TableName tableName, final List scanList, final Re public List doInTable(Table table) throws Throwable { Scan[] copy = scanList.toArray(new Scan[0]); final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "find", copy); - Scanner scanner = scannerFactory.newScanner(table, copy); + Scanner scanner = resultScannerFactory.newScanner(table, copy); List result = scanner.extractData(action); @@ -365,39 +361,19 @@ public List> find(TableName tableName, List scanList, RowMappe } public List findParallel(final TableName tableName, final List scans, final ResultsExtractor action) { - if (nativeAsync) { - return findParallel_async(tableName, scans, action); - } - return findParallel_block(tableName, scans, action); - } - - public List findParallel_async(final TableName tableName, final List scans, final ResultsExtractor action) { assertAccessAvailable(); if (isSimpleScan(scans.size())) { return find(tableName, scans, action); } - return asyncTemplate.execute(tableName, new AsyncTableCallback<>() { - @Override - public List doInTable(AsyncTable table) throws Throwable { - final Scan[] copy = scans.toArray(new Scan[0]); - - final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", copy); - - Scanner scanner = scannerFactory.newScanner(table, copy); - List results = scanner.extractData(action); - reporter.report(scanner::getScanMetrics); - return results; - } - }); + if (nativeAsync) { + return asyncTemplate.findParallel(tableName, scans, action); + } + return findParallel0(tableName, scans, action); } - public List findParallel_block(final TableName tableName, final List scans, final ResultsExtractor action) { - assertAccessAvailable(); + public List findParallel0(final TableName tableName, final List scans, final ResultsExtractor action) { final Scan[] copy = scans.toArray(new Scan[0]); - if (isSimpleScan(copy.length)) { - return find(tableName, scans, action); - } final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "block-multi", copy); final Collection scanMetricsList = new ArrayBlockingQueue<>(copy.length); @@ -484,14 +460,14 @@ public T find(TableName tableName, final Scan scan, final AbstractRowKeyDist } protected final T executeDistributedScan(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action) { + assertAccessAvailable(); if (nativeAsync) { - return executeDistributedScan_async(tableName, scan, rowKeyDistributor, action); + return this.asyncTemplate.executeDistributedScan(tableName, scan, rowKeyDistributor, action); } - return executeDistributedScan_block(tableName, scan, rowKeyDistributor, action); + return executeDistributedScan0(tableName, scan, rowKeyDistributor, action); } - protected final T executeDistributedScan_block(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action) { - assertAccessAvailable(); + protected final T executeDistributedScan0(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action) { return execute(tableName, new TableCallback<>() { @Override public T doInTable(Table table) throws Throwable { @@ -517,35 +493,6 @@ public T doInTable(Table table) throws Throwable { }); } - protected final T executeDistributedScan_async(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action) { - assertAccessAvailable(); - - final T result = asyncTemplate.execute(tableName, new AsyncTableCallback<>() { - @Override - public T doInTable(AsyncTable table) throws Throwable { - final StopWatch watch = StopWatch.createStarted(); - final boolean debugEnabled = logger.isDebugEnabled(); - - Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor); - final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans); - final ResultScanner[] splitScanners = ScanUtils.newScanners(table, scans); - try (ResultScanner scanner = new DistributedScanner(rowKeyDistributor, splitScanners)) { - if (debugEnabled) { - logger.debug("DistributedScanner createTime: {}ms", watch.stop()); - watch.start(); - } - return action.extractData(scanner); - } finally { - if (debugEnabled) { - logger.debug("DistributedScanner scanTime: {}ms", watch.stop()); - } - reporter.report(splitScanners); - } - } - }); - return result; - } - @Override public List findParallel(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, RowMapper action, int numParallelThreads) { @@ -599,45 +546,15 @@ public T findParallel(TableName tableName, Scan scan, AbstractRowKeyDistribu } protected final T executeParallelDistributedScan(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor action, int numParallelThreads) { - if (nativeAsync) { - return executeParallelDistributedScan_async(tableName, scan, rowKeyDistributor, action, numParallelThreads); - } - return executeParallelDistributedScan_block(tableName, scan, rowKeyDistributor, action, numParallelThreads); - } - - protected final T executeParallelDistributedScan_async(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor action, int numParallelThreads) { assertAccessAvailable(); - try { - StopWatch watch = StopWatch.createStarted(); - - final Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor); - T result = asyncTemplate.execute(tableName, new AsyncTableCallback() { - @Override - public T doInTable(AsyncTable table) throws Throwable { - ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans); - ResultScanner[] resultScanners = ScanUtils.newScanners(table, scans); - - ResultScanner scanner = new DistributedScanner(rowKeyDistributor, resultScanners); - try (scanner) { - return action.extractData(scanner); - } finally { - reporter.report(resultScanners); - } - } - }); - - if (logger.isDebugEnabled()) { - logger.debug("executeParallelDistributedScan createTime: {}ms", watch.stop()); - } - return result; - } catch (Exception e) { - throw new HbaseSystemException(e); + if (nativeAsync) { + return asyncTemplate.executeParallelDistributedScan(tableName, scan, rowKeyDistributor, action, numParallelThreads); } + return executeParallelDistributedScan0(tableName, scan, rowKeyDistributor, action, numParallelThreads); } - protected final T executeParallelDistributedScan_block(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor action, int numParallelThreads) { - assertAccessAvailable(); + protected T executeParallelDistributedScan0(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor action, int numParallelThreads) { try { StopWatch watch = StopWatch.createStarted(); final boolean debugEnabled = logger.isDebugEnabled(); diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncHbaseOperations.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncHbaseOperations.java index b0fe4d0cb6c3..e11d5005f679 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncHbaseOperations.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncHbaseOperations.java @@ -6,6 +6,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -21,6 +22,7 @@ public interface AsyncHbaseOperations { T execute(TableName tableName, AsyncTableCallback action); + CompletableFuture put(TableName tableName, Put put); List> put(TableName tableName, List puts); @@ -28,21 +30,31 @@ public interface AsyncHbaseOperations { CompletableFuture get(TableName tableName, Get get, RowMapper mapper); + List> get(TableName tableName, final List gets, final RowMapper mapper); + + + CompletableFuture delete(TableName tableName, final Delete delete); + default CompletableFuture increment(TableName tableName, byte[] row, byte[] family, byte[] qualifier, long amount) { return increment(tableName, row, family, qualifier, amount, Durability.SKIP_WAL); } default CompletableFuture increment(TableName tableName, byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) { - return increment(tableName, new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)).thenApply((r) -> { + Increment increment = new Increment(row). + addColumn(family, qualifier, amount). + setDurability(durability); + + return increment(tableName, increment).thenApply((r) -> { return Bytes.toLong(r.getValue(family, qualifier)); }); } - List> increment(TableName tableName, List incrementList); CompletableFuture increment(TableName tableName, Increment increment); + List> increment(TableName tableName, List incrementList); + CompletableFuture maxColumnValue(TableName tableName, CheckAndMax max); diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncOperations.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncOperations.java deleted file mode 100644 index 74ceb5f1736e..000000000000 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncOperations.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.navercorp.pinpoint.common.hbase.async; - -import com.navercorp.pinpoint.common.hbase.LimitEventHandler; -import com.navercorp.pinpoint.common.hbase.ResultsExtractor; -import com.navercorp.pinpoint.common.hbase.RowMapper; -import com.sematext.hbase.wd.AbstractRowKeyDistributor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Scan; - -import java.util.List; - -public interface HbaseAsyncOperations { - - - List findParallel(TableName tableName, final List scans, final ResultsExtractor action); - List> findParallel(TableName tableName, final List scans, final RowMapper action); - - // Parallel scanners for distributed scans - List findParallel(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final RowMapper action, int numParallelThreads); - List findParallel(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper action, int numParallelThreads); - List findParallel(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper action, final LimitEventHandler limitEventHandler, int numParallelThreads); - T findParallel(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action, int numParallelThreads); - - T executeAsync(TableName tableName, AsyncTableCallback action); -} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTemplate.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTemplate.java index 2f3327660331..8a91383bf6ab 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTemplate.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTemplate.java @@ -18,20 +18,32 @@ import com.navercorp.pinpoint.common.hbase.CasResult; import com.navercorp.pinpoint.common.hbase.CheckAndMax; +import com.navercorp.pinpoint.common.hbase.HbaseSystemException; +import com.navercorp.pinpoint.common.hbase.ResultsExtractor; import com.navercorp.pinpoint.common.hbase.RowMapper; import com.navercorp.pinpoint.common.hbase.future.FutureDecorator; import com.navercorp.pinpoint.common.hbase.future.FutureLoggingDecorator; +import com.navercorp.pinpoint.common.hbase.scan.ResultScannerFactory; +import com.navercorp.pinpoint.common.hbase.scan.ScanUtils; +import com.navercorp.pinpoint.common.hbase.scan.Scanner; import com.navercorp.pinpoint.common.hbase.util.HBaseExceptionUtils; import com.navercorp.pinpoint.common.hbase.util.MutationType; +import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; +import com.navercorp.pinpoint.common.util.StopWatch; +import com.sematext.hbase.wd.AbstractRowKeyDistributor; +import com.sematext.hbase.wd.DistributedScanner; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.logging.log4j.LogManager; @@ -58,11 +70,18 @@ public class HbaseAsyncTemplate implements DisposableBean, AsyncHbaseOperations private final AsyncTableFactory asyncTableFactory; private final ExecutorService executor; + private final ScanMetricReporter scanMetric; + private final ResultScannerFactory scannerFactory; + private final FutureDecorator futureDecorator = new FutureLoggingDecorator(logger); public HbaseAsyncTemplate(AsyncTableFactory asyncTableFactory, + ResultScannerFactory scannerFactory, + ScanMetricReporter scanMetric, ExecutorService executor) { this.asyncTableFactory = Objects.requireNonNull(asyncTableFactory, "asyncTableFactory"); + this.scannerFactory = Objects.requireNonNull(scannerFactory, "scannerFactory"); + this.scanMetric = Objects.requireNonNull(scanMetric, "scanMetric"); this.executor = Objects.requireNonNull(executor, "executor"); } @@ -124,9 +143,9 @@ public CompletableFuture doInTable(AsyncTable table) thro CompletableFuture result = table.get(get); return result.thenApply(new Function() { @Override - public T apply(Result result1) { + public T apply(Result result) { try { - return mapper.mapRow(result1, 0); + return mapper.mapRow(result, 0); } catch (Exception e) { return HBaseExceptionUtils.rethrowHbaseException(e); } @@ -138,6 +157,44 @@ public T apply(Result result1) { return futures; } + @Override + public List> get(TableName tableName, final List gets, final RowMapper mapper) { + List> futures = this.execute(tableName, new AsyncTableCallback<>() { + @Override + public List> doInTable(AsyncTable table) throws Throwable { + List> results = table.get(gets); + List> mapperResult = new ArrayList<>(results.size()); + for (CompletableFuture result : results) { + mapperResult.add(result.thenApply(new Function() { + @Override + public T apply(Result result) { + try { + return mapper.mapRow(result, 0); + } catch (Exception e) { + return HBaseExceptionUtils.rethrowHbaseException(e); + } + }; + })); + } + return mapperResult; + } + }); + futureDecorator.apply(futures, tableName, MutationType.GET); + return futures; + } + + @Override + public CompletableFuture delete(TableName tableName, final Delete delete) { + CompletableFuture futures = this.execute(tableName, new AsyncTableCallback<>() { + @Override + public CompletableFuture doInTable(AsyncTable table) throws Throwable { + return table.delete(delete); + } + }); + futureDecorator.apply(futures, tableName, MutationType.GET); + return futures; + } + @Override public List> increment(final TableName tableName, final List incrementList) { @@ -240,6 +297,78 @@ public List> doInTable(AsyncTable List findParallel(final TableName tableName, final List scans, final ResultsExtractor action) { + return execute(tableName, new AsyncTableCallback<>() { + @Override + public List doInTable(AsyncTable table) throws Throwable { + final Scan[] copy = scans.toArray(new Scan[0]); + + final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", copy); + + Scanner scanner = scannerFactory.newScanner(table, copy); + List results = scanner.extractData(action); + reporter.report(scanner::getScanMetrics); + return results; + } + }); + } + + public T executeDistributedScan(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor action) { + final T result = execute(tableName, new AsyncTableCallback<>() { + @Override + public T doInTable(AsyncTable table) throws Throwable { + final StopWatch watch = StopWatch.createStarted(); + final boolean debugEnabled = logger.isDebugEnabled(); + + Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor); + final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans); + final ResultScanner[] splitScanners = ScanUtils.newScanners(table, scans); + try (ResultScanner scanner = new DistributedScanner(rowKeyDistributor, splitScanners)) { + if (debugEnabled) { + logger.debug("DistributedScanner createTime: {}ms", watch.stop()); + watch.start(); + } + return action.extractData(scanner); + } finally { + if (debugEnabled) { + logger.debug("DistributedScanner scanTime: {}ms", watch.stop()); + } + reporter.report(splitScanners); + } + } + }); + return result; + } + + public T executeParallelDistributedScan(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor action, int numParallelThreads) { + try { + StopWatch watch = StopWatch.createStarted(); + + final Scan[] scans = ScanUtils.splitScans(scan, rowKeyDistributor); + T result = execute(tableName, new AsyncTableCallback() { + @Override + public T doInTable(AsyncTable table) throws Throwable { + ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "async-multi", scans); + ResultScanner[] resultScanners = ScanUtils.newScanners(table, scans); + + ResultScanner scanner = new DistributedScanner(rowKeyDistributor, resultScanners); + try (scanner) { + return action.extractData(scanner); + } finally { + reporter.report(resultScanners); + } + } + }); + + if (logger.isDebugEnabled()) { + logger.debug("executeParallelDistributedScan createTime: {}ms", watch.stop()); + } + return result; + } catch (Exception e) { + throw new HbaseSystemException(e); + } + } + public AsyncTable getAsyncTable(TableName tableName) { return getAsyncTableFactory().getTable(tableName, executor); diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java index ebc8bdf62d3a..ee86d48fca15 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java @@ -44,6 +44,7 @@ import com.navercorp.pinpoint.common.hbase.async.HbasePutWriterDecorator; import com.navercorp.pinpoint.common.hbase.async.LoggingHbasePutWriter; import com.navercorp.pinpoint.common.hbase.async.TableWriterFactory; +import com.navercorp.pinpoint.common.hbase.scan.ResultScannerFactory; import com.navercorp.pinpoint.common.hbase.util.DefaultScanMetricReporter; import com.navercorp.pinpoint.common.hbase.util.EmptyScanMetricReporter; import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; @@ -115,9 +116,23 @@ public ScanMetricReporter emptyScannerMetricReporter() { } @Bean - public HbaseAsyncTemplate asyncTemplate(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory) { + public ResultScannerFactory resultScannerFactory(Optional parallelScan) { + int partitionSize = getPartitionSize(parallelScan); + return new ResultScannerFactory(partitionSize); + } + + private int getPartitionSize(Optional parallelScan) { + return parallelScan + .map(ParallelScan::getMaxConcurrentAsyncScanner) + .orElse(ParallelScan.DEFAULT_MAX_CONCURRENT_ASYNC_SCANNER); + } + + @Bean + public HbaseAsyncTemplate asyncTemplate(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory, + ScanMetricReporter scanMetricReporter, + ResultScannerFactory resultScannerFactory) { ExecutorService executor = newAsyncTemplateExecutor(); - return new HbaseAsyncTemplate(asyncTableFactory, executor); + return new HbaseAsyncTemplate(asyncTableFactory, resultScannerFactory, scanMetricReporter, executor); } private ExecutorService newAsyncTemplateExecutor() { @@ -132,6 +147,7 @@ public HbaseTemplate hbaseTemplate(@Qualifier("hbaseConfiguration") Configuratio @Qualifier("asyncTemplate") HbaseAsyncTemplate asyncTemplate, Optional parallelScan, @Value("${hbase.client.nativeAsync:false}") boolean nativeAsync, + ResultScannerFactory resultScannerFactory, ScanMetricReporter scanMetricReporter) { HbaseTemplate template2 = new HbaseTemplate(); template2.setConfiguration(configurable); @@ -142,8 +158,8 @@ public HbaseTemplate hbaseTemplate(@Qualifier("hbaseConfiguration") Configuratio template2.setEnableParallelScan(true); template2.setMaxThreads(scan.getMaxThreads()); template2.setMaxThreadsPerParallelScan(scan.getMaxThreadsPerParallelScan()); - template2.setMaxConcurrentAsyncScanner(scan.getMaxConcurrentAsyncScanner()); } + template2.setResultScannerFactory(resultScannerFactory); template2.setAsyncTemplate(asyncTemplate); template2.setNativeAsync(nativeAsync); diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/ParallelScan.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/ParallelScan.java index 934ae588ca2b..c9deff731fe6 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/ParallelScan.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/ParallelScan.java @@ -20,9 +20,12 @@ import com.navercorp.pinpoint.common.util.CpuUtils; public class ParallelScan { + + public static final int DEFAULT_MAX_CONCURRENT_ASYNC_SCANNER = 256; + private int maxThreads = CpuUtils.workerCount() * 4; private int maxThreadsPerParallelScan = CpuUtils.workerCount(); - private int maxConcurrentAsyncScanner = 256; + private int maxConcurrentAsyncScanner = DEFAULT_MAX_CONCURRENT_ASYNC_SCANNER; public ParallelScan() { diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/scan/ResultScannerFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/scan/ResultScannerFactory.java index 943ed93fd067..6f123c9364d2 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/scan/ResultScannerFactory.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/scan/ResultScannerFactory.java @@ -73,4 +73,12 @@ public static void closeScanner(List scannerList) { } } } + + + @Override + public String toString() { + return "ResultScannerFactory{" + + "partitionSize=" + partitionSize + + '}'; + } } diff --git a/hbase/hbase-schema-manager/src/main/java/com/navercorp/pinpoint/hbase/manager/config/HbaseConfig.java b/hbase/hbase-schema-manager/src/main/java/com/navercorp/pinpoint/hbase/manager/config/HbaseConfig.java index fcfc0ef8df5f..41874e615608 100644 --- a/hbase/hbase-schema-manager/src/main/java/com/navercorp/pinpoint/hbase/manager/config/HbaseConfig.java +++ b/hbase/hbase-schema-manager/src/main/java/com/navercorp/pinpoint/hbase/manager/config/HbaseConfig.java @@ -30,6 +30,9 @@ import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncTableCustomizer; import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTableFactory; import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate; +import com.navercorp.pinpoint.common.hbase.scan.ResultScannerFactory; +import com.navercorp.pinpoint.common.hbase.util.EmptyScanMetricReporter; +import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory; import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; import com.navercorp.pinpoint.common.util.CpuUtils; @@ -220,10 +223,13 @@ public AsyncTableFactory hbaseAsyncTableFactory(AsyncConnection connection, Asyn public HbaseAdminFactory hbaseAdminFactory(Connection connection) { return new HbaseAdminFactory(connection); } + @Bean public HbaseAsyncTemplate hbaseAsyncTemplate(AsyncTableFactory asyncTableFactory) { ExecutorService executor = newAsyncTemplateExecutor(); - return new HbaseAsyncTemplate(asyncTableFactory, executor); + ScanMetricReporter scanMetricReporter = new EmptyScanMetricReporter(); + ResultScannerFactory scannerFactory = new ResultScannerFactory(4); + return new HbaseAsyncTemplate(asyncTableFactory, scannerFactory, scanMetricReporter, executor); } private ExecutorService newAsyncTemplateExecutor() { diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java index f8a629d4e1a2..5bf33d428691 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java @@ -28,6 +28,7 @@ import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate; import com.navercorp.pinpoint.common.hbase.config.HbaseTemplateConfiguration; import com.navercorp.pinpoint.common.hbase.config.ParallelScan; +import com.navercorp.pinpoint.common.hbase.scan.ResultScannerFactory; import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; import com.navercorp.pinpoint.common.server.executor.ExecutorCustomizer; import com.navercorp.pinpoint.common.server.executor.ExecutorProperties; @@ -105,8 +106,11 @@ public AsyncTableFactory mapHbaseAsyncTableFactory(@Qualifier("hbaseAsyncConnect } @Bean - public HbaseAsyncTemplate mapHbaseAsyncTemplate(@Qualifier("mapHbaseAsyncTableFactory") AsyncTableFactory tableFactory) { - return config.asyncTemplate(tableFactory); + public HbaseAsyncTemplate mapHbaseAsyncTemplate(@Qualifier("mapHbaseAsyncTableFactory") + AsyncTableFactory tableFactory, + ScanMetricReporter scanMetricReporter, + ResultScannerFactory resultScannerFactory) { + return config.asyncTemplate(tableFactory, scanMetricReporter, resultScannerFactory); } @Bean @@ -115,8 +119,9 @@ public HbaseTemplate mapHbaseTemplate(@Qualifier("hbaseConfiguration") Configura @Qualifier("mapHbaseAsyncTemplate") HbaseAsyncTemplate asyncTemplate, Optional parallelScan, @Value("${hbase.client.nativeAsync:false}") boolean nativeAsync, + ResultScannerFactory resultScannerFactory, ScanMetricReporter reporter) { - return config.hbaseTemplate(configurable, tableFactory, asyncTemplate, parallelScan, nativeAsync, reporter); + return config.hbaseTemplate(configurable, tableFactory, asyncTemplate, parallelScan, nativeAsync, resultScannerFactory, reporter); } @Bean