Skip to content

Commit

Permalink
Research/parallel-tx/header generation from master (#1792)
Browse files Browse the repository at this point in the history
* Added the execution plan for the transaction to the block header as a transactionEdgeList.

* Added ParallelizeTransactionHandler that builds the different buckets that are going to be executed by different threads. If the sequential bucket doesn't have enough gas for the new transaction, the transaction isn't added to the block. In addition, once all the transactions are processed, the order of the transactions in the buckets is added to the receipt. The REMASC transaction is always added to the SequentialBucket. When a transaction is properly added to a bucket, it returns the gas used in that bucket.

* Added a tracker to the MutableRepository (MR). If the MR is used for any reason but builds the block, the tracker used is a DummyTracker one.

* Review processed

* Handler's been refactorized (#1760)

* Added tests.

* Research/parallel-tx/splitting executions (#1799)

* changed method names, deleted executeInternal from blockExecutor, and changed execute for executeParallel in some places. fixed BlockExecutorTest since from now on when the block is executed with executeAndFill and its blockNumber is over the rskip144 then the transactionEdgeList is built by the miner

* Three changes so test pass (i) Changed the block hash in Web3ImplLogsTest due to the new field in the header for txEdges, (ii) Many tests used a really high gas, then we decided not to split the gas limit by two for parallel and sequential bucket, and (iii) addRemascFee wasn't present in the oldSequentialExecute and now is added.
  • Loading branch information
julianlen committed Nov 28, 2022
1 parent 3a3b3ba commit 74b8aba
Show file tree
Hide file tree
Showing 16 changed files with 1,847 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private void executeBlocks(String[] args, BlockExecutor blockExecutor, BlockStor
Block block = blockStore.getChainBlockByNumber(n);
Block parent = blockStore.getBlockByHash(block.getParentHash().getBytes());

BlockResult blockResult = blockExecutor.execute(block, parent.getHeader(), false, false, true);
BlockResult blockResult = blockExecutor.execute(null, 0, block, parent.getHeader(), false, false, true);

Keccak256 stateRootHash = stateRootHandler.translate(block.getHeader());
if (!Arrays.equals(blockResult.getFinalState().getHash().getBytes(), stateRootHash.getBytes())) {
Expand Down
2 changes: 1 addition & 1 deletion rskj-core/src/main/java/co/rsk/core/bc/BlockChainImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private ImportResult internalTryToConnect(Block block) {
long saveTime = System.nanoTime();
logger.trace("execute start");

result = blockExecutor.execute(block, parent.getHeader(), false, noValidation, true);
result = blockExecutor.execute(null, 0, block, parent.getHeader(), false, noValidation, true);

logger.trace("execute done");

Expand Down
254 changes: 211 additions & 43 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion rskj-core/src/main/java/co/rsk/core/bc/BlockResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class BlockResult {
null,
Collections.emptyList(),
Collections.emptyList(),
0,
new short[0], 0,
Coin.ZERO,
null
);
Expand All @@ -49,11 +49,13 @@ public class BlockResult {
// It is for optimizing switching between states. Instead of using the "stateRoot" field,
// which requires regenerating the trie, using the finalState field does not.
private final Trie finalState;
private final short[] txEdges;

public BlockResult(
Block block,
List<Transaction> executedTransactions,
List<TransactionReceipt> transactionReceipts,
short[] txEdges,
long gasUsed,
Coin paidFees,
Trie finalState) {
Expand All @@ -63,12 +65,15 @@ public BlockResult(
this.gasUsed = gasUsed;
this.paidFees = paidFees;
this.finalState = finalState;
this.txEdges = txEdges;
}

public Block getBlock() {
return block;
}

public short[] getTxEdges() { return txEdges; }

public List<Transaction> getExecutedTransactions() { return executedTransactions; }

public List<TransactionReceipt> getTransactionReceipts() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* This file is part of RskJ
* Copyright (C) 2019 RSK Labs Ltd.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.core.bc;

import org.ethereum.db.ByteArrayWrapper;
import java.util.Set;

public interface IReadWrittenKeysTracker {
Set<ByteArrayWrapper> getTemporalReadKeys();

Set<ByteArrayWrapper> getTemporalWrittenKeys();

void addNewReadKey(ByteArrayWrapper key);

void addNewWrittenKey(ByteArrayWrapper key);

void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* This file is part of RskJ
* Copyright (C) 2017 RSK Labs Ltd.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.core.bc;

import co.rsk.core.RskAddress;
import org.ethereum.core.Transaction;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.vm.GasCost;

import java.util.*;

public class ParallelizeTransactionHandler {
private final HashMap<ByteArrayWrapper, TransactionBucket> bucketByWrittenKey;
private final HashMap<ByteArrayWrapper, Set<TransactionBucket>> bucketByReadKey;
private final Map<RskAddress, TransactionBucket> bucketBySender;
private final ArrayList<TransactionBucket> buckets;

public ParallelizeTransactionHandler(short buckets, long bucketGasLimit) {
this.bucketBySender = new HashMap<>();
this.bucketByWrittenKey = new HashMap<>();
this.bucketByReadKey = new HashMap<>();
this.buckets = new ArrayList<>();
for (short i = 0; i < buckets; i++){
this.buckets.add(new TransactionBucket(i, bucketGasLimit, false));
}
this.buckets.add(new TransactionBucket(buckets, bucketGasLimit, true));
}

public Optional<Long> addTransaction(Transaction tx, Set<ByteArrayWrapper> newReadKeys, Set<ByteArrayWrapper> newWrittenKeys, long gasUsedByTx) {
TransactionBucket bucketCandidate = getBucketCandidates(tx, newReadKeys, newWrittenKeys);

if (!bucketHasAvailableGas(tx, bucketCandidate)) {
if (bucketCandidate.isSequential()) {
return Optional.empty();
}
bucketCandidate = getSequentialBucket();

if (!bucketHasAvailableGas(tx, bucketCandidate)) {
return Optional.empty();
}
}

bucketCandidate.addTransaction(tx, gasUsedByTx);
addNewKeysToMaps(tx.getSender(), bucketCandidate, newReadKeys, newWrittenKeys);
return Optional.of(bucketCandidate.getGasUsed());
}

private boolean bucketHasAvailableGas(Transaction tx, TransactionBucket bucketCandidate) {
return bucketCandidate.hasGasAvailable(GasCost.toGas(tx.getGasLimit()));
}

public Optional<Long> addRemascTransaction(Transaction tx, long gasUsedByTx) {
TransactionBucket sequentialBucket = getSequentialBucket();
sequentialBucket.addTransaction(tx, gasUsedByTx);
return Optional.of(sequentialBucket.getGasUsed());
}

public long getGasUsedIn(Short bucketId) {

if (bucketId < 0 || bucketId >= buckets.size()) {
throw new NoSuchElementException();
}

return this.buckets.get(bucketId).getGasUsed();
}

public List<Transaction> getTransactionsInOrder() {
List<Transaction> txs = new ArrayList<>();
for (TransactionBucket bucket: this.buckets) {
txs.addAll(bucket.getTransactions());
}
return txs;
}

public short[] getTransactionsPerBucketInOrder() {
List<Short> bucketSizes = new ArrayList<>();
short bucketEdges = 0;

for (TransactionBucket bucket: this.buckets) {
if (bucket.getTransactions().isEmpty() || bucket.isSequential()) {
continue;
}
bucketEdges += bucket.getTransactions().size();
bucketSizes.add(bucketEdges);
}

short[] bucketOrder = new short[bucketSizes.size()];
int i = 0;
for (Short size: bucketSizes) {
bucketOrder[i] = size;
i++;
}

return bucketOrder;
}

private void addNewKeysToMaps(RskAddress sender, TransactionBucket bucket, Set<ByteArrayWrapper> newReadKeys, Set<ByteArrayWrapper> newWrittenKeys) {
for (ByteArrayWrapper key : newReadKeys) {
Set<TransactionBucket> bucketsAlreadyRead = bucketByReadKey.getOrDefault(key, new HashSet<>());
bucketsAlreadyRead.add(bucket);
bucketByReadKey.put(key, bucketsAlreadyRead);
}

if (bucket.isSequential()) {
bucketBySender.put(sender, bucket);
return;
} else {
bucketBySender.putIfAbsent(sender, bucket);
}

for (ByteArrayWrapper key: newWrittenKeys) {
bucketByWrittenKey.putIfAbsent(key, bucket);
}
}

private Optional<TransactionBucket> getBucketBySender(Transaction tx) {
return Optional.ofNullable(bucketBySender.get(tx.getSender()));
}

private Optional<TransactionBucket> getAvailableBucketWithLessUsedGas(long txGasLimit) {
long gasUsed = Long.MAX_VALUE;
Optional<TransactionBucket> bucketCandidate = Optional.empty();

for (TransactionBucket bucket : buckets) {
if (!bucket.isSequential() && bucket.hasGasAvailable(txGasLimit) && bucket.getGasUsed() < gasUsed) {
bucketCandidate = Optional.of(bucket);
gasUsed = bucket.getGasUsed();
}
}

return bucketCandidate;
}


private TransactionBucket getBucketCandidates(Transaction tx, Set<ByteArrayWrapper> newReadKeys, Set<ByteArrayWrapper> newWrittenKeys) {
Optional<TransactionBucket> bucketCandidate = getBucketBySender(tx);

if (bucketCandidate.isPresent() && bucketCandidate.get().isSequential()) {
return getSequentialBucket();
}

// read - written
for (ByteArrayWrapper newReadKey : newReadKeys) {
if (bucketByWrittenKey.containsKey(newReadKey)) {
TransactionBucket bucket = bucketByWrittenKey.get(newReadKey);

if (bucketCandidate.isPresent() && !bucketCandidate.get().equals(bucket)) {
return getSequentialBucket();
} else if (!bucketCandidate.isPresent()) {
bucketCandidate = Optional.of(bucket);
}
}
}

for (ByteArrayWrapper newWrittenKey : newWrittenKeys) {
// written - written,
if (bucketByWrittenKey.containsKey(newWrittenKey)) {
TransactionBucket bucket = bucketByWrittenKey.get(newWrittenKey);

if (bucketCandidate.isPresent() && !bucketCandidate.get().equals(bucket)) {
return getSequentialBucket();
} else {
bucketCandidate = Optional.of(bucket);
}
}
// read - written
if (bucketByReadKey.containsKey(newWrittenKey)) {
Set<TransactionBucket> readBuckets = bucketByReadKey.get(newWrittenKey);

if (readBuckets.size() > 1) {
return getSequentialBucket();
}

if (bucketCandidate.isPresent() && !readBuckets.contains(bucketCandidate.get())) {
return getSequentialBucket();
} else {
bucketCandidate = Optional.of(readBuckets.iterator().next());
}
}
}

return bucketCandidate.orElseGet(() -> getAvailableBucketWithLessUsedGas(GasCost.toGas(tx.getGasLimit())).orElseGet(this::getSequentialBucket));
}

private TransactionBucket getSequentialBucket() {
return this.buckets.get(this.buckets.size()-1);
}

public long getGasUsedInSequential() {
return getSequentialBucket().getGasUsed();
}

private static class TransactionBucket {

final Short id;
final long gasLimit;
final boolean isSequential;
final List<Transaction> transactions;
long gasUsedInBucket;

public TransactionBucket(Short id, long bucketGasLimit, boolean isSequential) {
this.id = id;
this.gasLimit = bucketGasLimit;
this.isSequential = isSequential;
this.transactions = new ArrayList<>();
this.gasUsedInBucket = 0;
}

private void addTransaction(Transaction tx, long gasUsedByTx) {
transactions.add(tx);
gasUsedInBucket = gasUsedInBucket + gasUsedByTx;
}

private boolean hasGasAvailable(long txGasLimit) {
//TODO(JULI): Re-check a thousand of times this line.
long cumulativeGas = GasCost.add(gasUsedInBucket, txGasLimit);
return cumulativeGas <= gasLimit;
}

public long getGasUsed() {
return gasUsedInBucket;
}

public List<Transaction> getTransactions() {
return this.transactions;
}

public boolean isSequential() {
return isSequential;
}
}
}
Loading

0 comments on commit 74b8aba

Please sign in to comment.