diff --git a/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java b/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java index 33e7b4f348b..0b22499e60d 100644 --- a/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java +++ b/rskj-core/src/main/java/co/rsk/core/TransactionListExecutor.java @@ -1,5 +1,6 @@ package co.rsk.core; +import co.rsk.core.bc.IReadWrittenKeysTracker; import co.rsk.crypto.Keccak256; import org.ethereum.core.*; import org.ethereum.vm.DataWord; @@ -21,6 +22,7 @@ public class TransactionListExecutor implements Callable { private final TransactionExecutorFactory transactionExecutorFactory; private final List transactions; + private IReadWrittenKeysTracker readWrittenKeysTracker; private final Block block; private final Repository track; private final boolean vmTrace; @@ -41,6 +43,7 @@ public class TransactionListExecutor implements Callable { public TransactionListExecutor( List transactions, + IReadWrittenKeysTracker readWrittenKeysTracker, Block block, TransactionExecutorFactory transactionExecutorFactory, Repository track, @@ -58,6 +61,7 @@ public TransactionListExecutor( LongAccumulator accumulatedFees, LongAccumulator accumulatedGas, int firstTxIndex) { + this.readWrittenKeysTracker = readWrittenKeysTracker; this.block = block; this.transactionExecutorFactory = transactionExecutorFactory; this.track = track; @@ -98,6 +102,10 @@ public Boolean call() { ); boolean transactionExecuted = txExecutor.executeTransaction(); + if (readWrittenKeysTracker.hasCollided()) { + return false; + } + if (!acceptInvalidTransactions && !transactionExecuted) { if (discardInvalidTxs) { logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash()); diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java index 6fb07560714..1cc4d2ca42a 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java @@ -455,8 +455,8 @@ private BlockResult executeParallel( // of the repository to the state post execution, so it's necessary to get it to // the state prior execution again. Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE); - - Repository track = repositoryLocator.startTrackingAt(parent); + IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker(); + Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker); maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber())); @@ -477,6 +477,7 @@ private BlockResult executeParallel( List sublist = block.getTransactionsList().subList(start, end); TransactionListExecutor txListExecutor = new TransactionListExecutor( sublist, + readWrittenKeysTracker, block, transactionExecutorFactory, track, @@ -522,10 +523,12 @@ private BlockResult executeParallel( } } + readWrittenKeysTracker.clear(); // execute remaining transactions after the parallel subsets List sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size()); TransactionListExecutor txListExecutor = new TransactionListExecutor( sublist, + readWrittenKeysTracker, block, transactionExecutorFactory, track, diff --git a/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java index 3fb6f21d11d..c0aa9d11cc0 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java @@ -26,6 +26,8 @@ public interface IReadWrittenKeysTracker { Set getTemporalWrittenKeys(); + boolean hasCollided(); + void addNewReadKey(ByteArrayWrapper key); void addNewWrittenKey(ByteArrayWrapper key); diff --git a/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java index 46e1b877816..8d119d92a88 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java @@ -20,45 +20,71 @@ import org.ethereum.db.ByteArrayWrapper; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; //TODO(JULI): // * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key. public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker { - private Set temporalReadKeys; - private Set temporalWrittenKeys; + private Map> threadByReadKey; + private Map threadByWrittenKey; + private boolean collision; public ReadWrittenKeysTracker() { - this.temporalReadKeys = new HashSet<>(); - this.temporalWrittenKeys = new HashSet<>(); + this.threadByReadKey = new HashMap<>(); + this.threadByWrittenKey = new HashMap<>(); + this.collision = false; } @Override public Set getTemporalReadKeys(){ - return this.temporalReadKeys; + return new HashSet<>(this.threadByReadKey.keySet()); } @Override public Set getTemporalWrittenKeys(){ - return this.temporalWrittenKeys; + return new HashSet<>(this.threadByWrittenKey.keySet()); } + public boolean hasCollided() { return this.collision;} + @Override - public void addNewReadKey(ByteArrayWrapper key) { - temporalReadKeys.add(key); + public synchronized void addNewReadKey(ByteArrayWrapper key) { + long threadId = Thread.currentThread().getId(); + if (threadByWrittenKey.containsKey(key)) { + collision = collision || (threadId != threadByWrittenKey.get(key)); + } + Set threadSet; + if (threadByReadKey.containsKey(key)) { + threadSet = threadByReadKey.get(key); + } else { + threadSet = new HashSet<>(); + } + threadSet.add(threadId); + threadByReadKey.put(key, threadSet); } @Override - public void addNewWrittenKey(ByteArrayWrapper key) { - temporalWrittenKeys.add(key); + public synchronized void addNewWrittenKey(ByteArrayWrapper key) { + long threadId = Thread.currentThread().getId(); + if (threadByWrittenKey.containsKey(key)) { + collision = collision || (threadId != threadByWrittenKey.get(key)); + } + + if (threadByReadKey.containsKey(key)) { + Set threadSet = threadByReadKey.get(key); + collision = collision || !(threadSet.contains(threadId)) || (threadSet.size() > 1); + } + + threadByWrittenKey.put(key, threadId); } @Override - public void clear() { - this.temporalReadKeys = new HashSet<>(); - this.temporalWrittenKeys = new HashSet<>(); - + public synchronized void clear() { + this.threadByReadKey = new HashMap<>(); + this.threadByWrittenKey = new HashMap<>(); } } diff --git a/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java b/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java index e928147c90e..09cb33a4f33 100644 --- a/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java +++ b/rskj-core/src/main/java/org/ethereum/db/DummyReadWrittenKeysTracker.java @@ -25,6 +25,11 @@ public Set getTemporalWrittenKeys() { return temporalWrittenKeys; } + @Override + public boolean hasCollided() { + return false; + } + @Override public void addNewReadKey(ByteArrayWrapper key) { //Dummy tracker does not store added keys diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java index caa127cf5e4..48f41d97a1a 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java @@ -281,7 +281,8 @@ void executeBlockWithTwoTransactions(boolean activeRskip144) { Assertions.assertEquals(BigInteger.valueOf(60000 - 42000 - 20), accountState.getBalance().asBigInteger()); } - @Test + @ParameterizedTest + @ValueSource(booleans = {true, false}) void executeAndFillBlockWithNoSavingToStore(boolean activeRskip144) { TestObjects objects = generateBlockWithOneTransaction(activeRskip144, RSKIP_126_IS_ACTIVE); Block parent = objects.getParent(); @@ -294,7 +295,8 @@ void executeAndFillBlockWithNoSavingToStore(boolean activeRskip144) { Assertions.assertEquals(Optional.empty(), trieStore.retrieve(block.getStateRoot())); } - @Test + @ParameterizedTest + @ValueSource(booleans = {true, false}) void executeBlockWithSavingToStore(boolean activeRskip144) { TestObjects objects = generateBlockWithOneTransaction(activeRskip144, RSKIP_126_IS_ACTIVE); Block parent = objects.getParent(); @@ -666,6 +668,35 @@ void executeParallelBlockAgainstSequentialBlock(boolean activeRskip144) { Assertions.assertArrayEquals(seqResult.getFinalState().getHash().getBytes(), parallelResult.getFinalState().getHash().getBytes()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void executeInvalidParallelBlock(boolean activeRskip144) { + if (!activeRskip144) { + return; + } + doReturn(activeRskip144).when(activationConfig).isActive(eq(ConsensusRule.RSKIP144), anyLong()); + BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE); + Block parent = blockchain.getBestBlock(); + Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1, 2}); + BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false, true); + Assertions.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void ifThereIsACollisionBetweenParallelAndSequentialBucketItShouldNotBeConsidered(boolean activeRskip144) { + if (!activeRskip144) { + return; + } + doReturn(activeRskip144).when(activationConfig).isActive(eq(ConsensusRule.RSKIP144), anyLong()); + BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE); + Block parent = blockchain.getBestBlock(); + Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1}); + BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false, true); + Assertions.assertTrue(pBlock.getTransactionsList().containsAll(result.getExecutedTransactions())); + Assertions.assertEquals(pBlock.getTransactionsList().size(), result.getExecutedTransactions().size()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void executeParallelBlockTwice(boolean activeRskip144) { @@ -677,11 +708,11 @@ void executeParallelBlockTwice(boolean activeRskip144) { BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE); Block parent = blockchain.getBestBlock(); Block block1 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8}); - BlockResult result1 = executor.executeAndFill(block1, parent.getHeader()); + BlockResult result1 = executor.execute(null, 0, block1, parent.getHeader(), true, false, true); Block block2 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8}); - BlockResult result2 = executor.executeAndFill(block2, parent.getHeader()); + BlockResult result2 = executor.execute(null, 0, block2, parent.getHeader(), true, false, true); Assertions.assertArrayEquals(result2.getFinalState().getHash().getBytes(), result1.getFinalState().getHash().getBytes()); Assertions.assertArrayEquals(block1.getHash().getBytes(), block2.getHash().getBytes()); @@ -963,6 +994,48 @@ private Block getBlockWithTwoTransactions() { return new BlockGenerator(Constants.regtest(), activationConfig).createChildBlock(bestBlock, txs, uncles, 1, null); } + private Block getBlockWithTwoDependentTransactions(short[] edges) { + int nTxs = 2; + + Repository track = repository.startTracking(); + List accounts = new LinkedList<>(); + + for (int i = 0; i < nTxs; i++) { + accounts.add(createAccount("accounttest" + i, track, Coin.valueOf(60000))); + } + track.commit(); + Block bestBlock = blockchain.getBestBlock(); + bestBlock.setStateRoot(repository.getRoot()); + + List txs = new LinkedList<>(); + + for (int i = 0; i < nTxs; i++) { + Transaction tx = Transaction.builder() + .nonce(BigInteger.ZERO) + .gasPrice(BigInteger.ONE) + .gasLimit(BigInteger.valueOf(21000)) + .destination(accounts.get((i + 1) % 2).getAddress()) + .chainId(CONFIG.getNetworkConstants().getChainId()) + .value(BigInteger.TEN) + .build(); + tx.sign(accounts.get(i).getEcKey().getPrivKeyBytes()); + txs.add(tx); + } + List uncles = new ArrayList<>(); + + return new BlockGenerator(Constants.regtest(), activationConfig) + .createChildBlock( + bestBlock, + txs, + uncles, + 1, + null, + bestBlock.getGasLimit(), + bestBlock.getCoinbase(), + edges + ); + } + private Block getBlockWithTenTransactions(short[] edges) { int nTxs = 10; int nAccounts = nTxs * 2; @@ -1005,9 +1078,8 @@ private Block getBlockWithTenTransactions(short[] edges) { ); } - private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasLimit, boolean withRemasc) { - int nTxs = number; - int nAccounts = nTxs * 2; + private Block getBlockWithNIndependentTransactions(int txNumber, BigInteger txGasLimit, boolean withRemasc) { + int nAccounts = txNumber * 2; Repository track = repository.startTracking(); List accounts = new LinkedList<>(); @@ -1020,12 +1092,12 @@ private Block getBlockWithNIndependentTransactions(int number, BigInteger txGasL List txs = new LinkedList<>(); - for (int i = 0; i < nTxs; i++) { + for (int i = 0; i < txNumber; i++) { Transaction tx = Transaction.builder() .nonce(BigInteger.ZERO) .gasPrice(BigInteger.ONE) .gasLimit(txGasLimit) - .destination(accounts.get(i + nTxs).getAddress()) + .destination(accounts.get(i + txNumber).getAddress()) .chainId(CONFIG.getNetworkConstants().getChainId()) .value(BigInteger.TEN) .build(); diff --git a/rskj-core/src/test/java/co/rsk/core/bc/ReadWrittenKeysTrackerTest.java b/rskj-core/src/test/java/co/rsk/core/bc/ReadWrittenKeysTrackerTest.java index c9103ded52d..e438e0e8630 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/ReadWrittenKeysTrackerTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/ReadWrittenKeysTrackerTest.java @@ -24,8 +24,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Set; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; public class ReadWrittenKeysTrackerTest { @@ -131,8 +134,141 @@ void clearDummyTrackerShouldDoNothing() { Assertions.assertEquals(0, dummyTracker.getTemporalWrittenKeys().size()); } + @Test + public void ifTwoThreadsWriteTheSameKeyCollideShouldBeTrue() { + int nThreads = 2; + ExecutorService service = Executors.newFixedThreadPool(nThreads); + CompletionService completionService = new ExecutorCompletionService<>(service); + + for (int i = 0; i < nThreads; i++) { + ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(key1), Collections.emptyList()); + completionService.submit(rwKeys); + } + + assertThereWasACollision(nThreads, service, completionService); + } + + @Test + public void ifTwoThreadsReadAndWriteTheSameKeyShouldCollide() { + int nThreads = 2; + ExecutorService service = Executors.newFixedThreadPool(nThreads); + CompletionService completionService = new ExecutorCompletionService<>(service); + List writtenKeys; + List readKeys; + for (int i = 0; i < nThreads; i++) { + if (i == 0) { + writtenKeys = Collections.singletonList(key1); + readKeys = Collections.emptyList(); + } else { + writtenKeys = Collections.emptyList(); + readKeys = Collections.singletonList(key1); + } + + ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, writtenKeys, readKeys); + completionService.submit(rwKeys); + } + + assertThereWasACollision(nThreads, service, completionService); + } + + @Test + public void ifTwoThreadsWriteDifferentKeyCollideShouldBeFalse() { + int nThreads = 2; + ExecutorService service = Executors.newFixedThreadPool(nThreads); + CompletionService completionService = new ExecutorCompletionService<>(service); + + for (int i = 0; i < nThreads; i++) { + ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(i == 0? key1 : key2), Collections.emptyList()); + completionService.submit(rwKeys); + } + assertThereWasNotACollision(nThreads, service, completionService); + } + + @Test + public void allThreadIdsShouldBeStoredInTheReadKeysMap() { + int nThreads = 2; + ExecutorService service = Executors.newFixedThreadPool(nThreads); + CompletionService completionService = new ExecutorCompletionService<>(service); + boolean hasCollided = false; + + ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.emptyList(), Collections.singletonList(key1)); + completionService.submit(rwKeys); + + try { + Future hasCollidedFuture = completionService.take(); + hasCollided = hasCollidedFuture.get(); + } catch (Exception e) { + Assertions.fail(); + } + + Assertions.assertFalse(hasCollided); + ReadWrittenKeysHelper rwKeys2 = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(key1), Collections.singletonList(key1)); + completionService.submit(rwKeys2); + + try { + Future hasCollidedFuture = completionService.take(); + hasCollided = hasCollidedFuture.get(); + } catch (Exception e) { + Assertions.fail(); + } + + service.shutdown(); + Assertions.assertTrue(hasCollided); + } + + private void assertThereWasNotACollision(int nThreads, ExecutorService service, CompletionService completionService) { + boolean hasCollided = hasCollided(nThreads, completionService); + Assertions.assertFalse(hasCollided); + service.shutdown(); + } + + private void assertThereWasACollision(int nThreads, ExecutorService service, CompletionService completionService) { + boolean hasCollided = hasCollided(nThreads, completionService); + System.out.println(hasCollided); + Assertions.assertTrue(hasCollided); + service.shutdown(); + } + + private boolean hasCollided(int nThreads, CompletionService completionService) { + boolean hasCollided = false; + for (int i = 0; i < nThreads; i++) { + try { + Future hasCollidedFuture = completionService.take(); + hasCollided |= hasCollidedFuture.get(); + } catch (Exception e) { + Assertions.fail(); + } + } + return hasCollided; + } + private void assertKeyWasAddedInMap(Set map, ByteArrayWrapper key) { Assertions.assertEquals(1, map.size()); Assertions.assertTrue(map.contains(key)); } + private static class ReadWrittenKeysHelper implements Callable { + + private final List readKeys; + private final List writtenKeys; + private final IReadWrittenKeysTracker tracker; + + public ReadWrittenKeysHelper(IReadWrittenKeysTracker tracker, List writtenKeys, List readKeys) { + this.tracker = tracker; + this.readKeys = readKeys; + this.writtenKeys = writtenKeys; + } + //At first, it reads and then it writes. + public Boolean call() { + for (ByteArrayWrapper rk : readKeys) { + tracker.addNewReadKey(rk); + } + + for (ByteArrayWrapper wk : writtenKeys) { + tracker.addNewWrittenKey(wk); + } + + return tracker.hasCollided(); + } + } + }