diff --git a/stackslib/src/chainstate/stacks/miner.rs b/stackslib/src/chainstate/stacks/miner.rs index bf81293a27..f2dfdf5dff 100644 --- a/stackslib/src/chainstate/stacks/miner.rs +++ b/stackslib/src/chainstate/stacks/miner.rs @@ -1204,7 +1204,6 @@ impl<'a> StacksMicroblockBuilder<'a> { intermediate_result = mem_pool.iterate_candidates( &mut clarity_tx, &mut tx_events, - self.anchor_block_height, mempool_settings.clone(), |clarity_tx, to_consider, estimator| { let mempool_tx = &to_consider.tx; @@ -2211,7 +2210,6 @@ impl StacksBlockBuilder { intermediate_result = mempool.iterate_candidates( epoch_tx, &mut tx_events, - tip_height, mempool_settings.clone(), |epoch_tx, to_consider, estimator| { // first, have we been preempted? diff --git a/stackslib/src/chainstate/stacks/tests/block_construction.rs b/stackslib/src/chainstate/stacks/tests/block_construction.rs index 56ae3d8d52..4194207840 100644 --- a/stackslib/src/chainstate/stacks/tests/block_construction.rs +++ b/stackslib/src/chainstate/stacks/tests/block_construction.rs @@ -3060,7 +3060,7 @@ fn test_build_microblock_stream_forks_with_descendants() { // erase any pending transactions -- this is a "worse" poison-microblock, // and we want to avoid mining the "better" one - mempool.clear_before_height(10).unwrap(); + mempool.clear_before_coinbase_height(10).unwrap(); let mut tx_bytes = vec![]; poison_microblock_tx @@ -4784,6 +4784,7 @@ fn paramaterized_mempool_walk_test( &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -4832,7 +4833,6 @@ fn paramaterized_mempool_walk_test( .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; diff --git a/stackslib/src/core/mempool.rs b/stackslib/src/core/mempool.rs index 8e8912f569..fe75d62bd2 100644 --- a/stackslib/src/core/mempool.rs +++ b/stackslib/src/core/mempool.rs @@ -301,7 +301,7 @@ pub struct MemPoolAdmitter { enum MemPoolWalkResult { Chainstate(ConsensusHash, BlockHeaderHash, u64, u64), - NoneAtHeight(ConsensusHash, BlockHeaderHash, u64), + NoneAtCoinbaseHeight(ConsensusHash, BlockHeaderHash, u64), Done, } @@ -432,9 +432,19 @@ pub struct MemPoolTxMetadata { pub txid: Txid, pub len: u64, pub tx_fee: u64, - pub consensus_hash: ConsensusHash, - pub block_header_hash: BlockHeaderHash, - pub block_height: u64, + /// The tenure ID in which this transaction was accepted. + /// In epoch 2.x, this is the consensus hash of the sortition that chose the Stacks block + /// In Nakamoto, this is the consensus hash of the ongoing tenure. + pub tenure_consensus_hash: ConsensusHash, + /// The tenure block in which this transaction was accepted. + /// In epoch 2.x, this is the hash of the Stacks block produced in the sortition. + /// In Nakamoto, this is the hash of the tenure-start block. + pub tenure_block_header_hash: BlockHeaderHash, + /// The number of coinbases that have transpired at the time of this transaction's acceptance. + /// In epoch 2.x, this is the same as the Stacks block height + /// In Nakamoto, this is the simply the number of coinbases produced in the history tipped at + /// `tenure_consensus_hash` and `tenure_block_header_hash` + pub coinbase_height: u64, pub origin_address: StacksAddress, pub origin_nonce: u64, pub sponsor_address: StacksAddress, @@ -564,10 +574,10 @@ impl FromRow for Txid { impl FromRow for MemPoolTxMetadata { fn from_row<'a>(row: &'a Row) -> Result { let txid = Txid::from_column(row, "txid")?; - let consensus_hash = ConsensusHash::from_column(row, "consensus_hash")?; - let block_header_hash = BlockHeaderHash::from_column(row, "block_header_hash")?; + let tenure_consensus_hash = ConsensusHash::from_column(row, "consensus_hash")?; + let tenure_block_header_hash = BlockHeaderHash::from_column(row, "block_header_hash")?; let tx_fee = u64::from_column(row, "tx_fee")?; - let block_height = u64::from_column(row, "height")?; + let coinbase_height = u64::from_column(row, "height")?; let len = u64::from_column(row, "length")?; let accept_time = u64::from_column(row, "accept_time")?; let origin_address = StacksAddress::from_column(row, "origin_address")?; @@ -581,9 +591,9 @@ impl FromRow for MemPoolTxMetadata { txid, len, tx_fee, - consensus_hash, - block_header_hash, - block_height, + tenure_consensus_hash, + tenure_block_header_hash, + coinbase_height, origin_address, origin_nonce, sponsor_address, @@ -657,8 +667,13 @@ const MEMPOOL_INITIAL_SCHEMA: &'static [&'static str] = &[r#" tx_fee INTEGER NOT NULL, length INTEGER NOT NULL, consensus_hash TEXT NOT NULL, + -- In epoch2x, this is the Stacks tip block hash at the time of this tx's arrival. + -- In Nakamoto, this is the tenure-start block hash of the ongoing tenure at the time of this tx's arrival. block_header_hash TEXT NOT NULL, - height INTEGER NOT NULL, -- stacks block height + -- This is the *coinbase height* of the chain tip above. + -- In epoch2x (when this schema was written), this also happened to be the block height; hence the name. + -- In Nakamoto, this is not a block height any longer. + height INTEGER NOT NULL, accept_time INTEGER NOT NULL, tx BLOB NOT NULL, PRIMARY KEY (txid), @@ -855,15 +870,18 @@ impl<'a> MemPoolTx<'a> { self.tx.commit().map_err(db_error::SqliteError) } - /// Remove all txids at the given height from the bloom counter. + /// Remove all txids at the given coinbase height from the bloom counter. /// Used to clear out txids that are now outside the bloom counter's depth. - fn prune_bloom_counter(&mut self, target_height: u64) -> Result<(), MemPoolRejection> { + fn prune_bloom_counter(&mut self, target_coinbase_height: u64) -> Result<(), MemPoolRejection> { let sql = "SELECT a.txid FROM mempool AS a LEFT OUTER JOIN removed_txids AS b ON a.txid = b.txid WHERE b.txid IS NULL AND a.height = ?1"; - let args = params![u64_to_sql(target_height)?]; + let args = params![u64_to_sql(target_coinbase_height)?]; let txids: Vec = query_rows(&self.tx, sql, args)?; let _num_txs = txids.len(); - test_debug!("Prune bloom counter from height {}", target_height); + test_debug!( + "Prune bloom counter from coinbase height {}", + target_coinbase_height + ); // keep borrow-checker happy MemPoolTx::with_bloom_state(self, |ref mut dbtx, ref mut bloom_counter| { @@ -880,8 +898,8 @@ impl<'a> MemPoolTx<'a> { })?; test_debug!( - "Pruned bloom filter at height {}: removed {} txs", - target_height, + "Pruned bloom filter at coinbase height {}: removed {} txs", + target_coinbase_height, _num_txs ); Ok(()) @@ -889,26 +907,26 @@ impl<'a> MemPoolTx<'a> { /// Add the txid to the bloom counter in the mempool DB, optionally replacing a prior /// transaction (identified by prior_txid) if the bloom counter is full. - /// If this is the first txid at this block height, then also garbage-collect the bloom counter to remove no-longer-recent transactions. + /// If this is the first txid at this coinbase height, then also garbage-collect the bloom counter to remove no-longer-recent transactions. /// If the bloom counter is saturated -- i.e. it represents more than MAX_BLOOM_COUNTER_TXS /// transactions -- then pick another transaction to evict from the bloom filter and return its txid. /// (Note that no transactions are ever removed from the mempool; we just don't prioritize them /// in the bloom filter). fn update_bloom_counter( &mut self, - height: u64, + coinbase_height: u64, txid: &Txid, prior_txid: Option, ) -> Result, MemPoolRejection> { - // is this the first-ever txid at this height? + // is this the first-ever txid at this coinbase height? let sql = "SELECT 1 FROM mempool WHERE height = ?1"; - let args = params![u64_to_sql(height)?]; + let args = params![u64_to_sql(coinbase_height)?]; let present: Option = query_row(&self.tx, sql, args)?; - if present.is_none() && height > (BLOOM_COUNTER_DEPTH as u64) { - // this is the first-ever tx at this height. + if present.is_none() && coinbase_height > (BLOOM_COUNTER_DEPTH as u64) { + // this is the first-ever tx at this coinbase height. // which means, the bloom filter window has advanced. // which means, we need to remove all the txs that are now out of the window. - self.prune_bloom_counter(height - (BLOOM_COUNTER_DEPTH as u64))?; + self.prune_bloom_counter(coinbase_height - (BLOOM_COUNTER_DEPTH as u64))?; } MemPoolTx::with_bloom_state(self, |ref mut dbtx, ref mut bloom_counter| { @@ -926,7 +944,7 @@ impl<'a> MemPoolTx<'a> { // deprioritized) let sql = "SELECT a.txid FROM mempool AS a LEFT OUTER JOIN removed_txids AS b ON a.txid = b.txid WHERE b.txid IS NULL AND a.height > ?1 ORDER BY a.tx_fee ASC LIMIT 1"; let args = params![u64_to_sql( - height.saturating_sub(BLOOM_COUNTER_DEPTH as u64), + coinbase_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64), )?]; let evict_txid: Option = query_row(&dbtx, sql, args)?; if let Some(evict_txid) = evict_txid { @@ -971,46 +989,6 @@ impl<'a> MemPoolTx<'a> { } } -impl MemPoolTxInfo { - pub fn from_tx( - tx: StacksTransaction, - consensus_hash: ConsensusHash, - block_header_hash: BlockHeaderHash, - block_height: u64, - ) -> MemPoolTxInfo { - let txid = tx.txid(); - let mut tx_data = vec![]; - tx.consensus_serialize(&mut tx_data) - .expect("BUG: failed to serialize to vector"); - - let origin_address = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let (sponsor_address, sponsor_nonce) = - if let (Some(addr), Some(nonce)) = (tx.sponsor_address(), tx.get_sponsor_nonce()) { - (addr, nonce) - } else { - (origin_address.clone(), origin_nonce) - }; - - let metadata = MemPoolTxMetadata { - txid, - len: tx_data.len() as u64, - tx_fee: tx.get_tx_fee(), - consensus_hash, - block_header_hash, - block_height, - origin_address, - origin_nonce, - sponsor_address, - sponsor_nonce, - accept_time: get_epoch_time_secs(), - last_known_origin_nonce: None, - last_known_sponsor_nonce: None, - }; - MemPoolTxInfo { tx, metadata } - } -} - /// Used to locally cache nonces to avoid repeatedly looking them up in the nonce. struct NonceCache { cache: HashMap, @@ -1496,8 +1474,8 @@ impl MemPoolDB { /// Find the origin addresses who have sent the highest-fee transactions fn find_origin_addresses_by_descending_fees( &self, - start_height: i64, - end_height: i64, + start_coinbase_height: i64, + end_coinbase_height: i64, min_fees: u64, offset: u32, count: u32, @@ -1505,8 +1483,8 @@ impl MemPoolDB { let sql = "SELECT DISTINCT origin_address FROM mempool WHERE height > ?1 AND height <= ?2 AND tx_fee >= ?3 ORDER BY tx_fee DESC LIMIT ?4 OFFSET ?5"; let args = params![ - start_height, - end_height, + start_coinbase_height, + end_coinbase_height, u64_to_sql(min_fees)?, count, offset, @@ -1612,7 +1590,6 @@ impl MemPoolDB { &mut self, clarity_tx: &mut C, output_events: &mut Vec, - _tip_height: u64, settings: MemPoolWalkSettings, mut todo: F, ) -> Result @@ -1977,28 +1954,8 @@ impl MemPoolDB { Ok(rows.len()) } - /// Get all transactions at a particular timestamp on a given chain tip. - /// Order them by origin nonce. - pub fn get_txs_at( - conn: &DBConn, - consensus_hash: &ConsensusHash, - block_header_hash: &BlockHeaderHash, - timestamp: u64, - ) -> Result, db_error> { - let sql = "SELECT * FROM mempool WHERE accept_time = ?1 AND consensus_hash = ?2 AND block_header_hash = ?3 ORDER BY origin_nonce ASC"; - let args = params![u64_to_sql(timestamp)?, consensus_hash, block_header_hash]; - let rows = query_rows::(conn, &sql, args)?; - Ok(rows) - } - - /// Given a chain tip, find the highest block-height from _before_ this tip - pub fn get_previous_block_height(conn: &DBConn, height: u64) -> Result, db_error> { - let sql = "SELECT height FROM mempool WHERE height < ?1 ORDER BY height DESC LIMIT 1"; - let args = params![u64_to_sql(height)?]; - query_row(conn, sql, args) - } - /// Get a number of transactions after a given timestamp on a given chain tip. + #[cfg(test)] pub fn get_txs_after( conn: &DBConn, consensus_hash: &ConsensusHash, @@ -2048,6 +2005,9 @@ impl MemPoolDB { query_row(conn, &sql, args) } + /// Are the given fully-qualified blocks, identified by their (consensus-hash, block-header-hash) pairs, in the same fork? + /// That is, is one block an ancestor of another? + /// TODO: Nakamoto-ize fn are_blocks_in_same_fork( chainstate: &mut StacksChainState, first_consensus_hash: &ConsensusHash, @@ -2080,17 +2040,33 @@ impl MemPoolDB { /// Add a transaction to the mempool. If it already exists, then replace it if the given fee /// is higher than the one that's already there. /// Carry out the mempool admission test before adding. + /// + /// `tip_consensus_hash`, `tip_block_header_hash`, and `coinbase_height` describe the fork that + /// was canonical when this transaction is added. While `coinbase_height` would be derived + /// from these first two fields, it is supplied independently to facilitate testing. + /// + /// If this is called in the Nakamoto epoch -- i.e. if `tip_consensus_hash` is in the Nakamoto + /// epoch -- then these tip hashes will be resolved to the tenure-start hashes first. This is + /// because in Nakamoto, we index transactions by tenure-start blocks since they directly + /// correspond to epoch 2.x Stacks blocks (meaning, the semantics of mempool sync are preserved + /// across epoch 2.x and Nakamoto as long as we treat transactions this way). In both epochs, + /// transactions arrive during a miner's tenure, not during a particular block's status as + /// the canonical chain tip. + /// + /// The tenure resolution behavior can be short-circuited with `resolve_tenure = false`. + /// However, this is only used in testing. + /// /// Don't call directly; use submit(). - /// This is `pub` only for testing. - pub fn try_add_tx( + pub(crate) fn try_add_tx( tx: &mut MemPoolTx, chainstate: &mut StacksChainState, - consensus_hash: &ConsensusHash, - block_header_hash: &BlockHeaderHash, + tip_consensus_hash: &ConsensusHash, + tip_block_header_hash: &BlockHeaderHash, + resolve_tenure: bool, txid: Txid, tx_bytes: Vec, tx_fee: u64, - height: u64, + coinbase_height: u64, origin_address: &StacksAddress, origin_nonce: u64, sponsor_address: &StacksAddress, @@ -2099,6 +2075,32 @@ impl MemPoolDB { ) -> Result<(), MemPoolRejection> { let length = tx_bytes.len() as u64; + // this transaction is said to arrive during this _tenure_, not during this _block_. + // In epoch 2.x, these are the same as `tip_consensus_hash` and `tip_block_header_hash`. + // In Nakamoto, they may be different. + // + // The only exception to this rule is if `tip_consensus_hash` and `tip_block_header_hash` + // are `FIRST_BURNCHAIN_CONSENSUS_HASH` and `FIRST_STACKS_BLOCK_HASH` -- in this case, + // there's no need to find the tenure-start header + let (consensus_hash, block_header_hash) = if resolve_tenure { + let tenure_start_header = NakamotoChainState::get_tenure_start_block_header( + &mut chainstate.index_conn(), + &StacksBlockId::new(tip_consensus_hash, tip_block_header_hash), + tip_consensus_hash, + ) + .map_err(|e| MemPoolRejection::FailedToValidate(e))? + .ok_or(MemPoolRejection::NoSuchChainTip( + tip_consensus_hash.clone(), + tip_block_header_hash.clone(), + ))?; + + let consensus_hash = tenure_start_header.consensus_hash; + let block_header_hash = tenure_start_header.anchored_header.block_hash(); + (consensus_hash, block_header_hash) + } else { + (tip_consensus_hash.clone(), tip_block_header_hash.clone()) + }; + // do we already have txs with either the same origin nonce or sponsor nonce ? let prior_tx = { match MemPoolDB::get_tx_metadata_by_address(tx, true, origin_address, origin_nonce)? { @@ -2126,10 +2128,10 @@ impl MemPoolDB { true } else if !MemPoolDB::are_blocks_in_same_fork( chainstate, - &prior_tx.consensus_hash, - &prior_tx.block_header_hash, - consensus_hash, - block_header_hash, + &prior_tx.tenure_consensus_hash, + &prior_tx.tenure_block_header_hash, + &consensus_hash, + &block_header_hash, )? { // is this a replace-across-fork ? debug!( @@ -2160,7 +2162,11 @@ impl MemPoolDB { return Err(MemPoolRejection::ConflictingNonceInMempool); } - tx.update_bloom_counter(height, &txid, prior_tx.as_ref().map(|tx| tx.txid.clone()))?; + tx.update_bloom_counter( + coinbase_height, + &txid, + prior_tx.as_ref().map(|tx| tx.txid.clone()), + )?; let sql = "INSERT OR REPLACE INTO mempool ( txid, @@ -2187,7 +2193,7 @@ impl MemPoolDB { u64_to_sql(length)?, consensus_hash, block_header_hash, - u64_to_sql(height)?, + u64_to_sql(coinbase_height)?, u64_to_sql(get_epoch_time_secs())?, tx_bytes, ]; @@ -2215,10 +2221,12 @@ impl MemPoolDB { let tx = self.tx_begin()?; match behavior { MempoolCollectionBehavior::ByStacksHeight => { + // NOTE: this is the epoch2x behavior, so `chain_height` is 1-to-1 with coinbase + // height. This will not be true in Nakamoto! let Some(min_height) = chain_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) else { return Ok(()); }; - Self::garbage_collect_by_height(&tx, min_height, event_observer)?; + Self::garbage_collect_by_coinbase_height(&tx, min_height, event_observer)?; } MempoolCollectionBehavior::ByReceiveTime => { Self::garbage_collect_by_time( @@ -2253,14 +2261,14 @@ impl MemPoolDB { Ok(()) } - /// Garbage-collect the mempool. Remove transactions that were received `min_height` + /// Garbage-collect the mempool. Remove transactions that were received `min_coinbase_height` /// blocks ago. - pub fn garbage_collect_by_height( + pub fn garbage_collect_by_coinbase_height( tx: &MemPoolTx, - min_height: u64, + min_coinbase_height: u64, event_observer: Option<&dyn MemPoolEventDispatcher>, ) -> Result<(), db_error> { - let args = params![u64_to_sql(min_height)?]; + let args = params![u64_to_sql(min_coinbase_height)?]; if let Some(event_observer) = event_observer { let sql = "SELECT txid FROM mempool WHERE height < ?1"; @@ -2276,40 +2284,15 @@ impl MemPoolDB { } #[cfg(test)] - pub fn clear_before_height(&mut self, min_height: u64) -> Result<(), db_error> { + pub fn clear_before_coinbase_height( + &mut self, + min_coinbase_height: u64, + ) -> Result<(), db_error> { let tx = self.tx_begin()?; - MemPoolDB::garbage_collect_by_height(&tx, min_height, None)?; + MemPoolDB::garbage_collect_by_coinbase_height(&tx, min_coinbase_height, None)?; tx.commit() } - /// Scan the chain tip for all available transactions (but do not remove them!) - pub fn poll( - &mut self, - consensus_hash: &ConsensusHash, - block_hash: &BlockHeaderHash, - ) -> Vec { - test_debug!("Mempool poll at {}/{}", consensus_hash, block_hash); - MemPoolDB::get_txs_after( - &self.db, - consensus_hash, - block_hash, - 0, - (i64::MAX - 1) as u64, - ) - .unwrap_or(vec![]) - .into_iter() - .map(|tx_info| { - test_debug!( - "Mempool poll {} at {}/{}", - &tx_info.tx.txid(), - consensus_hash, - block_hash - ); - tx_info.tx - }) - .collect() - } - /// Submit a transaction to the mempool at a particular chain tip. fn tx_submit( mempool_tx: &mut MemPoolTx, @@ -2330,7 +2313,8 @@ impl MemPoolDB { ); let block_id = StacksBlockId::new(consensus_hash, block_hash); - let height = match NakamotoChainState::get_block_header(chainstate.db(), &block_id) { + let coinbase_height = match NakamotoChainState::get_block_header(chainstate.db(), &block_id) + { Ok(Some(header)) => header.stacks_block_height, Ok(None) => { if *consensus_hash == FIRST_BURNCHAIN_CONSENSUS_HASH { @@ -2380,10 +2364,11 @@ impl MemPoolDB { chainstate, &consensus_hash, &block_hash, + true, txid.clone(), tx_data, tx_fee, - height, + coinbase_height, &origin_address, origin_nonce, &sponsor_address, @@ -2405,7 +2390,20 @@ impl MemPoolDB { Ok(()) } - /// One-shot submit + /// One-shot transaction submit. + /// + /// Transactions are indexed relative to a chain tip, identified by `consensus_hash` and + /// `block_hash`. These fields have slightly different interpretations depending on what epoch + /// we're in: + /// * In epoch 2.x, these are the Stacks chain tip. + /// * In Nakamoto, these will be resolved to the tenure-start block of the tenure in which this + /// Stacks block lies. The reason for this is because of how the mempool performs + /// garbage collection in its DB and bloom filter -- the latter of which is used for mempool + /// sync. + /// + /// No action is required by te caller to handle this discrepancy; the caller should just submit + /// the canonical Stacks tip. If the current epoch is a Nakamoto epoch, it will be resolved to + /// the tenure-start block internally. pub fn submit( &mut self, chainstate: &mut StacksChainState, @@ -2490,8 +2488,7 @@ impl MemPoolDB { } /// Directly submit to the mempool, and don't do any admissions checks. - /// This method is only used during testing, but because it is used by the - /// integration tests, it cannot be marked #[cfg(test)]. + #[cfg(any(test, feature = "testing"))] pub fn submit_raw( &mut self, chainstate: &mut StacksChainState, @@ -2698,8 +2695,8 @@ impl MemPoolDB { self.bloom_counter.to_bloom_filter(&self.conn()) } - /// Find maximum height represented in the mempool - pub fn get_max_height(conn: &DBConn) -> Result, db_error> { + /// Find maximum Stacks coinbase height represented in the mempool. + pub fn get_max_coinbase_height(conn: &DBConn) -> Result, db_error> { let sql = "SELECT 1 FROM mempool WHERE height >= 0"; let count = query_rows::(conn, sql, NO_PARAMS)?.len(); if count == 0 { @@ -2713,7 +2710,7 @@ impl MemPoolDB { /// Get the transaction ID list that represents the set of transactions that are represented in /// the bloom counter. pub fn get_bloom_txids(&self) -> Result, db_error> { - let max_height = match MemPoolDB::get_max_height(&self.conn())? { + let max_height = match MemPoolDB::get_max_coinbase_height(&self.conn())? { Some(h) => h, None => { // mempool is empty @@ -2738,10 +2735,10 @@ impl MemPoolDB { }) } - /// How many recent transactions are there -- i.e. within BLOOM_COUNTER_DEPTH block heights of + /// How many recent transactions are there -- i.e. within BLOOM_COUNTER_DEPTH coinbase heights of /// the chain tip? pub fn get_num_recent_txs(conn: &DBConn) -> Result { - let max_height = match MemPoolDB::get_max_height(conn)? { + let max_height = match MemPoolDB::get_max_coinbase_height(conn)? { Some(h) => h, None => { // mempool is empty @@ -2778,7 +2775,7 @@ impl MemPoolDB { pub fn find_next_missing_transactions( &self, data: &MemPoolSyncData, - height: u64, + coinbase_height: u64, last_randomized_txid: &Txid, max_txs: u64, max_run: u64, @@ -2786,7 +2783,7 @@ impl MemPoolDB { Self::static_find_next_missing_transactions( self.conn(), data, - height, + coinbase_height, last_randomized_txid, max_txs, max_run, @@ -2803,7 +2800,7 @@ impl MemPoolDB { pub fn static_find_next_missing_transactions( conn: &DBConn, data: &MemPoolSyncData, - height: u64, + coinbase_height: u64, last_randomized_txid: &Txid, max_txs: u64, max_run: u64, @@ -2820,7 +2817,7 @@ impl MemPoolDB { let args = params![ last_randomized_txid, - u64_to_sql(height.saturating_sub(BLOOM_COUNTER_DEPTH as u64))?, + u64_to_sql(coinbase_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64))?, u64_to_sql(max_run)?, ]; diff --git a/stackslib/src/core/tests/mod.rs b/stackslib/src/core/tests/mod.rs index 8e70f4dcbd..158feeeba5 100644 --- a/stackslib/src/core/tests/mod.rs +++ b/stackslib/src/core/tests/mod.rs @@ -240,6 +240,7 @@ fn mempool_walk_over_fork() { &mut chainstate, &block.0, &block.1, + true, txid, tx_bytes, tx_fee, @@ -275,7 +276,6 @@ fn mempool_walk_over_fork() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -314,7 +314,6 @@ fn mempool_walk_over_fork() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -352,7 +351,6 @@ fn mempool_walk_over_fork() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 3, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -395,7 +393,6 @@ fn mempool_walk_over_fork() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -436,7 +433,6 @@ fn mempool_walk_over_fork() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 3, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -498,6 +494,7 @@ fn mempool_walk_over_fork() { &mut chainstate, &block.0, &block.1, + true, txid, tx_bytes, tx_fee, @@ -551,6 +548,7 @@ fn mempool_walk_over_fork() { &mut chainstate, &block.0, &block.1, + true, txid, tx_bytes, tx_fee, @@ -630,6 +628,7 @@ fn test_iterate_candidates_consider_no_estimate_tx_prob() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -672,7 +671,6 @@ fn test_iterate_candidates_consider_no_estimate_tx_prob() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -710,7 +708,6 @@ fn test_iterate_candidates_consider_no_estimate_tx_prob() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -748,7 +745,6 @@ fn test_iterate_candidates_consider_no_estimate_tx_prob() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -826,6 +822,7 @@ fn test_iterate_candidates_skipped_transaction() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -850,7 +847,6 @@ fn test_iterate_candidates_skipped_transaction() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -939,6 +935,7 @@ fn test_iterate_candidates_processing_error_transaction() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -963,7 +960,6 @@ fn test_iterate_candidates_processing_error_transaction() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -1054,6 +1050,7 @@ fn test_iterate_candidates_problematic_transaction() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -1078,7 +1075,6 @@ fn test_iterate_candidates_problematic_transaction() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -1183,6 +1179,7 @@ fn test_iterate_candidates_concurrent_write_lock() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -1239,7 +1236,6 @@ fn test_iterate_candidates_concurrent_write_lock() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -1342,6 +1338,7 @@ fn mempool_do_not_replace_tx() { &mut chainstate, &b_1.0, &b_1.1, + true, txid, tx_bytes, tx_fee, @@ -1370,6 +1367,7 @@ fn mempool_do_not_replace_tx() { &mut chainstate, &b_2.0, &b_2.1, + true, txid, tx_bytes, tx_fee, @@ -1446,6 +1444,7 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) &mut chainstate, &ConsensusHash([0x1; 20]), &BlockHeaderHash([0x2; 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1471,12 +1470,15 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) assert_eq!(tx_info.metadata.origin_nonce, origin_nonce); assert_eq!(tx_info.metadata.sponsor_address, sponsor_address); assert_eq!(tx_info.metadata.sponsor_nonce, sponsor_nonce); - assert_eq!(tx_info.metadata.consensus_hash, ConsensusHash([0x1; 20])); assert_eq!( - tx_info.metadata.block_header_hash, + tx_info.metadata.tenure_consensus_hash, + ConsensusHash([0x1; 20]) + ); + assert_eq!( + tx_info.metadata.tenure_block_header_hash, BlockHeaderHash([0x2; 32]) ); - assert_eq!(tx_info.metadata.block_height, height); + assert_eq!(tx_info.metadata.coinbase_height, height); // test replace-by-fee with a higher fee let old_txid = txid; @@ -1503,6 +1505,7 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) &mut chainstate, &ConsensusHash([0x1; 20]), &BlockHeaderHash([0x2; 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1539,12 +1542,15 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) assert_eq!(tx_info.metadata.origin_nonce, origin_nonce); assert_eq!(tx_info.metadata.sponsor_address, sponsor_address); assert_eq!(tx_info.metadata.sponsor_nonce, sponsor_nonce); - assert_eq!(tx_info.metadata.consensus_hash, ConsensusHash([0x1; 20])); assert_eq!( - tx_info.metadata.block_header_hash, + tx_info.metadata.tenure_consensus_hash, + ConsensusHash([0x1; 20]) + ); + assert_eq!( + tx_info.metadata.tenure_block_header_hash, BlockHeaderHash([0x2; 32]) ); - assert_eq!(tx_info.metadata.block_height, height); + assert_eq!(tx_info.metadata.coinbase_height, height); // test replace-by-fee with a lower fee let old_txid = txid; @@ -1563,6 +1569,7 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) &mut chainstate, &ConsensusHash([0x1; 20]), &BlockHeaderHash([0x2; 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1622,7 +1629,7 @@ fn mempool_db_load_store_replace_tx(#[case] behavior: MempoolCollectionBehavior) let mut mempool_tx = mempool.tx_begin().unwrap(); match behavior { MempoolCollectionBehavior::ByStacksHeight => { - MemPoolDB::garbage_collect_by_height(&mut mempool_tx, 101, None) + MemPoolDB::garbage_collect_by_coinbase_height(&mut mempool_tx, 101, None) } MempoolCollectionBehavior::ByReceiveTime => { let test_max_age = Duration::from_secs(1); @@ -1712,6 +1719,7 @@ fn mempool_db_test_rbf() { &mut chainstate, &ConsensusHash([0x1; 20]), &BlockHeaderHash([0x2; 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1761,6 +1769,7 @@ fn mempool_db_test_rbf() { &mut chainstate, &ConsensusHash([0x1; 20]), &BlockHeaderHash([0x2; 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1843,6 +1852,7 @@ fn test_add_txs_bloom_filter() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -1953,6 +1963,7 @@ fn test_txtags() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid, tx_bytes, tx_fee, @@ -2046,6 +2057,7 @@ fn test_make_mempool_sync_data() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid.clone(), tx_bytes, tx_fee, @@ -2084,7 +2096,7 @@ fn test_make_mempool_sync_data() { let recent_txids = mempool.get_bloom_txids().unwrap(); assert!(recent_txids.len() <= MAX_BLOOM_COUNTER_TXS as usize); - let max_height = MemPoolDB::get_max_height(mempool.conn()) + let max_height = MemPoolDB::get_max_coinbase_height(mempool.conn()) .unwrap() .unwrap_or(0); eprintln!( @@ -2223,6 +2235,7 @@ fn test_find_next_missing_transactions() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid.clone(), tx_bytes, tx_fee, @@ -2492,6 +2505,7 @@ fn test_drop_and_blacklist_txs_by_time() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid.clone(), tx_bytes, tx_fee, @@ -2611,6 +2625,7 @@ fn test_drop_and_blacklist_txs_by_size() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid.clone(), tx_bytes, tx_fee, @@ -2728,6 +2743,7 @@ fn test_filter_txs_by_type() { &mut chainstate, &b_2.0, &b_2.1, + true, txid.clone(), tx_bytes, tx_fee, @@ -2763,7 +2779,6 @@ fn test_filter_txs_by_type() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; @@ -2799,7 +2814,6 @@ fn test_filter_txs_by_type() { .iterate_candidates::<_, ChainstateError, _>( clarity_conn, &mut tx_events, - 2, mempool_settings.clone(), |_, available_tx, _| { count_txs += 1; diff --git a/stackslib/src/net/api/postmempoolquery.rs b/stackslib/src/net/api/postmempoolquery.rs index 1e8caa1843..2155863220 100644 --- a/stackslib/src/net/api/postmempoolquery.rs +++ b/stackslib/src/net/api/postmempoolquery.rs @@ -29,6 +29,7 @@ use url::form_urlencoded; use {serde, serde_json}; use crate::burnchains::Txid; +use crate::chainstate::nakamoto::NakamotoChainState; use crate::chainstate::stacks::db::StacksChainState; use crate::chainstate::stacks::{Error as ChainError, StacksTransaction}; use crate::core::mempool::{decode_tx_stream, MemPoolDB, MemPoolSyncData}; @@ -89,8 +90,8 @@ pub struct StacksMemPoolStream { pub num_txs: u64, /// maximum we can visit in the query pub max_txs: u64, - /// height of the chain at time of query - pub height: u64, + /// coinbase height of the chain at time of query + pub coinbase_height: u64, /// Are we done sending transactions, and are now in the process of sending the trailing page /// ID? pub corked: bool, @@ -105,7 +106,7 @@ impl StacksMemPoolStream { mempool_db: DBConn, tx_query: MemPoolSyncData, max_txs: u64, - height: u64, + coinbase_height: u64, page_id_opt: Option, ) -> Self { let last_randomized_txid = page_id_opt.unwrap_or_else(|| { @@ -118,7 +119,7 @@ impl StacksMemPoolStream { last_randomized_txid: last_randomized_txid, num_txs: 0, max_txs: max_txs, - height: height, + coinbase_height, corked: false, finished: false, mempool_db, @@ -159,7 +160,7 @@ impl HttpChunkGenerator for StacksMemPoolStream { MemPoolDB::static_find_next_missing_transactions( &self.mempool_db, &self.tx_query, - self.height, + self.coinbase_height, &self.last_randomized_txid, 1, remaining, @@ -275,12 +276,18 @@ impl RPCRequestHandler for RPCMempoolQueryRequestHandler { let page_id = self.page_id.take(); let stream_res = node.with_node_state(|network, sortdb, chainstate, mempool, _rpc_args| { - let height = self.get_stacks_chain_tip(&preamble, sortdb, chainstate).map(|hdr| hdr.anchored_header.height()).unwrap_or(0); + let header = self.get_stacks_chain_tip(&preamble, sortdb, chainstate) + .map_err(|e| StacksHttpResponse::new_error(&preamble, &HttpServerError::new(format!("Failed to load chain tip: {:?}", &e))))?; + + let coinbase_height = NakamotoChainState::get_coinbase_height(&mut chainstate.index_conn(), &header.index_block_hash()) + .map_err(|e| StacksHttpResponse::new_error(&preamble, &HttpServerError::new(format!("Failed to load coinbase height: {:?}", &e))))? + .unwrap_or(0); + let max_txs = network.connection_opts.mempool_max_tx_query; debug!( "Begin mempool query"; "page_id" => %page_id.map(|txid| format!("{}", &txid)).unwrap_or("(none".to_string()), - "block_height" => height, + "coinbase_height" => coinbase_height, "max_txs" => max_txs ); @@ -291,7 +298,7 @@ impl RPCRequestHandler for RPCMempoolQueryRequestHandler { } }; - Ok(StacksMemPoolStream::new(mempool_db, mempool_query, max_txs, height, page_id)) + Ok(StacksMemPoolStream::new(mempool_db, mempool_query, max_txs, coinbase_height, page_id)) }); let stream = match stream_res { diff --git a/stackslib/src/net/api/tests/mod.rs b/stackslib/src/net/api/tests/mod.rs index fffce02e1a..f0a537d045 100644 --- a/stackslib/src/net/api/tests/mod.rs +++ b/stackslib/src/net/api/tests/mod.rs @@ -600,6 +600,7 @@ impl<'a> TestRPC<'a> { peer_1.chainstate(), &consensus_hash, &stacks_block.block_hash(), + true, txid.clone(), tx_bytes, tx_fee, diff --git a/stackslib/src/net/api/tests/postmempoolquery.rs b/stackslib/src/net/api/tests/postmempoolquery.rs index b669beb2e4..6954024844 100644 --- a/stackslib/src/net/api/tests/postmempoolquery.rs +++ b/stackslib/src/net/api/tests/postmempoolquery.rs @@ -174,6 +174,7 @@ fn test_stream_mempool_txs() { &mut chainstate, &ConsensusHash([0x1 + (block_height as u8); 20]), &BlockHeaderHash([0x2 + (block_height as u8); 32]), + false, // don't resolve the above chain tip since it doesn't exist txid.clone(), tx_bytes, tx_fee, diff --git a/stackslib/src/net/mempool/mod.rs b/stackslib/src/net/mempool/mod.rs new file mode 100644 index 0000000000..2a4232ad2f --- /dev/null +++ b/stackslib/src/net/mempool/mod.rs @@ -0,0 +1,620 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::net::SocketAddr; + +use rand::prelude::*; +use rand::thread_rng; +use stacks_common::types::net::{PeerAddress, PeerHost}; +use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs}; +use url; + +use crate::burnchains::Txid; +use crate::chainstate::stacks::StacksTransaction; +use crate::core::MemPoolDB; +use crate::net::chat::ConversationP2P; +use crate::net::dns::{DNSClient, DNSRequest}; +use crate::net::httpcore::StacksHttpRequest; +use crate::net::inv::inv2x::*; +use crate::net::p2p::PeerNetwork; +use crate::net::{Error as NetError, HttpRequestContents}; +use crate::util_lib::strings::UrlString; + +/// The four states the mempool sync state machine can be in +#[derive(Debug, Clone, PartialEq)] +pub enum MempoolSyncState { + /// Picking an outbound peer + PickOutboundPeer, + /// Resolving its data URL to a SocketAddr. Contains the data URL, DNS request handle, and + /// mempool page ID + ResolveURL(UrlString, DNSRequest, Txid), + /// Sending the request for mempool transactions. Contains the data URL, resolved socket, and + /// mempool page. + SendQuery(UrlString, SocketAddr, Txid), + /// Receiving the mempool response. Contains the URL, socket address, and event ID + RecvResponse(UrlString, SocketAddr, usize), +} + +/// Mempool synchronization state machine +#[derive(Debug, Clone, PartialEq)] +pub struct MempoolSync { + /// what state are we in? + mempool_state: MempoolSyncState, + /// when's the next mempool sync start? + mempool_sync_deadline: u64, + /// how long can the sync go for? + mempool_sync_timeout: u64, + /// how many complete syncs have happened + mempool_sync_completions: u64, + /// how many txs have been sync'ed? + pub(crate) mempool_sync_txs: u64, + /// what's the API endpoint? + api_endpoint: String, +} + +impl MempoolSync { + pub fn new() -> Self { + Self { + mempool_state: MempoolSyncState::PickOutboundPeer, + mempool_sync_deadline: 0, + mempool_sync_timeout: 0, + mempool_sync_completions: 0, + mempool_sync_txs: 0, + api_endpoint: "/v2/mempool/query".to_string(), + } + } + + /// Do a mempool sync. Return any transactions we might receive. + #[cfg_attr(test, mutants::skip)] + pub fn run( + &mut self, + network: &mut PeerNetwork, + dns_client_opt: &mut Option<&mut DNSClient>, + mempool: &MemPoolDB, + ibd: bool, + ) -> Option> { + if ibd { + return None; + } + + return match self.do_mempool_sync(network, dns_client_opt, mempool) { + (true, txs_opt) => { + // did we run to completion? + if let Some(txs) = txs_opt { + debug!( + "{:?}: Mempool sync obtained {} transactions from mempool sync, and done receiving", + &network.get_local_peer(), + txs.len() + ); + + self.mempool_sync_deadline = + get_epoch_time_secs() + network.get_connection_opts().mempool_sync_interval; + self.mempool_sync_completions = self.mempool_sync_completions.saturating_add(1); + self.mempool_sync_txs = self.mempool_sync_txs.saturating_add(txs.len() as u64); + Some(txs) + } else { + None + } + } + (false, txs_opt) => { + // did we get some transactions, but have more to get? + if let Some(txs) = txs_opt { + debug!( + "{:?}: Mempool sync obtained {} transactions from mempool sync, but have more", + &network.get_local_peer(), + txs.len() + ); + + self.mempool_sync_txs = self.mempool_sync_txs.saturating_add(txs.len() as u64); + Some(txs) + } else { + None + } + } + }; + } + + /// Reset a mempool sync + fn mempool_sync_reset(&mut self) { + self.mempool_state = MempoolSyncState::PickOutboundPeer; + self.mempool_sync_timeout = 0; + } + + /// Pick a peer to mempool sync with. + /// Returns Ok(None) if we're done syncing the mempool. + /// Returns Ok(Some(..)) if we're not done, and can proceed + /// Returns the new sync state -- either ResolveURL if we need to resolve a data URL, + /// or SendQuery if we got the IP address and can just issue the query. + #[cfg_attr(test, mutants::skip)] + fn mempool_sync_pick_outbound_peer( + &mut self, + network: &mut PeerNetwork, + dns_client_opt: &mut Option<&mut DNSClient>, + page_id: &Txid, + ) -> Result, NetError> { + let num_peers = network.get_num_p2p_convos(); + if num_peers == 0 { + debug!("No peers connected; cannot do mempool sync"); + return Ok(None); + } + + let mut idx = thread_rng().gen::() % num_peers; + let mut mempool_sync_data_url = None; + let mut mempool_sync_data_url_and_sockaddr = None; + for _ in 0..num_peers { + let Some((_event_id, convo)) = network.iter_peer_convos().skip(idx).next() else { + idx = 0; + continue; + }; + idx = (idx + 1) % num_peers; + + // only talk to authenticated, outbound peers + if !convo.is_authenticated() || !convo.is_outbound() { + continue; + } + // peer must support mempool protocol + if !ConversationP2P::supports_mempool_query(convo.peer_services) { + continue; + } + // has a data URL? + if convo.data_url.len() == 0 { + continue; + } + // already resolved? + if let Some(sockaddr) = convo.data_ip.as_ref() { + mempool_sync_data_url_and_sockaddr = + Some((convo.data_url.clone(), sockaddr.clone())); + break; + } + // can we resolve the data URL? + let url = convo.data_url.clone(); + if dns_client_opt.is_none() { + if let Ok(Some(_)) = PeerNetwork::try_get_url_ip(&url) { + } else { + // need a DNS client for this one + continue; + } + } + + // will resolve + mempool_sync_data_url = Some(url); + break; + } + + if let Some((url_str, sockaddr)) = mempool_sync_data_url_and_sockaddr { + // already resolved + return Ok(Some(MempoolSyncState::SendQuery( + url_str, + sockaddr, + page_id.clone(), + ))); + } else if let Some(url) = mempool_sync_data_url { + // will need to resolve + self.mempool_sync_begin_resolve_data_url(network, url, dns_client_opt, page_id) + } else { + debug!("No peer has a data URL, so no mempool sync can happen"); + Ok(None) + } + } + + /// Begin resolving the DNS host of a data URL for mempool sync. + /// Returns Ok(None) if we're done syncing the mempool. + /// Returns Ok(Some(..)) if we're not done, and can proceed + /// Returns the new sync state -- either ResolveURL if we need to resolve a data URL, + /// or SendQuery if we got the IP address and can just issue the query. + #[cfg_attr(test, mutants::skip)] + fn mempool_sync_begin_resolve_data_url( + &self, + network: &PeerNetwork, + url_str: UrlString, + dns_client_opt: &mut Option<&mut DNSClient>, + page_id: &Txid, + ) -> Result, NetError> { + // start resolving + let url = url_str.parse_to_block_url()?; + let port = match url.port_or_known_default() { + Some(p) => p, + None => { + warn!("Unsupported URL {:?}: unknown port", &url); + return Ok(None); + } + }; + + // bare IP address? + if let Some(addr) = PeerNetwork::try_get_url_ip(&url_str)? { + return Ok(Some(MempoolSyncState::SendQuery( + url_str, + addr, + page_id.clone(), + ))); + } else if let Some(url::Host::Domain(domain)) = url.host() { + if let Some(ref mut dns_client) = dns_client_opt { + // begin DNS query + match dns_client.queue_lookup( + domain, + port, + get_epoch_time_ms() + network.get_connection_opts().dns_timeout, + ) { + Ok(_) => {} + Err(_) => { + warn!("Failed to queue DNS lookup on {}", &url_str); + return Ok(None); + } + } + return Ok(Some(MempoolSyncState::ResolveURL( + url_str, + DNSRequest::new(domain.to_string(), port, 0), + page_id.clone(), + ))); + } else { + // can't proceed -- no DNS client + return Ok(None); + } + } else { + // can't proceed + return Ok(None); + } + } + + /// Resolve our picked mempool sync peer's data URL. + /// Returns Ok(true, ..) if we're done syncing the mempool. + /// Returns Ok(false, ..) if there's more to do + /// Returns the socket addr if we ever succeed in resolving it. + #[cfg_attr(test, mutants::skip)] + fn mempool_sync_resolve_data_url( + url_str: &UrlString, + request: &DNSRequest, + dns_client_opt: &mut Option<&mut DNSClient>, + ) -> Result<(bool, Option), NetError> { + if let Ok(Some(addr)) = PeerNetwork::try_get_url_ip(url_str) { + // URL contains an IP address -- go with that + Ok((false, Some(addr))) + } else if let Some(dns_client) = dns_client_opt { + // keep trying to resolve + match dns_client.poll_lookup(&request.host, request.port) { + Ok(Some(dns_response)) => match dns_response.result { + Ok(mut addrs) => { + if let Some(addr) = addrs.pop() { + // resolved! + return Ok((false, Some(addr))); + } else { + warn!("DNS returned no results for {}", url_str); + return Ok((true, None)); + } + } + Err(msg) => { + warn!("DNS failed to look up {:?}: {}", &url_str, msg); + return Ok((true, None)); + } + }, + Ok(None) => { + // still in-flight + return Ok((false, None)); + } + Err(e) => { + warn!("DNS lookup failed on {:?}: {:?}", url_str, &e); + return Ok((true, None)); + } + } + } else { + // can't do anything + debug!("No DNS client, and URL contains a domain, so no mempool sync can happen"); + return Ok((true, None)); + } + } + + /// Ask the remote peer for its mempool, connecting to it in the process if need be. + /// Returns Ok((true, ..)) if we're done mempool syncing + /// Returns Ok((false, ..)) if there's more to do + /// Returns the event ID on success + #[cfg_attr(test, mutants::skip)] + fn mempool_sync_send_query( + &mut self, + network: &mut PeerNetwork, + url: &UrlString, + addr: &SocketAddr, + mempool: &MemPoolDB, + page_id: Txid, + ) -> Result<(bool, Option), NetError> { + let sync_data = mempool.make_mempool_sync_data()?; + let request = StacksHttpRequest::new_for_peer( + PeerHost::from_socketaddr(addr), + "POST".into(), + self.api_endpoint.clone(), + HttpRequestContents::new() + .query_arg("page_id".into(), format!("{}", &page_id)) + .payload_stacks(&sync_data), + )?; + + let event_id = network.connect_or_send_http_request(url.clone(), addr.clone(), request)?; + return Ok((false, Some(event_id))); + } + + /// Receive the mempool sync response. + /// Return Ok(true, ..) if we're done with the mempool sync. + /// Return Ok(false, ..) if we have more work to do. + /// Returns the page ID of the next request to make, and the list of transactions we got + #[cfg_attr(test, mutants::skip)] + fn mempool_sync_recv_response( + &mut self, + network: &mut PeerNetwork, + event_id: usize, + ) -> Result<(bool, Option, Option>), NetError> { + PeerNetwork::with_http(network, |network, http| { + match http.get_conversation(event_id) { + None => { + if http.is_connecting(event_id) { + debug!( + "{:?}: Mempool sync event {} is not connected yet", + &network.local_peer, event_id + ); + return Ok((false, None, None)); + } else { + // conversation died + debug!("{:?}: Mempool sync peer hung up", &network.local_peer); + return Ok((true, None, None)); + } + } + Some(ref mut convo) => { + match convo.try_get_response() { + None => { + // still waiting + debug!( + "{:?}: Mempool sync event {} still waiting for a response", + &network.get_local_peer(), + event_id + ); + return Ok((false, None, None)); + } + Some(http_response) => match http_response.decode_mempool_txs_page() { + Ok((txs, page_id_opt)) => { + debug!("{:?}: Mempool sync received response for {} txs, next page {:?}", &network.local_peer, txs.len(), &page_id_opt); + return Ok((true, page_id_opt, Some(txs))); + } + Err(e) => { + warn!( + "{:?}: Mempool sync request did not receive a txs page: {:?}", + &network.local_peer, &e + ); + return Ok((true, None, None)); + } + }, + } + } + } + }) + } + + /// Do a mempool sync + /// Return true if we're done and can advance to the next state. + /// Returns the transactions as well if the sync ran to completion. + #[cfg_attr(test, mutants::skip)] + fn do_mempool_sync( + &mut self, + network: &mut PeerNetwork, + dns_client_opt: &mut Option<&mut DNSClient>, + mempool: &MemPoolDB, + ) -> (bool, Option>) { + if get_epoch_time_secs() <= self.mempool_sync_deadline { + debug!( + "{:?}: Wait until {} to do a mempool sync", + &network.get_local_peer(), + self.mempool_sync_deadline + ); + return (true, None); + } + + if self.mempool_sync_timeout == 0 { + // begin new sync + self.mempool_sync_timeout = + get_epoch_time_secs() + network.get_connection_opts().mempool_sync_timeout; + } else { + if get_epoch_time_secs() > self.mempool_sync_timeout { + debug!( + "{:?}: Mempool sync took too long; terminating", + &network.get_local_peer() + ); + self.mempool_sync_reset(); + return (true, None); + } + } + + // try advancing states until we get blocked. + // Once we get blocked, return. + loop { + let cur_state = self.mempool_state.clone(); + debug!( + "{:?}: Mempool sync state is {:?}", + &network.get_local_peer(), + &cur_state + ); + match cur_state { + MempoolSyncState::PickOutboundPeer => { + // 1. pick a random outbound conversation. + match self.mempool_sync_pick_outbound_peer( + network, + dns_client_opt, + &Txid([0u8; 32]), + ) { + Ok(Some(next_state)) => { + // success! can advance to either resolve a URL or to send a query + self.mempool_state = next_state; + } + Ok(None) => { + // done + self.mempool_sync_reset(); + return (true, None); + } + Err(e) => { + // done; need reset + warn!("mempool_sync_pick_outbound_peer returned {:?}", &e); + self.mempool_sync_reset(); + return (true, None); + } + } + } + MempoolSyncState::ResolveURL(ref url_str, ref dns_request, ref page_id) => { + // 2. resolve its data URL + match Self::mempool_sync_resolve_data_url(url_str, dns_request, dns_client_opt) + { + Ok((false, Some(addr))) => { + // success! advance + self.mempool_state = + MempoolSyncState::SendQuery(url_str.clone(), addr, page_id.clone()); + } + Ok((false, None)) => { + // try again later + return (false, None); + } + Ok((true, _)) => { + // done + self.mempool_sync_reset(); + return (true, None); + } + Err(e) => { + // failed + warn!( + "mempool_sync_resolve_data_url({}) failed: {:?}", + url_str, &e + ); + self.mempool_sync_reset(); + return (true, None); + } + } + } + MempoolSyncState::SendQuery(ref url, ref addr, ref page_id) => { + // 3. ask for the remote peer's mempool's novel txs + // address must be resolvable + if !network.get_connection_opts().private_neighbors + && PeerAddress::from_socketaddr(&addr).is_in_private_range() + { + debug!( + "{:?}: Mempool sync skips {}, which has private IP", + network.get_local_peer(), + &addr + ); + self.mempool_sync_reset(); + return (true, None); + } + debug!( + "{:?}: Mempool sync will query {} for mempool transactions at {}", + &network.get_local_peer(), + url, + page_id + ); + match self.mempool_sync_send_query(network, url, addr, mempool, page_id.clone()) + { + Ok((false, Some(event_id))) => { + // success! advance + debug!("{:?}: Mempool sync query {} for mempool transactions at {} on event {}", &network.get_local_peer(), url, page_id, event_id); + self.mempool_state = + MempoolSyncState::RecvResponse(url.clone(), addr.clone(), event_id); + } + Ok((false, None)) => { + // try again later + return (false, None); + } + Ok((true, _)) => { + // done + self.mempool_sync_reset(); + return (true, None); + } + Err(e) => { + // done + warn!("mempool_sync_send_query({}) returned {:?}", url, &e); + self.mempool_sync_reset(); + return (true, None); + } + } + } + MempoolSyncState::RecvResponse(ref url, ref addr, ref event_id) => { + match self.mempool_sync_recv_response(network, *event_id) { + Ok((true, next_page_id_opt, Some(txs))) => { + debug!( + "{:?}: Mempool sync received {} transactions; next page is {:?}", + &network.get_local_peer(), + txs.len(), + &next_page_id_opt + ); + + // done! got data + let ret = match next_page_id_opt { + Some(next_page_id) => { + // get the next page + self.mempool_state = MempoolSyncState::SendQuery( + url.clone(), + addr.clone(), + next_page_id, + ); + false + } + None => { + // done + self.mempool_sync_reset(); + true + } + }; + return (ret, Some(txs)); + } + Ok((true, _, None)) => { + // done! did not get data + self.mempool_sync_reset(); + return (true, None); + } + Ok((false, _, None)) => { + // still receiving; try again later + return (false, None); + } + Ok((false, _, Some(_))) => { + // should never happen + if cfg!(test) { + panic!("Reached invalid state in {:?}, aborting...", &cur_state); + } + warn!("Reached invalid state in {:?}, resetting...", &cur_state); + self.mempool_sync_reset(); + return (true, None); + } + Err(e) => { + // likely a network error + warn!("mempool_sync_recv_response returned {:?}", &e); + self.mempool_sync_reset(); + return (true, None); + } + } + } + } + } + } +} + +impl PeerNetwork { + /// Run the internal mempool sync machine + pub fn run_mempool_sync( + &mut self, + dns_client: &mut Option<&mut DNSClient>, + mempool: &MemPoolDB, + ibd: bool, + ) -> Option> { + let Some(mut mempool_sync) = self.mempool_sync.take() else { + return None; + }; + + let res = mempool_sync.run(self, dns_client, mempool, ibd); + + self.mempool_sync = Some(mempool_sync); + res + } +} diff --git a/stackslib/src/net/mod.rs b/stackslib/src/net/mod.rs index b865476fd2..e836bdfec2 100644 --- a/stackslib/src/net/mod.rs +++ b/stackslib/src/net/mod.rs @@ -124,6 +124,7 @@ pub mod http; /// Links http crate to Stacks pub mod httpcore; pub mod inv; +pub mod mempool; pub mod neighbors; pub mod p2p; /// Implements wrapper around `mio` crate, which itself is a wrapper around Linux's `epoll(2)` syscall. diff --git a/stackslib/src/net/p2p.rs b/stackslib/src/net/p2p.rs index ac4bbe28e4..861a6e6cfa 100644 --- a/stackslib/src/net/p2p.rs +++ b/stackslib/src/net/p2p.rs @@ -63,6 +63,7 @@ use crate::net::http::HttpRequestContents; use crate::net::httpcore::StacksHttpRequest; use crate::net::inv::inv2x::*; use crate::net::inv::nakamoto::{InvGenerator, NakamotoInvStateMachine}; +use crate::net::mempool::MempoolSync; use crate::net::neighbors::*; use crate::net::poll::{NetworkPollState, NetworkState}; use crate::net::prune::*; @@ -194,21 +195,6 @@ pub enum PeerNetworkWorkState { Prune, } -/// The four states the mempool sync state machine can be in -#[derive(Debug, Clone, PartialEq)] -pub enum MempoolSyncState { - /// Picking an outbound peer - PickOutboundPeer, - /// Resolving its data URL to a SocketAddr. Contains the data URL, DNS request handle, and - /// mempool page ID - ResolveURL(UrlString, DNSRequest, Txid), - /// Sending the request for mempool transactions. Contains the data URL, resolved socket, and - /// mempool page. - SendQuery(UrlString, SocketAddr, Txid), - /// Receiving the mempool response. Contains the URL, socket address, and event ID - RecvResponse(UrlString, SocketAddr, usize), -} - pub type PeerMap = HashMap; pub type PendingMessages = HashMap>; @@ -348,6 +334,9 @@ pub struct PeerNetwork { pub nakamoto_work_state: PeerNetworkWorkState, pub(crate) have_data_to_download: bool, + /// Mempool sync machine + pub mempool_sync: Option, + // neighbor walk state pub walk: Option>, pub walk_deadline: u64, @@ -389,15 +378,6 @@ pub struct PeerNetwork { // handle to all stacker DB state pub stackerdbs: StackerDBs, - // outstanding request to perform a mempool sync - // * mempool_sync_deadline is when the next mempool sync must start - // * mempool_sync_timeout is when the current mempool sync must stop - mempool_state: MempoolSyncState, - mempool_sync_deadline: u64, - mempool_sync_timeout: u64, - mempool_sync_completions: u64, - mempool_sync_txs: u64, - // how often we pruned a given inbound/outbound peer pub prune_outbound_counts: HashMap, pub prune_inbound_counts: HashMap, @@ -541,6 +521,8 @@ impl PeerNetwork { nakamoto_work_state: PeerNetworkWorkState::GetPublicIP, have_data_to_download: false, + mempool_sync: Some(MempoolSync::new()), + walk: None, walk_deadline: 0, walk_attempts: 0, @@ -565,12 +547,6 @@ impl PeerNetwork { stacker_db_configs: stacker_db_configs, stackerdbs: stackerdbs, - mempool_state: MempoolSyncState::PickOutboundPeer, - mempool_sync_deadline: 0, - mempool_sync_timeout: 0, - mempool_sync_completions: 0, - mempool_sync_txs: 0, - prune_outbound_counts: HashMap::new(), prune_inbound_counts: HashMap::new(), @@ -2624,55 +2600,6 @@ impl PeerNetwork { done } - /// Do a mempool sync. Return any transactions we might receive. - #[cfg_attr(test, mutants::skip)] - fn do_network_mempool_sync( - &mut self, - dns_client_opt: &mut Option<&mut DNSClient>, - mempool: &MemPoolDB, - ibd: bool, - ) -> Option> { - if ibd { - return None; - } - - return match self.do_mempool_sync(dns_client_opt, mempool) { - (true, txs_opt) => { - // did we run to completion? - if let Some(txs) = txs_opt { - debug!( - "{:?}: Mempool sync obtained {} transactions from mempool sync, and done receiving", - &self.local_peer, - txs.len() - ); - - self.mempool_sync_deadline = - get_epoch_time_secs() + self.connection_opts.mempool_sync_interval; - self.mempool_sync_completions = self.mempool_sync_completions.saturating_add(1); - self.mempool_sync_txs = self.mempool_sync_txs.saturating_add(txs.len() as u64); - Some(txs) - } else { - None - } - } - (false, txs_opt) => { - // did we get some transactions, but have more to get? - if let Some(txs) = txs_opt { - debug!( - "{:?}: Mempool sync obtained {} transactions from mempool sync, but have more", - &self.local_peer, - txs.len() - ); - - self.mempool_sync_txs = self.mempool_sync_txs.saturating_add(txs.len() as u64); - Some(txs) - } else { - None - } - } - }; - } - /// Begin the process of learning this peer's public IP address. /// Return Ok(finished with this step) /// Return Err(..) on failure @@ -3602,435 +3529,6 @@ impl PeerNetwork { } } - /// Reset a mempool sync - fn mempool_sync_reset(&mut self) { - self.mempool_state = MempoolSyncState::PickOutboundPeer; - self.mempool_sync_timeout = 0; - } - - /// Pick a peer to mempool sync with. - /// Returns Ok(None) if we're done syncing the mempool. - /// Returns Ok(Some(..)) if we're not done, and can proceed - /// Returns the new sync state -- either ResolveURL if we need to resolve a data URL, - /// or SendQuery if we got the IP address and can just issue the query. - #[cfg_attr(test, mutants::skip)] - fn mempool_sync_pick_outbound_peer( - &mut self, - dns_client_opt: &mut Option<&mut DNSClient>, - page_id: &Txid, - ) -> Result, net_error> { - if self.peers.len() == 0 { - debug!("No peers connected; cannot do mempool sync"); - return Ok(None); - } - - let mut idx = thread_rng().gen::() % self.peers.len(); - let mut mempool_sync_data_url = None; - for _ in 0..self.peers.len() + 1 { - let event_id = match self.peers.keys().skip(idx).next() { - Some(eid) => *eid, - None => { - idx = 0; - continue; - } - }; - idx = (idx + 1) % self.peers.len(); - - if let Some(convo) = self.peers.get(&event_id) { - if !convo.is_authenticated() || !convo.is_outbound() { - continue; - } - if !ConversationP2P::supports_mempool_query(convo.peer_services) { - continue; - } - if convo.data_url.len() == 0 { - continue; - } - let url = convo.data_url.clone(); - if dns_client_opt.is_none() { - if let Ok(Some(_)) = PeerNetwork::try_get_url_ip(&url) { - } else { - // need a DNS client for this one - continue; - } - } - - mempool_sync_data_url = Some(url); - break; - } - } - - if let Some(url) = mempool_sync_data_url { - self.mempool_sync_begin_resolve_data_url(url, dns_client_opt, page_id) - } else { - debug!("No peer has a data URL, so no mempool sync can happen"); - Ok(None) - } - } - - /// Begin resolving the DNS host of a data URL for mempool sync. - /// Returns Ok(None) if we're done syncing the mempool. - /// Returns Ok(Some(..)) if we're not done, and can proceed - /// Returns the new sync state -- either ResolveURL if we need to resolve a data URL, - /// or SendQuery if we got the IP address and can just issue the query. - #[cfg_attr(test, mutants::skip)] - fn mempool_sync_begin_resolve_data_url( - &self, - url_str: UrlString, - dns_client_opt: &mut Option<&mut DNSClient>, - page_id: &Txid, - ) -> Result, net_error> { - // start resolving - let url = url_str.parse_to_block_url()?; - let port = match url.port_or_known_default() { - Some(p) => p, - None => { - warn!("Unsupported URL {:?}: unknown port", &url); - return Ok(None); - } - }; - - // bare IP address? - if let Some(addr) = PeerNetwork::try_get_url_ip(&url_str)? { - return Ok(Some(MempoolSyncState::SendQuery( - url_str, - addr, - page_id.clone(), - ))); - } else if let Some(url::Host::Domain(domain)) = url.host() { - if let Some(ref mut dns_client) = dns_client_opt { - // begin DNS query - match dns_client.queue_lookup( - domain, - port, - get_epoch_time_ms() + self.connection_opts.dns_timeout, - ) { - Ok(_) => {} - Err(_) => { - warn!("Failed to queue DNS lookup on {}", &url_str); - return Ok(None); - } - } - return Ok(Some(MempoolSyncState::ResolveURL( - url_str, - DNSRequest::new(domain.to_string(), port, 0), - page_id.clone(), - ))); - } else { - // can't proceed -- no DNS client - return Ok(None); - } - } else { - // can't proceed - return Ok(None); - } - } - - /// Resolve our picked mempool sync peer's data URL. - /// Returns Ok(true, ..) if we're done syncing the mempool. - /// Returns Ok(false, ..) if there's more to do - /// Returns the socket addr if we ever succeed in resolving it. - #[cfg_attr(test, mutants::skip)] - fn mempool_sync_resolve_data_url( - &mut self, - url_str: &UrlString, - request: &DNSRequest, - dns_client_opt: &mut Option<&mut DNSClient>, - ) -> Result<(bool, Option), net_error> { - if let Ok(Some(addr)) = PeerNetwork::try_get_url_ip(url_str) { - // URL contains an IP address -- go with that - Ok((false, Some(addr))) - } else if let Some(dns_client) = dns_client_opt { - // keep trying to resolve - match dns_client.poll_lookup(&request.host, request.port) { - Ok(Some(dns_response)) => match dns_response.result { - Ok(mut addrs) => { - if let Some(addr) = addrs.pop() { - // resolved! - return Ok((false, Some(addr))); - } else { - warn!("DNS returned no results for {}", url_str); - return Ok((true, None)); - } - } - Err(msg) => { - warn!("DNS failed to look up {:?}: {}", &url_str, msg); - return Ok((true, None)); - } - }, - Ok(None) => { - // still in-flight - return Ok((false, None)); - } - Err(e) => { - warn!("DNS lookup failed on {:?}: {:?}", url_str, &e); - return Ok((true, None)); - } - } - } else { - // can't do anything - debug!("No DNS client, and URL contains a domain, so no mempool sync can happen"); - return Ok((true, None)); - } - } - - /// Ask the remote peer for its mempool, connecting to it in the process if need be. - /// Returns Ok((true, ..)) if we're done mempool syncing - /// Returns Ok((false, ..)) if there's more to do - /// Returns the event ID on success - #[cfg_attr(test, mutants::skip)] - fn mempool_sync_send_query( - &mut self, - url: &UrlString, - addr: &SocketAddr, - mempool: &MemPoolDB, - page_id: Txid, - ) -> Result<(bool, Option), net_error> { - let sync_data = mempool.make_mempool_sync_data()?; - let request = StacksHttpRequest::new_for_peer( - PeerHost::from_socketaddr(addr), - "POST".into(), - "/v2/mempool/query".into(), - HttpRequestContents::new() - .query_arg("page_id".into(), format!("{}", &page_id)) - .payload_stacks(&sync_data), - )?; - - let event_id = self.connect_or_send_http_request(url.clone(), addr.clone(), request)?; - return Ok((false, Some(event_id))); - } - - /// Receive the mempool sync response. - /// Return Ok(true, ..) if we're done with the mempool sync. - /// Return Ok(false, ..) if we have more work to do. - /// Returns the page ID of the next request to make, and the list of transactions we got - #[cfg_attr(test, mutants::skip)] - fn mempool_sync_recv_response( - &mut self, - event_id: usize, - ) -> Result<(bool, Option, Option>), net_error> { - PeerNetwork::with_http(self, |network, http| { - match http.get_conversation(event_id) { - None => { - if http.is_connecting(event_id) { - debug!( - "{:?}: Mempool sync event {} is not connected yet", - &network.local_peer, event_id - ); - return Ok((false, None, None)); - } else { - // conversation died - debug!("{:?}: Mempool sync peer hung up", &network.local_peer); - return Ok((true, None, None)); - } - } - Some(ref mut convo) => { - match convo.try_get_response() { - None => { - // still waiting - debug!( - "{:?}: Mempool sync event {} still waiting for a response", - &network.local_peer, event_id - ); - return Ok((false, None, None)); - } - Some(http_response) => match http_response.decode_mempool_txs_page() { - Ok((txs, page_id_opt)) => { - debug!("{:?}: Mempool sync received response for {} txs, next page {:?}", &network.local_peer, txs.len(), &page_id_opt); - return Ok((true, page_id_opt, Some(txs))); - } - Err(e) => { - warn!( - "{:?}: Mempool sync request did not receive a txs page: {:?}", - &network.local_peer, &e - ); - return Ok((true, None, None)); - } - }, - } - } - } - }) - } - - /// Do a mempool sync - /// Return true if we're done and can advance to the next state. - /// Returns the transactions as well if the sync ran to completion. - #[cfg_attr(test, mutants::skip)] - fn do_mempool_sync( - &mut self, - dns_client_opt: &mut Option<&mut DNSClient>, - mempool: &MemPoolDB, - ) -> (bool, Option>) { - if get_epoch_time_secs() <= self.mempool_sync_deadline { - debug!( - "{:?}: Wait until {} to do a mempool sync", - &self.local_peer, self.mempool_sync_deadline - ); - return (true, None); - } - - if self.mempool_sync_timeout == 0 { - // begin new sync - self.mempool_sync_timeout = - get_epoch_time_secs() + self.connection_opts.mempool_sync_timeout; - } else { - if get_epoch_time_secs() > self.mempool_sync_timeout { - debug!( - "{:?}: Mempool sync took too long; terminating", - &self.local_peer - ); - self.mempool_sync_reset(); - return (true, None); - } - } - - // try advancing states until we get blocked. - // Once we get blocked, return. - loop { - let cur_state = self.mempool_state.clone(); - debug!( - "{:?}: Mempool sync state is {:?}", - &self.local_peer, &cur_state - ); - match cur_state { - MempoolSyncState::PickOutboundPeer => { - // 1. pick a random outbound conversation. - match self.mempool_sync_pick_outbound_peer(dns_client_opt, &Txid([0u8; 32])) { - Ok(Some(next_state)) => { - // success! can advance to either resolve a URL or to send a query - self.mempool_state = next_state; - } - Ok(None) => { - // done - self.mempool_sync_reset(); - return (true, None); - } - Err(e) => { - // done; need reset - warn!("mempool_sync_pick_outbound_peer returned {:?}", &e); - self.mempool_sync_reset(); - return (true, None); - } - } - } - MempoolSyncState::ResolveURL(ref url_str, ref dns_request, ref page_id) => { - // 2. resolve its data URL - match self.mempool_sync_resolve_data_url(url_str, dns_request, dns_client_opt) { - Ok((false, Some(addr))) => { - // success! advance - self.mempool_state = - MempoolSyncState::SendQuery(url_str.clone(), addr, page_id.clone()); - } - Ok((false, None)) => { - // try again later - return (false, None); - } - Ok((true, _)) => { - // done - self.mempool_sync_reset(); - return (true, None); - } - Err(e) => { - // failed - warn!( - "mempool_sync_resolve_data_url({}) failed: {:?}", - url_str, &e - ); - self.mempool_sync_reset(); - return (true, None); - } - } - } - MempoolSyncState::SendQuery(ref url, ref addr, ref page_id) => { - // 3. ask for the remote peer's mempool's novel txs - debug!( - "{:?}: Mempool sync will query {} for mempool transactions at {}", - &self.local_peer, url, page_id - ); - match self.mempool_sync_send_query(url, addr, mempool, page_id.clone()) { - Ok((false, Some(event_id))) => { - // success! advance - debug!("{:?}: Mempool sync query {} for mempool transactions at {} on event {}", &self.local_peer, url, page_id, event_id); - self.mempool_state = - MempoolSyncState::RecvResponse(url.clone(), addr.clone(), event_id); - } - Ok((false, None)) => { - // try again later - return (false, None); - } - Ok((true, _)) => { - // done - self.mempool_sync_reset(); - return (true, None); - } - Err(e) => { - // done - warn!("mempool_sync_send_query({}) returned {:?}", url, &e); - self.mempool_sync_reset(); - return (true, None); - } - } - } - MempoolSyncState::RecvResponse(ref url, ref addr, ref event_id) => { - match self.mempool_sync_recv_response(*event_id) { - Ok((true, next_page_id_opt, Some(txs))) => { - debug!( - "{:?}: Mempool sync received {} transactions; next page is {:?}", - &self.local_peer, - txs.len(), - &next_page_id_opt - ); - - // done! got data - let ret = match next_page_id_opt { - Some(next_page_id) => { - // get the next page - self.mempool_state = MempoolSyncState::SendQuery( - url.clone(), - addr.clone(), - next_page_id, - ); - false - } - None => { - // done - self.mempool_sync_reset(); - true - } - }; - return (ret, Some(txs)); - } - Ok((true, _, None)) => { - // done! did not get data - self.mempool_sync_reset(); - return (true, None); - } - Ok((false, _, None)) => { - // still receiving; try again later - return (false, None); - } - Ok((false, _, Some(_))) => { - // should never happen - if cfg!(test) { - panic!("Reached invalid state in {:?}, aborting...", &cur_state); - } - warn!("Reached invalid state in {:?}, resetting...", &cur_state); - self.mempool_sync_reset(); - return (true, None); - } - Err(e) => { - // likely a network error - warn!("mempool_sync_recv_response returned {:?}", &e); - self.mempool_sync_reset(); - return (true, None); - } - } - } - } - } - } - /// Do the actual work in the state machine. /// Return true if we need to prune connections. /// This will call the epoch-appropriate network worker @@ -5191,7 +4689,7 @@ impl PeerNetwork { // In parallel, do a mempool sync. // Remember any txs we get, so we can feed them to the relayer thread. - if let Some(mut txs) = self.do_network_mempool_sync(&mut dns_client_opt, mempool, ibd) { + if let Some(mut txs) = self.run_mempool_sync(&mut dns_client_opt, mempool, ibd) { network_result.synced_transactions.append(&mut txs); } @@ -5950,847 +5448,6 @@ mod test { }) } - #[test] - fn test_mempool_sync_2_peers() { - // peer 1 gets some transactions; verify peer 2 gets the recent ones and not the old - // ones - let mut peer_1_config = TestPeerConfig::new(function_name!(), 2210, 2211); - let mut peer_2_config = TestPeerConfig::new(function_name!(), 2212, 2213); - - peer_1_config.add_neighbor(&peer_2_config.to_neighbor()); - peer_2_config.add_neighbor(&peer_1_config.to_neighbor()); - - peer_1_config.connection_opts.mempool_sync_interval = 1; - peer_2_config.connection_opts.mempool_sync_interval = 1; - - let num_txs = 10; - let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); - let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); - let initial_balances: Vec<_> = addrs - .iter() - .map(|a| (a.to_account_principal(), 1000000000)) - .collect(); - - peer_1_config.initial_balances = initial_balances.clone(); - peer_2_config.initial_balances = initial_balances.clone(); - - let mut peer_1 = TestPeer::new(peer_1_config); - let mut peer_2 = TestPeer::new(peer_2_config); - - let num_blocks = 10; - let first_stacks_block_height = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - for i in 0..(num_blocks / 2) { - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - - peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - } - - let addr = StacksAddress { - version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, - bytes: Hash160([0xff; 20]), - }; - - // old transactions - let num_txs = 10; - let mut old_txs = HashMap::new(); - let mut peer_1_mempool = peer_1.mempool.take().unwrap(); - let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); - for i in 0..num_txs { - let pk = &pks[i]; - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(0); - - let mut tx_signer = StacksTransactionSigner::new(&tx); - tx_signer.sign_origin(&pk).unwrap(); - - let tx = tx_signer.get_tx().unwrap(); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - old_txs.insert(tx.txid(), tx.clone()); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - peer_1.chainstate(), - &ConsensusHash([0x1 + (num_blocks as u8); 20]), - &BlockHeaderHash([0x2 + (num_blocks as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - (num_blocks / 2) as u64, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - } - mempool_tx.commit().unwrap(); - peer_1.mempool = Some(peer_1_mempool); - - // keep mining to make these txs old - for i in (num_blocks / 2)..num_blocks { - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - - peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - } - - let num_burn_blocks = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - let mut txs = HashMap::new(); - let mut peer_1_mempool = peer_1.mempool.take().unwrap(); - let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); - for i in 0..num_txs { - let pk = &pks[i]; - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(1); - - let mut tx_signer = StacksTransactionSigner::new(&tx); - tx_signer.sign_origin(&pk).unwrap(); - - let tx = tx_signer.get_tx().unwrap(); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - txs.insert(tx.txid(), tx.clone()); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - peer_1.chainstate(), - &ConsensusHash([0x1 + (num_blocks as u8); 20]), - &BlockHeaderHash([0x2 + (num_blocks as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - num_blocks as u64, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - } - mempool_tx.commit().unwrap(); - peer_1.mempool = Some(peer_1_mempool); - - let mut round = 0; - let mut peer_1_mempool_txs = 0; - let mut peer_2_mempool_txs = 0; - - while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs { - if let Ok(mut result) = peer_1.step_with_ibd(false) { - let lp = peer_1.network.local_peer.clone(); - let burnchain = peer_1.network.burnchain.clone(); - peer_1 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - if let Ok(mut result) = peer_2.step_with_ibd(false) { - let lp = peer_2.network.local_peer.clone(); - let burnchain = peer_2.network.burnchain.clone(); - peer_2 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - round += 1; - - let mp = peer_1.mempool.take().unwrap(); - peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_1.mempool.replace(mp); - - let mp = peer_2.mempool.take().unwrap(); - peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_2.mempool.replace(mp); - - info!( - "Peer 1: {}, Peer 2: {}", - peer_1_mempool_txs, peer_2_mempool_txs - ); - } - - info!("Completed mempool sync in {} step(s)", round); - - let mp = peer_2.mempool.take().unwrap(); - let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); - peer_2.mempool.replace(mp); - - // peer 2 has all the recent txs - // peer 2 has none of the old ones - for tx in peer_2_mempool_txs { - assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); - assert!(old_txs.get(&tx.tx.txid()).is_none()); - } - } - - #[test] - fn test_mempool_sync_2_peers_paginated() { - // peer 1 gets some transactions; verify peer 2 gets them all - let mut peer_1_config = TestPeerConfig::new(function_name!(), 2214, 2215); - let mut peer_2_config = TestPeerConfig::new(function_name!(), 2216, 2217); - - peer_1_config.add_neighbor(&peer_2_config.to_neighbor()); - peer_2_config.add_neighbor(&peer_1_config.to_neighbor()); - - peer_1_config.connection_opts.mempool_sync_interval = 1; - peer_2_config.connection_opts.mempool_sync_interval = 1; - - let num_txs = 1024; - let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); - let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); - let initial_balances: Vec<_> = addrs - .iter() - .map(|a| (a.to_account_principal(), 1000000000)) - .collect(); - - peer_1_config.initial_balances = initial_balances.clone(); - peer_2_config.initial_balances = initial_balances.clone(); - - let mut peer_1 = TestPeer::new(peer_1_config); - let mut peer_2 = TestPeer::new(peer_2_config); - - let num_blocks = 10; - let first_stacks_block_height = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - for i in 0..num_blocks { - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - - peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - } - - let addr = StacksAddress { - version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, - bytes: Hash160([0xff; 20]), - }; - - // fill peer 1 with lots of transactions - let mut txs = HashMap::new(); - let mut peer_1_mempool = peer_1.mempool.take().unwrap(); - let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); - for i in 0..num_txs { - let pk = &pks[i]; - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(0); - - let mut tx_signer = StacksTransactionSigner::new(&tx); - tx_signer.sign_origin(&pk).unwrap(); - - let tx = tx_signer.get_tx().unwrap(); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - txs.insert(tx.txid(), tx.clone()); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - peer_1.chainstate(), - &ConsensusHash([0x1 + (num_blocks as u8); 20]), - &BlockHeaderHash([0x2 + (num_blocks as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - num_blocks, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - } - mempool_tx.commit().unwrap(); - peer_1.mempool = Some(peer_1_mempool); - - let num_burn_blocks = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - let mut round = 0; - let mut peer_1_mempool_txs = 0; - let mut peer_2_mempool_txs = 0; - - while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs { - if let Ok(mut result) = peer_1.step_with_ibd(false) { - let lp = peer_1.network.local_peer.clone(); - let burnchain = peer_1.network.burnchain.clone(); - peer_1 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - if let Ok(mut result) = peer_2.step_with_ibd(false) { - let lp = peer_2.network.local_peer.clone(); - let burnchain = peer_2.network.burnchain.clone(); - peer_2 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - round += 1; - - let mp = peer_1.mempool.take().unwrap(); - peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_1.mempool.replace(mp); - - let mp = peer_2.mempool.take().unwrap(); - peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_2.mempool.replace(mp); - - info!( - "Peer 1: {}, Peer 2: {}", - peer_1_mempool_txs, peer_2_mempool_txs - ); - } - - info!("Completed mempool sync in {} step(s)", round); - - let mp = peer_2.mempool.take().unwrap(); - let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); - peer_2.mempool.replace(mp); - - for tx in peer_2_mempool_txs { - assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); - } - } - - #[test] - fn test_mempool_sync_2_peers_blacklisted() { - // peer 1 gets some transactions; peer 2 blacklists some of them; - // verify peer 2 gets only the non-blacklisted ones. - let mut peer_1_config = TestPeerConfig::new(function_name!(), 2218, 2219); - let mut peer_2_config = TestPeerConfig::new(function_name!(), 2220, 2221); - - peer_1_config.add_neighbor(&peer_2_config.to_neighbor()); - peer_2_config.add_neighbor(&peer_1_config.to_neighbor()); - - peer_1_config.connection_opts.mempool_sync_interval = 1; - peer_2_config.connection_opts.mempool_sync_interval = 1; - - let num_txs = 1024; - let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); - let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); - let initial_balances: Vec<_> = addrs - .iter() - .map(|a| (a.to_account_principal(), 1000000000)) - .collect(); - - peer_1_config.initial_balances = initial_balances.clone(); - peer_2_config.initial_balances = initial_balances.clone(); - - let mut peer_1 = TestPeer::new(peer_1_config); - let mut peer_2 = TestPeer::new(peer_2_config); - - let num_blocks = 10; - let first_stacks_block_height = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - for i in 0..num_blocks { - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - - peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - } - - let addr = StacksAddress { - version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, - bytes: Hash160([0xff; 20]), - }; - - // fill peer 1 with lots of transactions - let mut txs = HashMap::new(); - let mut peer_1_mempool = peer_1.mempool.take().unwrap(); - let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); - let mut peer_2_blacklist = vec![]; - for i in 0..num_txs { - let pk = &pks[i]; - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(0); - - let mut tx_signer = StacksTransactionSigner::new(&tx); - tx_signer.sign_origin(&pk).unwrap(); - - let tx = tx_signer.get_tx().unwrap(); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - txs.insert(tx.txid(), tx.clone()); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - peer_1.chainstate(), - &ConsensusHash([0x1 + (num_blocks as u8); 20]), - &BlockHeaderHash([0x2 + (num_blocks as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - num_blocks, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - - if i % 2 == 0 { - // peer 2 blacklists even-numbered txs - peer_2_blacklist.push(txid); - } - } - mempool_tx.commit().unwrap(); - peer_1.mempool = Some(peer_1_mempool); - - // peer 2 blacklists them all - let mut peer_2_mempool = peer_2.mempool.take().unwrap(); - - // blacklisted txs never time out - peer_2_mempool.blacklist_timeout = u64::MAX / 2; - - let mempool_tx = peer_2_mempool.tx_begin().unwrap(); - MemPoolDB::inner_blacklist_txs(&mempool_tx, &peer_2_blacklist, get_epoch_time_secs()) - .unwrap(); - mempool_tx.commit().unwrap(); - - peer_2.mempool = Some(peer_2_mempool); - - let num_burn_blocks = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - let mut round = 0; - let mut peer_1_mempool_txs = 0; - let mut peer_2_mempool_txs = 0; - - while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs / 2 { - if let Ok(mut result) = peer_1.step_with_ibd(false) { - let lp = peer_1.network.local_peer.clone(); - let burnchain = peer_1.network.burnchain.clone(); - peer_1 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - if let Ok(mut result) = peer_2.step_with_ibd(false) { - let lp = peer_2.network.local_peer.clone(); - let burnchain = peer_2.network.burnchain.clone(); - peer_2 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - round += 1; - - let mp = peer_1.mempool.take().unwrap(); - peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_1.mempool.replace(mp); - - let mp = peer_2.mempool.take().unwrap(); - peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_2.mempool.replace(mp); - - info!( - "Peer 1: {}, Peer 2: {}", - peer_1_mempool_txs, peer_2_mempool_txs - ); - } - - info!("Completed mempool sync in {} step(s)", round); - - let mp = peer_2.mempool.take().unwrap(); - let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); - peer_2.mempool.replace(mp); - - for tx in peer_2_mempool_txs { - assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); - assert!(!peer_2_blacklist.contains(&tx.tx.txid())); - } - } - - /// Make sure mempool sync never stores problematic transactions - #[test] - fn test_mempool_sync_2_peers_problematic() { - // peer 1 gets some transactions; peer 2 blacklists them all due to being invalid. - // verify peer 2 stores nothing. - let mut peer_1_config = TestPeerConfig::new(function_name!(), 2218, 2219); - let mut peer_2_config = TestPeerConfig::new(function_name!(), 2220, 2221); - - peer_1_config.add_neighbor(&peer_2_config.to_neighbor()); - peer_2_config.add_neighbor(&peer_1_config.to_neighbor()); - - peer_1_config.connection_opts.mempool_sync_interval = 1; - peer_2_config.connection_opts.mempool_sync_interval = 1; - - let num_txs = 128; - let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); - let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); - let initial_balances: Vec<_> = addrs - .iter() - .map(|a| (a.to_account_principal(), 1000000000)) - .collect(); - - peer_1_config.initial_balances = initial_balances.clone(); - peer_2_config.initial_balances = initial_balances.clone(); - - let mut peer_1 = TestPeer::new(peer_1_config); - let mut peer_2 = TestPeer::new(peer_2_config); - - let num_blocks = 10; - let first_stacks_block_height = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - for i in 0..num_blocks { - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - - peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - } - - let addr = StacksAddress { - version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, - bytes: Hash160([0xff; 20]), - }; - - // fill peer 1 with lots of transactions - let mut txs = HashMap::new(); - let mut peer_1_mempool = peer_1.mempool.take().unwrap(); - let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); - for i in 0..num_txs { - let pk = &pks[i]; - - let exceeds_repeat_factor = AST_CALL_STACK_DEPTH_BUFFER + (MAX_CALL_STACK_DEPTH as u64); - let tx_exceeds_body_start = "{ a : ".repeat(exceeds_repeat_factor as usize); - let tx_exceeds_body_end = "} ".repeat(exceeds_repeat_factor as usize); - let tx_exceeds_body = format!("{}u1 {}", tx_exceeds_body_start, tx_exceeds_body_end); - - let tx = make_contract_tx( - &pk, - 0, - (tx_exceeds_body.len() * 100) as u64, - "test-exceeds", - &tx_exceeds_body, - ); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - txs.insert(tx.txid(), tx.clone()); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - peer_1.chainstate(), - &ConsensusHash([0x1 + (num_blocks as u8); 20]), - &BlockHeaderHash([0x2 + (num_blocks as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - num_blocks, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - } - mempool_tx.commit().unwrap(); - peer_1.mempool = Some(peer_1_mempool); - - // blacklisted txs never time out - let mut peer_2_mempool = peer_2.mempool.take().unwrap(); - peer_2_mempool.blacklist_timeout = u64::MAX / 2; - peer_2.mempool = Some(peer_2_mempool); - - let num_burn_blocks = { - let sn = - SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) - .unwrap(); - sn.block_height + 1 - }; - - let mut round = 0; - let mut peer_1_mempool_txs = 0; - - while peer_1_mempool_txs < num_txs || peer_2.network.mempool_sync_txs < (num_txs as u64) { - if let Ok(mut result) = peer_1.step_with_ibd(false) { - let lp = peer_1.network.local_peer.clone(); - let burnchain = peer_1.network.burnchain.clone(); - peer_1 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - if let Ok(mut result) = peer_2.step_with_ibd(false) { - let lp = peer_2.network.local_peer.clone(); - let burnchain = peer_2.network.burnchain.clone(); - peer_2 - .with_db_state(|sortdb, chainstate, relayer, mempool| { - relayer.process_network_result( - &lp, - &mut result, - &burnchain, - sortdb, - chainstate, - mempool, - false, - None, - None, - ) - }) - .unwrap(); - } - - round += 1; - - let mp = peer_1.mempool.take().unwrap(); - peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); - peer_1.mempool.replace(mp); - - info!( - "Peer 1: {}, Peer 2: {}", - peer_1_mempool_txs, peer_2.network.mempool_sync_txs - ); - } - - info!("Completed mempool sync in {} step(s)", round); - - let mp = peer_2.mempool.take().unwrap(); - let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); - peer_2.mempool.replace(mp); - - assert_eq!(peer_2_mempool_txs.len(), 128); - } - #[test] fn test_is_connecting() { let peer_1_config = TestPeerConfig::new(function_name!(), 0, 0); diff --git a/stackslib/src/net/rpc.rs b/stackslib/src/net/rpc.rs index 6f0e1db01b..efa0484d4b 100644 --- a/stackslib/src/net/rpc.rs +++ b/stackslib/src/net/rpc.rs @@ -194,7 +194,7 @@ impl ConversationHttp { /// Is a request in-progress? pub fn is_request_inflight(&self) -> bool { - self.pending_request.is_some() + self.pending_request.is_some() || self.pending_response.is_some() } /// Start a HTTP request from this peer, and expect a response. diff --git a/stackslib/src/net/tests/inv/nakamoto.rs b/stackslib/src/net/tests/inv/nakamoto.rs index 539233812f..fd9f1dcc1f 100644 --- a/stackslib/src/net/tests/inv/nakamoto.rs +++ b/stackslib/src/net/tests/inv/nakamoto.rs @@ -20,6 +20,7 @@ use std::sync::mpsc::sync_channel; use std::thread; use std::thread::JoinHandle; +use clarity::vm::types::PrincipalData; use stacks_common::address::{AddressHashMode, C32_ADDRESS_VERSION_TESTNET_SINGLESIG}; use stacks_common::codec::{read_next, StacksMessageCodec}; use stacks_common::types::chainstate::{StacksAddress, StacksPrivateKey, StacksPublicKey}; @@ -336,6 +337,49 @@ pub fn make_nakamoto_peers_from_invs<'a>( prepare_len: u32, bitvecs: Vec>, num_peers: usize, +) -> (TestPeer<'a>, Vec>) { + inner_make_nakamoto_peers_from_invs( + test_name, + observer, + rc_len, + prepare_len, + bitvecs, + num_peers, + vec![], + ) +} + +/// NOTE: The second return value does _not_ need `<'a>`, since `observer` is never installed into +/// the peers here. However, it appears unavoidable to the borrow-checker. +pub fn make_nakamoto_peers_from_invs_and_balances<'a>( + test_name: &str, + observer: &'a TestEventObserver, + rc_len: u32, + prepare_len: u32, + bitvecs: Vec>, + num_peers: usize, + initial_balances: Vec<(PrincipalData, u64)>, +) -> (TestPeer<'a>, Vec>) { + inner_make_nakamoto_peers_from_invs( + test_name, + observer, + rc_len, + prepare_len, + bitvecs, + num_peers, + initial_balances, + ) +} + +/// Make peers from inventories and balances +fn inner_make_nakamoto_peers_from_invs<'a>( + test_name: &str, + observer: &'a TestEventObserver, + rc_len: u32, + prepare_len: u32, + bitvecs: Vec>, + num_peers: usize, + mut initial_balances: Vec<(PrincipalData, u64)>, ) -> (TestPeer<'a>, Vec>) { for bitvec in bitvecs.iter() { assert_eq!(bitvec.len() as u32, rc_len); @@ -415,10 +459,11 @@ pub fn make_nakamoto_peers_from_invs<'a>( 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, ]); + initial_balances.push((addr.into(), 1_000_000)); let plan = NakamotoBootPlan::new(test_name) .with_private_key(private_key) .with_pox_constants(rc_len, prepare_len) - .with_initial_balances(vec![(addr.into(), 1_000_000)]) + .with_initial_balances(initial_balances) .with_extra_peers(num_peers) .with_test_signers(test_signers) .with_test_stackers(test_stackers); diff --git a/stackslib/src/net/tests/mempool/mod.rs b/stackslib/src/net/tests/mempool/mod.rs new file mode 100644 index 0000000000..7a44a56788 --- /dev/null +++ b/stackslib/src/net/tests/mempool/mod.rs @@ -0,0 +1,1302 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::cell::RefCell; +use std::{thread, time}; + +use clarity::vm::ast::stack_depth_checker::AST_CALL_STACK_DEPTH_BUFFER; +use clarity::vm::types::StacksAddressExtensions; +use clarity::vm::MAX_CALL_STACK_DEPTH; +use rand; +use rand::RngCore; +use stacks_common::types::chainstate::BurnchainHeaderHash; +use stacks_common::util::secp256k1::Secp256k1PrivateKey; +use stacks_common::util::{log, sleep_ms}; + +use super::*; +use crate::burnchains::burnchain::*; +use crate::burnchains::*; +use crate::chainstate::nakamoto::coordinator::tests::make_token_transfer; +use crate::chainstate::stacks::test::*; +use crate::chainstate::stacks::*; +use crate::core::StacksEpochExtension; +use crate::net::atlas::*; +use crate::net::codec::*; +use crate::net::db::*; +use crate::net::test::*; +use crate::net::tests::inv::nakamoto::make_nakamoto_peers_from_invs_and_balances; +use crate::net::tests::relay::epoch2x::make_contract_tx; +use crate::net::*; +use crate::util_lib::test::*; + +#[test] +fn test_mempool_sync_2_peers() { + // peer 1 gets some transactions; verify peer 2 gets the recent ones and not the old + // ones + let mut peer_1_config = TestPeerConfig::new(function_name!(), 0, 0); + let mut peer_2_config = TestPeerConfig::new(function_name!(), 0, 0); + + peer_1_config.connection_opts.mempool_sync_interval = 1; + peer_2_config.connection_opts.mempool_sync_interval = 1; + + let num_txs = 10; + let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); + let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); + let initial_balances: Vec<_> = addrs + .iter() + .map(|a| (a.to_account_principal(), 1000000000)) + .collect(); + + peer_1_config.initial_balances = initial_balances.clone(); + peer_2_config.initial_balances = initial_balances.clone(); + + let mut peer_1 = TestPeer::new(peer_1_config); + let mut peer_2 = TestPeer::new(peer_2_config); + + peer_1.add_neighbor(&mut peer_2.to_neighbor(), None, true); + peer_2.add_neighbor(&mut peer_1.to_neighbor(), None, true); + + let num_blocks = 10; + let first_stacks_block_height = { + let sn = SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + for i in 0..(num_blocks / 2) { + let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); + + peer_1.next_burnchain_block(burn_ops.clone()); + peer_2.next_burnchain_block(burn_ops.clone()); + + peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); + peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); + } + + let addr = StacksAddress { + version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + bytes: Hash160([0xff; 20]), + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + // old transactions + let num_txs = 10; + let mut old_txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + for i in 0..num_txs { + let pk = &pks[i]; + let mut tx = StacksTransaction { + version: TransactionVersion::Testnet, + chain_id: 0x80000000, + auth: TransactionAuth::from_p2pkh(&pk).unwrap(), + anchor_mode: TransactionAnchorMode::Any, + post_condition_mode: TransactionPostConditionMode::Allow, + post_conditions: vec![], + payload: TransactionPayload::TokenTransfer( + addr.to_account_principal(), + 123, + TokenTransferMemo([0u8; 34]), + ), + }; + tx.set_tx_fee(1000); + tx.set_origin_nonce(0); + + let mut tx_signer = StacksTransactionSigner::new(&tx); + tx_signer.sign_origin(&pk).unwrap(); + + let tx = tx_signer.get_tx().unwrap(); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + old_txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + (num_blocks / 2) as u64, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + // keep mining to make these txs old + for i in (num_blocks / 2)..num_blocks { + let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); + + peer_1.next_burnchain_block(burn_ops.clone()); + peer_2.next_burnchain_block(burn_ops.clone()); + + peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); + peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); + } + + let num_burn_blocks = { + let sn = SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + let mut txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + for i in 0..num_txs { + let pk = &pks[i]; + let mut tx = StacksTransaction { + version: TransactionVersion::Testnet, + chain_id: 0x80000000, + auth: TransactionAuth::from_p2pkh(&pk).unwrap(), + anchor_mode: TransactionAnchorMode::Any, + post_condition_mode: TransactionPostConditionMode::Allow, + post_conditions: vec![], + payload: TransactionPayload::TokenTransfer( + addr.to_account_principal(), + 123, + TokenTransferMemo([0u8; 34]), + ), + }; + tx.set_tx_fee(1000); + tx.set_origin_nonce(1); + + let mut tx_signer = StacksTransactionSigner::new(&tx); + tx_signer.sign_origin(&pk).unwrap(); + + let tx = tx_signer.get_tx().unwrap(); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + num_blocks as u64, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + let mut round = 0; + let mut peer_1_mempool_txs = 0; + let mut peer_2_mempool_txs = 0; + + while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs { + if let Ok(mut result) = peer_1.step_with_ibd(false) { + let lp = peer_1.network.local_peer.clone(); + let burnchain = peer_1.network.burnchain.clone(); + peer_1 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + if let Ok(mut result) = peer_2.step_with_ibd(false) { + let lp = peer_2.network.local_peer.clone(); + let burnchain = peer_2.network.burnchain.clone(); + peer_2 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + round += 1; + + let mp = peer_1.mempool.take().unwrap(); + peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_1.mempool.replace(mp); + + let mp = peer_2.mempool.take().unwrap(); + peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_2.mempool.replace(mp); + + info!( + "Peer 1: {}, Peer 2: {}", + peer_1_mempool_txs, peer_2_mempool_txs + ); + } + + info!("Completed mempool sync in {} step(s)", round); + + let mp = peer_2.mempool.take().unwrap(); + let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); + peer_2.mempool.replace(mp); + + // peer 2 has all the recent txs + // peer 2 has none of the old ones + for tx in peer_2_mempool_txs { + assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); + assert!(old_txs.get(&tx.tx.txid()).is_none()); + } +} + +#[test] +fn test_mempool_sync_2_peers_paginated() { + // peer 1 gets some transactions; verify peer 2 gets them all + let mut peer_1_config = TestPeerConfig::new(function_name!(), 0, 0); + let mut peer_2_config = TestPeerConfig::new(function_name!(), 0, 0); + + peer_1_config.connection_opts.mempool_sync_interval = 1; + peer_2_config.connection_opts.mempool_sync_interval = 1; + + let num_txs = 1024; + let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); + let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); + let initial_balances: Vec<_> = addrs + .iter() + .map(|a| (a.to_account_principal(), 1000000000)) + .collect(); + + peer_1_config.initial_balances = initial_balances.clone(); + peer_2_config.initial_balances = initial_balances.clone(); + + let mut peer_1 = TestPeer::new(peer_1_config); + let mut peer_2 = TestPeer::new(peer_2_config); + + peer_1.add_neighbor(&mut peer_2.to_neighbor(), None, true); + peer_2.add_neighbor(&mut peer_1.to_neighbor(), None, true); + + let num_blocks = 10; + let first_stacks_block_height = { + let sn = SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + for i in 0..num_blocks { + let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); + + peer_1.next_burnchain_block(burn_ops.clone()); + peer_2.next_burnchain_block(burn_ops.clone()); + + peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); + peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); + } + + let addr = StacksAddress { + version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + bytes: Hash160([0xff; 20]), + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + // fill peer 1 with lots of transactions + let mut txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + for i in 0..num_txs { + let pk = &pks[i]; + let mut tx = StacksTransaction { + version: TransactionVersion::Testnet, + chain_id: 0x80000000, + auth: TransactionAuth::from_p2pkh(&pk).unwrap(), + anchor_mode: TransactionAnchorMode::Any, + post_condition_mode: TransactionPostConditionMode::Allow, + post_conditions: vec![], + payload: TransactionPayload::TokenTransfer( + addr.to_account_principal(), + 123, + TokenTransferMemo([0u8; 34]), + ), + }; + tx.set_tx_fee(1000); + tx.set_origin_nonce(0); + + let mut tx_signer = StacksTransactionSigner::new(&tx); + tx_signer.sign_origin(&pk).unwrap(); + + let tx = tx_signer.get_tx().unwrap(); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + num_blocks, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + let num_burn_blocks = { + let sn = SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + let mut round = 0; + let mut peer_1_mempool_txs = 0; + let mut peer_2_mempool_txs = 0; + + while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs { + if let Ok(mut result) = peer_1.step_with_ibd(false) { + let lp = peer_1.network.local_peer.clone(); + let burnchain = peer_1.network.burnchain.clone(); + peer_1 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + if let Ok(mut result) = peer_2.step_with_ibd(false) { + let lp = peer_2.network.local_peer.clone(); + let burnchain = peer_2.network.burnchain.clone(); + peer_2 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + round += 1; + + let mp = peer_1.mempool.take().unwrap(); + peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_1.mempool.replace(mp); + + let mp = peer_2.mempool.take().unwrap(); + peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_2.mempool.replace(mp); + + info!( + "Peer 1: {}, Peer 2: {}", + peer_1_mempool_txs, peer_2_mempool_txs + ); + } + + info!("Completed mempool sync in {} step(s)", round); + + let mp = peer_2.mempool.take().unwrap(); + let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); + peer_2.mempool.replace(mp); + + for tx in peer_2_mempool_txs { + assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); + } +} + +#[test] +fn test_mempool_sync_2_peers_blacklisted() { + // peer 1 gets some transactions; peer 2 blacklists some of them; + // verify peer 2 gets only the non-blacklisted ones. + let mut peer_1_config = TestPeerConfig::new(function_name!(), 0, 0); + let mut peer_2_config = TestPeerConfig::new(function_name!(), 0, 0); + + peer_1_config.connection_opts.mempool_sync_interval = 1; + peer_2_config.connection_opts.mempool_sync_interval = 1; + + let num_txs = 1024; + let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); + let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); + let initial_balances: Vec<_> = addrs + .iter() + .map(|a| (a.to_account_principal(), 1000000000)) + .collect(); + + peer_1_config.initial_balances = initial_balances.clone(); + peer_2_config.initial_balances = initial_balances.clone(); + + let mut peer_1 = TestPeer::new(peer_1_config); + let mut peer_2 = TestPeer::new(peer_2_config); + + peer_1.add_neighbor(&mut peer_2.to_neighbor(), None, true); + peer_2.add_neighbor(&mut peer_1.to_neighbor(), None, true); + + let num_blocks = 10; + let first_stacks_block_height = { + let sn = SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + for i in 0..num_blocks { + let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); + + peer_1.next_burnchain_block(burn_ops.clone()); + peer_2.next_burnchain_block(burn_ops.clone()); + + peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); + peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); + } + + let addr = StacksAddress { + version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + bytes: Hash160([0xff; 20]), + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + // fill peer 1 with lots of transactions + let mut txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + let mut peer_2_blacklist = vec![]; + for i in 0..num_txs { + let pk = &pks[i]; + let mut tx = StacksTransaction { + version: TransactionVersion::Testnet, + chain_id: 0x80000000, + auth: TransactionAuth::from_p2pkh(&pk).unwrap(), + anchor_mode: TransactionAnchorMode::Any, + post_condition_mode: TransactionPostConditionMode::Allow, + post_conditions: vec![], + payload: TransactionPayload::TokenTransfer( + addr.to_account_principal(), + 123, + TokenTransferMemo([0u8; 34]), + ), + }; + tx.set_tx_fee(1000); + tx.set_origin_nonce(0); + + let mut tx_signer = StacksTransactionSigner::new(&tx); + tx_signer.sign_origin(&pk).unwrap(); + + let tx = tx_signer.get_tx().unwrap(); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + num_blocks, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + + if i % 2 == 0 { + // peer 2 blacklists even-numbered txs + peer_2_blacklist.push(txid); + } + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + // peer 2 blacklists them all + let mut peer_2_mempool = peer_2.mempool.take().unwrap(); + + // blacklisted txs never time out + peer_2_mempool.blacklist_timeout = u64::MAX / 2; + + let mempool_tx = peer_2_mempool.tx_begin().unwrap(); + MemPoolDB::inner_blacklist_txs(&mempool_tx, &peer_2_blacklist, get_epoch_time_secs()).unwrap(); + mempool_tx.commit().unwrap(); + + peer_2.mempool = Some(peer_2_mempool); + + let num_burn_blocks = { + let sn = SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + let mut round = 0; + let mut peer_1_mempool_txs = 0; + let mut peer_2_mempool_txs = 0; + + while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs / 2 { + if let Ok(mut result) = peer_1.step_with_ibd(false) { + let lp = peer_1.network.local_peer.clone(); + let burnchain = peer_1.network.burnchain.clone(); + peer_1 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + if let Ok(mut result) = peer_2.step_with_ibd(false) { + let lp = peer_2.network.local_peer.clone(); + let burnchain = peer_2.network.burnchain.clone(); + peer_2 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + round += 1; + + let mp = peer_1.mempool.take().unwrap(); + peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_1.mempool.replace(mp); + + let mp = peer_2.mempool.take().unwrap(); + peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_2.mempool.replace(mp); + + info!( + "Peer 1: {}, Peer 2: {}", + peer_1_mempool_txs, peer_2_mempool_txs + ); + } + + info!("Completed mempool sync in {} step(s)", round); + + let mp = peer_2.mempool.take().unwrap(); + let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); + peer_2.mempool.replace(mp); + + for tx in peer_2_mempool_txs { + assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); + assert!(!peer_2_blacklist.contains(&tx.tx.txid())); + } +} + +/// Make sure mempool sync never stores problematic transactions +#[test] +fn test_mempool_sync_2_peers_problematic() { + // peer 1 gets some transactions; peer 2 blacklists them all due to being invalid. + // verify peer 2 stores nothing. + let mut peer_1_config = TestPeerConfig::new(function_name!(), 0, 0); + let mut peer_2_config = TestPeerConfig::new(function_name!(), 0, 0); + + peer_1_config.connection_opts.mempool_sync_interval = 1; + peer_2_config.connection_opts.mempool_sync_interval = 1; + + let num_txs = 128; + let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); + let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); + let initial_balances: Vec<_> = addrs + .iter() + .map(|a| (a.to_account_principal(), 1000000000)) + .collect(); + + peer_1_config.initial_balances = initial_balances.clone(); + peer_2_config.initial_balances = initial_balances.clone(); + + let mut peer_1 = TestPeer::new(peer_1_config); + let mut peer_2 = TestPeer::new(peer_2_config); + + peer_1.add_neighbor(&mut peer_2.to_neighbor(), None, true); + peer_2.add_neighbor(&mut peer_1.to_neighbor(), None, true); + + let num_blocks = 10; + let first_stacks_block_height = { + let sn = SortitionDB::get_canonical_burn_chain_tip(&peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + for i in 0..num_blocks { + let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); + + peer_1.next_burnchain_block(burn_ops.clone()); + peer_2.next_burnchain_block(burn_ops.clone()); + + peer_1.process_stacks_epoch_at_tip(&stacks_block, µblocks); + peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); + } + + let addr = StacksAddress { + version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + bytes: Hash160([0xff; 20]), + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + // fill peer 1 with lots of transactions + let mut txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + for i in 0..num_txs { + let pk = &pks[i]; + + let exceeds_repeat_factor = AST_CALL_STACK_DEPTH_BUFFER + (MAX_CALL_STACK_DEPTH as u64); + let tx_exceeds_body_start = "{ a : ".repeat(exceeds_repeat_factor as usize); + let tx_exceeds_body_end = "} ".repeat(exceeds_repeat_factor as usize); + let tx_exceeds_body = format!("{}u1 {}", tx_exceeds_body_start, tx_exceeds_body_end); + + let tx = make_contract_tx( + &pk, + 0, + (tx_exceeds_body.len() * 100) as u64, + "test-exceeds", + &tx_exceeds_body, + ); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + num_blocks, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + // blacklisted txs never time out + let mut peer_2_mempool = peer_2.mempool.take().unwrap(); + peer_2_mempool.blacklist_timeout = u64::MAX / 2; + peer_2.mempool = Some(peer_2_mempool); + + let num_burn_blocks = { + let sn = SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + let mut round = 0; + let mut peer_1_mempool_txs = 0; + + while peer_1_mempool_txs < num_txs + || peer_2 + .network + .mempool_sync + .as_ref() + .unwrap() + .mempool_sync_txs + < (num_txs as u64) + { + if let Ok(mut result) = peer_1.step_with_ibd(false) { + let lp = peer_1.network.local_peer.clone(); + let burnchain = peer_1.network.burnchain.clone(); + peer_1 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + if let Ok(mut result) = peer_2.step_with_ibd(false) { + let lp = peer_2.network.local_peer.clone(); + let burnchain = peer_2.network.burnchain.clone(); + peer_2 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + round += 1; + + let mp = peer_1.mempool.take().unwrap(); + peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_1.mempool.replace(mp); + + info!( + "Peer 1: {}, Peer 2: {}", + peer_1_mempool_txs, + peer_2 + .network + .mempool_sync + .as_ref() + .unwrap() + .mempool_sync_txs + ); + } + + info!("Completed mempool sync in {} step(s)", round); + + let mp = peer_2.mempool.take().unwrap(); + let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); + peer_2.mempool.replace(mp); + + assert_eq!(peer_2_mempool_txs.len(), 128); +} + +/// Verify that when transactions get stored into the mempool, they are always keyed to the +/// tenure-start block and its coinbase height +#[test] +pub fn test_mempool_storage_nakamoto() { + let private_key = StacksPrivateKey::from_seed(&[2]); + let addr = StacksAddress::from_public_keys( + C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + &AddressHashMode::SerializeP2PKH, + 1, + &vec![StacksPublicKey::from_private(&private_key)], + ) + .unwrap(); + + let (mut test_signers, test_stackers) = TestStacker::common_signing_set(); + let mut peer = boot_nakamoto( + function_name!(), + vec![(addr.into(), 100_000_000)], + &mut test_signers, + &test_stackers, + None, + ); + + let mut total_blocks = 0; + let mut all_txs = vec![]; + let stx_miner_key = peer.miner.nakamoto_miner_key(); + let stx_miner_addr = StacksAddress::from_public_keys( + C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + &AddressHashMode::SerializeP2PKH, + 1, + &vec![StacksPublicKey::from_private(&private_key)], + ) + .unwrap(); + + // duplicate handles to the chainstates so we can submit txs + let mut mempool = + MemPoolDB::open_test(false, peer.config.network_id, &peer.chainstate_path).unwrap(); + let (mut chainstate, _) = peer.chainstate().reopen().unwrap(); + let sortdb = peer.sortdb().reopen().unwrap(); + + for i in 0..10 { + debug!("Tenure {}", i); + let (burn_ops, mut tenure_change, miner_key) = + peer.begin_nakamoto_tenure(TenureChangeCause::BlockFound); + let (_, _, consensus_hash) = peer.next_burnchain_block(burn_ops.clone()); + let vrf_proof = peer.make_nakamoto_vrf_proof(miner_key); + + tenure_change.tenure_consensus_hash = consensus_hash.clone(); + tenure_change.burn_view_consensus_hash = consensus_hash.clone(); + + let tenure_change_tx = peer + .miner + .make_nakamoto_tenure_change(tenure_change.clone()); + let coinbase_tx = peer.miner.make_nakamoto_coinbase(None, vrf_proof); + + debug!("Next burnchain block: {}", &consensus_hash); + + let num_blocks: usize = (thread_rng().gen::() % 10) + 1; + + let block_height = peer.get_burn_block_height(); + + // do a stx transfer in each block to a given recipient + let recipient_addr = + StacksAddress::from_string("ST2YM3J4KQK09V670TD6ZZ1XYNYCNGCWCVTASN5VM").unwrap(); + + let mempool_txs = RefCell::new(vec![]); + let blocks_and_sizes = peer.make_nakamoto_tenure_and( + tenure_change_tx, + coinbase_tx, + &mut test_signers, + |_| {}, + |miner, chainstate, sortdb, blocks_so_far| { + let mut txs = vec![]; + if blocks_so_far.len() < num_blocks { + let account = get_account(chainstate, sortdb, &addr); + + let stx_transfer = make_token_transfer( + chainstate, + sortdb, + &private_key, + account.nonce, + 200, + 200, + &recipient_addr, + ); + txs.push(stx_transfer.clone()); + (*mempool_txs.borrow_mut()).push(stx_transfer.clone()); + all_txs.push(stx_transfer.clone()); + } + txs + }, + |_| { + let tip = NakamotoChainState::get_canonical_block_header(chainstate.db(), &sortdb) + .unwrap() + .unwrap(); + let sort_tip = + SortitionDB::get_block_snapshot_consensus(sortdb.conn(), &tip.consensus_hash) + .unwrap() + .unwrap(); + let epoch = SortitionDB::get_stacks_epoch(sortdb.conn(), sort_tip.block_height) + .unwrap() + .unwrap(); + + // submit each transaction to the mempool + for mempool_tx in (*mempool_txs.borrow()).as_slice() { + mempool + .submit( + &mut chainstate, + &sortdb, + &tip.consensus_hash, + &tip.anchored_header.block_hash(), + &mempool_tx, + None, + &epoch.block_limit, + &epoch.epoch_id, + ) + .unwrap(); + } + + (*mempool_txs.borrow_mut()).clear(); + true + }, + ); + + total_blocks += num_blocks; + } + + let tip = { + let chainstate = &mut peer.stacks_node.as_mut().unwrap().chainstate; + let sort_db = peer.sortdb.as_mut().unwrap(); + NakamotoChainState::get_canonical_block_header(chainstate.db(), sort_db) + .unwrap() + .unwrap() + }; + + // each transaction is present, and is paired with a tenure-start block + let mut recovered_txs = HashSet::new(); + let tip_block_id = tip.index_block_hash(); + let mut tenure_id = tip.consensus_hash; + loop { + let tenure_start = NakamotoChainState::get_tenure_start_block_header( + &mut chainstate.index_conn(), + &tip_block_id, + &tenure_id, + ) + .unwrap() + .unwrap(); + + let all_txdata = MemPoolDB::get_txs_after( + mempool.conn(), + &tenure_start.consensus_hash, + &tenure_start.anchored_header.block_hash(), + 0, + u64::try_from(i64::MAX - 1).unwrap(), + ) + .unwrap(); + for txdata in all_txdata { + recovered_txs.insert(txdata.tx.txid()); + } + + let Some(parent_tenure_id) = + NakamotoChainState::get_nakamoto_parent_tenure_id_consensus_hash( + &mut chainstate.index_conn(), + &tip_block_id, + &tenure_id, + ) + .unwrap() + else { + break; + }; + tenure_id = parent_tenure_id; + } + + let all_txs_set: HashSet<_> = all_txs.into_iter().map(|tx| tx.txid()).collect(); + assert_eq!(all_txs_set, recovered_txs); +} + +#[test] +fn test_mempool_sync_2_peers_nakamoto_paginated() { + let observer = TestEventObserver::new(); + let bitvecs = vec![ + // full rc + vec![true, true, true, true, true, true, true, true, true, true], + ]; + let num_txs = 1024; + let pks: Vec<_> = (0..num_txs).map(|_| StacksPrivateKey::new()).collect(); + let addrs: Vec<_> = pks.iter().map(|pk| to_addr(pk)).collect(); + let initial_balances: Vec<_> = addrs + .iter() + .map(|a| (a.to_account_principal(), 1000000000)) + .collect(); + + let (mut peer_1, mut other_peers) = make_nakamoto_peers_from_invs_and_balances( + function_name!(), + &observer, + 10, + 3, + bitvecs.clone(), + 1, + initial_balances, + ); + let mut peer_2 = other_peers.pop().unwrap(); + + let nakamoto_start = + NakamotoBootPlan::nakamoto_first_tenure_height(&peer_1.config.burnchain.pox_constants); + + let tip = { + let sort_db = peer_1.sortdb.as_mut().unwrap(); + SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap() + }; + let total_rcs = peer_1 + .config + .burnchain + .block_height_to_reward_cycle(tip.block_height) + .unwrap(); + + // run peer and other_peer until they connect + loop { + let _ = peer_1.step_with_ibd(false); + let _ = peer_2.step_with_ibd(false); + + let event_ids: Vec = peer_1 + .network + .iter_peer_event_ids() + .map(|e_id| *e_id) + .collect(); + let other_event_ids: Vec = peer_2 + .network + .iter_peer_event_ids() + .map(|e_id| *e_id) + .collect(); + + if event_ids.len() > 0 && other_event_ids.len() > 0 { + break; + } + } + + debug!("Peers are connected"); + + let addr = StacksAddress { + version: C32_ADDRESS_VERSION_TESTNET_SINGLESIG, + bytes: Hash160([0xff; 20]), + }; + + let stacks_tip_ch = peer_1.network.stacks_tip.consensus_hash.clone(); + let stacks_tip_bhh = peer_1.network.stacks_tip.block_hash.clone(); + + // find coinbase height + let coinbase_height = NakamotoChainState::get_coinbase_height( + &mut peer_1.chainstate().index_conn(), + &StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bhh), + ) + .unwrap() + .unwrap(); + + // fill peer 1 with lots of transactions + let mut txs = HashMap::new(); + let mut peer_1_mempool = peer_1.mempool.take().unwrap(); + let mut mempool_tx = peer_1_mempool.tx_begin().unwrap(); + for i in 0..num_txs { + let pk = &pks[i]; + let mut tx = StacksTransaction { + version: TransactionVersion::Testnet, + chain_id: 0x80000000, + auth: TransactionAuth::from_p2pkh(&pk).unwrap(), + anchor_mode: TransactionAnchorMode::Any, + post_condition_mode: TransactionPostConditionMode::Allow, + post_conditions: vec![], + payload: TransactionPayload::TokenTransfer( + addr.to_account_principal(), + 123, + TokenTransferMemo([0u8; 34]), + ), + }; + tx.set_tx_fee(1000); + tx.set_origin_nonce(0); + + let mut tx_signer = StacksTransactionSigner::new(&tx); + tx_signer.sign_origin(&pk).unwrap(); + + let tx = tx_signer.get_tx().unwrap(); + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let origin_addr = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + let tx_fee = tx.get_tx_fee(); + + txs.insert(tx.txid(), tx.clone()); + + // should succeed + MemPoolDB::try_add_tx( + &mut mempool_tx, + peer_1.chainstate(), + &stacks_tip_ch, + &stacks_tip_bhh, + true, + txid.clone(), + tx_bytes, + tx_fee, + coinbase_height, + &origin_addr, + origin_nonce, + &sponsor_addr, + sponsor_nonce, + None, + ) + .unwrap(); + + eprintln!("Added {} {}", i, &txid); + } + mempool_tx.commit().unwrap(); + peer_1.mempool = Some(peer_1_mempool); + + let num_burn_blocks = { + let sn = SortitionDB::get_canonical_burn_chain_tip(peer_1.sortdb.as_ref().unwrap().conn()) + .unwrap(); + sn.block_height + 1 + }; + + let mut round = 0; + let mut peer_1_mempool_txs = 0; + let mut peer_2_mempool_txs = 0; + + while peer_1_mempool_txs < num_txs || peer_2_mempool_txs < num_txs { + if let Ok(mut result) = peer_1.step_with_ibd(false) { + let lp = peer_1.network.local_peer.clone(); + let burnchain = peer_1.network.burnchain.clone(); + peer_1 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + if let Ok(mut result) = peer_2.step_with_ibd(false) { + let lp = peer_2.network.local_peer.clone(); + let burnchain = peer_2.network.burnchain.clone(); + peer_2 + .with_db_state(|sortdb, chainstate, relayer, mempool| { + relayer.process_network_result( + &lp, + &mut result, + &burnchain, + sortdb, + chainstate, + mempool, + false, + None, + None, + ) + }) + .unwrap(); + } + + round += 1; + + let mp = peer_1.mempool.take().unwrap(); + peer_1_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_1.mempool.replace(mp); + + let mp = peer_2.mempool.take().unwrap(); + peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap().len(); + peer_2.mempool.replace(mp); + + info!( + "Peer 1: {}, Peer 2: {}", + peer_1_mempool_txs, peer_2_mempool_txs + ); + } + + info!("Completed mempool sync in {} step(s)", round); + + let mp = peer_2.mempool.take().unwrap(); + let peer_2_mempool_txs = MemPoolDB::get_all_txs(mp.conn()).unwrap(); + peer_2.mempool.replace(mp); + + for tx in peer_2_mempool_txs { + assert_eq!(&tx.tx, txs.get(&tx.tx.txid()).unwrap()); + } +} diff --git a/stackslib/src/net/tests/mod.rs b/stackslib/src/net/tests/mod.rs index 63e19a50c7..05477bb08c 100644 --- a/stackslib/src/net/tests/mod.rs +++ b/stackslib/src/net/tests/mod.rs @@ -17,6 +17,7 @@ pub mod download; pub mod httpcore; pub mod inv; +pub mod mempool; pub mod neighbors; pub mod relay;