From 871f4033b7e6c99de4c0b3e72a0a1c253c804317 Mon Sep 17 00:00:00 2001 From: emeroad Date: Thu, 22 Aug 2024 20:22:25 +0900 Subject: [PATCH] [#11388] Optimize Async call chain of maxColumnValue --- .../pinpoint/common/hbase/CasResult.java | 17 +++++ .../pinpoint/common/hbase/HbaseTemplate.java | 62 ++++++++----------- 2 files changed, 43 insertions(+), 36 deletions(-) create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/CasResult.java diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/CasResult.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/CasResult.java new file mode 100644 index 000000000000..b6f88aa9aeae --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/CasResult.java @@ -0,0 +1,17 @@ +package com.navercorp.pinpoint.common.hbase; + +import org.apache.hadoop.hbase.client.CheckAndMutateResult; + +public enum CasResult { + INITIAL_UPDATE, + CAS_NEW, + CAS_OLD; + + public static CasResult casResult(CheckAndMutateResult casResult) { + if (casResult.isSuccess()) { + return CAS_NEW; + } else { + return CAS_OLD; + } + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java index 428250e8625b..47e4a6c0c7d0 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTemplate.java @@ -75,7 +75,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; +import java.util.function.Function; /** * @author emeroad @@ -319,59 +319,49 @@ public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyNam .ifNotExists(familyName, qualifier) .build(put); -// this.execute(tableName, new TableCallback() { +// CasResult result = this.execute(tableName, new TableCallback() { // @Override -// public Object doInTable(Table table) throws Throwable { +// public CasResult doInTable(Table table) throws Throwable { // CheckAndMutateResult result = table.checkAndMutate(checkAndPut); // if (result.isSuccess()) { // logger.debug("MaxUpdate success for null"); -// return null; +// return CasResult.INITIAL_UPDATE; // } -// CheckAndMutate checkAndMax = checkAndMax(rowName, familyName, qualifier, valBytes, put); +// CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); // CheckAndMutateResult maxResult = table.checkAndMutate(checkAndMax); // if (maxResult.isSuccess()) { // logger.debug("MaxUpdate success for GREATER"); -// } else { -// logger.trace("MaxUpdate failure for ConcurrentUpdate"); +// return CasResult.CAS_NEW; // } -// return null; +// logger.trace("MaxUpdate failure for ConcurrentUpdate {}", maxResult.getResult()); +// return CasResult.CAS_OLD; // } // }); +// logger.debug("maxColumnValue result:{}", result); - this.asyncExecute(tableName, new AsyncTableCallback<>() { + CompletableFuture result = this.asyncExecute(tableName, new AsyncTableCallback<>() { @Override - public Void doInTable(AsyncTable table) throws Throwable { + public CompletableFuture doInTable(AsyncTable table) throws Throwable { CompletableFuture result = table.checkAndMutate(checkAndPut); - result.whenCompleteAsync(new BiConsumer() { + return result.thenCompose(new Function>() { @Override - public void accept(CheckAndMutateResult checkAndMutateResult, Throwable throwable) { - if (throwable != null) { - logger.warn("{} MaxUpdate(EQUALS) failure", tableName, throwable); - return; - } + public CompletableFuture apply(CheckAndMutateResult checkAndMutateResult) { if (checkAndMutateResult.isSuccess()) { - logger.debug("{} MaxUpdate(EQUALS) success", tableName); - } else { - CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); - CompletableFuture maxFuture = table.checkAndMutate(checkAndMax); - maxFuture.whenComplete(new BiConsumer() { - @Override - public void accept(CheckAndMutateResult maxResult, Throwable throwable) { - if (throwable != null) { - logger.warn("{} MaxUpdate(GREATER) exceptionally", tableName, throwable); - return; - } - if (maxResult.isSuccess()) { - logger.debug("{} MaxUpdate(GREATER) success", tableName); - } else { - logger.trace("{} MaxUpdate(GREATER) failure", tableName); - } - } - }); + return CompletableFuture.completedFuture(CasResult.INITIAL_UPDATE); } + + CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put); + CompletableFuture maxFuture = table.checkAndMutate(checkAndMax); + return maxFuture.thenApply(CasResult::casResult); } }); - return null; + } + }); + result.whenComplete((r, e) -> { + if (e != null) { + logger.info("maxColumnValue failed", e); + } else { + logger.debug("maxColumnValue result:{}", r); } }); } @@ -455,7 +445,7 @@ public List findParallel_block(final TableName tableName, final List scanMetricsList = new ArrayBlockingQueue<>(copy.length);