From a07e648e884468f4acbaae4621d0d16206a36c49 Mon Sep 17 00:00:00 2001 From: Julian Len Date: Thu, 26 May 2022 16:39:09 -0300 Subject: [PATCH] Added thread logic in the Tracker so it controls whether there is a key collision between threads --- .../co/rsk/core/TransactionListExecutor.java | 8 +++ .../java/co/rsk/core/bc/BlockExecutor.java | 6 +- .../rsk/core/bc/IReadWrittenKeysTracker.java | 2 + .../rsk/core/bc/ReadWrittenKeysTracker.java | 44 +++++++++---- .../db/DummyReadWrittenKeysTracker.java | 5 ++ .../co/rsk/core/bc/BlockExecutorTest.java | 62 +++++++++++++++++-- 6 files changed, 107 insertions(+), 20 deletions(-) diff --git a/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java b/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java index 33e7b4f348b..0b22499e60d 100644 --- a/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java +++ b/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java @@ -1,5 +1,6 @@ package co.rsk.core; +import co.rsk.core.bc.IReadWrittenKeysTracker; import co.rsk.crypto.Keccak256; import org.ethereum.core.*; import org.ethereum.vm.DataWord; @@ -21,6 +22,7 @@ public class TransactionListExecutor implements Callable { private final TransactionExecutorFactory transactionExecutorFactory; private final List transactions; + private IReadWrittenKeysTracker readWrittenKeysTracker; private final Block block; private final Repository track; private final boolean vmTrace; @@ -41,6 +43,7 @@ public class TransactionListExecutor implements Callable { public TransactionListExecutor( List transactions, + IReadWrittenKeysTracker readWrittenKeysTracker, Block block, TransactionExecutorFactory transactionExecutorFactory, Repository track, @@ -58,6 +61,7 @@ public TransactionListExecutor( LongAccumulator accumulatedFees, LongAccumulator accumulatedGas, int firstTxIndex) { + this.readWrittenKeysTracker = readWrittenKeysTracker; this.block = block; this.transactionExecutorFactory = transactionExecutorFactory; this.track = track; @@ -98,6 +102,10 @@ public Boolean call() { ); boolean transactionExecuted = txExecutor.executeTransaction(); + if (readWrittenKeysTracker.hasCollided()) { + return false; + } + if (!acceptInvalidTransactions && !transactionExecuted) { if (discardInvalidTxs) { logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash()); diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java index 1f247c7ee09..4c96f0d4a2a 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java @@ -458,8 +458,8 @@ private BlockResult executeParallel( // of the repository to the state post execution, so it's necessary to get it to // the state prior execution again. Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE); - - Repository track = repositoryLocator.startTrackingAt(parent); + IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker(); + Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker); maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber())); @@ -480,6 +480,7 @@ private BlockResult executeParallel( List sublist = block.getTransactionsList().subList(start, end); TransactionListExecutor txListExecutor = new TransactionListExecutor( sublist, + readWrittenKeysTracker, block, transactionExecutorFactory, track, @@ -529,6 +530,7 @@ private BlockResult executeParallel( List sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size()); TransactionListExecutor txListExecutor = new TransactionListExecutor( sublist, + readWrittenKeysTracker, block, transactionExecutorFactory, track, diff --git a/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java index 3fb6f21d11d..c0aa9d11cc0 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java @@ -26,6 +26,8 @@ public interface IReadWrittenKeysTracker { Set getTemporalWrittenKeys(); + boolean hasCollided(); + void addNewReadKey(ByteArrayWrapper key); void addNewWrittenKey(ByteArrayWrapper key); diff --git a/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java index 46e1b877816..08b7ca73a2c 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java @@ -19,46 +19,64 @@ package co.rsk.core.bc; import org.ethereum.db.ByteArrayWrapper; - -import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; //TODO(JULI): // * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key. public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker { - private Set temporalReadKeys; - private Set temporalWrittenKeys; + private Map threadByReadKey; + private Map threadByWrittenKey; + private boolean collision; public ReadWrittenKeysTracker() { - this.temporalReadKeys = new HashSet<>(); - this.temporalWrittenKeys = new HashSet<>(); + this.threadByReadKey = new HashMap<>(); + this.threadByWrittenKey = new HashMap<>(); + this.collision = false; } @Override public Set getTemporalReadKeys(){ - return this.temporalReadKeys; + return this.threadByReadKey.keySet(); } @Override public Set getTemporalWrittenKeys(){ - return this.temporalWrittenKeys; + return this.threadByWrittenKey.keySet(); } + public boolean hasCollided() { return this.collision;} + @Override public void addNewReadKey(ByteArrayWrapper key) { - temporalReadKeys.add(key); + long threadId = Thread.currentThread().getId(); + + if (threadByWrittenKey.containsKey(key)) { + collision |= threadId != threadByWrittenKey.get(key); + } + + threadByReadKey.put(key, threadId); } @Override public void addNewWrittenKey(ByteArrayWrapper key) { - temporalWrittenKeys.add(key); + long threadId = Thread.currentThread().getId(); + if (threadByWrittenKey.containsKey(key)) { + collision |= threadId != threadByWrittenKey.get(key); + } + + if (threadByReadKey.containsKey(key)) { + collision |= threadId != threadByReadKey.get(key); + } + + threadByWrittenKey.put(key, threadId); } @Override public void clear() { - this.temporalReadKeys = new HashSet<>(); - this.temporalWrittenKeys = new HashSet<>(); - + this.threadByReadKey = new HashMap<>(); + this.threadByWrittenKey = new HashMap<>(); } } diff --git a/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java b/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java index e928147c90e..09cb33a4f33 100644 --- a/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java @@ -25,6 +25,11 @@ public Set getTemporalWrittenKeys() { return temporalWrittenKeys; } + @Override + public boolean hasCollided() { + return false; + } + @Override public void addNewReadKey(ByteArrayWrapper key) { //Dummy tracker does not store added keys diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java index 7af4a34b3dc..502fcd3e8eb 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java @@ -618,6 +618,17 @@ public void executeParallelBlockAgainstSequentialBlock() { Assert.assertArrayEquals(seqResult.getFinalState().getHash().getBytes(), parallelResult.getFinalState().getHash().getBytes()); } + @Test + public void executeInvalidParallelBlock() { + if (!activeRskip144) { + return; + } + Block parent = blockchain.getBestBlock(); + Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1, 2}); + BlockResult result = executor.execute(pBlock, parent.getHeader(), true); + Assert.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result); + } + @Test public void executeParallelBlockTwice() { if (!activeRskip144) { @@ -895,6 +906,48 @@ private Block getBlockWithTwoTransactions() { return new BlockGenerator(Constants.regtest(), activationConfig).createChildBlock(bestBlock, txs, uncles, 1, null); } + private Block getBlockWithTwoDependentTransactions(short[] edges) { + int nTxs = 2; + + Repository track = repository.startTracking(); + List accounts = new LinkedList<>(); + + for (int i = 0; i < nTxs; i++) { + accounts.add(createAccount("accounttest" + i, track, Coin.valueOf(60000))); + } + track.commit(); + Block bestBlock = blockchain.getBestBlock(); + bestBlock.setStateRoot(repository.getRoot()); + + List txs = new LinkedList<>(); + + for (int i = 0; i < nTxs; i++) { + Transaction tx = Transaction.builder() + .nonce(BigInteger.ZERO) + .gasPrice(BigInteger.ONE) + .gasLimit(BigInteger.valueOf(21000)) + .destination(accounts.get((i + 1) % 2).getAddress()) + .chainId(CONFIG.getNetworkConstants().getChainId()) + .value(BigInteger.TEN) + .build(); + tx.sign(accounts.get(i).getEcKey().getPrivKeyBytes()); + txs.add(tx); + } + List uncles = new ArrayList<>(); + + return new BlockGenerator(Constants.regtest(), activationConfig) + .createChildBlock( + bestBlock, + txs, + uncles, + 1, + null, + bestBlock.getGasLimit(), + bestBlock.getCoinbase(), + edges + ); + } + private Block getBlockWithTenTransactions(short[] edges) { int nTxs = 10; int nAccounts = nTxs * 2; @@ -937,9 +990,8 @@ private Block getBlockWithTenTransactions(short[] edges) { ); } - private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasLimit, boolean withRemasc) { - int nTxs = number; - int nAccounts = nTxs * 2; + private Block getBlockWithNIndependentTransactions(int txNumber, BigInteger txGasLimit, boolean withRemasc) { + int nAccounts = txNumber * 2; Repository track = repository.startTracking(); List accounts = new LinkedList<>(); @@ -952,12 +1004,12 @@ private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasL List txs = new LinkedList<>(); - for (int i = 0; i < nTxs; i++) { + for (int i = 0; i < txNumber; i++) { Transaction tx = Transaction.builder() .nonce(BigInteger.ZERO) .gasPrice(BigInteger.ONE) .gasLimit(txGasLimit) - .destination(accounts.get(i + nTxs).getAddress()) + .destination(accounts.get(i + txNumber).getAddress()) .chainId(CONFIG.getNetworkConstants().getChainId()) .value(BigInteger.TEN) .build();