diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java index 1cc4d2ca42a..74b0005b344 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java @@ -28,6 +28,7 @@ import co.rsk.metrics.profilers.Profiler; import co.rsk.metrics.profilers.ProfilerFactory; import com.google.common.annotations.VisibleForTesting; +import org.ethereum.config.Constants; import org.ethereum.config.blockchain.upgrades.ActivationConfig; import org.ethereum.config.blockchain.upgrades.ConsensusRule; import org.ethereum.core.*; @@ -56,8 +57,6 @@ * Note that this class IS NOT guaranteed to be thread safe because its dependencies might hold state. */ public class BlockExecutor { - private static final int THREAD_COUNT = 4; - private static final Logger logger = LoggerFactory.getLogger("blockexecutor"); private static final Profiler profiler = ProfilerFactory.getInstance(); @@ -467,8 +466,8 @@ private BlockResult executeParallel( Set deletedAccounts = ConcurrentHashMap.newKeySet(); LongAccumulator remascFees = new LongAccumulator(Long::sum, 0); - ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); - CompletionService completionService = new ExecutorCompletionService(executorService); + ExecutorService executorService = Executors.newFixedThreadPool(Constants.getTransactionExecutionThreads()); + ExecutorCompletionService completionService = new ExecutorCompletionService(executorService); int nTasks = 0; // execute parallel subsets of transactions @@ -610,10 +609,9 @@ private BlockResult executeForMiningAfterRSKIP144( Map receiptsByTx = new HashMap<>(); Set deletedAccounts = new HashSet<>(); LongAccumulator remascFees = new LongAccumulator(Long::sum, 0); - short buckets = 2; //TODO(Juli): Is there a better way to calculate the bucket gas limit? - ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler(buckets, GasCost.toGas(block.getGasLimit())); + ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler((short) Constants.getTransactionExecutionThreads(), GasCost.toGas(block.getGasLimit())); int txindex = 0; @@ -693,7 +691,7 @@ private BlockResult executeForMiningAfterRSKIP144( receipt.setCumulativeGas(bucketGasAccumulated.get()); } else { //This line is used for testing only when acceptInvalidTransactions is set. - receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn(buckets)); + receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn((short) Constants.getTransactionExecutionThreads())); } receipt.setTxStatus(txExecutor.getReceipt().isSuccessful()); diff --git a/rskj-core/src/main/java/co/rsk/validators/ValidTxExecutionListsEdgesRule.java b/rskj-core/src/main/java/co/rsk/validators/ValidTxExecutionListsEdgesRule.java index 9885dbdfeb1..0a89a03de2d 100644 --- a/rskj-core/src/main/java/co/rsk/validators/ValidTxExecutionListsEdgesRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/ValidTxExecutionListsEdgesRule.java @@ -24,9 +24,9 @@ public boolean isValid(Block block) { if (edges == null) { return true; } - if (edges.length > Constants.getMaxTransactionExecutionThreads()) { + if (edges.length > Constants.getTransactionExecutionThreads()) { logger.warn("Invalid block: number of execution lists edges is greater than number of execution threads ({} vs {})", - edges.length, Constants.getMaxTransactionExecutionThreads()); + edges.length, Constants.getTransactionExecutionThreads()); return false; } short prev = 0; diff --git a/rskj-core/src/main/java/org/ethereum/config/Constants.java b/rskj-core/src/main/java/org/ethereum/config/Constants.java index 63ad2f55705..d8efab75255 100644 --- a/rskj-core/src/main/java/org/ethereum/config/Constants.java +++ b/rskj-core/src/main/java/org/ethereum/config/Constants.java @@ -48,6 +48,7 @@ public class Constants { private static final long DEFAULT_MAX_TIMESTAMPS_DIFF_IN_SECS = 5L * 60; // 5 mins private static final long TESTNET_MAX_TIMESTAMPS_DIFF_IN_SECS = 120L * 60; // 120 mins + public static final int TX_EXECUTION_THREADS = 4; private final byte chainId; private final boolean seedCowAccounts; @@ -225,7 +226,7 @@ public static int getMaxBitcoinMergedMiningMerkleProofLength() { return 960; } - public static int getMaxTransactionExecutionThreads() { return 4; } + public static int getTransactionExecutionThreads() { return TX_EXECUTION_THREADS; } public static Constants mainnet() { return new Constants( diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java index 48f41d97a1a..8d7dfe5fbba 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java @@ -499,16 +499,17 @@ void executeSequentiallyTenIndependentTxsAndThemShouldGoInBothBuckets(boolean ac BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE); long expectedGasUsed = 0L; long expectedAccumulatedGas = 21000L; - short[] expectedEdges = new short[]{5, 10}; + int txNumber = 12; + short[] expectedEdges = new short[]{3, 6, 9, 12}; Block parent = blockchain.getBestBlock(); - Block block = getBlockWithNIndependentTransactions(10, BigInteger.valueOf(expectedAccumulatedGas), false); + Block block = getBlockWithNIndependentTransactions(txNumber, BigInteger.valueOf(expectedAccumulatedGas), false); List txs = block.getTransactionsList(); BlockResult blockResult = executor.executeAndFill(block, parent.getHeader()); Assertions.assertEquals(txs.size(), blockResult.getExecutedTransactions().size()); Assertions.assertTrue(txs.containsAll(blockResult.getExecutedTransactions())); Assertions.assertArrayEquals(expectedEdges, blockResult.getTxEdges()); - Assertions.assertEquals(expectedAccumulatedGas*10, blockResult.getGasUsed()); + Assertions.assertEquals(expectedAccumulatedGas*txNumber, blockResult.getGasUsed()); List transactionReceipts = blockResult.getTransactionReceipts(); long accumulatedGasUsed = 0L; @@ -539,11 +540,10 @@ void executeBigIndependentTxsSequentiallyTheLastOneShouldGoToSequential(boolean Block parent = blockchain.getBestBlock(); long blockGasLimit = GasCost.toGas(parent.getGasLimit()); int gasLimit = 21000; - int transactionNumber = (int) (blockGasLimit /gasLimit); - short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2)}; + int transactionNumber = (int) (blockGasLimit/gasLimit); + short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2), (short) (transactionNumber*3), (short) (transactionNumber*4)}; int transactionsInSequential = 1; - - Block block = getBlockWithNIndependentTransactions(transactionNumber*2+transactionsInSequential, BigInteger.valueOf(gasLimit), false); + Block block = getBlockWithNIndependentTransactions(transactionNumber * Constants.getTransactionExecutionThreads() + transactionsInSequential, BigInteger.valueOf(gasLimit), false); List transactionsList = block.getTransactionsList(); BlockResult blockResult = executor.executeAndFill(block, parent.getHeader()); @@ -582,7 +582,7 @@ void executeATxInSequentialAndBlockResultShouldTrackTheGasUsedInTheBlock(boolean int gasLimit = 21000; int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit); int transactionsInSequential = 1; - int totalTxsNumber = transactionNumberToFillParallelBucket * 2 + transactionsInSequential; + int totalTxsNumber = transactionNumberToFillParallelBucket * Constants.getTransactionExecutionThreads() + transactionsInSequential; Block block = getBlockWithNIndependentTransactions(totalTxsNumber, BigInteger.valueOf(gasLimit), false); BlockResult blockResult = executor.executeAndFill(block, parent.getHeader()); @@ -601,7 +601,8 @@ void withTheBucketsFullTheLastTransactionShouldNotFit(boolean activeRskip144) { long blockGasLimit = GasCost.toGas(parent.getGasLimit()); int gasLimit = 21000; int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit); - int totalTxs = (transactionNumberToFillParallelBucket) * 3 + 1; + int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1; + int totalTxs = (transactionNumberToFillParallelBucket) * totalNumberOfBuckets + 1; Block block = getBlockWithNIndependentTransactions(totalTxs, BigInteger.valueOf(gasLimit), false); BlockResult blockResult = executor.executeAndFill(block, parent.getHeader()); Assertions.assertEquals(totalTxs, blockResult.getExecutedTransactions().size() + 1); @@ -620,8 +621,9 @@ void withSequentialBucketFullRemascTxShouldFit(boolean activeRskip144) { long blockGasLimit = GasCost.toGas(parent.getGasLimit()); int gasLimit = 21000; int transactionNumberToFillABucket = (int) (blockGasLimit / gasLimit); - int expectedNumberOfTx = transactionNumberToFillABucket*3 + 1; - Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket*3, BigInteger.valueOf(gasLimit), true); + int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1; + int expectedNumberOfTx = transactionNumberToFillABucket* totalNumberOfBuckets + 1; + Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket * totalNumberOfBuckets, BigInteger.valueOf(gasLimit), true); BlockResult blockResult = executor.executeAndFill(block, parent.getHeader()); Assertions.assertEquals(expectedNumberOfTx, blockResult.getExecutedTransactions().size()); }