Skip to content

Commit

Permalink
Added thread logic in the Tracker so it controls whether there is a k…
Browse files Browse the repository at this point in the history
…ey collision between threads
  • Loading branch information
julianlen committed Jun 13, 2022
1 parent a7ceb02 commit a07e648
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.rsk.core;

import co.rsk.core.bc.IReadWrittenKeysTracker;
import co.rsk.crypto.Keccak256;
import org.ethereum.core.*;
import org.ethereum.vm.DataWord;
Expand All @@ -21,6 +22,7 @@ public class TransactionListExecutor implements Callable {

private final TransactionExecutorFactory transactionExecutorFactory;
private final List<Transaction> transactions;
private IReadWrittenKeysTracker readWrittenKeysTracker;
private final Block block;
private final Repository track;
private final boolean vmTrace;
Expand All @@ -41,6 +43,7 @@ public class TransactionListExecutor implements Callable {

public TransactionListExecutor(
List<Transaction> transactions,
IReadWrittenKeysTracker readWrittenKeysTracker,
Block block,
TransactionExecutorFactory transactionExecutorFactory,
Repository track,
Expand All @@ -58,6 +61,7 @@ public TransactionListExecutor(
LongAccumulator accumulatedFees,
LongAccumulator accumulatedGas,
int firstTxIndex) {
this.readWrittenKeysTracker = readWrittenKeysTracker;
this.block = block;
this.transactionExecutorFactory = transactionExecutorFactory;
this.track = track;
Expand Down Expand Up @@ -98,6 +102,10 @@ public Boolean call() {
);
boolean transactionExecuted = txExecutor.executeTransaction();

if (readWrittenKeysTracker.hasCollided()) {
return false;
}

if (!acceptInvalidTransactions && !transactionExecuted) {
if (discardInvalidTxs) {
logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash());
Expand Down
6 changes: 4 additions & 2 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ private BlockResult executeParallel(
// of the repository to the state post execution, so it's necessary to get it to
// the state prior execution again.
Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);

Repository track = repositoryLocator.startTrackingAt(parent);
IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker();
Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker);

maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber()));

Expand All @@ -480,6 +480,7 @@ private BlockResult executeParallel(
List<Transaction> sublist = block.getTransactionsList().subList(start, end);
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down Expand Up @@ -529,6 +530,7 @@ private BlockResult executeParallel(
List<Transaction> sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size());
TransactionListExecutor txListExecutor = new TransactionListExecutor(
sublist,
readWrittenKeysTracker,
block,
transactionExecutorFactory,
track,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface IReadWrittenKeysTracker {

Set<ByteArrayWrapper> getTemporalWrittenKeys();

boolean hasCollided();

void addNewReadKey(ByteArrayWrapper key);

void addNewWrittenKey(ByteArrayWrapper key);
Expand Down
44 changes: 31 additions & 13 deletions rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,64 @@
package co.rsk.core.bc;

import org.ethereum.db.ByteArrayWrapper;

import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

//TODO(JULI):
// * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key.

public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker {
private Set<ByteArrayWrapper> temporalReadKeys;
private Set<ByteArrayWrapper> temporalWrittenKeys;
private Map<ByteArrayWrapper, Long> threadByReadKey;
private Map<ByteArrayWrapper, Long> threadByWrittenKey;
private boolean collision;

public ReadWrittenKeysTracker() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
this.collision = false;
}

@Override
public Set<ByteArrayWrapper> getTemporalReadKeys(){
return this.temporalReadKeys;
return this.threadByReadKey.keySet();
}

@Override
public Set<ByteArrayWrapper> getTemporalWrittenKeys(){
return this.temporalWrittenKeys;
return this.threadByWrittenKey.keySet();
}

public boolean hasCollided() { return this.collision;}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
temporalReadKeys.add(key);
long threadId = Thread.currentThread().getId();

if (threadByWrittenKey.containsKey(key)) {
collision |= threadId != threadByWrittenKey.get(key);
}

threadByReadKey.put(key, threadId);
}

@Override
public void addNewWrittenKey(ByteArrayWrapper key) {
temporalWrittenKeys.add(key);
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision |= threadId != threadByWrittenKey.get(key);
}

if (threadByReadKey.containsKey(key)) {
collision |= threadId != threadByReadKey.get(key);
}

threadByWrittenKey.put(key, threadId);
}

@Override
public void clear() {
this.temporalReadKeys = new HashSet<>();
this.temporalWrittenKeys = new HashSet<>();

this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public Set<ByteArrayWrapper> getTemporalWrittenKeys() {
return temporalWrittenKeys;
}

@Override
public boolean hasCollided() {
return false;
}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
//Dummy tracker does not store added keys
Expand Down
62 changes: 57 additions & 5 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,17 @@ public void executeParallelBlockAgainstSequentialBlock() {
Assert.assertArrayEquals(seqResult.getFinalState().getHash().getBytes(), parallelResult.getFinalState().getHash().getBytes());
}

@Test
public void executeInvalidParallelBlock() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1, 2});
BlockResult result = executor.execute(pBlock, parent.getHeader(), true);
Assert.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result);
}

@Test
public void executeParallelBlockTwice() {
if (!activeRskip144) {
Expand Down Expand Up @@ -895,6 +906,48 @@ private Block getBlockWithTwoTransactions() {
return new BlockGenerator(Constants.regtest(), activationConfig).createChildBlock(bestBlock, txs, uncles, 1, null);
}

private Block getBlockWithTwoDependentTransactions(short[] edges) {
int nTxs = 2;

Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
accounts.add(createAccount("accounttest" + i, track, Coin.valueOf(60000)));
}
track.commit();
Block bestBlock = blockchain.getBestBlock();
bestBlock.setStateRoot(repository.getRoot());

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(BigInteger.valueOf(21000))
.destination(accounts.get((i + 1) % 2).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
tx.sign(accounts.get(i).getEcKey().getPrivKeyBytes());
txs.add(tx);
}
List<BlockHeader> uncles = new ArrayList<>();

return new BlockGenerator(Constants.regtest(), activationConfig)
.createChildBlock(
bestBlock,
txs,
uncles,
1,
null,
bestBlock.getGasLimit(),
bestBlock.getCoinbase(),
edges
);
}

private Block getBlockWithTenTransactions(short[] edges) {
int nTxs = 10;
int nAccounts = nTxs * 2;
Expand Down Expand Up @@ -937,9 +990,8 @@ private Block getBlockWithTenTransactions(short[] edges) {
);
}

private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasLimit, boolean withRemasc) {
int nTxs = number;
int nAccounts = nTxs * 2;
private Block getBlockWithNIndependentTransactions(int txNumber, BigInteger txGasLimit, boolean withRemasc) {
int nAccounts = txNumber * 2;
Repository track = repository.startTracking();
List<Account> accounts = new LinkedList<>();

Expand All @@ -952,12 +1004,12 @@ private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasL

List<Transaction> txs = new LinkedList<>();

for (int i = 0; i < nTxs; i++) {
for (int i = 0; i < txNumber; i++) {
Transaction tx = Transaction.builder()
.nonce(BigInteger.ZERO)
.gasPrice(BigInteger.ONE)
.gasLimit(txGasLimit)
.destination(accounts.get(i + nTxs).getAddress())
.destination(accounts.get(i + txNumber).getAddress())
.chainId(CONFIG.getNetworkConstants().getChainId())
.value(BigInteger.TEN)
.build();
Expand Down

0 comments on commit a07e648

Please sign in to comment.