Skip to content

Commit

Permalink
Created a Constant with the number of threads (#1815)
Browse files Browse the repository at this point in the history
  • Loading branch information
julianlen committed Nov 28, 2022
1 parent cd278a1 commit f7569b7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
12 changes: 5 additions & 7 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import co.rsk.metrics.profilers.Profiler;
import co.rsk.metrics.profilers.ProfilerFactory;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.config.Constants;
import org.ethereum.config.blockchain.upgrades.ActivationConfig;
import org.ethereum.config.blockchain.upgrades.ConsensusRule;
import org.ethereum.core.*;
Expand Down Expand Up @@ -56,8 +57,6 @@
* Note that this class IS NOT guaranteed to be thread safe because its dependencies might hold state.
*/
public class BlockExecutor {
private static final int THREAD_COUNT = 4;

private static final Logger logger = LoggerFactory.getLogger("blockexecutor");
private static final Profiler profiler = ProfilerFactory.getInstance();

Expand Down Expand Up @@ -467,8 +466,8 @@ private BlockResult executeParallel(
Set<DataWord> deletedAccounts = ConcurrentHashMap.newKeySet();
LongAccumulator remascFees = new LongAccumulator(Long::sum, 0);

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
CompletionService completionService = new ExecutorCompletionService(executorService);
ExecutorService executorService = Executors.newFixedThreadPool(Constants.getTransactionExecutionThreads());
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
int nTasks = 0;

// execute parallel subsets of transactions
Expand Down Expand Up @@ -610,10 +609,9 @@ private BlockResult executeForMiningAfterRSKIP144(
Map<Transaction, TransactionReceipt> receiptsByTx = new HashMap<>();
Set<DataWord> deletedAccounts = new HashSet<>();
LongAccumulator remascFees = new LongAccumulator(Long::sum, 0);
short buckets = 2;

//TODO(Juli): Is there a better way to calculate the bucket gas limit?
ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler(buckets, GasCost.toGas(block.getGasLimit()));
ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler((short) Constants.getTransactionExecutionThreads(), GasCost.toGas(block.getGasLimit()));

int txindex = 0;

Expand Down Expand Up @@ -693,7 +691,7 @@ private BlockResult executeForMiningAfterRSKIP144(
receipt.setCumulativeGas(bucketGasAccumulated.get());
} else {
//This line is used for testing only when acceptInvalidTransactions is set.
receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn(buckets));
receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn((short) Constants.getTransactionExecutionThreads()));
}

receipt.setTxStatus(txExecutor.getReceipt().isSuccessful());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public boolean isValid(Block block) {
if (edges == null) {
return true;
}
if (edges.length > Constants.getMaxTransactionExecutionThreads()) {
if (edges.length > Constants.getTransactionExecutionThreads()) {
logger.warn("Invalid block: number of execution lists edges is greater than number of execution threads ({} vs {})",
edges.length, Constants.getMaxTransactionExecutionThreads());
edges.length, Constants.getTransactionExecutionThreads());
return false;
}
short prev = 0;
Expand Down
3 changes: 2 additions & 1 deletion rskj-core/src/main/java/org/ethereum/config/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class Constants {

private static final long DEFAULT_MAX_TIMESTAMPS_DIFF_IN_SECS = 5L * 60; // 5 mins
private static final long TESTNET_MAX_TIMESTAMPS_DIFF_IN_SECS = 120L * 60; // 120 mins
public static final int TX_EXECUTION_THREADS = 4;

private final byte chainId;
private final boolean seedCowAccounts;
Expand Down Expand Up @@ -225,7 +226,7 @@ public static int getMaxBitcoinMergedMiningMerkleProofLength() {
return 960;
}

public static int getMaxTransactionExecutionThreads() { return 4; }
public static int getTransactionExecutionThreads() { return TX_EXECUTION_THREADS; }

public static Constants mainnet() {
return new Constants(
Expand Down
24 changes: 13 additions & 11 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,17 @@ void executeSequentiallyTenIndependentTxsAndThemShouldGoInBothBuckets(boolean ac
BlockExecutor executor = buildBlockExecutor(trieStore, activeRskip144, RSKIP_126_IS_ACTIVE);
long expectedGasUsed = 0L;
long expectedAccumulatedGas = 21000L;
short[] expectedEdges = new short[]{5, 10};
int txNumber = 12;
short[] expectedEdges = new short[]{3, 6, 9, 12};
Block parent = blockchain.getBestBlock();
Block block = getBlockWithNIndependentTransactions(10, BigInteger.valueOf(expectedAccumulatedGas), false);
Block block = getBlockWithNIndependentTransactions(txNumber, BigInteger.valueOf(expectedAccumulatedGas), false);
List<Transaction> txs = block.getTransactionsList();
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Assertions.assertEquals(txs.size(), blockResult.getExecutedTransactions().size());
Assertions.assertTrue(txs.containsAll(blockResult.getExecutedTransactions()));
Assertions.assertArrayEquals(expectedEdges, blockResult.getTxEdges());
Assertions.assertEquals(expectedAccumulatedGas*10, blockResult.getGasUsed());
Assertions.assertEquals(expectedAccumulatedGas*txNumber, blockResult.getGasUsed());

List<TransactionReceipt> transactionReceipts = blockResult.getTransactionReceipts();
long accumulatedGasUsed = 0L;
Expand Down Expand Up @@ -539,11 +540,10 @@ void executeBigIndependentTxsSequentiallyTheLastOneShouldGoToSequential(boolean
Block parent = blockchain.getBestBlock();
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumber = (int) (blockGasLimit /gasLimit);
short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2)};
int transactionNumber = (int) (blockGasLimit/gasLimit);
short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2), (short) (transactionNumber*3), (short) (transactionNumber*4)};
int transactionsInSequential = 1;

Block block = getBlockWithNIndependentTransactions(transactionNumber*2+transactionsInSequential, BigInteger.valueOf(gasLimit), false);
Block block = getBlockWithNIndependentTransactions(transactionNumber * Constants.getTransactionExecutionThreads() + transactionsInSequential, BigInteger.valueOf(gasLimit), false);
List<Transaction> transactionsList = block.getTransactionsList();
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Expand Down Expand Up @@ -582,7 +582,7 @@ void executeATxInSequentialAndBlockResultShouldTrackTheGasUsedInTheBlock(boolean
int gasLimit = 21000;
int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit);
int transactionsInSequential = 1;
int totalTxsNumber = transactionNumberToFillParallelBucket * 2 + transactionsInSequential;
int totalTxsNumber = transactionNumberToFillParallelBucket * Constants.getTransactionExecutionThreads() + transactionsInSequential;
Block block = getBlockWithNIndependentTransactions(totalTxsNumber, BigInteger.valueOf(gasLimit), false);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Expand All @@ -601,7 +601,8 @@ void withTheBucketsFullTheLastTransactionShouldNotFit(boolean activeRskip144) {
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit);
int totalTxs = (transactionNumberToFillParallelBucket) * 3 + 1;
int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1;
int totalTxs = (transactionNumberToFillParallelBucket) * totalNumberOfBuckets + 1;
Block block = getBlockWithNIndependentTransactions(totalTxs, BigInteger.valueOf(gasLimit), false);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());
Assertions.assertEquals(totalTxs, blockResult.getExecutedTransactions().size() + 1);
Expand All @@ -620,8 +621,9 @@ void withSequentialBucketFullRemascTxShouldFit(boolean activeRskip144) {
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumberToFillABucket = (int) (blockGasLimit / gasLimit);
int expectedNumberOfTx = transactionNumberToFillABucket*3 + 1;
Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket*3, BigInteger.valueOf(gasLimit), true);
int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1;
int expectedNumberOfTx = transactionNumberToFillABucket* totalNumberOfBuckets + 1;
Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket * totalNumberOfBuckets, BigInteger.valueOf(gasLimit), true);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());
Assertions.assertEquals(expectedNumberOfTx, blockResult.getExecutedTransactions().size());
}
Expand Down

0 comments on commit f7569b7

Please sign in to comment.