diff --git a/CHANGELOG.md b/CHANGELOG.md index cfb2c0d962f..2031a7e6dfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,14 @@ ## 22.1.3 ### Breaking Changes -- Remove the experimental flag for bonsai tries CLI options '--data-storage-format' and '--bonsai-maximum-back-layers-to-load' [#3578](https://github.com/hyperledger/besu/pull/3578) +- Remove the experimental flag for bonsai tries CLI options `--data-storage-format` and `--bonsai-maximum-back-layers-to-load` [#3578](https://github.com/hyperledger/besu/pull/3578) + +### Deprecations +- `--tx-pool-hashes-max-size` is now deprecated and has no more effect and it will be removed in a future release. ### Additions and Improvements - Tune transaction synchronization parameter to adapt to mainnet traffic [#3610](https://github.com/hyperledger/besu/pull/3610) +- Improve eth/66 support [#3616](https://github.com/hyperledger/besu/pull/3616) ## 22.1.2 diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index f70b0a0f1e8..cd92db57c3b 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -21,6 +21,7 @@ import static org.hyperledger.besu.cli.DefaultCommandValues.getDefaultBesuDataPath; import static org.hyperledger.besu.cli.config.NetworkName.MAINNET; import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG; +import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG; import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG; import static org.hyperledger.besu.config.experimental.MergeConfigOptions.isMergeEnabled; import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH; @@ -1117,10 +1118,10 @@ static class JsonRPCWebsocketOptionGroup { names = {"--tx-pool-hashes-max-size"}, paramLabel = MANDATORY_INTEGER_FORMAT_HELP, description = - "Maximum number of pending transaction hashes that will be kept in the transaction pool (default: ${DEFAULT-VALUE})", + "Deprecated, has not effect. Maximum number of pending transaction hashes that will be kept in the transaction pool", arity = "1") - private final Integer pooledTransactionHashesSize = - TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES; + @SuppressWarnings("unused") + private final Integer pooledTransactionHashesSize = null; // NOSONAR @Option( names = {"--tx-pool-retention-hours"}, @@ -1826,6 +1827,10 @@ private void issueOptionWarnings() { "--privacy-onchain-groups-enabled", "--privacy-flexible-groups-enabled"); } + + if (pooledTransactionHashesSize != null) { // NOSONAR + logger.warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size"); + } } private void configure() throws Exception { @@ -2651,7 +2656,6 @@ private TransactionPoolConfiguration buildTransactionPoolConfiguration() { return unstableTransactionPoolOptions .toDomainObject() .txPoolMaxSize(txPoolMaxSize) - .pooledTransactionHashesSize(pooledTransactionHashesSize) .pendingTxRetentionPeriod(pendingTxRetentionPeriod) .priceBump(Percentage.fromInt(priceBump)) .txFeeCap(txFeeCap) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/util/CommandLineUtils.java b/besu/src/main/java/org/hyperledger/besu/cli/util/CommandLineUtils.java index d688e4307ac..475ce752c85 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/util/CommandLineUtils.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/util/CommandLineUtils.java @@ -29,6 +29,8 @@ public class CommandLineUtils { public static final String MULTI_DEPENDENCY_WARNING_MSG = "{} ignored because none of {} was defined."; public static final String DEPRECATION_WARNING_MSG = "{} has been deprecated, use {} instead."; + public static final String DEPRECATED_AND_USELESS_WARNING_MSG = + "{} has been deprecated and is now useless, remove it."; /** * Check if options are passed that require an option to be true to have any effect and log a diff --git a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java index ecb0996f6c3..dce113a447e 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java @@ -26,6 +26,7 @@ import static org.hyperledger.besu.cli.config.NetworkName.RINKEBY; import static org.hyperledger.besu.cli.config.NetworkName.ROPSTEN; import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG; +import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG; import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG; import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ENGINE; import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ETH; @@ -3884,6 +3885,13 @@ public void onchainPrivacyGroupEnabledOptionIsDeprecated() { "--privacy-flexible-groups-enabled"); } + @Test + public void txPoolHashesMaxSizeOptionIsDeprecated() { + parseCommand("--tx-pool-hashes-max-size", "1024"); + + verify(mockLogger).warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size"); + } + @Test public void flexiblePrivacyGroupEnabledFlagValueIsSet() { parseCommand( diff --git a/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueBlockCreatorTest.java b/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueBlockCreatorTest.java index d0c85e7b629..d8572fe7add 100644 --- a/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueBlockCreatorTest.java +++ b/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueBlockCreatorTest.java @@ -133,7 +133,6 @@ public void proposerAddressCanBeExtractFromAConstructedBlock() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 5, - 5, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, @@ -169,7 +168,6 @@ public void insertsValidVoteIntoConstructedBlock() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 5, - 5, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, @@ -207,7 +205,6 @@ public void insertsNoVoteWhenAtEpoch() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 5, - 5, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, diff --git a/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueMinerExecutorTest.java b/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueMinerExecutorTest.java index ba0f88c78a0..c76a27d30b5 100644 --- a/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueMinerExecutorTest.java +++ b/consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueMinerExecutorTest.java @@ -96,7 +96,6 @@ public void extraDataCreatedOnEpochBlocksContainsValidators() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, CliqueMinerExecutorTest::mockBlockHeader, @@ -141,7 +140,6 @@ public void extraDataForNonEpochBlocksDoesNotContainValidaors() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, CliqueMinerExecutorTest::mockBlockHeader, @@ -186,7 +184,6 @@ public void shouldUseLatestVanityData() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, CliqueMinerExecutorTest::mockBlockHeader, diff --git a/consensus/ibft/src/integration-test/java/org/hyperledger/besu/consensus/ibft/support/TestContextBuilder.java b/consensus/ibft/src/integration-test/java/org/hyperledger/besu/consensus/ibft/support/TestContextBuilder.java index 462f000e0d9..c905641bd17 100644 --- a/consensus/ibft/src/integration-test/java/org/hyperledger/besu/consensus/ibft/support/TestContextBuilder.java +++ b/consensus/ibft/src/integration-test/java/org/hyperledger/besu/consensus/ibft/support/TestContextBuilder.java @@ -334,7 +334,6 @@ private static ControllerAndState createControllerAndFinalState( new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 1, clock, metricsSystem, blockChain::getChainHeadHeader, diff --git a/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/BftBlockCreatorTest.java b/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/BftBlockCreatorTest.java index a99596236ca..764d6040205 100644 --- a/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/BftBlockCreatorTest.java +++ b/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/BftBlockCreatorTest.java @@ -118,7 +118,6 @@ public BlockHeaderValidator.Builder createBlockHeaderRuleset( new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, diff --git a/consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/blockcreation/BftBlockCreatorTest.java b/consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/blockcreation/BftBlockCreatorTest.java index fdc0d14711a..f051f8f99b4 100644 --- a/consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/blockcreation/BftBlockCreatorTest.java +++ b/consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/blockcreation/BftBlockCreatorTest.java @@ -107,7 +107,6 @@ public void headerProducedPassesValidationRules() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, diff --git a/consensus/qbft/src/integration-test/java/org/hyperledger/besu/consensus/qbft/support/TestContextBuilder.java b/consensus/qbft/src/integration-test/java/org/hyperledger/besu/consensus/qbft/support/TestContextBuilder.java index cd723d8ed0f..6ddd1450ac5 100644 --- a/consensus/qbft/src/integration-test/java/org/hyperledger/besu/consensus/qbft/support/TestContextBuilder.java +++ b/consensus/qbft/src/integration-test/java/org/hyperledger/besu/consensus/qbft/support/TestContextBuilder.java @@ -439,7 +439,6 @@ private static ControllerAndState createControllerAndFinalState( new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 1, clock, metricsSystem, blockChain::getChainHeadHeader, diff --git a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java index d3fb2b094c4..b4091dcbb52 100644 --- a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java @@ -49,10 +49,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker; -import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; -import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -75,8 +73,7 @@ @RunWith(MockitoJUnitRunner.class) public class EthGetFilterChangesIntegrationTest { - @Mock private TransactionBatchAddedListener batchAddedListener; - @Mock private TransactionBatchAddedListener pendingBatchAddedListener; + @Mock private TransactionBroadcaster batchAddedListener; private MutableBlockchain blockchain; private final String ETH_METHOD = "eth_getFilterChanges"; private final String JSON_RPC_VERSION = "2.0"; @@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest { private GasPricePendingTransactionsSorter transactions; private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_HASHES = 5; private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair(); private final Transaction transaction = createTransaction(1); private FilterManager filterManager; @@ -101,16 +97,12 @@ public void setUp() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_HASHES, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, TransactionPoolConfiguration.DEFAULT_PRICE_BUMP); final ProtocolContext protocolContext = executionContext.getProtocolContext(); - PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); - PeerPendingTransactionTracker peerPendingTransactionTracker = - mock(PeerPendingTransactionTracker.class); EthContext ethContext = mock(EthContext.class); EthPeers ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); @@ -121,11 +113,8 @@ public void setUp() { executionContext.getProtocolSchedule(), protocolContext, batchAddedListener, - pendingBatchAddedListener, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, TransactionPoolConfiguration.DEFAULT); diff --git a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java index e97d3bb6312..5d6149a2938 100644 --- a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java @@ -49,10 +49,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker; -import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; -import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -75,8 +73,7 @@ @RunWith(MockitoJUnitRunner.class) public class EthGetFilterChangesIntegrationTest { - @Mock private TransactionBatchAddedListener batchAddedListener; - @Mock private TransactionBatchAddedListener pendingBatchAddedListener; + @Mock private TransactionBroadcaster batchAddedListener; private MutableBlockchain blockchain; private final String ETH_METHOD = "eth_getFilterChanges"; private final String JSON_RPC_VERSION = "2.0"; @@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest { private BaseFeePendingTransactionsSorter transactions; private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_HASHES = 5; private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair(); private final Transaction transaction = createTransaction(1); private FilterManager filterManager; @@ -101,16 +97,12 @@ public void setUp() { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_HASHES, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, TransactionPoolConfiguration.DEFAULT_PRICE_BUMP); final ProtocolContext protocolContext = executionContext.getProtocolContext(); - PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); - PeerPendingTransactionTracker peerPendingTransactionTracker = - mock(PeerPendingTransactionTracker.class); EthContext ethContext = mock(EthContext.class); EthPeers ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); @@ -121,11 +113,8 @@ public void setUp() { executionContext.getProtocolSchedule(), protocolContext, batchAddedListener, - pendingBatchAddedListener, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, TransactionPoolConfiguration.DEFAULT); diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/BlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/BlockTransactionSelectorTest.java index 460c491fd02..b64edffeb23 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/BlockTransactionSelectorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/BlockTransactionSelectorTest.java @@ -84,7 +84,6 @@ public class BlockTransactionSelectorTest { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 5, - 5, TestClock.fixed(), metricsSystem, BlockTransactionSelectorTest::mockBlockHeader, @@ -337,7 +336,6 @@ public void useSingleGasSpaceForAllTransactions() { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 5, - 5, TestClock.fixed(), metricsSystem, () -> { diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWBlockCreatorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWBlockCreatorTest.java index c6575ad28a8..b44eb0e5cc4 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWBlockCreatorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWBlockCreatorTest.java @@ -96,7 +96,6 @@ public void createMainnetBlock1() throws IOException { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader, @@ -158,7 +157,6 @@ public void createMainnetBlock1_fixedDifficulty1() { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader, @@ -215,7 +213,6 @@ public void rewardBeneficiary_zeroReward_skipZeroRewardsFalse() { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader, @@ -288,7 +285,6 @@ public void rewardBeneficiary_zeroReward_skipZeroRewardsTrue() { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader, diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWMinerExecutorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWMinerExecutorTest.java index 0c11a05e547..d922a788bd6 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWMinerExecutorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWMinerExecutorTest.java @@ -45,7 +45,6 @@ public void startingMiningWithoutCoinbaseThrowsException() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, PoWMinerExecutorTest::mockBlockHeader, @@ -75,7 +74,6 @@ public void settingCoinbaseToNullThrowsException() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, 1, - 5, TestClock.fixed(), metricsSystem, PoWMinerExecutorTest::mockBlockHeader, diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Transaction.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Transaction.java index 10c9229e482..af566bce735 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Transaction.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Transaction.java @@ -38,9 +38,11 @@ import java.math.BigInteger; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; @@ -680,6 +682,17 @@ public boolean isGoQuorumPrivateTransaction(final boolean goQuorumCompatibilityM return GoQuorumPrivateTransactionDetector.isGoQuorumPrivateTransactionV(v.get()); } + /** + * Return the list of transaction hashes extracted from the collection of Transaction passed as + * argument + * + * @param transactions a collection of transactions + * @return the list of transaction hashes + */ + public static List toHashList(final Collection transactions) { + return transactions.stream().map(Transaction::getHash).collect(Collectors.toUnmodifiableList()); + } + private static Bytes32 computeSenderRecoveryHash( final TransactionType transactionType, final long nonce, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index bbf4670fdd2..6fed1ff423f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -515,6 +515,11 @@ public Bytes nodeId() { return connection.getPeerInfo().getNodeId(); } + public boolean hasSupportForMessage(final int messageCode) { + return getAgreedCapabilities().stream() + .anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode)); + } + @Override public String toString() { return String.format("Peer %s...", nodeId().toString().substring(0, 20)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java index 31329ed0404..74e4c7b3aca 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java @@ -18,7 +18,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor; +import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor; import java.util.ArrayList; import java.util.List; @@ -33,11 +33,11 @@ public class BufferedGetPooledTransactionsFromPeerFetcher { private static final int MAX_HASHES = 256; private final EthPeer peer; - private final PendingTransactionsMessageProcessor processor; + private final NewPooledTransactionHashesMessageProcessor processor; private final Queue txAnnounces; public BufferedGetPooledTransactionsFromPeerFetcher( - final EthPeer peer, final PendingTransactionsMessageProcessor processor) { + final EthPeer peer, final NewPooledTransactionHashesMessageProcessor processor) { this.peer = peer; this.processor = processor; this.txAnnounces = Queues.synchronizedQueue(EvictingQueue.create(MAX_PENDING_TRANSACTIONS)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageHandler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java similarity index 85% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageHandler.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java index db6270992c3..58b9a506d06 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java @@ -24,15 +24,15 @@ import java.time.Duration; import java.time.Instant; -class PendingTransactionsMessageHandler implements EthMessages.MessageCallback { +class NewPooledTransactionHashesMessageHandler implements EthMessages.MessageCallback { - private final PendingTransactionsMessageProcessor transactionsMessageProcessor; + private final NewPooledTransactionHashesMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; private final Duration txMsgKeepAlive; - public PendingTransactionsMessageHandler( + public NewPooledTransactionHashesMessageHandler( final EthScheduler scheduler, - final PendingTransactionsMessageProcessor transactionsMessageProcessor, + final NewPooledTransactionHashesMessageProcessor transactionsMessageProcessor, final int txMsgKeepAliveSeconds) { this.scheduler = scheduler; this.transactionsMessageProcessor = transactionsMessageProcessor; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java similarity index 90% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessor.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java index e9b904f70c6..a6d09728894 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java @@ -37,18 +37,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PendingTransactionsMessageProcessor { +public class NewPooledTransactionHashesMessageProcessor { private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000; private static final long SYNC_TOLERANCE = 100L; private static final Logger LOG = - LoggerFactory.getLogger(PendingTransactionsMessageProcessor.class); + LoggerFactory.getLogger(NewPooledTransactionHashesMessageProcessor.class); private final ConcurrentHashMap scheduledTasks; - private final PeerPendingTransactionTracker transactionTracker; + private final PeerTransactionTracker transactionTracker; private final Counter totalSkippedTransactionsMessageCounter; private final TransactionPool transactionPool; private final TransactionPoolConfiguration transactionPoolConfiguration; @@ -56,8 +56,8 @@ public class PendingTransactionsMessageProcessor { private final MetricsSystem metricsSystem; private final SyncState syncState; - public PendingTransactionsMessageProcessor( - final PeerPendingTransactionTracker transactionTracker, + public NewPooledTransactionHashesMessageProcessor( + final PeerTransactionTracker transactionTracker, final TransactionPool transactionPool, final TransactionPoolConfiguration transactionPoolConfiguration, final Counter metricsCounter, @@ -86,7 +86,7 @@ void processNewPooledTransactionHashesMessage( final NewPooledTransactionHashesMessage transactionsMessage, final Instant startedAt, final Duration keepAlive) { - // Check if message not expired. + // Check if message is not expired. if (startedAt.plus(keepAlive).isAfter(now())) { this.processNewPooledTransactionHashesMessage(peer, transactionsMessage); } else { @@ -99,6 +99,7 @@ private void processNewPooledTransactionHashesMessage( final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) { try { final List incomingTransactionHashes = transactionsMessage.pendingTransactions(); + transactionTracker.markTransactionHashesAsSeen(peer, incomingTransactionHashes); traceLambda( LOG, @@ -107,8 +108,6 @@ private void processNewPooledTransactionHashesMessage( incomingTransactionHashes::size, incomingTransactionHashes::toString); - transactionTracker.markTransactionsHashesAsSeen( - peer, transactionsMessage.pendingTransactions()); if (syncState.isInSync(SYNC_TOLERANCE)) { final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask = scheduledTasks.computeIfAbsent( @@ -122,9 +121,8 @@ private void processNewPooledTransactionHashesMessage( return new BufferedGetPooledTransactionsFromPeerFetcher(peer, this); }); - for (final Hash hash : transactionsMessage.pendingTransactions()) { - if (transactionPool.getTransactionByHash(hash).isEmpty() - && transactionPool.addTransactionHash(hash)) { + for (final Hash hash : incomingTransactionHashes) { + if (transactionPool.getTransactionByHash(hash).isEmpty()) { bufferedTask.addHash(hash); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSender.java similarity index 63% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSender.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSender.java index 6e540d64a7d..016d0a1eec6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSender.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSender.java @@ -14,50 +14,47 @@ */ package org.hyperledger.besu.ethereum.eth.transactions; +import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import java.util.List; -import java.util.stream.StreamSupport; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class PendingTransactionsMessageSender { - private static final Logger LOG = LoggerFactory.getLogger(PendingTransactionsMessageSender.class); +class NewPooledTransactionHashesMessageSender { + private static final Logger LOG = + LoggerFactory.getLogger(NewPooledTransactionHashesMessageSender.class); + private static final int MAX_TRANSACTIONS_HASHES = 4096; - private final PeerPendingTransactionTracker transactionTracker; + private final PeerTransactionTracker transactionTracker; - public PendingTransactionsMessageSender(final PeerPendingTransactionTracker transactionTracker) { + public NewPooledTransactionHashesMessageSender(final PeerTransactionTracker transactionTracker) { this.transactionTracker = transactionTracker; } - public void sendTransactionsToPeers() { - StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true) - .parallel() - .forEach(this::sendTransactionsToPeer); - } - - private void sendTransactionsToPeer(final EthPeer peer) { - for (final List hashes : + public void sendTransactionHashesToPeer(final EthPeer peer) { + for (final List txBatch : Iterables.partition( - transactionTracker.claimTransactionsToSendToPeer(peer), - TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES)) { + transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) { try { + List txHashes = toHashList(txBatch); traceLambda( LOG, "Sending transaction hashes to peer {}, transaction hashes count {}, list {}", peer::toString, - hashes::size, - hashes::toString); + txHashes::size, + txHashes::toString); - peer.send(NewPooledTransactionHashesMessage.create(hashes)); - } catch (final PeerNotConnected __) { + peer.send(NewPooledTransactionHashesMessage.create(txHashes)); + } catch (final PeerNotConnected unused) { break; } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTracker.java deleted file mode 100644 index b05ab410a56..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTracker.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.transactions; - -import static java.util.Collections.emptySet; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -public class PeerPendingTransactionTracker implements EthPeer.DisconnectCallback { - private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000; - private final Map> seenTransactions = new ConcurrentHashMap<>(); - private final Map> transactionsToSend = new ConcurrentHashMap<>(); - private final AbstractPendingTransactionsSorter pendingTransactions; - - public PeerPendingTransactionTracker( - final AbstractPendingTransactionsSorter pendingTransactions) { - this.pendingTransactions = pendingTransactions; - } - - public synchronized void markTransactionsHashesAsSeen( - final EthPeer peer, final Collection transactions) { - final Set seenTransactionsForPeer = getOrCreateSeenTransactionsForPeer(peer); - transactions.stream().forEach(seenTransactionsForPeer::add); - } - - public synchronized void addToPeerSendQueue(final EthPeer peer, final Hash hash) { - if (!hasPeerSeenTransaction(peer, hash)) { - transactionsToSend.computeIfAbsent(peer, key -> createTransactionsSet()).add(hash); - } - } - - public Iterable getEthPeersWithUnsentTransactions() { - return transactionsToSend.keySet(); - } - - public synchronized Set claimTransactionsToSendToPeer(final EthPeer peer) { - final Set transactionsToSend = this.transactionsToSend.remove(peer); - if (transactionsToSend != null) { - markTransactionsHashesAsSeen( - peer, - transactionsToSend.stream() - .filter(h -> pendingTransactions.getTransactionByHash(h).isPresent()) - .collect(Collectors.toSet())); - return transactionsToSend; - } else { - return emptySet(); - } - } - - public boolean isPeerSupported(final EthPeer peer, final Capability capability) { - return peer.getAgreedCapabilities().contains(capability); - } - - private Set getOrCreateSeenTransactionsForPeer(final EthPeer peer) { - return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet()); - } - - private boolean hasPeerSeenTransaction(final EthPeer peer, final Hash hash) { - final Set seenTransactionsForPeer = seenTransactions.get(peer); - return seenTransactionsForPeer != null && seenTransactionsForPeer.contains(hash); - } - - private Set createTransactionsSet() { - return Collections.newSetFromMap( - new LinkedHashMap(1 << 4, 0.75f, true) { - @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { - return size() > MAX_TRACKED_SEEN_TRANSACTIONS; - } - }); - } - - @Override - public void onDisconnect(final EthPeer peer) { - seenTransactions.remove(peer); - transactionsToSend.remove(peer); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java index ca8e4d3b93c..c4d10754936 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static java.util.Collections.emptySet; +import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Transaction; @@ -34,8 +35,13 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback { public synchronized void markTransactionsAsSeen( final EthPeer peer, final Collection transactions) { + markTransactionHashesAsSeen(peer, toHashList(transactions)); + } + + public synchronized void markTransactionHashesAsSeen( + final EthPeer peer, final Collection txHashes) { final Set seenTransactionsForPeer = getOrCreateSeenTransactionsForPeer(peer); - transactions.stream().map(Transaction::getHash).forEach(seenTransactionsForPeer::add); + seenTransactionsForPeer.addAll(txHashes); } public synchronized void addToPeerSendQueue(final EthPeer peer, final Transaction transaction) { @@ -62,10 +68,13 @@ private Set getOrCreateSeenTransactionsForPeer(final EthPeer peer) { return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet()); } - private boolean hasPeerSeenTransaction(final EthPeer peer, final Transaction transaction) { + boolean hasPeerSeenTransaction(final EthPeer peer, final Transaction transaction) { + return hasPeerSeenTransaction(peer, transaction.getHash()); + } + + boolean hasPeerSeenTransaction(final EthPeer peer, final Hash txHash) { final Set seenTransactionsForPeer = seenTransactions.get(peer); - return seenTransactionsForPeer != null - && seenTransactionsForPeer.contains(transaction.getHash()); + return seenTransactionsForPeer != null && seenTransactionsForPeer.contains(txHash); } private Set createTransactionsSet() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionSender.java deleted file mode 100644 index 46c7d8ad1b5..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionSender.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.transactions; - -import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.eth.EthProtocol; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; - -class PendingTransactionSender implements TransactionBatchAddedListener { - - private final PeerPendingTransactionTracker transactionTracker; - private final PendingTransactionsMessageSender transactionsMessageSender; - private final EthContext ethContext; - - public PendingTransactionSender( - final PeerPendingTransactionTracker transactionTracker, - final PendingTransactionsMessageSender transactionsMessageSender, - final EthContext ethContext) { - this.transactionTracker = transactionTracker; - this.transactionsMessageSender = transactionsMessageSender; - this.ethContext = ethContext; - } - - @Override - public void onTransactionsAdded(final Iterable transactions) { - ethContext - .getEthPeers() - .streamAvailablePeers() - .filter(peer -> transactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) - .forEach( - peer -> - transactions.forEach( - transaction -> - transactionTracker.addToPeerSendQueue(peer, transaction.getHash()))); - ethContext - .getScheduler() - .scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcaster.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcaster.java new file mode 100644 index 00000000000..f233167dda8 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcaster.java @@ -0,0 +1,157 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions; + +import static org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo.toTransactionList; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; + +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.messages.EthPV65; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TransactionBroadcaster implements TransactionBatchAddedListener { + private static final Logger LOG = LoggerFactory.getLogger(TransactionBroadcaster.class); + + private final AbstractPendingTransactionsSorter pendingTransactions; + private final PeerTransactionTracker transactionTracker; + private final TransactionsMessageSender transactionsMessageSender; + private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; + private final EthContext ethContext; + private final int numPeersToSendFullTransactions; + + public TransactionBroadcaster( + final EthContext ethContext, + final AbstractPendingTransactionsSorter pendingTransactions, + final PeerTransactionTracker transactionTracker, + final TransactionsMessageSender transactionsMessageSender, + final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) { + this.pendingTransactions = pendingTransactions; + this.transactionTracker = transactionTracker; + this.transactionsMessageSender = transactionsMessageSender; + this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender; + this.ethContext = ethContext; + this.numPeersToSendFullTransactions = + (int) Math.ceil(Math.sqrt(ethContext.getEthPeers().getMaxPeers())); + } + + public void relayTransactionPoolTo(final EthPeer peer) { + Set pendingTransactionInfo = pendingTransactions.getTransactionInfo(); + if (!pendingTransactionInfo.isEmpty()) { + if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) { + sendTransactionHashes(toTransactionList(pendingTransactionInfo), List.of(peer)); + } else { + sendFullTransactions(toTransactionList(pendingTransactionInfo), List.of(peer)); + } + } + } + + @Override + public void onTransactionsAdded(final Iterable transactions) { + final int currPeerCount = ethContext.getEthPeers().peerCount(); + if (currPeerCount == 0) { + return; + } + + List peersWithOnlyTransactionSupport = new ArrayList<>(currPeerCount); + List peersWithTransactionHashesSupport = new ArrayList<>(currPeerCount); + + ethContext + .getEthPeers() + .streamAvailablePeers() + .forEach( + peer -> { + if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) { + peersWithTransactionHashesSupport.add(peer); + } else { + peersWithOnlyTransactionSupport.add(peer); + } + }); + + if (peersWithOnlyTransactionSupport.size() < numPeersToSendFullTransactions) { + final int delta = + Math.min( + numPeersToSendFullTransactions - peersWithOnlyTransactionSupport.size(), + peersWithTransactionHashesSupport.size()); + + Collections.shuffle(peersWithTransactionHashesSupport); + + // move peers from the other list to reach the required size for full transaction peers + movePeersBetweenLists( + peersWithTransactionHashesSupport, peersWithOnlyTransactionSupport, delta); + } + + traceLambda( + LOG, + "Sending full transactions to {} peers and transaction hashes to {} peers." + + " Peers w/o eth/66 {}, peers with eth/66 {}", + peersWithOnlyTransactionSupport::size, + peersWithTransactionHashesSupport::size, + peersWithOnlyTransactionSupport::toString, + peersWithTransactionHashesSupport::toString); + + sendFullTransactions(transactions, peersWithOnlyTransactionSupport); + + sendTransactionHashes(transactions, peersWithTransactionHashesSupport); + } + + private void sendFullTransactions( + final Iterable transactions, final List fullTransactionPeers) { + fullTransactionPeers.forEach( + peer -> { + transactions.forEach( + transaction -> transactionTracker.addToPeerSendQueue(peer, transaction)); + ethContext + .getScheduler() + .scheduleSyncWorkerTask(() -> transactionsMessageSender.sendTransactionsToPeer(peer)); + }); + } + + private void sendTransactionHashes( + final Iterable transactions, final List transactionHashPeers) { + transactionHashPeers.stream() + .forEach( + peer -> { + transactions.forEach( + transaction -> transactionTracker.addToPeerSendQueue(peer, transaction)); + ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> + newPooledTransactionHashesMessageSender.sendTransactionHashesToPeer( + peer)); + }); + } + + private void movePeersBetweenLists( + final List sourceList, final List destinationList, final int num) { + + final int stopIndex = sourceList.size() - num; + for (int i = sourceList.size() - 1; i >= stopIndex; i--) { + destinationList.add(sourceList.remove(i)); + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 40491e1ef8c..7617527a3e7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -28,7 +28,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -72,36 +71,27 @@ public class TransactionPool implements BlockAddedObserver { private final AbstractPendingTransactionsSorter pendingTransactions; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; - private final TransactionBatchAddedListener transactionBatchAddedListener; - private final TransactionBatchAddedListener pendingTransactionBatchAddedListener; + private final TransactionBroadcaster transactionBroadcaster; private final SyncState syncState; private final MiningParameters miningParameters; private final LabelledMetric duplicateTransactionCounter; - private final PeerTransactionTracker peerTransactionTracker; - private final PeerPendingTransactionTracker peerPendingTransactionTracker; private final TransactionPoolConfiguration configuration; public TransactionPool( final AbstractPendingTransactionsSorter pendingTransactions, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, - final TransactionBatchAddedListener transactionBatchAddedListener, - final TransactionBatchAddedListener pendingTransactionBatchAddedListener, + final TransactionBroadcaster transactionBroadcaster, final SyncState syncState, final EthContext ethContext, - final PeerTransactionTracker peerTransactionTracker, - final PeerPendingTransactionTracker peerPendingTransactionTracker, final MiningParameters miningParameters, final MetricsSystem metricsSystem, final TransactionPoolConfiguration configuration) { this.pendingTransactions = pendingTransactions; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; - this.transactionBatchAddedListener = transactionBatchAddedListener; - this.pendingTransactionBatchAddedListener = pendingTransactionBatchAddedListener; + this.transactionBroadcaster = transactionBroadcaster; this.syncState = syncState; - this.peerTransactionTracker = peerTransactionTracker; - this.peerPendingTransactionTracker = peerPendingTransactionTracker; this.miningParameters = miningParameters; this.configuration = configuration; @@ -116,19 +106,7 @@ public TransactionPool( } void handleConnect(final EthPeer peer) { - pendingTransactions - .getLocalTransactions() - .forEach(transaction -> peerTransactionTracker.addToPeerSendQueue(peer, transaction)); - - if (peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) { - pendingTransactions - .getNewPooledHashes() - .forEach(hash -> peerPendingTransactionTracker.addToPeerSendQueue(peer, hash)); - } - } - - public boolean addTransactionHash(final Hash transactionHash) { - return pendingTransactions.addTransactionHash(transactionHash); + transactionBroadcaster.relayTransactionPoolTo(peer); } public ValidationResult addLocalTransaction( @@ -147,8 +125,7 @@ && minTransactionGasPrice(transaction).compareTo(configuration.getTxFeeCap()) > return ValidationResult.invalid(transactionAddedStatus.getInvalidReason().orElseThrow()); } final Collection txs = singletonList(transaction); - transactionBatchAddedListener.onTransactionsAdded(txs); - pendingTransactionBatchAddedListener.onTransactionsAdded(txs); + transactionBroadcaster.onTransactionsAdded(txs); } return validationResult; @@ -167,9 +144,8 @@ public void addRemoteTransactions(final Collection transactions) { if (!syncState.isInSync(SYNC_TOLERANCE)) { return; } - final Set addedTransactions = new HashSet<>(); + final Set addedTransactions = new HashSet<>(transactions.size()); for (final Transaction transaction : transactions) { - pendingTransactions.tryEvictTransactionHash(transaction.getHash()); if (pendingTransactions.containsTransaction(transaction.getHash())) { // We already have this transaction, don't even validate it. duplicateTransactionCounter.labels(REMOTE).inc(); @@ -196,7 +172,7 @@ public void addRemoteTransactions(final Collection transactions) { } } if (!addedTransactions.isEmpty()) { - transactionBatchAddedListener.onTransactionsAdded(addedTransactions); + transactionBroadcaster.onTransactionsAdded(addedTransactions); } } @@ -328,11 +304,6 @@ private BlockHeader getChainHeadBlockHeader() { return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get(); } - public interface TransactionBatchAddedListener { - - void onTransactionsAdded(Iterable transactions); - } - private Wei minTransactionGasPrice(final Transaction transaction) { final BlockHeader chainHeadBlockHeader = getChainHeadBlockHeader(); return protocolSchedule @@ -340,4 +311,9 @@ private Wei minTransactionGasPrice(final Transaction transaction) { .getFeeMarket() .minTransactionPriceInNextBlock(transaction, chainHeadBlockHeader::getBaseFee); } + + public interface TransactionBatchAddedListener { + + void onTransactionsAdded(Iterable transactions); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java index 784e54e1ebd..d4e91d45c9d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java @@ -40,11 +40,6 @@ default int getTxPoolMaxSize() { return MAX_PENDING_TRANSACTIONS; } - @Value.Default - default int getPooledTransactionHashesSize() { - return MAX_PENDING_TRANSACTIONS_HASHES; - } - @Value.Default default int getPendingTxRetentionPeriod() { return DEFAULT_TX_RETENTION_HOURS; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 7840759539a..962e3c927fc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -51,10 +51,8 @@ public static TransactionPool createTransactionPool( final TransactionsMessageSender transactionsMessageSender = new TransactionsMessageSender(transactionTracker); - final PeerPendingTransactionTracker pendingTransactionTracker = - new PeerPendingTransactionTracker(pendingTransactions); - final PendingTransactionsMessageSender pendingTransactionsMessageSender = - new PendingTransactionsMessageSender(pendingTransactionTracker); + final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender = + new NewPooledTransactionHashesMessageSender(transactionTracker); return createTransactionPool( protocolSchedule, @@ -67,8 +65,7 @@ public static TransactionPool createTransactionPool( pendingTransactions, transactionTracker, transactionsMessageSender, - pendingTransactionTracker, - pendingTransactionsMessageSender); + newPooledTransactionHashesMessageSender); } static TransactionPool createTransactionPool( @@ -82,20 +79,20 @@ static TransactionPool createTransactionPool( final AbstractPendingTransactionsSorter pendingTransactions, final PeerTransactionTracker transactionTracker, final TransactionsMessageSender transactionsMessageSender, - final PeerPendingTransactionTracker pendingTransactionTracker, - final PendingTransactionsMessageSender pendingTransactionsMessageSender) { + final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) { final TransactionPool transactionPool = new TransactionPool( pendingTransactions, protocolSchedule, protocolContext, - new TransactionSender(transactionTracker, transactionsMessageSender, ethContext), - new PendingTransactionSender( - pendingTransactionTracker, pendingTransactionsMessageSender, ethContext), + new TransactionBroadcaster( + ethContext, + pendingTransactions, + transactionTracker, + transactionsMessageSender, + newPooledTransactionHashesMessageSender), syncState, ethContext, - transactionTracker, - pendingTransactionTracker, miningParameters, metricsSystem, transactionPoolConfiguration); @@ -111,11 +108,11 @@ static TransactionPool createTransactionPool( "Total number of transactions messages skipped by the processor.")), transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler); - final PendingTransactionsMessageHandler pooledTransactionsMessageHandler = - new PendingTransactionsMessageHandler( + final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler = + new NewPooledTransactionHashesMessageHandler( ethContext.getScheduler(), - new PendingTransactionsMessageProcessor( - pendingTransactionTracker, + new NewPooledTransactionHashesMessageProcessor( + transactionTracker, transactionPool, transactionPoolConfiguration, metricsSystem.createCounter( @@ -129,7 +126,6 @@ static TransactionPool createTransactionPool( ethContext .getEthMessages() .subscribe(EthPV65.NEW_POOLED_TRANSACTION_HASHES, pooledTransactionsMessageHandler); - ethContext.getEthPeers().subscribeDisconnect(pendingTransactionTracker); protocolContext.getBlockchain().observeBlockAdded(transactionPool); ethContext.getEthPeers().subscribeDisconnect(transactionTracker); @@ -152,7 +148,6 @@ private static AbstractPendingTransactionsSorter createPendingTransactionsSorter return new BaseFeePendingTransactionsSorter( transactionPoolConfiguration.getPendingTxRetentionPeriod(), transactionPoolConfiguration.getTxPoolMaxSize(), - transactionPoolConfiguration.getPooledTransactionHashesSize(), clock, metricsSystem, protocolContext.getBlockchain()::getChainHeadHeader, @@ -161,7 +156,6 @@ private static AbstractPendingTransactionsSorter createPendingTransactionsSorter return new GasPricePendingTransactionsSorter( transactionPoolConfiguration.getPendingTxRetentionPeriod(), transactionPoolConfiguration.getTxPoolMaxSize(), - transactionPoolConfiguration.getPooledTransactionHashesSize(), clock, metricsSystem, protocolContext.getBlockchain()::getChainHeadHeader, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionSender.java deleted file mode 100644 index 84be2ef231e..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionSender.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.transactions; - -import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; - -class TransactionSender implements TransactionBatchAddedListener { - - private final PeerTransactionTracker transactionTracker; - private final TransactionsMessageSender transactionsMessageSender; - private final EthContext ethContext; - - public TransactionSender( - final PeerTransactionTracker transactionTracker, - final TransactionsMessageSender transactionsMessageSender, - final EthContext ethContext) { - this.transactionTracker = transactionTracker; - this.transactionsMessageSender = transactionsMessageSender; - this.ethContext = ethContext; - } - - @Override - public void onTransactionsAdded(final Iterable transactions) { - ethContext - .getEthPeers() - .streamAvailablePeers() - .forEach( - peer -> - transactions.forEach( - transaction -> transactionTracker.addToPeerSendQueue(peer, transaction))); - ethContext - .getScheduler() - .scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java index 416213493fd..451bdb44fb0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java @@ -15,9 +15,9 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static java.time.Instant.now; +import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage; @@ -28,12 +28,8 @@ import java.time.Duration; import java.time.Instant; -import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,18 +73,17 @@ void processTransactionsMessage( private void processTransactionsMessage( final EthPeer peer, final TransactionsMessage transactionsMessage) { try { - final List readTransactions = transactionsMessage.transactions(); + final List incomingTransactions = transactionsMessage.transactions(); + transactionTracker.markTransactionsAsSeen(peer, incomingTransactions); traceLambda( LOG, "Received transactions message from {}, incoming transactions {}, incoming list {}", peer::toString, - readTransactions::size, - () -> toHashList(readTransactions)); + incomingTransactions::size, + () -> toHashList(incomingTransactions)); - final Set transactions = Sets.newHashSet(readTransactions); - transactionTracker.markTransactionsAsSeen(peer, transactions); - transactionPool.addRemoteTransactions(transactions); + transactionPool.addRemoteTransactions(incomingTransactions); } catch (final RLPException ex) { if (peer != null) { LOG.debug("Malformed transaction message received, disconnecting: {}", peer, ex); @@ -96,8 +91,4 @@ private void processTransactionsMessage( } } } - - private List toHashList(final Collection txs) { - return txs.stream().map(Transaction::getHash).collect(Collectors.toList()); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java index 0c62f53b650..6497c394e2a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java @@ -14,18 +14,15 @@ */ package org.hyperledger.besu.ethereum.eth.transactions; +import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.messages.LimitedTransactionsMessages; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; -import java.util.Collection; -import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.slf4j.Logger; @@ -46,7 +43,7 @@ public void sendTransactionsToPeers() { .forEach(this::sendTransactionsToPeer); } - private void sendTransactionsToPeer(final EthPeer peer) { + void sendTransactionsToPeer(final EthPeer peer) { final Set allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer); while (!allTxToSend.isEmpty()) { final LimitedTransactionsMessages limitedTransactionsMessages = @@ -69,8 +66,4 @@ private void sendTransactionsToPeer(final EthPeer peer) { } } } - - private List toHashList(final Collection txs) { - return txs.stream().map(Transaction::getHash).collect(Collectors.toList()); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java index 98443d20987..bacf1e6d2aa 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java @@ -40,6 +40,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -54,7 +55,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.EvictingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +71,6 @@ public abstract class AbstractPendingTransactionsSorter { protected final int maxTransactionRetentionHours; protected final Clock clock; - protected final EvictingQueue newPooledHashes; protected final Object lock = new Object(); protected final Map pendingTransactions = new ConcurrentHashMap<>(); @@ -96,7 +95,6 @@ public abstract class AbstractPendingTransactionsSorter { public AbstractPendingTransactionsSorter( final int maxTransactionRetentionHours, final int maxPendingTransactions, - final int maxPooledTransactionHashes, final Clock clock, final MetricsSystem metricsSystem, final Supplier chainHeadHeaderSupplier, @@ -104,7 +102,6 @@ public AbstractPendingTransactionsSorter( this.maxTransactionRetentionHours = maxTransactionRetentionHours; this.maxPendingTransactions = maxPendingTransactions; this.clock = clock; - this.newPooledHashes = EvictingQueue.create(maxPooledTransactionHashes); this.chainHeadHeaderSupplier = chainHeadHeaderSupplier; this.transactionReplacementHandler = new TransactionPoolReplacementHandler(priceBump); final LabelledMetric transactionAddedCounter = @@ -163,17 +160,6 @@ public boolean addRemoteTransaction(final Transaction transaction) { return added; } - public boolean addTransactionHash(final Hash transactionHash) { - final boolean hashAdded; - synchronized (newPooledHashes) { - hashAdded = newPooledHashes.add(transactionHash); - } - if (hashAdded) { - localTransactionHashesAddedCounter.inc(); - } - return hashAdded; - } - @VisibleForTesting public TransactionAddedStatus addLocalTransaction(final Transaction transaction) { final TransactionAddedStatus transactionAdded = @@ -338,18 +324,6 @@ public OptionalLong getNextNonceForSender(final Address sender) { : transactionsForSenderInfo.maybeNextNonce(); } - public void tryEvictTransactionHash(final Hash hash) { - synchronized (newPooledHashes) { - newPooledHashes.remove(hash); - } - } - - public List getNewPooledHashes() { - synchronized (newPooledHashes) { - return List.copyOf(newPooledHashes); - } - } - public abstract void manageBlockAdded(final Block block); protected abstract void doRemoveTransaction( @@ -412,6 +386,13 @@ public Hash getHash() { public Instant getAddedToPoolAt() { return addedToPoolAt; } + + public static List toTransactionList( + final Collection transactionsInfo) { + return transactionsInfo.stream() + .map(TransactionInfo::getTransaction) + .collect(Collectors.toUnmodifiableList()); + } } public enum TransactionSelectionResult { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java index 55256cb176e..5d8d2590cd7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java @@ -54,7 +54,6 @@ public class BaseFeePendingTransactionsSorter extends AbstractPendingTransaction public BaseFeePendingTransactionsSorter( final int maxTransactionRetentionHours, final int maxPendingTransactions, - final int maxPooledTransactionHashes, final Clock clock, final MetricsSystem metricsSystem, final Supplier chainHeadHeaderSupplier, @@ -62,7 +61,6 @@ public BaseFeePendingTransactionsSorter( super( maxTransactionRetentionHours, maxPendingTransactions, - maxPooledTransactionHashes, clock, metricsSystem, chainHeadHeaderSupplier, @@ -211,7 +209,6 @@ protected TransactionAddedStatus addTransaction(final TransactionInfo transactio } LOG.trace("Adding {} to pending transactions", transactionInfo); pendingTransactions.put(transactionInfo.getHash(), transactionInfo); - tryEvictTransactionHash(transactionInfo.getHash()); if (pendingTransactions.size() > maxPendingTransactions) { final Stream.Builder removalCandidates = Stream.builder(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java index 9b75827a71b..db69811446a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java @@ -47,7 +47,6 @@ public class GasPricePendingTransactionsSorter extends AbstractPendingTransactio public GasPricePendingTransactionsSorter( final int maxTransactionRetentionHours, final int maxPendingTransactions, - final int maxPooledTransactionHashes, final Clock clock, final MetricsSystem metricsSystem, final Supplier chainHeadHeaderSupplier, @@ -55,7 +54,6 @@ public GasPricePendingTransactionsSorter( super( maxTransactionRetentionHours, maxPendingTransactions, - maxPooledTransactionHashes, clock, metricsSystem, chainHeadHeaderSupplier, @@ -101,7 +99,6 @@ protected TransactionAddedStatus addTransaction(final TransactionInfo transactio } prioritizedTransactions.add(transactionInfo); pendingTransactions.put(transactionInfo.getHash(), transactionInfo); - tryEvictTransactionHash(transactionInfo.getHash()); if (pendingTransactions.size() > maxPendingTransactions) { final TransactionInfo toRemove = prioritizedTransactions.last(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java index 7171f0e72cb..e353f0f58ba 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java @@ -29,7 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; -import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor; +import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -51,7 +51,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest { @Mock EthPeer ethPeer; - @Mock PendingTransactionsMessageProcessor processor; + @Mock NewPooledTransactionHashesMessageProcessor processor; @Mock TransactionPool transactionPool; @Mock EthContext ethContext; @Mock EthScheduler ethScheduler; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BaseFeePendingTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BaseFeePendingTransactionsTest.java index b117f54f207..55ba5fd97e7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BaseFeePendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BaseFeePendingTransactionsTest.java @@ -53,7 +53,6 @@ public class BaseFeePendingTransactionsTest { private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_TRANSACTION_HASHES = 5; private static final Supplier SIGNATURE_ALGORITHM = Suppliers.memoize(SignatureAlgorithmFactory::getInstance); private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair(); @@ -70,7 +69,6 @@ public class BaseFeePendingTransactionsTest { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, TestClock.fixed(), metricsSystem, BaseFeePendingTransactionsTest::mockBlockHeader, @@ -594,7 +592,6 @@ public void shouldEvictMultipleOldTransactions() { new BaseFeePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, BaseFeePendingTransactionsTest::mockBlockHeader, @@ -618,7 +615,6 @@ public void shouldEvictSingleOldTransaction() { new BaseFeePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, BaseFeePendingTransactionsTest::mockBlockHeader, @@ -638,7 +634,6 @@ public void shouldEvictExclusivelyOldTransactions() { new BaseFeePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, BaseFeePendingTransactionsTest::mockBlockHeader, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/GasPricePendingTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/GasPricePendingTransactionsTest.java index a43b925a04e..c4d9ab73d98 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/GasPricePendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/GasPricePendingTransactionsTest.java @@ -53,7 +53,6 @@ public class GasPricePendingTransactionsTest { private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_TRANSACTION_HASHES = 5; private static final Supplier SIGNATURE_ALGORITHM = Suppliers.memoize(SignatureAlgorithmFactory::getInstance); private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair(); @@ -70,7 +69,6 @@ public class GasPricePendingTransactionsTest { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, TestClock.fixed(), metricsSystem, GasPricePendingTransactionsTest::mockBlockHeader, @@ -617,7 +615,6 @@ public void shouldEvictMultipleOldTransactions() { new GasPricePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, () -> null, @@ -641,7 +638,6 @@ public void shouldEvictSingleOldTransaction() { new GasPricePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, () -> null, @@ -661,7 +657,6 @@ public void shouldEvictExclusivelyOldTransactions() { new GasPricePendingTransactionsSorter( maxTransactionRetentionHours, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, clock, metricsSystem, () -> null, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java similarity index 89% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessorTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java index afaf6755ba2..3c2ae1d4d2d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java @@ -35,7 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor.FetcherCreatorTask; +import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; @@ -51,11 +51,11 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class PendingTransactionsMessageProcessorTest { +public class NewPooledTransactionHashesMessageProcessorTest { @Mock private TransactionPool transactionPool; @Mock private TransactionPoolConfiguration transactionPoolConfiguration; - @Mock private PeerPendingTransactionTracker transactionTracker; + @Mock private PeerTransactionTracker transactionTracker; @Mock private Counter totalSkippedTransactionsMessageCounter; @Mock private EthPeer peer1; @Mock private MetricsSystem metricsSystem; @@ -63,7 +63,7 @@ public class PendingTransactionsMessageProcessorTest { @Mock private EthContext ethContext; @Mock private EthScheduler ethScheduler; - private PendingTransactionsMessageProcessor messageHandler; + private NewPooledTransactionHashesMessageProcessor messageHandler; private final BlockDataGenerator generator = new BlockDataGenerator(); private final Hash hash1 = generator.transaction().getHash(); @@ -75,7 +75,7 @@ public void setup() { when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()) .thenReturn(Duration.ofMillis(500)); messageHandler = - new PendingTransactionsMessageProcessor( + new NewPooledTransactionHashesMessageProcessor( transactionTracker, transactionPool, transactionPoolConfiguration, @@ -95,7 +95,7 @@ public void shouldMarkAllReceivedTransactionsAsSeen() { ofMinutes(1)); verify(transactionTracker) - .markTransactionsHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3)); + .markTransactionHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3)); verifyNoMoreInteractions(transactionTracker); } @@ -109,9 +109,6 @@ public void shouldAddInitiatedRequestingTransactions() { now(), ofMinutes(1)); - verify(transactionPool).addTransactionHash(hash1); - verify(transactionPool).addTransactionHash(hash2); - verify(transactionPool).addTransactionHash(hash3); verify(transactionPool).getTransactionByHash(hash1); verify(transactionPool).getTransactionByHash(hash2); verify(transactionPool).getTransactionByHash(hash3); @@ -133,7 +130,7 @@ public void shouldNotAddAlreadyPresentTransactions() { now(), ofMinutes(1)); - verify(transactionPool).addTransactionHash(hash3); + // verify(transactionPool).addTransactionHash(hash3); verify(transactionPool).getTransactionByHash(hash1); verify(transactionPool).getTransactionByHash(hash2); verify(transactionPool).getTransactionByHash(hash3); @@ -182,7 +179,6 @@ public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() { final EthScheduler ethScheduler = mock(EthScheduler.class); when(ethContext.getScheduler()).thenReturn(ethScheduler); - when(transactionPool.addTransactionHash(hash1)).thenReturn(true); messageHandler.processNewPooledTransactionHashesMessage( peer1, NewPooledTransactionHashesMessage.create(asList(hash1, hash2)), now(), ofMinutes(1)); @@ -195,9 +191,6 @@ public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() { public void shouldNotScheduleGetPooledTransactionsTaskTwice() { when(syncState.isInSync(anyLong())).thenReturn(true); - when(transactionPool.addTransactionHash(hash1)).thenReturn(true); - when(transactionPool.addTransactionHash(hash2)).thenReturn(true); - messageHandler.processNewPooledTransactionHashesMessage( peer1, NewPooledTransactionHashesMessage.create(Collections.singletonList(hash1)), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java similarity index 81% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSenderTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java index cd940ed9f1c..85c8f304f36 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSenderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java @@ -16,6 +16,7 @@ import static com.google.common.collect.Sets.newHashSet; import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -50,20 +51,20 @@ import org.mockito.ArgumentCaptor; @RunWith(Parameterized.class) -public class PendingTransactionsMessageSenderTest { +public class NewPooledTransactionHashesMessageSenderTest { private final EthPeer peer1 = mock(EthPeer.class); private final EthPeer peer2 = mock(EthPeer.class); private final BlockDataGenerator generator = new BlockDataGenerator(); - private final Hash transaction1 = generator.transaction().getHash(); - private final Hash transaction2 = generator.transaction().getHash(); - private final Hash transaction3 = generator.transaction().getHash(); + private final Transaction transaction1 = generator.transaction(); + private final Transaction transaction2 = generator.transaction(); + private final Transaction transaction3 = generator.transaction(); @Parameterized.Parameter public AbstractPendingTransactionsSorter pendingTransactions; - private PeerPendingTransactionTracker transactionTracker; - private PendingTransactionsMessageSender messageSender; + private PeerTransactionTracker transactionTracker; + private NewPooledTransactionHashesMessageSender messageSender; @Parameterized.Parameters public static Collection data() { @@ -76,8 +77,8 @@ public static Collection data() { @Before public void setUp() { - transactionTracker = new PeerPendingTransactionTracker(pendingTransactions); - messageSender = new PendingTransactionsMessageSender(transactionTracker); + transactionTracker = new PeerTransactionTracker(); + messageSender = new NewPooledTransactionHashesMessageSender(transactionTracker); Transaction tx = mock(Transaction.class); when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx)); } @@ -89,7 +90,7 @@ public void shouldSendPendingTransactionsToEachPeer() throws Exception { transactionTracker.addToPeerSendQueue(peer1, transaction2); transactionTracker.addToPeerSendQueue(peer2, transaction3); - messageSender.sendTransactionsToPeers(); + List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer); verify(peer1).send(transactionsMessageContaining(transaction1, transaction2)); verify(peer2).send(transactionsMessageContaining(transaction3)); @@ -98,20 +99,20 @@ public void shouldSendPendingTransactionsToEachPeer() throws Exception { @Test public void shouldSendTransactionsInBatchesWithLimit() throws Exception { - final Set transactions = - generator.transactions(6000).stream().map(Transaction::getHash).collect(Collectors.toSet()); + final Set transactions = + generator.transactions(6000).stream().collect(Collectors.toSet()); transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction)); - messageSender.sendTransactionsToPeers(); + messageSender.sendTransactionHashesToPeer(peer1); final ArgumentCaptor messageDataArgumentCaptor = ArgumentCaptor.forClass(MessageData.class); verify(peer1, times(2)).send(messageDataArgumentCaptor.capture()); final List sentMessages = messageDataArgumentCaptor.getAllValues(); - assertThat(sentMessages).hasSize(2); assertThat(sentMessages) + .hasSize(2) .allMatch(message -> message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES); final Set firstBatch = getTransactionsFromMessage(sentMessages.get(0)); final Set secondBatch = getTransactionsFromMessage(sentMessages.get(1)); @@ -124,14 +125,16 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception { .hasSizeBetween( expectedSecondBatchSize - toleranceDelta, expectedSecondBatchSize + toleranceDelta); - assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions); + assertThat(Sets.union(firstBatch, secondBatch)) + .containsExactlyInAnyOrderElementsOf(toHashList(transactions)); } - private MessageData transactionsMessageContaining(final Hash... transactions) { + private MessageData transactionsMessageContaining(final Transaction... transactions) { return argThat( message -> { final Set actualSentTransactions = getTransactionsFromMessage(message); - final Set expectedTransactions = newHashSet(transactions); + final Set expectedTransactions = + newHashSet(toHashList(Arrays.asList(transactions))); return message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES && actualSentTransactions.equals(expectedTransactions); }); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTrackerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTrackerTest.java deleted file mode 100644 index d6086683457..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTrackerTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.transactions; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.core.BlockDataGenerator; -import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; -import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter; -import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Optional; - -import com.google.common.collect.ImmutableSet; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class PeerPendingTransactionTrackerTest { - - @Parameterized.Parameter public AbstractPendingTransactionsSorter pendingTransactions; - - private final EthPeer ethPeer1 = mock(EthPeer.class); - private final EthPeer ethPeer2 = mock(EthPeer.class); - private final BlockDataGenerator generator = new BlockDataGenerator(); - private PeerPendingTransactionTracker tracker; - private final Hash hash1 = generator.transaction().getHash(); - private final Hash hash2 = generator.transaction().getHash(); - private final Hash hash3 = generator.transaction().getHash(); - - @Parameterized.Parameters - public static Collection data() { - return Arrays.asList( - new Object[][] { - {mock(GasPricePendingTransactionsSorter.class)}, - {mock(BaseFeePendingTransactionsSorter.class)} - }); - } - - @Before - public void setUp() { - tracker = new PeerPendingTransactionTracker(pendingTransactions); - Transaction tx = mock(Transaction.class); - when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx)); - } - - @Test - public void shouldTrackTransactionsToSendToPeer() { - tracker.addToPeerSendQueue(ethPeer1, hash1); - tracker.addToPeerSendQueue(ethPeer1, hash2); - tracker.addToPeerSendQueue(ethPeer2, hash3); - - assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1, hash2); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); - } - - @Test - public void shouldExcludeAlreadySeenTransactionsFromTransactionsToSend() { - tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash2)); - - tracker.addToPeerSendQueue(ethPeer1, hash1); - tracker.addToPeerSendQueue(ethPeer1, hash2); - tracker.addToPeerSendQueue(ethPeer2, hash3); - - assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); - } - - @Test - public void shouldExcludeAlreadySeenTransactionsAsACollectionFromTransactionsToSend() { - tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash1, hash2)); - - tracker.addToPeerSendQueue(ethPeer1, hash1); - tracker.addToPeerSendQueue(ethPeer1, hash2); - tracker.addToPeerSendQueue(ethPeer2, hash3); - - assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).isEmpty(); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); - } - - @Test - public void shouldClearDataWhenPeerDisconnects() { - tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash3)); - - tracker.addToPeerSendQueue(ethPeer1, hash2); - tracker.addToPeerSendQueue(ethPeer2, hash3); - - tracker.onDisconnect(ethPeer1); - - assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2); - - // Should have cleared data that ethPeer1 has already seen transaction1 - tracker.addToPeerSendQueue(ethPeer1, hash1); - - assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1); - assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingMultiTypesTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingMultiTypesTransactionsTest.java index 5f6848a85d6..29d49e87d22 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingMultiTypesTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingMultiTypesTransactionsTest.java @@ -42,7 +42,6 @@ public class PendingMultiTypesTransactionsTest { private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_TRANSACTION_HASHES = 5; private static final Supplier SIGNATURE_ALGORITHM = Suppliers.memoize(SignatureAlgorithmFactory::getInstance)::get; private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair(); @@ -59,7 +58,6 @@ public class PendingMultiTypesTransactionsTest { new BaseFeePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, TestClock.fixed(), metricsSystem, () -> mockBlockHeader(Wei.of(7L)), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsSenderTest.java deleted file mode 100644 index 222e9c60392..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsSenderTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.transactions; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.eth.EthProtocol; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.EthPeers; -import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; - -import java.util.Arrays; -import java.util.Collections; - -import org.apache.tuweni.bytes.Bytes32; -import org.junit.Test; - -public class PendingTransactionsSenderTest { - - @Test - public void testSendEth65PeersOnly() { - PeerPendingTransactionTracker peerPendingTransactionTracker = - mock(PeerPendingTransactionTracker.class); - - PendingTransactionsMessageSender pendingTransactionsMessageSender = - mock(PendingTransactionsMessageSender.class); - EthContext ethContext = mock(EthContext.class); - EthScheduler ethScheduler = mock(EthScheduler.class); - when(ethContext.getScheduler()).thenReturn(ethScheduler); - PendingTransactionSender sender = - new PendingTransactionSender( - peerPendingTransactionTracker, pendingTransactionsMessageSender, ethContext); - - EthPeer peer1 = mock(EthPeer.class); - EthPeer peer2 = mock(EthPeer.class); - Transaction tx = mock(Transaction.class); - Hash hash = Hash.wrap(Bytes32.random()); - when(tx.getHash()).thenReturn(hash); - EthPeers ethPeers = mock(EthPeers.class); - when(ethContext.getEthPeers()).thenReturn(ethPeers); - when(ethPeers.streamAvailablePeers()).thenReturn(Arrays.asList(peer1, peer2).stream()); - when(peerPendingTransactionTracker.isPeerSupported(peer1, EthProtocol.ETH65)).thenReturn(true); - when(peerPendingTransactionTracker.isPeerSupported(peer2, EthProtocol.ETH65)).thenReturn(false); - sender.onTransactionsAdded(Collections.singleton(tx)); - verify(peerPendingTransactionTracker, times(1)).addToPeerSendQueue(peer1, hash); - verify(peerPendingTransactionTracker, never()).addToPeerSendQueue(peer2, hash); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcasterTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcasterTest.java new file mode 100644 index 00000000000..00b97f2c178 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcasterTest.java @@ -0,0 +1,272 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo.toTransactionList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.messages.EthPV65; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TransactionBroadcasterTest { + + @Mock private EthContext ethContext; + @Mock private EthPeers ethPeers; + @Mock private EthScheduler ethScheduler; + @Mock private AbstractPendingTransactionsSorter pendingTransactions; + @Mock private PeerTransactionTracker transactionTracker; + @Mock private TransactionsMessageSender transactionsMessageSender; + @Mock private NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; + + private final EthPeer ethPeerNoEth66 = mock(EthPeer.class); + private final EthPeer ethPeerWithEth66 = mock(EthPeer.class); + private final EthPeer ethPeerNoEth66_2 = mock(EthPeer.class); + private final EthPeer ethPeerWithEth66_2 = mock(EthPeer.class); + private final EthPeer ethPeerWithEth66_3 = mock(EthPeer.class); + private final BlockDataGenerator generator = new BlockDataGenerator(); + + private TransactionBroadcaster txBroadcaster; + private ArgumentCaptor sendTaskCapture; + + @Before + public void setUp() { + when(ethPeerNoEth66.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) + .thenReturn(Boolean.FALSE); + when(ethPeerNoEth66_2.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) + .thenReturn(Boolean.FALSE); + when(ethPeerWithEth66.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) + .thenReturn(Boolean.TRUE); + when(ethPeerWithEth66_2.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) + .thenReturn(Boolean.TRUE); + when(ethPeerWithEth66_3.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) + .thenReturn(Boolean.TRUE); + + sendTaskCapture = ArgumentCaptor.forClass(Runnable.class); + doNothing().when(ethScheduler).scheduleSyncWorkerTask(sendTaskCapture.capture()); + + when(ethPeers.getMaxPeers()).thenReturn(4); + + when(ethContext.getEthPeers()).thenReturn(ethPeers); + when(ethContext.getScheduler()).thenReturn(ethScheduler); + + txBroadcaster = + new TransactionBroadcaster( + ethContext, + pendingTransactions, + transactionTracker, + transactionsMessageSender, + newPooledTransactionHashesMessageSender); + } + + @Test + public void doNotRelayTransactionsWhenPoolIsEmpty() { + setupTransactionPool(0, 0); + + txBroadcaster.relayTransactionPoolTo(ethPeerNoEth66); + txBroadcaster.relayTransactionPoolTo(ethPeerWithEth66); + + verifyNothingSent(); + } + + @Test + public void relayFullTransactionsFromPoolWhenPeerDoesNotSupportEth66() { + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.relayTransactionPoolTo(ethPeerNoEth66); + + verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs); + + sendTaskCapture.getValue().run(); + + verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66); + verifyNoInteractions(newPooledTransactionHashesMessageSender); + } + + @Test + public void relayTransactionHashesFromPoolWhenPeerSupportEth66() { + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.relayTransactionPoolTo(ethPeerWithEth66); + + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs); + + sendTaskCapture.getValue().run(); + + verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(ethPeerWithEth66); + verifyNoInteractions(transactionsMessageSender); + } + + @Test + public void onTransactionsAddedWithNoPeersDoesNothing() { + when(ethPeers.peerCount()).thenReturn(0); + + txBroadcaster.onTransactionsAdded(toTransactionList(setupTransactionPool(1, 1))); + + verifyNothingSent(); + } + + @Test + public void onTransactionsAddedWithOnlyNonEth66PeersSendFullTransactions() { + when(ethPeers.peerCount()).thenReturn(2); + when(ethPeers.streamAvailablePeers()).thenReturn(Stream.of(ethPeerNoEth66, ethPeerNoEth66_2)); + + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.onTransactionsAdded(txs); + + verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66_2, txs); + + sendTaskCapture.getAllValues().forEach(Runnable::run); + + verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66); + verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66_2); + verifyNoInteractions(newPooledTransactionHashesMessageSender); + } + + @Test + public void onTransactionsAddedWithOnlyFewEth66PeersSendFullTransactions() { + when(ethPeers.peerCount()).thenReturn(2); + when(ethPeers.streamAvailablePeers()) + .thenReturn(Stream.of(ethPeerWithEth66, ethPeerWithEth66_2)); + + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.onTransactionsAdded(txs); + + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs); + + sendTaskCapture.getAllValues().forEach(Runnable::run); + + verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class)); + verifyNoInteractions(newPooledTransactionHashesMessageSender); + } + + @Test + public void onTransactionsAddedWithOnlyEth66PeersSendFullTransactionsAndTransactionHashes() { + when(ethPeers.peerCount()).thenReturn(3); + when(ethPeers.streamAvailablePeers()) + .thenReturn(Stream.of(ethPeerWithEth66, ethPeerWithEth66_2, ethPeerWithEth66_3)); + + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.onTransactionsAdded(txs); + + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_3, txs); + + sendTaskCapture.getAllValues().forEach(Runnable::run); + + verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class)); + verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(any(EthPeer.class)); + } + + @Test + public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionHashes() { + List eth66Peers = List.of(ethPeerWithEth66, ethPeerWithEth66_2); + + when(ethPeers.peerCount()).thenReturn(3); + when(ethPeers.streamAvailablePeers()) + .thenReturn(Stream.concat(eth66Peers.stream(), Stream.of(ethPeerNoEth66))); + + List txs = toTransactionList(setupTransactionPool(1, 1)); + + txBroadcaster.onTransactionsAdded(txs); + + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs); + verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs); + + sendTaskCapture.getAllValues().forEach(Runnable::run); + + ArgumentCaptor capPeerFullTransactions = ArgumentCaptor.forClass(EthPeer.class); + verify(transactionsMessageSender, times(2)) + .sendTransactionsToPeer(capPeerFullTransactions.capture()); + List fullTransactionPeers = new ArrayList<>(capPeerFullTransactions.getAllValues()); + assertThat(fullTransactionPeers.remove(ethPeerNoEth66)).isTrue(); + assertThat(fullTransactionPeers).hasSize(1).first().isIn(eth66Peers); + + ArgumentCaptor capPeerTransactionHashes = ArgumentCaptor.forClass(EthPeer.class); + verify(newPooledTransactionHashesMessageSender) + .sendTransactionHashesToPeer(capPeerTransactionHashes.capture()); + assertThat(capPeerTransactionHashes.getValue()).isIn(eth66Peers); + } + + private void verifyNothingSent() { + verifyNoInteractions( + transactionTracker, transactionsMessageSender, newPooledTransactionHashesMessageSender); + } + + private Set setupTransactionPool( + final int numLocalTransactions, final int numRemoteTransactions) { + Set txInfo = createTransactionInfoList(numLocalTransactions, true); + txInfo.addAll(createTransactionInfoList(numRemoteTransactions, false)); + + when(pendingTransactions.getTransactionInfo()).thenReturn(txInfo); + + return txInfo; + } + + private Set createTransactionInfoList(final int num, final boolean local) { + return IntStream.range(0, num) + .mapToObj(unused -> generator.transaction()) + .map(tx -> new TransactionInfo(tx, local, Instant.now())) + .collect(Collectors.toSet()); + } + + private void verifyTransactionAddedToPeerSendingQueue( + final EthPeer peer, final Collection transactions) { + + ArgumentCaptor trackedTransactions = ArgumentCaptor.forClass(Transaction.class); + verify(transactionTracker, times(transactions.size())) + .addToPeerSendQueue(eq(peer), trackedTransactions.capture()); + assertThat(trackedTransactions.getAllValues()) + .containsExactlyInAnyOrderElementsOf(transactions); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index e04538e1c3c..347659a87cd 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -15,7 +15,9 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -30,6 +32,7 @@ import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthMessages; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -65,16 +68,17 @@ public void testDisconnect() { final EthContext ethContext = mock(EthContext.class); when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class)); when(ethContext.getEthPeers()).thenReturn(ethPeers); + final EthScheduler ethScheduler = mock(EthScheduler.class); + when(ethContext.getScheduler()).thenReturn(ethScheduler); final SyncState state = mock(SyncState.class); final GasPricePendingTransactionsSorter pendingTransactions = mock(GasPricePendingTransactionsSorter.class); final PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); final TransactionsMessageSender transactionsMessageSender = mock(TransactionsMessageSender.class); - final PeerPendingTransactionTracker peerPendingTransactionTracker = - mock(PeerPendingTransactionTracker.class); - final PendingTransactionsMessageSender pendingTransactionsMessageSender = - mock(PendingTransactionsMessageSender.class); + doNothing().when(transactionsMessageSender).sendTransactionsToPeer(any(EthPeer.class)); + final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender = + mock(NewPooledTransactionHashesMessageSender.class); final TransactionPool pool = TransactionPoolFactory.createTransactionPool( schedule, @@ -87,7 +91,6 @@ public void testDisconnect() { 1, 1, 1, - 1, TransactionPoolConfiguration.DEFAULT_PRICE_BUMP, TransactionPoolConfiguration.ETH65_TRX_ANNOUNCED_BUFFERING_PERIOD, TransactionPoolConfiguration.DEFAULT_RPC_TX_FEE_CAP, @@ -95,8 +98,7 @@ public void testDisconnect() { pendingTransactions, peerTransactionTracker, transactionsMessageSender, - peerPendingTransactionTracker, - pendingTransactionsMessageSender); + newPooledTransactionHashesMessageSender); final EthProtocolManager ethProtocolManager = new EthProtocolManager( @@ -119,6 +121,5 @@ public void testDisconnect() { assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); - verify(peerPendingTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolTest.java index c5682f4bf88..cadd22969bb 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolTest.java @@ -28,9 +28,10 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -51,13 +52,14 @@ import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionTestFixture; -import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.messages.EthPV65; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionValidator; @@ -92,15 +94,14 @@ public class TransactionPoolTest { private static final int MAX_TRANSACTIONS = 5; - private static final int MAX_TRANSACTION_HASHES = 5; private static final KeyPair KEY_PAIR1 = SignatureAlgorithmFactory.getInstance().generateKeyPair(); @Mock private MainnetTransactionValidator transactionValidator; @Mock private PendingTransactionListener listener; - @Mock private TransactionPool.TransactionBatchAddedListener batchAddedListener; - @Mock private TransactionPool.TransactionBatchAddedListener pendingBatchAddedListener; @Mock private MiningParameters miningParameters; + @Mock private TransactionsMessageSender transactionsMessageSender; + @Mock private NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; @SuppressWarnings("unchecked") @Mock @@ -112,6 +113,7 @@ public class TransactionPoolTest { private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private MutableBlockchain blockchain; + private TransactionBroadcaster transactionBroadcaster; private GasPricePendingTransactionsSorter transactions; private final Transaction transaction1 = createTransaction(1); @@ -124,7 +126,7 @@ public class TransactionPoolTest { private EthContext ethContext; private EthPeers ethPeers; private PeerTransactionTracker peerTransactionTracker; - private PeerPendingTransactionTracker peerPendingTransactionTracker; + private ArgumentCaptor syncTaskCapture; @Before public void setUp() { @@ -133,7 +135,6 @@ public void setUp() { new GasPricePendingTransactionsSorter( TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS, MAX_TRANSACTIONS, - MAX_TRANSACTION_HASHES, TestClock.fixed(), metricsSystem, blockchain::getChainHeadHeader, @@ -145,10 +146,25 @@ public void setUp() { syncState = mock(SyncState.class); when(syncState.isInSync(anyLong())).thenReturn(true); ethContext = mock(EthContext.class); + + final EthScheduler ethScheduler = mock(EthScheduler.class); + syncTaskCapture = ArgumentCaptor.forClass(Runnable.class); + doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); + when(ethContext.getScheduler()).thenReturn(ethScheduler); + ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); - peerTransactionTracker = mock(PeerTransactionTracker.class); - peerPendingTransactionTracker = mock(PeerPendingTransactionTracker.class); + + peerTransactionTracker = new PeerTransactionTracker(); + transactionBroadcaster = + spy( + new TransactionBroadcaster( + ethContext, + transactions, + peerTransactionTracker, + transactionsMessageSender, + newPooledTransactionHashesMessageSender)); + transactionPool = createTransactionPool(); blockchain.observeBlockAdded(transactionPool); when(miningParameters.getMinTransactionGasPrice()).thenReturn(Wei.of(2)); @@ -169,12 +185,9 @@ private TransactionPool createTransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, miningParameters, metricsSystem, config); @@ -440,7 +453,7 @@ public void shouldNotAddRemoteTransactionsThatAreInvalidAccordingToInvariantChec assertTransactionNotPending(transaction1); assertTransactionPending(transaction2); - verify(batchAddedListener).onTransactionsAdded(singleton(transaction2)); + verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction2)); } @Test @@ -455,7 +468,7 @@ public void shouldNotAddRemoteTransactionsThatAreInvalidAccordingToStateDependen assertTransactionNotPending(transaction1); assertTransactionPending(transaction2); - verify(batchAddedListener).onTransactionsAdded(singleton(transaction2)); + verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction2)); verify(transactionValidator).validate(eq(transaction1), any(Optional.class), any()); verify(transactionValidator) .validateForSender(eq(transaction1), eq(null), any(TransactionValidationParams.class)); @@ -529,12 +542,9 @@ public void shouldDiscardRemoteTransactionThatAlreadyExistsBeforeValidation() { pendingTransactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, TransactionPoolConfiguration.DEFAULT); @@ -544,7 +554,6 @@ public void shouldDiscardRemoteTransactionThatAlreadyExistsBeforeValidation() { transactionPool.addRemoteTransactions(singletonList(transaction1)); verify(pendingTransactions).containsTransaction(transaction1.getHash()); - verify(pendingTransactions).tryEvictTransactionHash(transaction1.getHash()); verifyNoInteractions(transactionValidator); verifyNoMoreInteractions(pendingTransactions); } @@ -570,8 +579,8 @@ public void shouldNotNotifyBatchListenerWhenRemoteTransactionDoesNotReplaceExist transactionPool.addRemoteTransactions(singletonList(transaction2)); assertTransactionPending(transaction1); - verify(batchAddedListener).onTransactionsAdded(singleton(transaction1)); - verify(batchAddedListener, never()).onTransactionsAdded(singleton(transaction2)); + verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction1)); + verify(transactionBroadcaster, never()).onTransactionsAdded(singleton(transaction2)); } @Test @@ -595,8 +604,8 @@ public void shouldNotNotifyBatchListenerWhenLocalTransactionDoesNotReplaceExisti transactionPool.addLocalTransaction(transaction2); assertTransactionPending(transaction1); - verify(batchAddedListener).onTransactionsAdded(singletonList(transaction1)); - verify(batchAddedListener, never()).onTransactionsAdded(singletonList(transaction2)); + verify(transactionBroadcaster).onTransactionsAdded(singletonList(transaction1)); + verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(transaction2)); } @Test @@ -611,7 +620,7 @@ public void shouldRejectLocalTransactionsWhereGasLimitExceedBlockGasLimit() { .isEqualTo(ValidationResult.invalid(EXCEEDS_BLOCK_GAS_LIMIT)); assertTransactionNotPending(transaction1); - verifyNoInteractions(batchAddedListener); + verifyNoInteractions(transactionBroadcaster); } @Test @@ -625,29 +634,37 @@ public void shouldRejectRemoteTransactionsWhereGasLimitExceedBlockGasLimit() { transactionPool.addRemoteTransactions(singleton(transaction1)); assertTransactionNotPending(transaction1); - verifyNoInteractions(batchAddedListener); + verifyNoInteractions(transactionBroadcaster); } @Test public void shouldNotNotifyBatchListenerIfNoTransactionsAreAdded() { transactionPool.addRemoteTransactions(emptyList()); - verifyNoInteractions(batchAddedListener); + verifyNoInteractions(transactionBroadcaster); + } + + @Test + public void shouldSendPooledTransactionHashesIfPeerSupportsEth65() { + EthPeer peer = mock(EthPeer.class); + when(peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)).thenReturn(true); + + givenTransactionIsValid(transaction1); + transactionPool.addLocalTransaction(transaction1); + transactionPool.handleConnect(peer); + syncTaskCapture.getValue().run(); + verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(peer); } @Test - public void shouldNotNotifyPeerForPendingTransactionsIfItDoesntSupportEth65() { + public void shouldSendFullTransactionsIfPeerDoesNotSupportEth65() { EthPeer peer = mock(EthPeer.class); - EthPeer validPeer = mock(EthPeer.class); - when(peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)).thenReturn(false); - when(peerPendingTransactionTracker.isPeerSupported(validPeer, EthProtocol.ETH65)) - .thenReturn(true); + when(peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)).thenReturn(false); - transactionPool.addTransactionHash(transaction1.getHash()); + givenTransactionIsValid(transaction1); + transactionPool.addLocalTransaction(transaction1); transactionPool.handleConnect(peer); - verify(peerPendingTransactionTracker, never()).addToPeerSendQueue(peer, transaction1.getHash()); - transactionPool.handleConnect(validPeer); - verify(peerPendingTransactionTracker, times(1)) - .addToPeerSendQueue(validPeer, transaction1.getHash()); + syncTaskCapture.getValue().run(); + verify(transactionsMessageSender).sendTransactionsToPeer(peer); } @Test @@ -668,12 +685,9 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() { transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, TransactionPoolConfiguration.DEFAULT); @@ -688,7 +702,7 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() { assertTransactionNotPending(transaction1); assertTransactionNotPending(transaction2); assertTransactionNotPending(transaction3); - verifyNoInteractions(batchAddedListener); + verifyNoInteractions(transactionBroadcaster); } @Test @@ -718,21 +732,17 @@ public void shouldAllowRemoteTransactionsWhenInSync() { } @Test - public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() { + public void shouldSendFullTransactionPoolToNewlyConnectedPeer() { EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthContext ethContext = ethProtocolManager.ethContext(); - PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); TransactionPool transactionPool = new TransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, TransactionPoolConfiguration.DEFAULT); @@ -756,7 +766,7 @@ public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() { Set transactionsToSendToPeer = peerTransactionTracker.claimTransactionsToSendToPeer(peer.getEthPeer()); - assertThat(transactionsToSendToPeer).containsExactly(transactionLocal); + assertThat(transactionsToSendToPeer).contains(transactionLocal, transactionRemote); } @Test @@ -782,19 +792,15 @@ public void shouldCallValidatorWithExpectedValidationParameters() { public void shouldIgnoreFeeCapIfSetZero() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthContext ethContext = ethProtocolManager.ethContext(); - final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); final Wei twoEthers = Wei.fromEth(2); final TransactionPool transactionPool = new TransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ZERO).build()); @@ -820,18 +826,14 @@ public void shouldIgnoreFeeCapIfSetZero() { public void shouldIgnoreEIP1559TransactionWhenNotAllowed() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthContext ethContext = ethProtocolManager.ethContext(); - final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); final TransactionPool transactionPool = new TransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ONE).build()); @@ -857,18 +859,14 @@ public void shouldIgnoreEIP1559TransactionWhenNotAllowed() { public void shouldIgnoreEIP1559TransactionBeforeTheFork() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthContext ethContext = ethProtocolManager.ethContext(); - final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); final TransactionPool transactionPool = new TransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ONE).build()); @@ -896,19 +894,15 @@ public void shouldIgnoreEIP1559TransactionBeforeTheFork() { public void shouldRejectLocalTransactionIfFeeCapExceeded() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthContext ethContext = ethProtocolManager.ethContext(); - final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); final Wei twoEthers = Wei.fromEth(2); TransactionPool transactionPool = new TransactionPool( transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, ImmutableTransactionPoolConfiguration.builder().txFeeCap(twoEthers).build()); @@ -936,7 +930,6 @@ public void shouldRejectGoQuorumTransactionWithNonZeroValue() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthContext ethContext = ethProtocolManager.ethContext(); - final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); final Wei twoEthers = Wei.fromEth(2); final TransactionPool transactionPool = @@ -944,12 +937,9 @@ public void shouldRejectGoQuorumTransactionWithNonZeroValue() { transactions, protocolSchedule, protocolContext, - batchAddedListener, - pendingBatchAddedListener, + transactionBroadcaster, syncState, ethContext, - peerTransactionTracker, - peerPendingTransactionTracker, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), metricsSystem, ImmutableTransactionPoolConfiguration.builder().txFeeCap(twoEthers).build()); @@ -1103,7 +1093,7 @@ private void assertLocalTransactionValid(final Transaction tx) { private void assertRemoteTransactionValid(final Transaction tx) { transactionPool.addRemoteTransactions(List.of(tx)); - verify(batchAddedListener).onTransactionsAdded(singleton(tx)); + verify(transactionBroadcaster).onTransactionsAdded(singleton(tx)); assertTransactionPending(tx); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java index 045a887ca4f..1263181934b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java @@ -27,7 +27,6 @@ import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage; import org.hyperledger.besu.plugin.services.metrics.Counter; -import com.google.common.collect.ImmutableSet; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -57,7 +56,7 @@ public void shouldMarkAllReceivedTransactionsAsSeen() { ofMinutes(1)); verify(transactionTracker) - .markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3)); + .markTransactionsAsSeen(peer1, asList(transaction1, transaction2, transaction3)); } @Test @@ -67,8 +66,7 @@ public void shouldAddReceivedTransactionsToTransactionPool() { TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), now(), ofMinutes(1)); - verify(transactionPool) - .addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3)); + verify(transactionPool).addRemoteTransactions(asList(transaction1, transaction2, transaction3)); } @Test