Skip to content

Commit

Permalink
Merge branch '2.x' into dev-appdata-size-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue authored Nov 9, 2023
2 parents 2700d05 + 8fed3d8 commit 367893b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 45 deletions.
62 changes: 42 additions & 20 deletions test/src/test/java/io/seata/common/LockAndCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,75 @@
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author wang.liang
*/
public class LockAndCallback {
private final Object lock;
private final Lock lock;
private final Condition notFinished;
private final AsyncCallback callback;

private final static long DEFAULT_TIMEOUT = 60000;
private String result;

public LockAndCallback() {
lock = new Object();
lock = new ReentrantLock();
notFinished = lock.newCondition();
callback = new AsyncCallback() {
@Override
public void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {
result = "onFinished";
synchronized (lock) {
lock.notifyAll();
try {
lock.lock();
notFinished.signal();
} finally {
lock.unlock();
}
}

@Override
public void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {
result = "onError";
synchronized (lock) {
lock.notifyAll();
try {
lock.lock();
notFinished.signal();
} finally {
lock.unlock();
}
}
};
}
public void waitingForFinish(StateMachineInstance inst) {
waitingForFinish(inst, DEFAULT_TIMEOUT);
}

public void waittingForFinish(StateMachineInstance inst) {
synchronized (lock) {
if (ExecutionStatus.RU.equals(inst.getStatus())) {
long start = System.nanoTime();
try {
lock.wait(30000);
public void waitingForFinish(StateMachineInstance inst, long timeout) {
if (ExecutionStatus.RU.equals(inst.getStatus())) {
long start = System.nanoTime();
try {
lock.lock();
boolean finished = notFinished.await(timeout, TimeUnit.MILLISECONDS);
if (finished) {
System.out.printf("finish wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result);
} catch (Exception e) {
System.out.printf("error wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s, error: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result, e.getMessage());
throw new RuntimeException("waittingForFinish failed", e);
} else {
System.out.printf("timeout wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result);
}
} else {
System.out.printf("do not wait ====== XID: %s, status: %s, compensationStatus: %s, result: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), result);
} catch (Exception e) {
System.out.printf("error wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s, error: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result, e.getMessage());
throw new RuntimeException("waitingForFinish failed", e);
} finally {
lock.unlock();
}
} else {
System.out.printf("do not wait ====== XID: %s, status: %s, compensationStatus: %s, result: %s\r\n",
inst.getId(), inst.getStatus(), inst.getCompensationStatus(), result);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testSimpleCatchesStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testSimpleRetryStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand All @@ -125,7 +125,7 @@ public void testStatusMatchingStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
Expand All @@ -144,7 +144,7 @@ public void testCompensationStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
// FIXME: some times, the compensationStatus is RU
Expand All @@ -163,7 +163,7 @@ public void testCompensationAndSubStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
});
Expand All @@ -180,7 +180,7 @@ public void testCompensationAndSubStateMachineWithLayout() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
});
Expand All @@ -200,7 +200,7 @@ public void testStateMachineWithComplexParams() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

People peopleResult = (People)inst.getEndParams().get("complexParameterMethodResult");
Assertions.assertNotNull(peopleResult);
Expand All @@ -219,7 +219,7 @@ public void testSimpleStateMachineWithAsyncState() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void testSimpleCatchesStateMachineAsync() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand All @@ -401,7 +401,7 @@ public void testSimpleRetryStateMachineAsync() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand All @@ -419,7 +419,7 @@ public void testStatusMatchingStateMachineAsync() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
Expand All @@ -441,7 +441,7 @@ public void testCompensationStateMachineAsync() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
Assertions.assertEquals(ExecutionStatus.SU, inst.getCompensationStatus());
Expand Down Expand Up @@ -522,7 +522,7 @@ public void testCompensationAndSubStateMachineAsync() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());

Expand All @@ -544,7 +544,7 @@ public void testCompensationAndSubStateMachineAsyncWithLayout() throws Exception

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());

Expand All @@ -564,7 +564,7 @@ public void testAsyncStartSimpleStateMachineWithAsyncState() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus());
});
Expand Down Expand Up @@ -957,7 +957,7 @@ private void doTestStateMachineTransTimeoutAsync(Map<String, Object> paramMap, i
SagaCostPrint.executeAndPrint("3-37-" + i, () -> {
LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

GlobalTransaction globalTransaction = getGlobalTransaction(inst);
Assertions.assertNotNull(globalTransaction);
Expand Down Expand Up @@ -1119,7 +1119,7 @@ private void doTestStateMachineCustomRecoverStrategyOnTimeoutAsync(Map<String, O
SagaCostPrint.executeAndPrint("3-39-" + i, () -> {
LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

GlobalTransaction globalTransaction = getGlobalTransaction(inst);
Assertions.assertNotNull(globalTransaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testSimpleCatchesStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand All @@ -75,7 +75,7 @@ public void testSimpleRetryStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus());
Expand All @@ -93,7 +93,7 @@ public void testStatusMatchingStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertNotNull(inst.getException());
Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
Expand All @@ -111,7 +111,7 @@ public void testCompensationStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
Assertions.assertEquals(ExecutionStatus.SU, inst.getCompensationStatus());
Expand All @@ -129,7 +129,7 @@ public void testCompensationAndSubStateMachine() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
});
Expand All @@ -146,7 +146,7 @@ public void testCompensationAndSubStateMachineWithLayout() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus());
});
Expand All @@ -166,7 +166,7 @@ public void testStateMachineWithComplexParams() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

People peopleResult = (People)inst.getEndParams().get("complexParameterMethodResult");
Assertions.assertNotNull(peopleResult);
Expand All @@ -186,7 +186,7 @@ public void testSimpleStateMachineWithAsyncState() throws Exception {

LockAndCallback lockAndCallback = new LockAndCallback();
StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback());
lockAndCallback.waittingForFinish(inst);
lockAndCallback.waitingForFinish(inst);

Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus());
});
Expand Down

0 comments on commit 367893b

Please sign in to comment.