Skip to content

Commit

Permalink
Stop evaluating txs for sender after the first skipped one (hyperledg…
Browse files Browse the repository at this point in the history
…er#5498)


Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored May 31, 2023
1 parent cb3e134 commit 5fe53cf
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,24 @@ public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);

prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.peek(
highPrioPendingTx ->
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(unused -> !completed.get())
.filter(
.takeWhile(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash()))
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
candidatePendingTx.getNonce() <= highPrioPendingTx.getNonce())
!alreadyChecked.contains(candidatePendingTx.getHash())
&& candidatePendingTx.getNonce() <= highPrioPendingTx.getNonce())
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
Expand All @@ -366,12 +359,31 @@ public synchronized void selectTransactions(
if (res.stop()) {
completed.set(true);
}

if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));

invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}

private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
}

@Override
public long maxSize() {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.BLOCK_FULL;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.BLOCK_OCCUPANCY_ABOVE_THRESHOLD;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.CURRENT_TX_PRICE_BELOW_MIN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.DATA_PRICE_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.TX_TOO_LARGE_FOR_REMAINING_GAS;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -43,17 +49,19 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolReplacementHandler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.evm.account.Account;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {

Expand Down Expand Up @@ -284,22 +292,28 @@ public void notNotifyListenerAfterUnsubscribe() {
verifyNoMoreInteractions(listener);
}

@Test
public void selectTransactionsUntilSelectorRequestsNoMore() {
@ParameterizedTest
@MethodSource
public void selectTransactionsUntilSelectorRequestsNoMore(
final TransactionSelectionResult selectionResult) {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction1, Optional.empty());

final List<Transaction> parsedTransactions = new ArrayList<>();
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return BLOCK_OCCUPANCY_ABOVE_THRESHOLD;
return selectionResult;
});

assertThat(parsedTransactions.size()).isEqualTo(1);
assertThat(parsedTransactions.get(0)).isEqualTo(transaction0);
}

static Stream<TransactionSelectionResult> selectTransactionsUntilSelectorRequestsNoMore() {
return Stream.of(BLOCK_OCCUPANCY_ABOVE_THRESHOLD, BLOCK_FULL);
}

@Test
public void selectTransactionsUntilPendingIsEmpty() {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
Expand Down Expand Up @@ -356,6 +370,42 @@ public void selectTransactionsFromSameSenderInNonceOrder() {
assertThat(iterationOrder).containsExactly(transaction0, transaction1, transaction2);
}

@ParameterizedTest
@MethodSource
public void ignoreSenderTransactionsAfterASkippedOne(
final TransactionSelectionResult skipSelectionResult) {
final Transaction transaction0a = createTransaction(0, Wei.of(20), KEYS1);
final Transaction transaction1a = createTransaction(1, Wei.of(20), KEYS1);
final Transaction transaction2a = createTransaction(2, Wei.of(20), KEYS1);
final Transaction transaction0b = createTransaction(0, Wei.of(10), KEYS2);

pendingTransactions.addLocalTransaction(transaction0a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction1a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction2a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction0b, Optional.empty());

final List<Transaction> iterationOrder = new ArrayList<>(3);
pendingTransactions.selectTransactions(
transaction -> {
iterationOrder.add(transaction);
// pretending that the 2nd tx of the 1st sender is not selected
return transaction.getNonce() == 1 ? skipSelectionResult : SELECTED;
});

// the 3rd tx of the 1st must not be processed, since the 2nd is skipped
// but the 2nd sender must not be affected
assertThat(iterationOrder).containsExactly(transaction0a, transaction1a, transaction0b);
}

static Stream<TransactionSelectionResult> ignoreSenderTransactionsAfterASkippedOne() {
return Stream.of(
CURRENT_TX_PRICE_BELOW_MIN,
DATA_PRICE_BELOW_CURRENT_MIN,
TX_TOO_LARGE_FOR_REMAINING_GAS,
TransactionSelectionResult.invalidTransient(GAS_PRICE_BELOW_CURRENT_BASE_FEE.name()),
TransactionSelectionResult.invalid(UPFRONT_COST_EXCEEDS_BALANCE.name()));
}

@Test
public void notForceNonceOrderWhenSendersDiffer() {
final Account sender2 = mock(Account.class);
Expand All @@ -382,19 +432,38 @@ public void invalidTransactionIsDeletedFromPendingTransactions() {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction1, Optional.empty());

final List<Transaction> parsedTransactions = new ArrayList<>(2);
final List<Transaction> parsedTransactions = new ArrayList<>(1);
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name());
return TransactionSelectionResult.invalid(UPFRONT_COST_EXCEEDS_BALANCE.name());
});

assertThat(parsedTransactions.size()).isEqualTo(2);
assertThat(parsedTransactions.get(0)).isEqualTo(transaction0);
assertThat(parsedTransactions.get(1)).isEqualTo(transaction1);
// only the first is processed since not being selected will automatically skip the processing
// all the other txs from the same sender

assertThat(parsedTransactions).containsExactly(transaction0);
assertThat(pendingTransactions.getPendingTransactions())
.map(PendingTransaction::getTransaction)
.containsExactly(transaction1);
}

assertThat(pendingTransactions.size()).isZero();
@Test
public void temporarilyInvalidTransactionIsKeptInPendingTransactions() {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());

final List<Transaction> parsedTransactions = new ArrayList<>(1);
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return TransactionSelectionResult.invalidTransient(
GAS_PRICE_BELOW_CURRENT_BASE_FEE.name());
});

assertThat(parsedTransactions).containsExactly(transaction0);
assertThat(pendingTransactions.getPendingTransactions())
.map(PendingTransaction::getTransaction)
.containsExactly(transaction0);
}

@Test
Expand Down

0 comments on commit 5fe53cf

Please sign in to comment.