Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Research/parallel-tx/block validation #1804

Merged
merged 2 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.rsk.core;

import co.rsk.core.bc.IReadWrittenKeysTracker;
import co.rsk.crypto.Keccak256;
import org.ethereum.core.*;
import org.ethereum.vm.DataWord;
Expand All @@ -21,6 +22,7 @@ public class TransactionListExecutor implements Callable {

private final TransactionExecutorFactory transactionExecutorFactory;
private final List<Transaction> transactions;
private IReadWrittenKeysTracker readWrittenKeysTracker;
private final Block block;
private final Repository track;
private final boolean vmTrace;
Expand All @@ -41,6 +43,7 @@ public class TransactionListExecutor implements Callable {

public TransactionListExecutor(
List<Transaction> transactions,
IReadWrittenKeysTracker readWrittenKeysTracker,
Block block,
TransactionExecutorFactory transactionExecutorFactory,
Repository track,
Expand All @@ -58,6 +61,7 @@ public TransactionListExecutor(
LongAccumulator accumulatedFees,
LongAccumulator accumulatedGas,
int firstTxIndex) {
this.readWrittenKeysTracker = readWrittenKeysTracker;
this.block = block;
this.transactionExecutorFactory = transactionExecutorFactory;
this.track = track;
Expand Down Expand Up @@ -98,6 +102,10 @@ public Boolean call() {
);
boolean transactionExecuted = txExecutor.executeTransaction();

if (readWrittenKeysTracker.hasCollided()) {
return false;
}

if (!acceptInvalidTransactions && !transactionExecuted) {
if (discardInvalidTxs) {
logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash());
Expand Down
7 changes: 5 additions & 2 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ private BlockResult executeParallel(
// of the repository to the state post execution, so it's necessary to get it to
// the state prior execution again.
Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);

Repository track = repositoryLocator.startTrackingAt(parent);
IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker();
Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker);

maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber()));

Expand All @@ -480,6 +480,7 @@ private BlockResult executeParallel(
List<Transaction> sublist = block.getTransactionsList().subList(start, end);
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down Expand Up @@ -525,10 +526,12 @@ private BlockResult executeParallel(
}
}

readWrittenKeysTracker.clear();
// execute remaining transactions after the parallel subsets
List<Transaction> sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size());
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface IReadWrittenKeysTracker {

Set<ByteArrayWrapper> getTemporalWrittenKeys();

boolean hasCollided();

void addNewReadKey(ByteArrayWrapper key);

void addNewWrittenKey(ByteArrayWrapper key);
Expand Down
54 changes: 40 additions & 14 deletions rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,71 @@

import org.ethereum.db.ByteArrayWrapper;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

//TODO(JULI):
// * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key.

public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker {
private Set<ByteArrayWrapper> temporalReadKeys;
private Set<ByteArrayWrapper> temporalWrittenKeys;
private Map<ByteArrayWrapper, Set<Long>> threadByReadKey;
private Map<ByteArrayWrapper, Long> threadByWrittenKey;
private boolean collision;

public ReadWrittenKeysTracker() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
this.collision = false;
}

@Override
public Set<ByteArrayWrapper> getTemporalReadKeys(){
return this.temporalReadKeys;
return new HashSet<>(this.threadByReadKey.keySet());
}

@Override
public Set<ByteArrayWrapper> getTemporalWrittenKeys(){
return this.temporalWrittenKeys;
return new HashSet<>(this.threadByWrittenKey.keySet());
}

public boolean hasCollided() { return this.collision;}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
temporalReadKeys.add(key);
public synchronized void addNewReadKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision = collision || (threadId != threadByWrittenKey.get(key));
}
Set<Long> threadSet;
if (threadByReadKey.containsKey(key)) {
threadSet = threadByReadKey.get(key);
} else {
threadSet = new HashSet<>();
}
threadSet.add(threadId);
threadByReadKey.put(key, threadSet);
}

@Override
public void addNewWrittenKey(ByteArrayWrapper key) {
temporalWrittenKeys.add(key);
public synchronized void addNewWrittenKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision = collision || (threadId != threadByWrittenKey.get(key));
}

if (threadByReadKey.containsKey(key)) {
Set<Long> threadSet = threadByReadKey.get(key);
collision = collision || !(threadSet.contains(threadId)) || (threadSet.size() > 1);
}

threadByWrittenKey.put(key, threadId);
}

@Override
public void clear() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();

public synchronized void clear() {
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public Set<ByteArrayWrapper> getTemporalWrittenKeys() {
return temporalWrittenKeys;
}

@Override
public boolean hasCollided() {
return false;
}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
//Dummy tracker does not store added keys
Expand Down
78 changes: 71 additions & 7 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,18 +618,41 @@ public void executeParallelBlockAgainstSequentialBlock() {
Assert.assertArrayEquals(seqResult.getFinalState().getHash().getBytes(), parallelResult.getFinalState().getHash().getBytes());
}

@Test
public void executeInvalidParallelBlock() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1, 2});
BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false);
Assert.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result);
}

@Test
public void ifThereIsACollisionBetweenParallelAndSequentialBucketItShouldNotBeConsidered() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1});
BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false);
Assert.assertTrue(pBlock.getTransactionsList().containsAll(result.getExecutedTransactions()));
Assert.assertEquals(pBlock.getTransactionsList().size(), result.getExecutedTransactions().size());
}

@Test
public void executeParallelBlockTwice() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block block1 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result1 = executor.executeAndFill(block1, parent.getHeader());
BlockResult result1 = executor.execute(null, 0, block1, parent.getHeader(), true, false);


Block block2 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result2 = executor.executeAndFill(block2, parent.getHeader());
BlockResult result2 = executor.execute(null, 0, block2, parent.getHeader(), true, false);

Assert.assertArrayEquals(result2.getFinalState().getHash().getBytes(), result1.getFinalState().getHash().getBytes());
Assert.assertArrayEquals(block1.getHash().getBytes(), block2.getHash().getBytes());
Expand Down Expand Up @@ -895,6 +918,48 @@ private Block getBlockWithTwoTransactions() {
return new BlockGenerator(Constants.regtest(), activationConfig).createChildBlock(bestBlock, txs, uncles, 1, null);
}

private Block getBlockWithTwoDependentTransactions(short[] edges) {
int nTxs = 2;

Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
accounts.add(createAccount("accounttest" + i, track, Coin.valueOf(60000)));
}
track.commit();
Block bestBlock = blockchain.getBestBlock();
bestBlock.setStateRoot(repository.getRoot());

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(BigInteger.valueOf(21000))
.destination(accounts.get((i + 1) % 2).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
tx.sign(accounts.get(i).getEcKey().getPrivKeyBytes());
txs.add(tx);
}
List<BlockHeader> uncles = new ArrayList<>();

return new BlockGenerator(Constants.regtest(), activationConfig)
.createChildBlock(
bestBlock,
txs,
uncles,
1,
null,
bestBlock.getGasLimit(),
bestBlock.getCoinbase(),
edges
);
}

private Block getBlockWithTenTransactions(short[] edges) {
int nTxs = 10;
int nAccounts = nTxs * 2;
Expand Down Expand Up @@ -937,9 +1002,8 @@ private Block getBlockWithTenTransactions(short[] edges) {
);
}

private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasLimit, boolean withRemasc) {
int nTxs = number;
int nAccounts = nTxs * 2;
private Block getBlockWithNIndependentTransactions(int txNumber, BigInteger txGasLimit, boolean withRemasc) {
int nAccounts = txNumber * 2;
Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

Expand All @@ -952,12 +1016,12 @@ private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasL

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
for (int i = 0; i < txNumber; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(txGasLimit)
.destination(accounts.get(i + nTxs).getAddress())
.destination(accounts.get(i + txNumber).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
Expand Down
Loading