Skip to content

Commit

Permalink
[pinpoint-apm#11388] Optimize Async call chain of maxColumnValue
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 23, 2024
1 parent 4d8fc3d commit c41aea2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.hbase.client.CheckAndMutateResult;

public enum CasResult {
INITIAL_UPDATE,
CAS_NEW,
CAS_OLD;

public static CasResult casResult(CheckAndMutateResult casResult) {
if (casResult.isSuccess()) {
return CAS_NEW;
} else {
return CAS_OLD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* @author emeroad
Expand Down Expand Up @@ -319,59 +319,49 @@ public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyNam
.ifNotExists(familyName, qualifier)
.build(put);

// this.execute(tableName, new TableCallback<Object>() {
// CasResult result = this.execute(tableName, new TableCallback<CasResult>() {
// @Override
// public Object doInTable(Table table) throws Throwable {
// public CasResult doInTable(Table table) throws Throwable {
// CheckAndMutateResult result = table.checkAndMutate(checkAndPut);
// if (result.isSuccess()) {
// logger.debug("MaxUpdate success for null");
// return null;
// return CasResult.INITIAL_UPDATE;
// }
// CheckAndMutate checkAndMax = checkAndMax(rowName, familyName, qualifier, valBytes, put);
// CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
// CheckAndMutateResult maxResult = table.checkAndMutate(checkAndMax);
// if (maxResult.isSuccess()) {
// logger.debug("MaxUpdate success for GREATER");
// } else {
// logger.trace("MaxUpdate failure for ConcurrentUpdate");
// return CasResult.CAS_NEW;
// }
// return null;
// logger.trace("MaxUpdate failure for ConcurrentUpdate {}", maxResult.getResult());
// return CasResult.CAS_OLD;
// }
// });
// logger.debug("maxColumnValue result:{}", result);

this.asyncExecute(tableName, new AsyncTableCallback<>() {
CompletableFuture<CasResult> result = this.asyncExecute(tableName, new AsyncTableCallback<>() {
@Override
public Void doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
public CompletableFuture<CasResult> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
CompletableFuture<CheckAndMutateResult> result = table.checkAndMutate(checkAndPut);
result.whenCompleteAsync(new BiConsumer<CheckAndMutateResult, Throwable>() {
return result.thenCompose(new Function<CheckAndMutateResult, CompletableFuture<CasResult>>() {
@Override
public void accept(CheckAndMutateResult checkAndMutateResult, Throwable throwable) {
if (throwable != null) {
logger.warn("{} MaxUpdate(EQUALS) failure", tableName, throwable);
return;
}
public CompletableFuture<CasResult> apply(CheckAndMutateResult checkAndMutateResult) {
if (checkAndMutateResult.isSuccess()) {
logger.debug("{} MaxUpdate(EQUALS) success", tableName);
} else {
CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
CompletableFuture<CheckAndMutateResult> maxFuture = table.checkAndMutate(checkAndMax);
maxFuture.whenComplete(new BiConsumer<CheckAndMutateResult, Throwable>() {
@Override
public void accept(CheckAndMutateResult maxResult, Throwable throwable) {
if (throwable != null) {
logger.warn("{} MaxUpdate(GREATER) exceptionally", tableName, throwable);
return;
}
if (maxResult.isSuccess()) {
logger.debug("{} MaxUpdate(GREATER) success", tableName);
} else {
logger.trace("{} MaxUpdate(GREATER) failure", tableName);
}
}
});
return CompletableFuture.completedFuture(CasResult.INITIAL_UPDATE);
}

CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
CompletableFuture<CheckAndMutateResult> maxFuture = table.checkAndMutate(checkAndMax);
return maxFuture.thenApply(CasResult::casResult);
}
});
return null;
}
});
result.whenComplete((r, e) -> {
if (e != null) {
logger.info("maxColumnValue failed", e);
} else {
logger.debug("maxColumnValue result:{}", r);
}
});
}
Expand Down Expand Up @@ -455,7 +445,7 @@ public <T> List<T> findParallel_block(final TableName tableName, final List<Scan
if (isSimpleScan(copy.length)) {
return find(tableName, scans, action);
}

final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "block-multi", copy);
final Collection<ScanMetrics> scanMetricsList = new ArrayBlockingQueue<>(copy.length);

Expand Down

0 comments on commit c41aea2

Please sign in to comment.