Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created a Constant with the number of threads #1815

Merged
merged 1 commit into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import co.rsk.metrics.profilers.Profiler;
import co.rsk.metrics.profilers.ProfilerFactory;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.config.Constants;
import org.ethereum.config.blockchain.upgrades.ActivationConfig;
import org.ethereum.config.blockchain.upgrades.ConsensusRule;
import org.ethereum.core.*;
Expand Down Expand Up @@ -56,8 +57,6 @@
* Note that this class IS NOT guaranteed to be thread safe because its dependencies might hold state.
*/
public class BlockExecutor {
private static final int THREAD_COUNT = 4;

private static final Logger logger = LoggerFactory.getLogger("blockexecutor");
private static final Profiler profiler = ProfilerFactory.getInstance();

Expand Down Expand Up @@ -470,8 +469,8 @@ private BlockResult executeParallel(
Set<DataWord> deletedAccounts = ConcurrentHashMap.newKeySet();
LongAccumulator remascFees = new LongAccumulator(Long::sum, 0);

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
CompletionService completionService = new ExecutorCompletionService(executorService);
ExecutorService executorService = Executors.newFixedThreadPool(Constants.getTransactionExecutionThreads());
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
int nTasks = 0;

// execute parallel subsets of transactions
Expand Down Expand Up @@ -619,10 +618,9 @@ private BlockResult executeForMiningAfterRSKIP144(
Map<Transaction, TransactionReceipt> receiptsByTx = new HashMap<>();
Set<DataWord> deletedAccounts = new HashSet<>();
LongAccumulator remascFees = new LongAccumulator(Long::sum, 0);
short buckets = 2;

//TODO(Juli): Is there a better way to calculate the bucket gas limit?
ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler(buckets, GasCost.toGas(block.getGasLimit()));
ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler((short) Constants.getTransactionExecutionThreads(), GasCost.toGas(block.getGasLimit()));

int txindex = 0;

Expand Down Expand Up @@ -702,7 +700,7 @@ private BlockResult executeForMiningAfterRSKIP144(
receipt.setCumulativeGas(bucketGasAccumulated.get());
} else {
//This line is used for testing only when acceptInvalidTransactions is set.
receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn(buckets));
receipt.setCumulativeGas(parallelizeTransactionHandler.getGasUsedIn((short) Constants.getTransactionExecutionThreads()));
}

receipt.setTxStatus(txExecutor.getReceipt().isSuccessful());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public boolean isValid(Block block) {
if (edges == null) {
return true;
}
if (edges.length > Constants.getMaxTransactionExecutionThreads()) {
if (edges.length > Constants.getTransactionExecutionThreads()) {
logger.warn("Invalid block: number of execution lists edges is greater than number of execution threads ({} vs {})",
edges.length, Constants.getMaxTransactionExecutionThreads());
edges.length, Constants.getTransactionExecutionThreads());
return false;
}
short prev = 0;
Expand Down
3 changes: 2 additions & 1 deletion rskj-core/src/main/java/org/ethereum/config/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class Constants {

private static final long DEFAULT_MAX_TIMESTAMPS_DIFF_IN_SECS = 5L * 60; // 5 mins
private static final long TESTNET_MAX_TIMESTAMPS_DIFF_IN_SECS = 120L * 60; // 120 mins
public static final int TX_EXECUTION_THREADS = 4;

private final byte chainId;
private final boolean seedCowAccounts;
Expand Down Expand Up @@ -225,7 +226,7 @@ public static int getMaxBitcoinMergedMiningMerkleProofLength() {
return 960;
}

public static int getMaxTransactionExecutionThreads() { return 4; }
public static int getTransactionExecutionThreads() { return TX_EXECUTION_THREADS; }

public static Constants mainnet() {
return new Constants(
Expand Down
24 changes: 13 additions & 11 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,16 +471,17 @@ public void executeSequentiallyTenIndependentTxsAndThemShouldGoInBothBuckets() {

long expectedGasUsed = 0L;
long expectedAccumulatedGas = 21000L;
short[] expectedEdges = new short[]{5, 10};
int txNumber = 12;
short[] expectedEdges = new short[]{3, 6, 9, 12};
Block parent = blockchain.getBestBlock();
Block block = getBlockWithNIndependentTransactions(10, BigInteger.valueOf(expectedAccumulatedGas), false);
Block block = getBlockWithNIndependentTransactions(txNumber, BigInteger.valueOf(expectedAccumulatedGas), false);
List<Transaction> txs = block.getTransactionsList();
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Assert.assertEquals(txs.size(), blockResult.getExecutedTransactions().size());
Assert.assertTrue(txs.containsAll(blockResult.getExecutedTransactions()));
Assert.assertArrayEquals(expectedEdges, blockResult.getTxEdges());
Assert.assertEquals(expectedAccumulatedGas*10, blockResult.getGasUsed());
Assert.assertEquals(expectedAccumulatedGas*txNumber, blockResult.getGasUsed());

List<TransactionReceipt> transactionReceipts = blockResult.getTransactionReceipts();
long accumulatedGasUsed = 0L;
Expand Down Expand Up @@ -508,11 +509,10 @@ public void executeBigIndependentTxsSequentiallyTheLastOneShouldGoToSequential()
Block parent = blockchain.getBestBlock();
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumber = (int) (blockGasLimit /gasLimit);
short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2)};
int transactionNumber = (int) (blockGasLimit/gasLimit);
short[] expectedEdges = new short[]{(short) transactionNumber, (short) (transactionNumber*2), (short) (transactionNumber*3), (short) (transactionNumber*4)};
int transactionsInSequential = 1;

Block block = getBlockWithNIndependentTransactions(transactionNumber*2+transactionsInSequential, BigInteger.valueOf(gasLimit), false);
Block block = getBlockWithNIndependentTransactions(transactionNumber * Constants.getTransactionExecutionThreads() + transactionsInSequential, BigInteger.valueOf(gasLimit), false);
List<Transaction> transactionsList = block.getTransactionsList();
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Expand Down Expand Up @@ -548,7 +548,7 @@ public void executeATxInSequentialAndBlockResultShouldTrackTheGasUsedInTheBlock(
int gasLimit = 21000;
int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit);
int transactionsInSequential = 1;
int totalTxsNumber = transactionNumberToFillParallelBucket * 2 + transactionsInSequential;
int totalTxsNumber = transactionNumberToFillParallelBucket * Constants.getTransactionExecutionThreads() + transactionsInSequential;
Block block = getBlockWithNIndependentTransactions(totalTxsNumber, BigInteger.valueOf(gasLimit), false);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());

Expand All @@ -564,7 +564,8 @@ public void withTheBucketsFullTheLastTransactionShouldNotFit() {
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumberToFillParallelBucket = (int) (blockGasLimit / gasLimit);
int totalTxs = (transactionNumberToFillParallelBucket) * 3 + 1;
int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1;
int totalTxs = (transactionNumberToFillParallelBucket) * totalNumberOfBuckets + 1;
Block block = getBlockWithNIndependentTransactions(totalTxs, BigInteger.valueOf(gasLimit), false);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());
Assert.assertEquals(totalTxs, blockResult.getExecutedTransactions().size() + 1);
Expand All @@ -579,8 +580,9 @@ public void withSequentialBucketFullRemascTxShouldFit() {
long blockGasLimit = GasCost.toGas(parent.getGasLimit());
int gasLimit = 21000;
int transactionNumberToFillABucket = (int) (blockGasLimit / gasLimit);
int expectedNumberOfTx = transactionNumberToFillABucket*3 + 1;
Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket*3, BigInteger.valueOf(gasLimit), true);
int totalNumberOfBuckets = Constants.getTransactionExecutionThreads() + 1;
int expectedNumberOfTx = transactionNumberToFillABucket* totalNumberOfBuckets + 1;
Block block = getBlockWithNIndependentTransactions(transactionNumberToFillABucket * totalNumberOfBuckets, BigInteger.valueOf(gasLimit), true);
BlockResult blockResult = executor.executeAndFill(block, parent.getHeader());
Assert.assertEquals(expectedNumberOfTx, blockResult.getExecutedTransactions().size());
}
Expand Down