From e940ec3f5cc0277d6af3fb770ed482f84c2e1a9a Mon Sep 17 00:00:00 2001 From: emeroad Date: Thu, 22 Aug 2024 20:22:25 +0900 Subject: [PATCH] [#11388] Optimize Async call chain of maxColumnValue --- .../pinpoint/common/hbase/HbaseTemplate.java | 75 ++++++++++--------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java index 428250e8625b0..3516c36d1b3a5 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java @@ -75,7 +75,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; +import java.util.function.Function; /** * @author emeroad @@ -308,6 +308,11 @@ public List doInTable(Table table) throws Throwable { }); } + public enum CasResult { + INITIAL_UPDATE, + CAS_NEW, + CAS_OLD, + } @Override public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, long value) { @@ -319,59 +324,57 @@ public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyNam .ifNotExists(familyName, qualifier) .build(put); -// this.execute(tableName, new TableCallback() { +// CasResult result = this.execute(tableName, new TableCallback() { // @Override -// public Object doInTable(Table table) throws Throwable { +// public CasResult doInTable(Table table) throws Throwable { // CheckAndMutateResult result = table.checkAndMutate(checkAndPut); // if (result.isSuccess()) { // logger.debug("MaxUpdate success for null"); -// return null; +// return CasResult.INITIAL_UPDATE; // } -// CheckAndMutate checkAndMax = checkAndMax(rowName, familyName, qualifier, valBytes, put); +// CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); // CheckAndMutateResult maxResult = table.checkAndMutate(checkAndMax); // if (maxResult.isSuccess()) { // logger.debug("MaxUpdate success for GREATER"); -// } else { -// logger.trace("MaxUpdate failure for ConcurrentUpdate"); +// return CasResult.CAS_NEW; // } -// return null; +// logger.trace("MaxUpdate failure for ConcurrentUpdate {}", maxResult.getResult()); +// return CasResult.CAS_OLD; // } // }); +// logger.debug("maxColumnValue result:{}", result); - this.asyncExecute(tableName, new AsyncTableCallback<>() { + CompletableFuture result = this.asyncExecute(tableName, new AsyncTableCallback<>() { @Override - public Void doInTable(AsyncTable table) throws Throwable { + public CompletableFuture doInTable(AsyncTable table) throws Throwable { CompletableFuture result = table.checkAndMutate(checkAndPut); - result.whenCompleteAsync(new BiConsumer() { + return result.thenCompose(new Function>() { @Override - public void accept(CheckAndMutateResult checkAndMutateResult, Throwable throwable) { - if (throwable != null) { - logger.warn("{} MaxUpdate(EQUALS) failure", tableName, throwable); - return; - } + public CompletableFuture apply(CheckAndMutateResult checkAndMutateResult) { if (checkAndMutateResult.isSuccess()) { - logger.debug("{} MaxUpdate(EQUALS) success", tableName); - } else { - CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); - CompletableFuture maxFuture = table.checkAndMutate(checkAndMax); - maxFuture.whenComplete(new BiConsumer() { - @Override - public void accept(CheckAndMutateResult maxResult, Throwable throwable) { - if (throwable != null) { - logger.warn("{} MaxUpdate(GREATER) exceptionally", tableName, throwable); - return; - } - if (maxResult.isSuccess()) { - logger.debug("{} MaxUpdate(GREATER) success", tableName); - } else { - logger.trace("{} MaxUpdate(GREATER) failure", tableName); - } - } - }); + return CompletableFuture.completedFuture(CasResult.INITIAL_UPDATE); } + + CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); + CompletableFuture maxFuture = table.checkAndMutate(checkAndMax); + return maxFuture.thenCompose(new Function>() { + @Override + public CompletableFuture apply(CheckAndMutateResult maxResult) { + if (maxResult.isSuccess()) { + return CompletableFuture.completedFuture(CasResult.CAS_NEW); + } + return CompletableFuture.completedFuture(CasResult.CAS_OLD); + } + }); } }); - return null; + } + }); + result.whenComplete((r, e) -> { + if (e != null) { + logger.info("maxColumnValue failed", e); + } else { + logger.debug("maxColumnValue result:{}", r); } }); } @@ -455,7 +458,7 @@ public List findParallel_block(final TableName tableName, final List scanMetricsList = new ArrayBlockingQueue<>(copy.length);