Skip to content

Commit

Permalink
Research/parallel-tx/block validation (#1804)
Browse files Browse the repository at this point in the history
* Added thread logic in the Tracker so it controls whether there is a key collision between threads

* Clear tracker between parallel and sequential execution within the parallel block execution

* Tracker is now atomic

* Added tests to test for race conditions

* Tracker returns a copy of the maps.
  • Loading branch information
julianlen committed Nov 28, 2022
1 parent 74b8aba commit cd278a1
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.rsk.core;

import co.rsk.core.bc.IReadWrittenKeysTracker;
import co.rsk.crypto.Keccak256;
import org.ethereum.core.*;
import org.ethereum.vm.DataWord;
Expand All @@ -21,6 +22,7 @@ public class TransactionListExecutor implements Callable {

private final TransactionExecutorFactory transactionExecutorFactory;
private final List<Transaction> transactions;
private IReadWrittenKeysTracker readWrittenKeysTracker;
private final Block block;
private final Repository track;
private final boolean vmTrace;
Expand All @@ -41,6 +43,7 @@ public class TransactionListExecutor implements Callable {

public TransactionListExecutor(
List<Transaction> transactions,
IReadWrittenKeysTracker readWrittenKeysTracker,
Block block,
TransactionExecutorFactory transactionExecutorFactory,
Repository track,
Expand All @@ -58,6 +61,7 @@ public TransactionListExecutor(
LongAccumulator accumulatedFees,
LongAccumulator accumulatedGas,
int firstTxIndex) {
this.readWrittenKeysTracker = readWrittenKeysTracker;
this.block = block;
this.transactionExecutorFactory = transactionExecutorFactory;
this.track = track;
Expand Down Expand Up @@ -98,6 +102,10 @@ public Boolean call() {
);
boolean transactionExecuted = txExecutor.executeTransaction();

if (readWrittenKeysTracker.hasCollided()) {
return false;
}

if (!acceptInvalidTransactions && !transactionExecuted) {
if (discardInvalidTxs) {
logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash());
Expand Down
7 changes: 5 additions & 2 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ private BlockResult executeParallel(
// of the repository to the state post execution, so it's necessary to get it to
// the state prior execution again.
Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);

Repository track = repositoryLocator.startTrackingAt(parent);
IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker();
Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker);

maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber()));

Expand All @@ -477,6 +477,7 @@ private BlockResult executeParallel(
List<Transaction> sublist = block.getTransactionsList().subList(start, end);
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down Expand Up @@ -522,10 +523,12 @@ private BlockResult executeParallel(
}
}

readWrittenKeysTracker.clear();
// execute remaining transactions after the parallel subsets
List<Transaction> sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size());
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface IReadWrittenKeysTracker {

Set<ByteArrayWrapper> getTemporalWrittenKeys();

boolean hasCollided();

void addNewReadKey(ByteArrayWrapper key);

void addNewWrittenKey(ByteArrayWrapper key);
Expand Down
54 changes: 40 additions & 14 deletions rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,71 @@

import org.ethereum.db.ByteArrayWrapper;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

//TODO(JULI):
// * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key.

public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker {
private Set<ByteArrayWrapper> temporalReadKeys;
private Set<ByteArrayWrapper> temporalWrittenKeys;
private Map<ByteArrayWrapper, Set<Long>> threadByReadKey;
private Map<ByteArrayWrapper, Long> threadByWrittenKey;
private boolean collision;

public ReadWrittenKeysTracker() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
this.collision = false;
}

@Override
public Set<ByteArrayWrapper> getTemporalReadKeys(){
return this.temporalReadKeys;
return new HashSet<>(this.threadByReadKey.keySet());
}

@Override
public Set<ByteArrayWrapper> getTemporalWrittenKeys(){
return this.temporalWrittenKeys;
return new HashSet<>(this.threadByWrittenKey.keySet());
}

public boolean hasCollided() { return this.collision;}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
temporalReadKeys.add(key);
public synchronized void addNewReadKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision = collision || (threadId != threadByWrittenKey.get(key));
}
Set<Long> threadSet;
if (threadByReadKey.containsKey(key)) {
threadSet = threadByReadKey.get(key);
} else {
threadSet = new HashSet<>();
}
threadSet.add(threadId);
threadByReadKey.put(key, threadSet);
}

@Override
public void addNewWrittenKey(ByteArrayWrapper key) {
temporalWrittenKeys.add(key);
public synchronized void addNewWrittenKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision = collision || (threadId != threadByWrittenKey.get(key));
}

if (threadByReadKey.containsKey(key)) {
Set<Long> threadSet = threadByReadKey.get(key);
collision = collision || !(threadSet.contains(threadId)) || (threadSet.size() > 1);
}

threadByWrittenKey.put(key, threadId);
}

@Override
public void clear() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();

public synchronized void clear() {
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public Set<ByteArrayWrapper> getTemporalWrittenKeys() {
return temporalWrittenKeys;
}

@Override
public boolean hasCollided() {
return false;
}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
//Dummy tracker does not store added keys
Expand Down
90 changes: 81 additions & 9 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ void executeBlockWithTwoTransactions(boolean activeRskip144) {
Assertions.assertEquals(BigInteger.valueOf(60000 - 42000 - 20), accountState.getBalance().asBigInteger());
}

@Test
@ParameterizedTest
@ValueSource(booleans = {true, false})
void executeAndFillBlockWithNoSavingToStore(boolean activeRskip144) {
TestObjects objects = generateBlockWithOneTransaction(activeRskip144, RSKIP_126_IS_ACTIVE);
Block parent = objects.getParent();
Expand All @@ -294,7 +295,8 @@ void executeAndFillBlockWithNoSavingToStore(boolean activeRskip144) {
Assertions.assertEquals(Optional.empty(), trieStore.retrieve(block.getStateRoot()));
}

@Test
@ParameterizedTest
@ValueSource(booleans = {true, false})
void executeBlockWithSavingToStore(boolean activeRskip144) {
TestObjects objects = generateBlockWithOneTransaction(activeRskip144, RSKIP_126_IS_ACTIVE);
Block parent = objects.getParent();
Expand Down Expand Up @@ -666,6 +668,35 @@ void executeParallelBlockAgainstSequentialBlock(boolean activeRskip144) {
Assertions.assertArrayEquals(seqResult.getFinalState().getHash().getBytes(), parallelResult.getFinalState().getHash().getBytes());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void executeInvalidParallelBlock(boolean activeRskip144) {
if (!activeRskip144) {
return;
}
doReturn(activeRskip144).when(activationConfig).isActive(eq(ConsensusRule.RSKIP144), anyLong());
BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE);
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1, 2});
BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false, true);
Assertions.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void ifThereIsACollisionBetweenParallelAndSequentialBucketItShouldNotBeConsidered(boolean activeRskip144) {
if (!activeRskip144) {
return;
}
doReturn(activeRskip144).when(activationConfig).isActive(eq(ConsensusRule.RSKIP144), anyLong());
BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE);
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1});
BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false, true);
Assertions.assertTrue(pBlock.getTransactionsList().containsAll(result.getExecutedTransactions()));
Assertions.assertEquals(pBlock.getTransactionsList().size(), result.getExecutedTransactions().size());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void executeParallelBlockTwice(boolean activeRskip144) {
Expand All @@ -677,11 +708,11 @@ void executeParallelBlockTwice(boolean activeRskip144) {
BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE);
Block parent = blockchain.getBestBlock();
Block block1 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result1 = executor.executeAndFill(block1, parent.getHeader());
BlockResult result1 = executor.execute(null, 0, block1, parent.getHeader(), true, false, true);


Block block2 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result2 = executor.executeAndFill(block2, parent.getHeader());
BlockResult result2 = executor.execute(null, 0, block2, parent.getHeader(), true, false, true);

Assertions.assertArrayEquals(result2.getFinalState().getHash().getBytes(), result1.getFinalState().getHash().getBytes());
Assertions.assertArrayEquals(block1.getHash().getBytes(), block2.getHash().getBytes());
Expand Down Expand Up @@ -963,6 +994,48 @@ private Block getBlockWithTwoTransactions() {
return new BlockGenerator(Constants.regtest(), activationConfig).createChildBlock(bestBlock, txs, uncles, 1, null);
}

private Block getBlockWithTwoDependentTransactions(short[] edges) {
int nTxs = 2;

Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
accounts.add(createAccount("accounttest" + i, track, Coin.valueOf(60000)));
}
track.commit();
Block bestBlock = blockchain.getBestBlock();
bestBlock.setStateRoot(repository.getRoot());

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(BigInteger.valueOf(21000))
.destination(accounts.get((i + 1) % 2).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
tx.sign(accounts.get(i).getEcKey().getPrivKeyBytes());
txs.add(tx);
}
List<BlockHeader> uncles = new ArrayList<>();

return new BlockGenerator(Constants.regtest(), activationConfig)
.createChildBlock(
bestBlock,
txs,
uncles,
1,
null,
bestBlock.getGasLimit(),
bestBlock.getCoinbase(),
edges
);
}

private Block getBlockWithTenTransactions(short[] edges) {
int nTxs = 10;
int nAccounts = nTxs * 2;
Expand Down Expand Up @@ -1005,9 +1078,8 @@ private Block getBlockWithTenTransactions(short[] edges) {
);
}

private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasLimit, boolean withRemasc) {
int nTxs = number;
int nAccounts = nTxs * 2;
private Block getBlockWithNIndependentTransactions(int txNumber, BigInteger txGasLimit, boolean withRemasc) {
int nAccounts = txNumber * 2;
Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

Expand All @@ -1020,12 +1092,12 @@ private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasL

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
for (int i = 0; i < txNumber; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(txGasLimit)
.destination(accounts.get(i + nTxs).getAddress())
.destination(accounts.get(i + txNumber).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
Expand Down
Loading

0 comments on commit cd278a1

Please sign in to comment.