Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction pool improvements to avoid filling the pool with not executable transactions #4425

Merged
merged 17 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public PayloadIdentifier preparePayload(
Result result = validateBlock(emptyBlock);
if (result.blockProcessingOutputs.isPresent()) {
mergeContext.putPayloadById(payloadIdentifier, emptyBlock);
debugLambda(
LOG,
"Built empty block proposal {} for payload {}",
emptyBlock::toLogString,
payloadIdentifier::toShortHexString);
} else {
LOG.warn(
"failed to execute empty block proposal {}, reason {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void shouldRetryBlockCreationIfStillHaveTime() {
.thenReturn(CompletableFuture.failedFuture(new StorageException("lock")))
.thenAnswer(i -> CompletableFuture.completedFuture(i.getArgument(0, Supplier.class).get()));

transactions.addLocalTransaction(localTransaction0);
transactions.addLocalTransaction(localTransaction0, Optional.empty());

var payloadId =
coordinator.preparePayload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.blockcreation;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.chain.Blockchain;
Expand All @@ -39,6 +41,7 @@
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -82,6 +85,12 @@ private void update(
transactions.add(transaction);
receipts.add(receipt);
cumulativeGasUsed += gasUsed;
traceLambda(
LOG,
"New selected transaction {}, total transactions {}, cumulative gas used {}",
transaction::toTraceLog,
transactions::size,
() -> cumulativeGasUsed);
}

public List<Transaction> getTransactions() {
Expand All @@ -95,6 +104,13 @@ public List<TransactionReceipt> getReceipts() {
public long getCumulativeGasUsed() {
return cumulativeGasUsed;
}

public String toTraceLog() {
return "cumulativeGasUsed="
+ cumulativeGasUsed
+ ", transactions="
+ transactions.stream().map(Transaction::toTraceLog).collect(Collectors.joining("; "));
}
}

private final Supplier<Boolean> isCancelled;
Expand Down Expand Up @@ -142,8 +158,13 @@ If running in a thread, it can be cancelled via the isCancelled supplier (which
in this throwing an CancellationException).
*/
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.debug("Transaction pool size {}", pendingTransactions.size());
traceLambda(
LOG, "Transaction pool content {}", () -> pendingTransactions.toTraceLog(false, false));
pendingTransactions.selectTransactions(
pendingTransaction -> evaluateTransaction(pendingTransaction));
traceLambda(
LOG, "Transaction selection result result {}", transactionSelectionResult::toTraceLog);
return transactionSelectionResult;
}

Expand Down Expand Up @@ -173,8 +194,10 @@ private TransactionSelectionResult evaluateTransaction(final Transaction transac
}

if (transactionTooLargeForBlock(transaction)) {
LOG.trace("{} too large to select for block creation", transaction);
traceLambda(
LOG, "Transaction {} too large to select for block creation", transaction::toTraceLog);
if (blockOccupancyAboveThreshold()) {
traceLambda(LOG, "Block occupancy above threshold, completing operation");
return TransactionSelectionResult.COMPLETE_OPERATION;
} else {
return TransactionSelectionResult.CONTINUE;
Expand All @@ -183,6 +206,7 @@ private TransactionSelectionResult evaluateTransaction(final Transaction transac

// If the gas price specified by the transaction is less than this node is willing to accept,
// do not include it in the block.
// ToDo: why we accept this in the pool in the first place then?
final Wei actualMinTransactionGasPriceInBlock =
feeMarket
.getTransactionPriceCalculator()
Expand Down Expand Up @@ -212,7 +236,7 @@ private TransactionSelectionResult evaluateTransaction(final Transaction transac
validationResult.getErrorMessage(),
processableBlockHeader.getParentHash().toHexString(),
transaction.getHash().toHexString());
return transactionSelectionResultForInvalidResult(validationResult);
return transactionSelectionResultForInvalidResult(transaction, validationResult);
} else {
// valid GoQuorum private tx, we need to hand craft the receipt and increment the nonce
effectiveResult = publicResultForWhenWeHaveAPrivateTransaction(transaction);
Expand All @@ -233,23 +257,34 @@ private TransactionSelectionResult evaluateTransaction(final Transaction transac

if (!effectiveResult.isInvalid()) {
worldStateUpdater.commit();
LOG.trace("Selected {} for block creation", transaction);
traceLambda(LOG, "Selected {} for block creation", transaction::toTraceLog);
updateTransactionResultTracking(transaction, effectiveResult);
} else {
return transactionSelectionResultForInvalidResult(effectiveResult.getValidationResult());
return transactionSelectionResultForInvalidResult(
transaction, effectiveResult.getValidationResult());
}
return TransactionSelectionResult.CONTINUE;
}

private TransactionSelectionResult transactionSelectionResultForInvalidResult(
final Transaction transaction,
final ValidationResult<TransactionInvalidReason> invalidReasonValidationResult) {
// If the transaction has an incorrect nonce, leave it in the pool and continue
if (invalidReasonValidationResult
.getInvalidReason()
.equals(TransactionInvalidReason.INCORRECT_NONCE)) {
traceLambda(
LOG,
"Incorrect nonce for transaction {} keeping it in the pool",
transaction::toTraceLog);
return TransactionSelectionResult.CONTINUE;
}
// If the transaction was invalid for any other reason, delete it, and continue.
traceLambda(
LOG,
"Delete invalid transaction {}, reason {}",
transaction::toTraceLog,
invalidReasonValidationResult::getInvalidReason);
return TransactionSelectionResult.DELETE_TRANSACTION_AND_CONTINUE;
}

Expand Down Expand Up @@ -318,6 +353,13 @@ private boolean transactionTooLargeForBlock(final Transaction transaction) {
private boolean blockOccupancyAboveThreshold() {
final double gasAvailable = processableBlockHeader.getGasLimit();
final double gasUsed = transactionSelectionResult.getCumulativeGasUsed();
return (gasUsed / gasAvailable) >= minBlockOccupancyRatio;
final double occupancyRatio = gasUsed / gasAvailable;
LOG.trace(
"Min block occupancy ratio {}, gas used {}, available {}, used/available {}",
minBlockOccupancyRatio,
gasUsed,
gasAvailable,
occupancyRatio);
return occupancyRatio >= minBlockOccupancyRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,20 @@ public class BlockTransactionSelectorTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();

private final Blockchain blockchain = new ReferenceTestBlockchain();
private final GasPricePendingTransactionsSorter pendingTransactions =
new GasPricePendingTransactionsSorter(
ImmutableTransactionPoolConfiguration.builder().txPoolMaxSize(5).build(),
TestClock.system(ZoneId.systemDefault()),
metricsSystem,
BlockTransactionSelectorTest::mockBlockHeader);
private final MutableWorldState worldState =
InMemoryKeyValueStorageProvider.createInMemoryWorldState();
private GasPricePendingTransactionsSorter pendingTransactions;
private MutableWorldState worldState;
@Mock private MainnetTransactionProcessor transactionProcessor;
@Mock private MainnetTransactionValidator transactionValidator;

@Before
public void setup() {
worldState = InMemoryKeyValueStorageProvider.createInMemoryWorldState();
pendingTransactions =
new GasPricePendingTransactionsSorter(
ImmutableTransactionPoolConfiguration.builder().txPoolMaxSize(5).build(),
TestClock.system(ZoneId.systemDefault()),
metricsSystem,
BlockTransactionSelectorTest::mockBlockHeader);
when(transactionProcessor.getTransactionValidator()).thenReturn(transactionValidator);
when(transactionValidator.getGoQuorumCompatibilityMode()).thenReturn(true);
}
Expand Down Expand Up @@ -158,7 +159,7 @@ public void emptyPendingTransactionsResultsInEmptyVettingResult() {
@Test
public void failedTransactionsAreIncludedInTheBlock() {
final Transaction transaction = createTransaction(1);
pendingTransactions.addRemoteTransaction(transaction);
pendingTransactions.addRemoteTransaction(transaction, Optional.empty());

when(transactionProcessor.processTransaction(
any(), any(), any(), eq(transaction), any(), any(), anyBoolean(), any()))
Expand Down Expand Up @@ -199,7 +200,7 @@ public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills
for (int i = 0; i < 5; i++) {
final Transaction tx = createTransaction(i);
transactionsToInject.add(tx);
pendingTransactions.addRemoteTransaction(tx);
pendingTransactions.addRemoteTransaction(tx, Optional.empty());
}

when(transactionProcessor.processTransaction(
Expand Down Expand Up @@ -255,7 +256,7 @@ public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() {
for (int i = 0; i < 5; i++) {
final Transaction tx = createTransaction(i);
transactionsToInject.add(tx);
pendingTransactions.addRemoteTransaction(tx);
pendingTransactions.addRemoteTransaction(tx, Optional.empty());
}

when(transactionProcessor.processTransaction(
Expand Down Expand Up @@ -317,7 +318,7 @@ public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFrom
FeeMarket.legacy());

final Transaction tx = createTransaction(1);
pendingTransactions.addRemoteTransaction(tx);
pendingTransactions.addRemoteTransaction(tx, Optional.empty());

final BlockTransactionSelector.TransactionSelectionResults results =
selector.buildTransactionListForBlock();
Expand Down Expand Up @@ -389,8 +390,8 @@ public void useSingleGasSpaceForAllTransactions() {
TransactionProcessingResult.successful(
new ArrayList<>(), 0, 0, Bytes.EMPTY, ValidationResult.valid()));

pendingTransactions1559.addRemoteTransaction(fillingLegacyTx);
pendingTransactions1559.addRemoteTransaction(extraEIP1559Tx);
pendingTransactions1559.addRemoteTransaction(fillingLegacyTx, Optional.empty());
pendingTransactions1559.addRemoteTransaction(extraEIP1559Tx, Optional.empty());
final BlockTransactionSelector.TransactionSelectionResults results =
selector.buildTransactionListForBlock();

Expand Down Expand Up @@ -441,7 +442,7 @@ public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupa
.createTransaction(keyPair));

for (final Transaction tx : transactionsToInject) {
pendingTransactions.addRemoteTransaction(tx);
pendingTransactions.addRemoteTransaction(tx, Optional.empty());
}

final BlockTransactionSelector.TransactionSelectionResults results =
Expand Down Expand Up @@ -504,10 +505,10 @@ public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() {
.nonce(4)
.createTransaction(keyPair);

pendingTransactions.addRemoteTransaction(transaction1);
pendingTransactions.addRemoteTransaction(transaction2);
pendingTransactions.addRemoteTransaction(transaction3);
pendingTransactions.addRemoteTransaction(transaction4);
pendingTransactions.addRemoteTransaction(transaction1, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction2, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction3, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction4, Optional.empty());

final BlockTransactionSelector.TransactionSelectionResults results =
selector.buildTransactionListForBlock();
Expand Down Expand Up @@ -544,8 +545,8 @@ public void shouldDiscardTransactionsThatFailValidation() {
final Transaction invalidTransaction =
txTestFixture.nonce(2).gasLimit(2).createTransaction(keyPair);

pendingTransactions.addRemoteTransaction(validTransaction);
pendingTransactions.addRemoteTransaction(invalidTransaction);
pendingTransactions.addRemoteTransaction(validTransaction, Optional.empty());
pendingTransactions.addRemoteTransaction(invalidTransaction, Optional.empty());

when(transactionProcessor.processTransaction(
eq(blockchain),
Expand Down Expand Up @@ -588,7 +589,7 @@ public void transactionWithIncorrectNonceRemainsInPoolAndNotSelected() {
final Transaction futureTransaction =
txTestFixture.nonce(5).gasLimit(1).createTransaction(keyPair);

pendingTransactions.addRemoteTransaction(futureTransaction);
pendingTransactions.addRemoteTransaction(futureTransaction, Optional.empty());

when(transactionProcessor.processTransaction(
eq(blockchain),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,23 @@ public String toString() {
return sb.append("}").toString();
}

public String toTraceLog() {
final StringBuilder sb = new StringBuilder();
sb.append(isContractCreation() ? "ContractCreation" : "MessageCall").append(", ");
sb.append(getSender()).append(", ");
sb.append(getType()).append(", ");
sb.append(getNonce()).append(", ");
getGasPrice().ifPresent(gasPrice -> sb.append(gasPrice.toBigInteger()).append(", "));
if (getMaxPriorityFeePerGas().isPresent() && getMaxFeePerGas().isPresent()) {
sb.append(getMaxPriorityFeePerGas().map(Wei::toBigInteger).get()).append(", ");
sb.append(getMaxFeePerGas().map(Wei::toBigInteger).get()).append(", ");
}
sb.append(getGasLimit()).append(", ");
sb.append(getValue().toBigInteger()).append(", ");
if (getTo().isPresent()) sb.append(getTo().get()).append(", ");
return sb.append("}").toString();
}

public Optional<Address> contractAddress() {
if (isContractCreation()) {
return Optional.of(Address.contractAddress(getSender(), getNonce()));
Expand Down
Loading