Skip to content

Commit

Permalink
[pinpoint-apm#11388] Optimize Async call chain of maxColumnValue
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 22, 2024
1 parent 4d8fc3d commit e940ec3
Showing 1 changed file with 39 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* @author emeroad
Expand Down Expand Up @@ -308,6 +308,11 @@ public List<CheckAndMutateResult> doInTable(Table table) throws Throwable {
});
}

public enum CasResult {
INITIAL_UPDATE,
CAS_NEW,
CAS_OLD,
}

@Override
public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, long value) {
Expand All @@ -319,59 +324,57 @@ public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyNam
.ifNotExists(familyName, qualifier)
.build(put);

// this.execute(tableName, new TableCallback<Object>() {
// CasResult result = this.execute(tableName, new TableCallback<CasResult>() {
// @Override
// public Object doInTable(Table table) throws Throwable {
// public CasResult doInTable(Table table) throws Throwable {
// CheckAndMutateResult result = table.checkAndMutate(checkAndPut);
// if (result.isSuccess()) {
// logger.debug("MaxUpdate success for null");
// return null;
// return CasResult.INITIAL_UPDATE;
// }
// CheckAndMutate checkAndMax = checkAndMax(rowName, familyName, qualifier, valBytes, put);
// CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
// CheckAndMutateResult maxResult = table.checkAndMutate(checkAndMax);
// if (maxResult.isSuccess()) {
// logger.debug("MaxUpdate success for GREATER");
// } else {
// logger.trace("MaxUpdate failure for ConcurrentUpdate");
// return CasResult.CAS_NEW;
// }
// return null;
// logger.trace("MaxUpdate failure for ConcurrentUpdate {}", maxResult.getResult());
// return CasResult.CAS_OLD;
// }
// });
// logger.debug("maxColumnValue result:{}", result);

this.asyncExecute(tableName, new AsyncTableCallback<>() {
CompletableFuture<CasResult> result = this.asyncExecute(tableName, new AsyncTableCallback<>() {
@Override
public Void doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
public CompletableFuture<CasResult> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
CompletableFuture<CheckAndMutateResult> result = table.checkAndMutate(checkAndPut);
result.whenCompleteAsync(new BiConsumer<CheckAndMutateResult, Throwable>() {
return result.thenCompose(new Function<CheckAndMutateResult, CompletableFuture<CasResult>>() {
@Override
public void accept(CheckAndMutateResult checkAndMutateResult, Throwable throwable) {
if (throwable != null) {
logger.warn("{} MaxUpdate(EQUALS) failure", tableName, throwable);
return;
}
public CompletableFuture<CasResult> apply(CheckAndMutateResult checkAndMutateResult) {
if (checkAndMutateResult.isSuccess()) {
logger.debug("{} MaxUpdate(EQUALS) success", tableName);
} else {
CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
CompletableFuture<CheckAndMutateResult> maxFuture = table.checkAndMutate(checkAndMax);
maxFuture.whenComplete(new BiConsumer<CheckAndMutateResult, Throwable>() {
@Override
public void accept(CheckAndMutateResult maxResult, Throwable throwable) {
if (throwable != null) {
logger.warn("{} MaxUpdate(GREATER) exceptionally", tableName, throwable);
return;
}
if (maxResult.isSuccess()) {
logger.debug("{} MaxUpdate(GREATER) success", tableName);
} else {
logger.trace("{} MaxUpdate(GREATER) failure", tableName);
}
}
});
return CompletableFuture.completedFuture(CasResult.INITIAL_UPDATE);
}

CheckAndMutate checkAndMax = CheckAndMutates.max(rowName, familyName, qualifier, valBytes, put);
CompletableFuture<CheckAndMutateResult> maxFuture = table.checkAndMutate(checkAndMax);
return maxFuture.thenCompose(new Function<CheckAndMutateResult, CompletableFuture<CasResult>>() {
@Override
public CompletableFuture<CasResult> apply(CheckAndMutateResult maxResult) {
if (maxResult.isSuccess()) {
return CompletableFuture.completedFuture(CasResult.CAS_NEW);
}
return CompletableFuture.completedFuture(CasResult.CAS_OLD);
}
});
}
});
return null;
}
});
result.whenComplete((r, e) -> {
if (e != null) {
logger.info("maxColumnValue failed", e);
} else {
logger.debug("maxColumnValue result:{}", r);
}
});
}
Expand Down Expand Up @@ -455,7 +458,7 @@ public <T> List<T> findParallel_block(final TableName tableName, final List<Scan
if (isSimpleScan(copy.length)) {
return find(tableName, scans, action);
}

final ScanMetricReporter.Reporter reporter = scanMetric.newReporter(tableName, "block-multi", copy);
final Collection<ScanMetrics> scanMetricsList = new ArrayBlockingQueue<>(copy.length);

Expand Down

0 comments on commit e940ec3

Please sign in to comment.