diff --git a/stackslib/src/burnchains/burnchain.rs b/stackslib/src/burnchains/burnchain.rs index a5ecaa0458..c1d07994d7 100644 --- a/stackslib/src/burnchains/burnchain.rs +++ b/stackslib/src/burnchains/burnchain.rs @@ -702,6 +702,10 @@ impl Burnchain { } pub fn get_burnchaindb_path(&self) -> String { + if self.working_dir.as_str() == ":memory:" { + return ":memory:".to_string(); + } + let chainstate_dir = Burnchain::get_chainstate_path_str(&self.working_dir); let mut db_pathbuf = PathBuf::from(&chainstate_dir); db_pathbuf.push("burnchain.sqlite"); @@ -743,12 +747,14 @@ impl Burnchain { /// Open just the burnchain database pub fn open_burnchain_db(&self, readwrite: bool) -> Result { let burnchain_db_path = self.get_burnchaindb_path(); - if let Err(e) = fs::metadata(&burnchain_db_path) { - warn!( - "Failed to stat burnchain DB path '{}': {:?}", - &burnchain_db_path, &e - ); - return Err(burnchain_error::DBError(db_error::NoDBError)); + if burnchain_db_path != ":memory:" { + if let Err(e) = fs::metadata(&burnchain_db_path) { + warn!( + "Failed to stat burnchain DB path '{}': {:?}", + &burnchain_db_path, &e + ); + return Err(burnchain_error::DBError(db_error::NoDBError)); + } } test_debug!( "Open burnchain DB at {} (rw? {})", diff --git a/stackslib/src/burnchains/db.rs b/stackslib/src/burnchains/db.rs index 72ca2e8bf1..d5f1e18804 100644 --- a/stackslib/src/burnchains/db.rs +++ b/stackslib/src/burnchains/db.rs @@ -1000,33 +1000,38 @@ impl BurnchainDB { readwrite: bool, ) -> Result { let mut create_flag = false; - let open_flags = match fs::metadata(path) { - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - // need to create - if readwrite { - create_flag = true; - let ppath = Path::new(path); - let pparent_path = ppath - .parent() - .unwrap_or_else(|| panic!("BUG: no parent of '{}'", path)); - fs::create_dir_all(&pparent_path) - .map_err(|e| BurnchainError::from(DBError::IOError(e)))?; - - OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE + let open_flags = if path == ":memory:" { + create_flag = true; + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE + } else { + match fs::metadata(path) { + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + // need to create + if readwrite { + create_flag = true; + let ppath = Path::new(path); + let pparent_path = ppath + .parent() + .unwrap_or_else(|| panic!("BUG: no parent of '{}'", path)); + fs::create_dir_all(&pparent_path) + .map_err(|e| BurnchainError::from(DBError::IOError(e)))?; + + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE + } else { + return Err(BurnchainError::from(DBError::NoDBError)); + } } else { - return Err(BurnchainError::from(DBError::NoDBError)); + return Err(BurnchainError::from(DBError::IOError(e))); } - } else { - return Err(BurnchainError::from(DBError::IOError(e))); } - } - Ok(_md) => { - // can just open - if readwrite { - OpenFlags::SQLITE_OPEN_READ_WRITE - } else { - OpenFlags::SQLITE_OPEN_READ_ONLY + Ok(_md) => { + // can just open + if readwrite { + OpenFlags::SQLITE_OPEN_READ_WRITE + } else { + OpenFlags::SQLITE_OPEN_READ_ONLY + } } } }; @@ -1089,7 +1094,7 @@ impl BurnchainDB { let conn = sqlite_open(path, open_flags, true)?; let mut db = BurnchainDB { conn }; - if readwrite { + if readwrite || path == ":memory:" { db.add_indexes()?; } Ok(db) diff --git a/stackslib/src/net/chat.rs b/stackslib/src/net/chat.rs index 5cf32a8a56..273c1c7335 100644 --- a/stackslib/src/net/chat.rs +++ b/stackslib/src/net/chat.rs @@ -3069,6 +3069,7 @@ mod test { use std::io::prelude::*; use std::io::{Read, Write}; use std::net::{SocketAddr, SocketAddrV4}; + use std::path::PathBuf; use clarity::vm::costs::ExecutionCost; use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, SortitionId}; @@ -3080,6 +3081,7 @@ mod test { use super::*; use crate::burnchains::bitcoin::keys::BitcoinPublicKey; use crate::burnchains::burnchain::*; + use crate::burnchains::db::BurnchainDB; use crate::burnchains::*; use crate::chainstate::burn::db::sortdb::*; use crate::chainstate::burn::*; @@ -3123,6 +3125,8 @@ mod test { let peerdb_path = format!("{}/peers.sqlite", &test_path); let stackerdb_path = format!("{}/stackerdb.sqlite", &test_path); let chainstate_path = format!("{}/chainstate", &test_path); + let burnchain_db = + BurnchainDB::connect(&burnchain.get_burnchaindb_path(), burnchain, true).unwrap(); let mut peerdb = PeerDB::connect( &peerdb_path, @@ -3314,12 +3318,14 @@ mod test { let atlasdb = AtlasDB::connect(atlas_config, &atlasdb_path, true).unwrap(); let stackerdbs = StackerDBs::connect(&stackerdb_path, true).unwrap(); let peerdb = PeerDB::open(&peerdb_path, true).unwrap(); + let burnchain_db = burnchain.open_burnchain_db(false).unwrap(); let local_peer = PeerDB::get_local_peer(peerdb.conn()).unwrap(); let network = PeerNetwork::new( peerdb, atlasdb, stackerdbs, + burnchain_db, local_peer, peer_version, burnchain.clone(), @@ -3331,7 +3337,7 @@ mod test { network } - fn testing_burnchain_config() -> Burnchain { + fn testing_burnchain_config(test_name: &str) -> Burnchain { let first_burn_hash = BurnchainHeaderHash::from_hex( "0000000000000000000000000000000000000000000000000000000000000000", ) @@ -3342,7 +3348,7 @@ mod test { network_id: 0, chain_name: "bitcoin".to_string(), network_name: "testnet".to_string(), - working_dir: "/nope".to_string(), + working_dir: format!("/tmp/stacks-test-databases-{}", test_name), consensus_hash_lifetime: 24, stable_confirmations: 7, first_block_height: 12300, @@ -3366,8 +3372,6 @@ mod test { let socketaddr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let socketaddr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 8081); - let burnchain = testing_burnchain_config(); - let mut chain_view_1 = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -3397,10 +3401,13 @@ mod test { &peer_2_rc_consensus_hash ); + let burnchain_1 = testing_burnchain_config(&test_name_1); + let burnchain_2 = testing_burnchain_config(&test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -3411,7 +3418,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -3422,7 +3429,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -3431,7 +3438,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -3445,7 +3452,7 @@ mod test { peerdb_1 .update_local_peer( 0x9abcdef0, - burnchain.network_id, + burnchain_1.network_id, local_peer_1.data_url, local_peer_1.port, &[ @@ -3458,7 +3465,7 @@ mod test { peerdb_2 .update_local_peer( 0x9abcdef0, - burnchain.network_id, + burnchain_2.network_id, local_peer_2.data_url, local_peer_2.port, &[ @@ -3490,7 +3497,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -3500,7 +3507,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -3708,8 +3715,6 @@ mod test { let socketaddr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let socketaddr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 8081); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -3723,10 +3728,13 @@ mod test { let test_name_1 = "convo_handshake_accept_1"; let test_name_2 = "convo_handshake_accept_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -3737,7 +3745,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -3748,7 +3756,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -3757,7 +3765,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -3771,7 +3779,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -3781,7 +3789,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -3887,8 +3895,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -3902,10 +3908,13 @@ mod test { let test_name_1 = "convo_handshake_reject_1"; let test_name_2 = "convo_handshake_reject_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -3916,7 +3925,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -3927,7 +3936,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -3936,7 +3945,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -3950,7 +3959,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -3960,7 +3969,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4026,8 +4035,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4046,10 +4053,13 @@ mod test { let test_name_1 = "convo_handshake_badsignature_1"; let test_name_2 = "convo_handshake_badsignature_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4060,7 +4070,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4071,7 +4081,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4080,7 +4090,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4094,7 +4104,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4104,7 +4114,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4169,8 +4179,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4189,10 +4197,13 @@ mod test { let test_name_1 = "convo_handshake_badpeeraddress_1"; let test_name_2 = "convo_handshake_badpeeraddress_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4203,7 +4214,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4214,7 +4225,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4223,7 +4234,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4237,7 +4248,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4247,7 +4258,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4330,8 +4341,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4345,10 +4354,13 @@ mod test { let test_name_1 = "convo_handshake_update_key_1"; let test_name_2 = "convo_handshake_update_key_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4359,7 +4371,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4370,7 +4382,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4379,7 +4391,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4393,7 +4405,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4403,7 +4415,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4523,8 +4535,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4543,10 +4553,13 @@ mod test { let test_name_1 = "convo_handshake_self_1"; let test_name_2 = "convo_handshake_self_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4557,7 +4570,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4568,7 +4581,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4577,7 +4590,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4591,7 +4604,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4601,7 +4614,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4666,8 +4679,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4686,10 +4697,13 @@ mod test { let test_name_1 = "convo_ping_1"; let test_name_2 = "convo_ping_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4700,7 +4714,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4711,7 +4725,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4720,7 +4734,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4734,7 +4748,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4744,7 +4758,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -4841,8 +4855,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -4861,10 +4873,13 @@ mod test { let test_name_1 = "convo_handshake_ping_loop_1"; let test_name_2 = "convo_handshake_ping_loop_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -4875,7 +4890,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -4886,7 +4901,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -4895,7 +4910,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -4909,7 +4924,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -4919,7 +4934,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -5067,8 +5082,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -5087,10 +5100,13 @@ mod test { let test_name_1 = "convo_nack_unsolicited_1"; let test_name_2 = "convo_nack_unsolicited_2"; + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -5101,7 +5117,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -5112,7 +5128,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -5121,7 +5137,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -5135,7 +5151,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -5145,7 +5161,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -5216,8 +5232,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -5235,10 +5249,14 @@ mod test { let test_name_1 = "convo_ignore_unsolicited_handshake_1"; let test_name_2 = "convo_ignore_unsolicited_handshake_2"; + + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -5249,7 +5267,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -5260,7 +5278,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -5269,7 +5287,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -5283,7 +5301,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -5293,7 +5311,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -5390,8 +5408,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12331, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -5404,10 +5420,14 @@ mod test { let test_name_1 = "convo_handshake_getblocksinv_1"; let test_name_2 = "convo_handshake_getblocksinv_2"; + + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -5418,7 +5438,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -5429,7 +5449,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -5438,7 +5458,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -5452,7 +5472,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -5462,7 +5482,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -5667,8 +5687,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12331, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -5681,10 +5699,14 @@ mod test { let test_name_1 = "convo_handshake_getnakamotoinv_1"; let test_name_2 = "convo_handshake_getnakamotoinv_2"; + + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12350, "http://peer1.com".into(), @@ -5695,7 +5717,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12351, "http://peer2.com".into(), @@ -5706,7 +5728,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -5715,7 +5737,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -5729,7 +5751,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -5739,7 +5761,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -5940,8 +5962,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -5959,10 +5979,14 @@ mod test { let test_name_1 = "convo_natpunch_1"; let test_name_2 = "convo_natpunch_2"; + + let burnchain_1 = testing_burnchain_config(test_name_1); + let burnchain_2 = testing_burnchain_config(test_name_2); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, mut chainstate_1) = make_test_chain_dbs( test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, 12352, "http://peer1.com".into(), @@ -5973,7 +5997,7 @@ mod test { let (mut peerdb_2, mut sortdb_2, stackerdbs_2, pox_id_2, mut chainstate_2) = make_test_chain_dbs( test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, 12353, "http://peer2.com".into(), @@ -5984,7 +6008,7 @@ mod test { let mut net_1 = db_setup( &test_name_1, - &burnchain, + &burnchain_1, 0x9abcdef0, &mut peerdb_1, &mut sortdb_1, @@ -5993,7 +6017,7 @@ mod test { ); let mut net_2 = db_setup( &test_name_2, - &burnchain, + &burnchain_2, 0x9abcdef0, &mut peerdb_2, &mut sortdb_2, @@ -6007,7 +6031,7 @@ mod test { let mut convo_1 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_1, &socketaddr_2, &conn_opts, true, @@ -6017,7 +6041,7 @@ mod test { let mut convo_2 = ConversationP2P::new( 123, 456, - &burnchain, + &burnchain_2, &socketaddr_1, &conn_opts, true, @@ -6081,8 +6105,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -6094,6 +6116,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "convo_is_preamble_valid"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, chainstate_1) = make_test_chain_dbs( test_name_1, @@ -6362,7 +6386,7 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); + let burnchain = testing_burnchain_config("unused"); let mut chain_view = BurnchainView { burn_block_height: 12348, @@ -6748,8 +6772,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -6761,6 +6783,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "sign_relay_forward_message_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, @@ -6866,8 +6890,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -6879,6 +6901,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "sign_and_forward_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, @@ -6933,8 +6957,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -6946,6 +6968,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "validate_block_push_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, @@ -7067,8 +7091,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -7080,6 +7102,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "validate_transaction_push_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, @@ -7201,8 +7225,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -7214,6 +7236,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "validate_microblocks_push_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, @@ -7335,8 +7359,6 @@ mod test { ) .unwrap(); - let burnchain = testing_burnchain_config(); - let mut chain_view = BurnchainView { burn_block_height: 12348, burn_block_hash: BurnchainHeaderHash([0x11; 32]), @@ -7348,6 +7370,8 @@ mod test { chain_view.make_test_data(); let test_name_1 = "validate_stackerdb_push_1"; + let burnchain = testing_burnchain_config(test_name_1); + let (mut peerdb_1, mut sortdb_1, stackerdbs_1, pox_id_1, _) = make_test_chain_dbs( test_name_1, &burnchain, diff --git a/stackslib/src/net/connection.rs b/stackslib/src/net/connection.rs index 85fe9d7494..4eeec0daaf 100644 --- a/stackslib/src/net/connection.rs +++ b/stackslib/src/net/connection.rs @@ -474,6 +474,8 @@ pub struct ConnectionOptions { /// the reward cycle in which Nakamoto activates, and thus needs to run both the epoch /// 2.x and Nakamoto state machines. pub force_nakamoto_epoch_transition: bool, + /// Reject blocks that were pushed + pub reject_blocks_pushed: bool, // test facilitation /// Do not require that an unsolicited message originate from an authenticated, connected @@ -583,6 +585,7 @@ impl std::default::Default for ConnectionOptions { disable_stackerdb_sync: false, force_disconnect_interval: None, force_nakamoto_epoch_transition: false, + reject_blocks_pushed: false, // no test facilitations on by default test_disable_unsolicited_message_authentication: false, diff --git a/stackslib/src/net/download/nakamoto/download_state_machine.rs b/stackslib/src/net/download/nakamoto/download_state_machine.rs index 02ed8b9419..42d228aca1 100644 --- a/stackslib/src/net/download/nakamoto/download_state_machine.rs +++ b/stackslib/src/net/download/nakamoto/download_state_machine.rs @@ -68,6 +68,9 @@ use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; +/// How often to check for unconfirmed tenures +const CHECK_UNCONFIRMED_TENURES_MS: u128 = 1_000; + /// The overall downloader can operate in one of two states: /// * it's doing IBD, in which case it's downloading tenures using neighbor inventories and /// the start/end block ID hashes obtained from block-commits. This works up until the last two @@ -118,6 +121,10 @@ pub struct NakamotoDownloadStateMachine { pub(super) neighbor_rpc: NeighborRPC, /// Nakamoto chain tip nakamoto_tip: StacksBlockId, + /// do we need to fetch unconfirmed tenures? + fetch_unconfirmed_tenures: bool, + /// last time an unconfirmed tenures was checked + last_unconfirmed_download_check_ms: u128, /// last time an unconfirmed downloader was run last_unconfirmed_download_run_ms: u128, } @@ -139,6 +146,8 @@ impl NakamotoDownloadStateMachine { unconfirmed_tenure_downloads: HashMap::new(), neighbor_rpc: NeighborRPC::new(), nakamoto_tip, + fetch_unconfirmed_tenures: false, + last_unconfirmed_download_check_ms: 0, last_unconfirmed_download_run_ms: 0, } } @@ -465,142 +474,6 @@ impl NakamotoDownloadStateMachine { Ok(()) } - /// Determine if the set of `TenureStartEnd`s represents available but unfetched data. Used to - /// determine whether or not to update the set of wanted tenures -- we don't want to skip - /// fetching wanted tenures if they're still available! - pub(crate) fn have_unprocessed_tenures<'a>( - first_nakamoto_rc: u64, - completed_tenures: &HashSet, - prev_wanted_tenures: &[WantedTenure], - tenure_block_ids: &HashMap, - pox_constants: &PoxConstants, - first_burn_height: u64, - inventory_iter: impl Iterator, - ) -> bool { - if prev_wanted_tenures.is_empty() { - debug!("prev_wanted_tenures is empty, so we have unprocessed tenures"); - return true; - } - - // the anchor block for prev_wanted_tenures must not only be processed, but also we have to - // have seen an inventory message from the subsequent reward cycle. If we can see - // inventory messages for the reward cycle after `prev_wanted_rc`, then the former will be - // true - let prev_wanted_rc = prev_wanted_tenures - .last() - .map(|wt| { - downloader_block_height_to_reward_cycle( - pox_constants, - first_burn_height, - wt.burn_height, - ) - .expect("FATAL: wanted tenure before system start") - }) - .unwrap_or(u64::MAX); - - let cur_wanted_rc = prev_wanted_rc.saturating_add(1); - - debug!( - "have_unprocessed_tenures: prev_wanted_rc = {}, cur_wanted_rc = {}", - prev_wanted_rc, cur_wanted_rc - ); - - let mut has_prev_inv = false; - let mut has_cur_inv = false; - let mut num_invs = 0; - for inv in inventory_iter { - num_invs += 1; - if prev_wanted_rc < first_nakamoto_rc { - // assume the epoch 2.x inventory has this - has_prev_inv = true; - } else if inv.tenures_inv.get(&prev_wanted_rc).is_some() { - has_prev_inv = true; - } - - if cur_wanted_rc < first_nakamoto_rc { - // assume the epoch 2.x inventory has this - has_cur_inv = true; - } else if inv.tenures_inv.get(&cur_wanted_rc).is_some() { - has_cur_inv = true; - } - } - - if !has_prev_inv || !has_cur_inv { - debug!("No peer has an inventory for either the previous ({}: available = {}) or current ({}: available = {}) wanted tenures. Total inventories: {}", prev_wanted_rc, has_prev_inv, cur_wanted_rc, has_cur_inv, num_invs); - return true; - } - - // the state machine updates `tenure_block_ids` _after_ `wanted_tenures`, so verify that - // this isn't a stale `tenure_block_ids` by checking that it contains at least one block in - // the prev_wanted_rc and at least one in the cur_wanted_rc - let mut has_prev_rc_block = false; - let mut has_cur_rc_block = false; - let mut available_considered = 0; - for (_naddr, available) in tenure_block_ids.iter() { - available_considered += available.len(); - debug!("Consider available tenures from {}", _naddr); - for (_ch, tenure_info) in available.iter() { - debug!("Consider tenure info for {}: {:?}", _ch, tenure_info); - if tenure_info.start_reward_cycle == prev_wanted_rc - || tenure_info.end_reward_cycle == prev_wanted_rc - { - has_prev_rc_block = true; - debug!( - "Consider tenure info for {}: have a tenure in prev reward cycle {}", - _ch, prev_wanted_rc - ); - } - if tenure_info.start_reward_cycle == cur_wanted_rc - || tenure_info.end_reward_cycle == cur_wanted_rc - { - has_cur_rc_block = true; - debug!( - "Consider tenure info for {}: have a tenure in cur reward cycle {}", - _ch, cur_wanted_rc - ); - } - } - } - - if available_considered > 0 - && ((prev_wanted_rc >= first_nakamoto_rc && !has_prev_rc_block) - || (cur_wanted_rc >= first_nakamoto_rc && !has_cur_rc_block)) - { - debug!( - "tenure_block_ids stale: missing representation in reward cycles {} ({}) and {} ({})", - prev_wanted_rc, - has_prev_rc_block, - cur_wanted_rc, - has_cur_rc_block, - ); - return true; - } - - let mut ret = false; - for (_naddr, available) in tenure_block_ids.iter() { - for wt in prev_wanted_tenures.iter() { - let Some(tenure_info) = available.get(&wt.tenure_id_consensus_hash) else { - continue; - }; - if completed_tenures.contains(&tenure_info.tenure_id_consensus_hash) { - // this check is necessary because the check for .processed requires that a - // child tenure block has been processed, which isn't guaranteed at a reward - // cycle boundary - debug!("Tenure {:?} has been fully downloaded", &tenure_info); - continue; - } - if !tenure_info.processed { - debug!( - "Tenure {:?} is available from {} but not processed", - &tenure_info, &_naddr - ); - ret = true; - } - } - } - ret - } - /// Update the state machine's wanted tenures and processed tenures, if it's time to do so. /// This will only happen when the sortition DB has finished processing a reward cycle of /// tenures when in IBD mode, _OR_ when the sortition tip advances when in steady-state mode. @@ -612,8 +485,7 @@ impl NakamotoDownloadStateMachine { /// cycle boundaries, where the sortition DB is about to begin processing a new reward cycle. /// The list of wanted tenures for the current reward cycle will be saved as /// `self.prev_wanted_tenures`, and the set of wanted tenures for the next reward cycle - /// will be stored to `self.wanted_tenures`. It will only update these two lists if it is safe - /// to do so, as determined by `have_unprocessed_tenures()`. + /// will be stored to `self.wanted_tenures`. /// /// In the second case (i.e. not a reward cycle boundary), this function will load up _new_ /// wanted tenure data and append it to `self.wanted_tenures` via @@ -1355,6 +1227,7 @@ impl NakamotoDownloadStateMachine { ) { Ok(blocks_opt) => blocks_opt, Err(NetError::StaleView) => { + neighbor_rpc.add_dead(network, &naddr); continue; } Err(e) => { @@ -1545,16 +1418,19 @@ impl NakamotoDownloadStateMachine { chainstate: &StacksChainState, ibd: bool, ) -> HashMap> { - debug!("NakamotoDownloadStateMachine in state {}", &self.state); + debug!( + "run_downloads: burnchain_height={}, network.burnchain_tip.block_height={}, state={}", + burnchain_height, network.burnchain_tip.block_height, &self.state; + "has_network_inventories" => network.inv_state_nakamoto.is_some(), + "next_unconfirmed_check" => self.last_unconfirmed_download_check_ms.saturating_add(CHECK_UNCONFIRMED_TENURES_MS) / 1000, + "timestamp_ms" => get_epoch_time_ms(), + ); + let Some(invs) = network.inv_state_nakamoto.as_ref() else { // nothing to do - debug!("No network inventories"); return HashMap::new(); }; - debug!( - "run_downloads: burnchain_height={}, network.burnchain_tip.block_height={}, state={}", - burnchain_height, network.burnchain_tip.block_height, &self.state - ); + self.update_available_tenures( &invs.inventories, &sortdb.pox_constants, @@ -1563,14 +1439,24 @@ impl NakamotoDownloadStateMachine { ); // check this now, since we mutate self.available - let need_unconfirmed_tenures = Self::need_unconfirmed_tenures( - burnchain_height, - &network.burnchain_tip, - &self.wanted_tenures, - self.prev_wanted_tenures.as_ref().unwrap_or(&vec![]), - &self.tenure_block_ids, - &self.available_tenures, - ); + self.fetch_unconfirmed_tenures = if self + .last_unconfirmed_download_check_ms + .saturating_add(CHECK_UNCONFIRMED_TENURES_MS) + > get_epoch_time_ms() + { + false + } else { + let do_fetch = Self::need_unconfirmed_tenures( + burnchain_height, + &network.burnchain_tip, + &self.wanted_tenures, + self.prev_wanted_tenures.as_ref().unwrap_or(&vec![]), + &self.tenure_block_ids, + &self.available_tenures, + ); + self.last_unconfirmed_download_check_ms = get_epoch_time_ms(); + do_fetch + }; match self.state { NakamotoDownloadState::Confirmed => { @@ -1580,7 +1466,7 @@ impl NakamotoDownloadStateMachine { .expect("FATAL: max_inflight_blocks exceeds usize::MAX"), ); - if self.tenure_downloads.is_empty() && need_unconfirmed_tenures { + if self.tenure_downloads.is_empty() && self.fetch_unconfirmed_tenures { debug!( "Transition from {} to {}", &self.state, @@ -1625,7 +1511,7 @@ impl NakamotoDownloadStateMachine { } else if self.unconfirmed_tenure_downloads.is_empty() && self.unconfirmed_tenure_download_schedule.is_empty() { - if need_unconfirmed_tenures { + if self.fetch_unconfirmed_tenures { // do this again self.unconfirmed_tenure_download_schedule = Self::make_unconfirmed_tenure_download_schedule( diff --git a/stackslib/src/net/download/nakamoto/tenure.rs b/stackslib/src/net/download/nakamoto/tenure.rs index 53f9105156..ba1ac81033 100644 --- a/stackslib/src/net/download/nakamoto/tenure.rs +++ b/stackslib/src/net/download/nakamoto/tenure.rs @@ -98,6 +98,8 @@ impl WantedTenure { pub struct TenureStartEnd { /// Consensus hash that identifies the start of the tenure pub tenure_id_consensus_hash: ConsensusHash, + /// Burnchain block height of tenure ID consensus hash + pub tenure_id_burn_block_height: u64, /// Tenure-start block ID pub start_block_id: StacksBlockId, /// Last block ID @@ -119,6 +121,7 @@ pub type AvailableTenures = HashMap; impl TenureStartEnd { pub fn new( tenure_id_consensus_hash: ConsensusHash, + tenure_id_burn_block_height: u64, start_block_id: StacksBlockId, end_block_id: StacksBlockId, start_reward_cycle: u64, @@ -127,6 +130,7 @@ impl TenureStartEnd { ) -> Self { Self { tenure_id_consensus_hash, + tenure_id_burn_block_height, start_block_id, end_block_id, start_reward_cycle, @@ -214,6 +218,7 @@ impl TenureStartEnd { let tenure_start_end = TenureStartEnd::new( wt.tenure_id_consensus_hash.clone(), + wt.burn_height, wt_start.winning_block_id.clone(), wt_end.winning_block_id.clone(), rc, @@ -322,6 +327,7 @@ impl TenureStartEnd { let mut tenure_start_end = TenureStartEnd::new( wt.tenure_id_consensus_hash.clone(), + wt.burn_height, wt_start.winning_block_id.clone(), wt_end.winning_block_id.clone(), rc, diff --git a/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs b/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs index 49b32c2634..b5514558b8 100644 --- a/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs +++ b/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs @@ -67,6 +67,35 @@ use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct CompletedTenure { + tenure_id: ConsensusHash, + start_block: StacksBlockId, + end_block: StacksBlockId, +} + +impl From<&TenureStartEnd> for CompletedTenure { + fn from(tse: &TenureStartEnd) -> Self { + Self { + tenure_id: tse.tenure_id_consensus_hash.clone(), + start_block: tse.start_block_id.clone(), + end_block: tse.end_block_id.clone(), + } + } +} + +impl From<&mut NakamotoTenureDownloader> for CompletedTenure { + fn from(ntd: &mut NakamotoTenureDownloader) -> Self { + Self { + tenure_id: ntd.tenure_id_consensus_hash, + start_block: ntd.tenure_start_block_id, + end_block: ntd.tenure_end_block_id, + } + } +} + +pub const PEER_DEPRIORITIZATION_TIME_SECS: u64 = 60; + /// A set of confirmed downloader state machines assigned to one or more neighbors. The block /// downloader runs tenure-downloaders in parallel, since the downloader for the N+1'st tenure /// needs to feed data into the Nth tenure. This struct is responsible for scheduling peer @@ -83,7 +112,14 @@ pub struct NakamotoTenureDownloaderSet { pub(crate) peers: HashMap, /// The set of tenures that have been successfully downloaded (but possibly not yet stored or /// processed) - pub(crate) completed_tenures: HashSet, + pub(crate) completed_tenures: HashSet, + /// Number of times a tenure download was attempted + pub(crate) attempted_tenures: HashMap, + /// Number of times a tenure download failed + pub(crate) attempt_failed_tenures: HashMap, + /// Peers that should be deprioritized because they're dead (maps to when they can be used + /// again) + pub(crate) deprioritized_peers: HashMap, } impl NakamotoTenureDownloaderSet { @@ -92,15 +128,51 @@ impl NakamotoTenureDownloaderSet { downloaders: vec![], peers: HashMap::new(), completed_tenures: HashSet::new(), + attempted_tenures: HashMap::new(), + attempt_failed_tenures: HashMap::new(), + deprioritized_peers: HashMap::new(), } } + /// Mark a tenure as having failed to download. + /// Implemented statically to appease the borrow checker. + fn mark_failure(attempt_failed_tenures: &mut HashMap, ch: &ConsensusHash) { + if let Some(failures) = attempt_failed_tenures.get_mut(ch) { + *failures = (*failures).saturating_add(1); + } else { + attempt_failed_tenures.insert(ch.clone(), 1); + } + } + + /// Mark a peer as deprioritized + /// Implemented statically to appease the borrow checker. + fn mark_deprioritized( + deprioritized_peers: &mut HashMap, + peer: &NeighborAddress, + ) { + deprioritized_peers.insert( + peer.clone(), + get_epoch_time_secs() + PEER_DEPRIORITIZATION_TIME_SECS, + ); + } + + /// Mark a peer and its tenure as dead and failed + fn mark_failed_and_deprioritize_peer( + attempted_failed_tenures: &mut HashMap, + deprioritized_peers: &mut HashMap, + ch: &ConsensusHash, + peer: &NeighborAddress, + ) { + Self::mark_failure(attempted_failed_tenures, ch); + Self::mark_deprioritized(deprioritized_peers, peer); + } + /// Assign the given peer to the given downloader state machine. Allocate a slot for it if /// needed. fn add_downloader(&mut self, naddr: NeighborAddress, downloader: NakamotoTenureDownloader) { debug!( - "Add downloader for tenure {} driven by {}", - &downloader.tenure_id_consensus_hash, &naddr + "Add downloader for tenure {} driven by {naddr}", + &downloader.tenure_id_consensus_hash ); if let Some(idx) = self.peers.get(&naddr) { self.downloaders[*idx] = Some(downloader); @@ -154,7 +226,7 @@ impl NakamotoTenureDownloaderSet { ) { for (naddr, downloader) in iter { if self.has_downloader(&naddr) { - debug!("Already have downloader for {}", &naddr); + debug!("Already have downloader for {naddr}"); continue; } self.add_downloader(naddr, downloader); @@ -180,15 +252,6 @@ impl NakamotoTenureDownloaderSet { cnt } - /// Determine whether or not there exists a downloader for the given tenure, identified by its - /// consensus hash. - pub fn is_tenure_inflight(&self, ch: &ConsensusHash) -> bool { - self.downloaders - .iter() - .find(|d| d.as_ref().map(|x| &x.tenure_id_consensus_hash) == Some(ch)) - .is_some() - } - /// Determine if this downloader set is empty -- i.e. there's no in-progress downloaders. pub fn is_empty(&self) -> bool { for downloader_opt in self.downloaders.iter() { @@ -218,8 +281,8 @@ impl NakamotoTenureDownloaderSet { }; debug!( - "Peer {} already bound to downloader for {}", - &naddr, &_downloader.tenure_id_consensus_hash + "Peer {naddr} already bound to downloader for {}", + &_downloader.tenure_id_consensus_hash ); return true; } @@ -231,8 +294,8 @@ impl NakamotoTenureDownloaderSet { continue; } debug!( - "Assign peer {} to work on downloader for {} in state {}", - &naddr, &downloader.tenure_id_consensus_hash, &downloader.state + "Assign peer {naddr} to work on downloader for {} in state {}", + &downloader.tenure_id_consensus_hash, &downloader.state ); downloader.naddr = naddr.clone(); self.peers.insert(naddr, i); @@ -251,15 +314,15 @@ impl NakamotoTenureDownloaderSet { idled.push(naddr.clone()); continue; }; - let Some(downloader) = downloader_opt else { - debug!("Remove peer {} for null download {}", &naddr, i); + let Some(downloader) = downloader_opt.as_ref() else { + debug!("Remove peer {naddr} for null download {i}"); idled.push(naddr.clone()); continue; }; if downloader.idle { debug!( - "Remove idled peer {} for tenure download {}", - &naddr, &downloader.tenure_id_consensus_hash + "Remove idled peer {naddr} for tenure download {}", + &downloader.tenure_id_consensus_hash ); idled.push(naddr.clone()); } @@ -273,10 +336,12 @@ impl NakamotoTenureDownloaderSet { /// this up with a call to `clear_available_peers()`. pub fn clear_finished_downloaders(&mut self) { for downloader_opt in self.downloaders.iter_mut() { - let Some(downloader) = downloader_opt else { - continue; - }; - if downloader.is_done() { + // clear the downloader if it's done by setting it to None + if downloader_opt + .as_ref() + .map(|dl| dl.is_done()) + .unwrap_or(false) + { *downloader_opt = None; } } @@ -306,8 +371,8 @@ impl NakamotoTenureDownloaderSet { }; if &downloader.tenure_id_consensus_hash == tenure_id { debug!( - "Have downloader for tenure {} already (idle={}, state={}, naddr={})", - tenure_id, downloader.idle, &downloader.state, &downloader.naddr + "Have downloader for tenure {tenure_id} already (idle={}, state={}, naddr={})", + downloader.idle, &downloader.state, &downloader.naddr ); return true; } @@ -337,32 +402,35 @@ impl NakamotoTenureDownloaderSet { self.clear_finished_downloaders(); self.clear_available_peers(); - while self.inflight() < count { + while self.num_scheduled_downloaders() < count { let Some(ch) = schedule.front() else { break; }; - if self.completed_tenures.contains(&ch) { - debug!("Already successfully downloaded tenure {}", &ch); - schedule.pop_front(); - continue; - } let Some(neighbors) = available.get_mut(ch) else { // not found on any neighbors, so stop trying this tenure - debug!("No neighbors have tenure {}", ch); + debug!("No neighbors have tenure {ch}"); schedule.pop_front(); continue; }; if neighbors.is_empty() { // no more neighbors to try - debug!("No more neighbors can serve tenure {}", ch); + debug!("No more neighbors can serve tenure {ch}"); schedule.pop_front(); continue; } let Some(naddr) = neighbors.pop() else { - debug!("No more neighbors can serve tenure {}", ch); + debug!("No more neighbors can serve tenure {ch}"); schedule.pop_front(); continue; }; + if get_epoch_time_secs() < *self.deprioritized_peers.get(&naddr).unwrap_or(&0) { + debug!( + "Peer {} is deprioritized until {naddr}", + self.deprioritized_peers.get(&naddr).unwrap_or(&0) + ); + continue; + } + if self.try_resume_peer(naddr.clone()) { continue; }; @@ -373,23 +441,40 @@ impl NakamotoTenureDownloaderSet { let Some(available_tenures) = tenure_block_ids.get(&naddr) else { // this peer doesn't have any known tenures, so try the others - debug!("No tenures available from {}", &naddr); + debug!("No tenures available from {naddr}"); continue; }; let Some(tenure_info) = available_tenures.get(ch) else { // this peer does not have a tenure start/end block for this tenure, so try the // others. - debug!("Neighbor {} does not serve tenure {}", &naddr, ch); + debug!("Neighbor {naddr} does not serve tenure {ch}"); continue; }; + if tenure_info.processed { + // we already have this tenure + debug!("Already have processed tenure {ch}"); + self.completed_tenures + .remove(&CompletedTenure::from(tenure_info)); + continue; + } + if self + .completed_tenures + .contains(&CompletedTenure::from(tenure_info)) + { + debug!( + "Already successfully downloaded tenure {ch} ({}-{})", + &tenure_info.start_block_id, &tenure_info.end_block_id + ); + schedule.pop_front(); + continue; + } let Some(Some(start_reward_set)) = current_reward_cycles .get(&tenure_info.start_reward_cycle) .map(|cycle_info| cycle_info.reward_set()) else { debug!( - "Cannot fetch tenure-start block due to no known start reward set for cycle {}: {:?}", + "Cannot fetch tenure-start block due to no known start reward set for cycle {}: {tenure_info:?}", tenure_info.start_reward_cycle, - &tenure_info ); schedule.pop_front(); continue; @@ -399,28 +484,33 @@ impl NakamotoTenureDownloaderSet { .map(|cycle_info| cycle_info.reward_set()) else { debug!( - "Cannot fetch tenure-end block due to no known end reward set for cycle {}: {:?}", + "Cannot fetch tenure-end block due to no known end reward set for cycle {}: {tenure_info:?}", tenure_info.end_reward_cycle, - &tenure_info ); schedule.pop_front(); continue; }; - info!("Download tenure {}", &ch; + let attempt_count = *self.attempted_tenures.get(&ch).unwrap_or(&0); + self.attempted_tenures + .insert(ch.clone(), attempt_count.saturating_add(1)); + + let attempt_failed_count = *self.attempt_failed_tenures.get(&ch).unwrap_or(&0); + + info!("Download tenure {ch}"; + "peer" => %naddr, + "attempt" => attempt_count.saturating_add(1), + "failed" => attempt_failed_count, + "downloads_scheduled" => %self.num_scheduled_downloaders(), + "downloads_total" => %self.num_downloaders(), + "downloads_max_count" => count, + "downloads_inflight" => self.inflight(), "tenure_start_block" => %tenure_info.start_block_id, "tenure_end_block" => %tenure_info.end_block_id, "tenure_start_reward_cycle" => tenure_info.start_reward_cycle, - "tenure_end_reward_cycle" => tenure_info.end_reward_cycle); + "tenure_end_reward_cycle" => tenure_info.end_reward_cycle, + "tenure_burn_height" => tenure_info.tenure_id_burn_block_height); - debug!( - "Download tenure {} (start={}, end={}) (rc {},{})", - &ch, - &tenure_info.start_block_id, - &tenure_info.end_block_id, - tenure_info.start_reward_cycle, - tenure_info.end_reward_cycle - ); let tenure_download = NakamotoTenureDownloader::new( ch.clone(), tenure_info.start_block_id.clone(), @@ -430,7 +520,7 @@ impl NakamotoTenureDownloaderSet { end_reward_set.clone(), ); - debug!("Request tenure {} from neighbor {}", ch, &naddr); + debug!("Request tenure {ch} from neighbor {naddr}"); self.add_downloader(naddr, tenure_download); schedule.pop_front(); } @@ -459,28 +549,37 @@ impl NakamotoTenureDownloaderSet { // send requests for (naddr, index) in self.peers.iter() { if neighbor_rpc.has_inflight(&naddr) { - debug!("Peer {} has an inflight request", &naddr); + debug!("Peer {naddr} has an inflight request"); continue; } let Some(Some(downloader)) = self.downloaders.get_mut(*index) else { - debug!("No downloader for {}", &naddr); + debug!("No downloader for {naddr}"); continue; }; if downloader.is_done() { debug!( - "Downloader for {} on tenure {} is finished", - &naddr, &downloader.tenure_id_consensus_hash + "Downloader for {naddr} on tenure {} is finished", + &downloader.tenure_id_consensus_hash ); finished.push(naddr.clone()); - finished_tenures.push(downloader.tenure_id_consensus_hash.clone()); + finished_tenures.push(CompletedTenure::from(downloader)); continue; } debug!( - "Send request to {} for tenure {} (state {})", - &naddr, &downloader.tenure_id_consensus_hash, &downloader.state + "Send request to {naddr} for tenure {} (state {})", + &downloader.tenure_id_consensus_hash, &downloader.state ); let Ok(sent) = downloader.send_next_download_request(network, neighbor_rpc) else { - debug!("Downloader for {} failed; this peer is dead", &naddr); + info!( + "Downloader for tenure {} to {naddr} failed; this peer is dead", + &downloader.tenure_id_consensus_hash, + ); + Self::mark_failed_and_deprioritize_peer( + &mut self.attempt_failed_tenures, + &mut self.deprioritized_peers, + &downloader.tenure_id_consensus_hash, + naddr, + ); neighbor_rpc.add_dead(network, naddr); continue; }; @@ -494,12 +593,12 @@ impl NakamotoTenureDownloaderSet { // clear dead, broken, and done for naddr in addrs.iter() { if neighbor_rpc.is_dead_or_broken(network, naddr) { - debug!("Remove dead/broken downloader for {}", &naddr); + debug!("Remove dead/broken downloader for {naddr}"); self.clear_downloader(&naddr); } } for done_naddr in finished.drain(..) { - debug!("Remove finished downloader for {}", &done_naddr); + debug!("Remove finished downloader for {done_naddr}"); self.clear_downloader(&done_naddr); } for done_tenure in finished_tenures.drain(..) { @@ -509,23 +608,35 @@ impl NakamotoTenureDownloaderSet { // handle responses for (naddr, response) in neighbor_rpc.collect_replies(network) { let Some(index) = self.peers.get(&naddr) else { - debug!("No downloader for {}", &naddr); + debug!("No downloader for {naddr}"); continue; }; let Some(Some(downloader)) = self.downloaders.get_mut(*index) else { - debug!("No downloader for {}", &naddr); + debug!("No downloader for {naddr}"); continue; }; - debug!("Got response from {}", &naddr); + debug!("Got response from {naddr}"); let Ok(blocks_opt) = downloader .handle_next_download_response(response) .map_err(|e| { - debug!("Failed to handle response from {}: {:?}", &naddr, &e); + info!( + "Failed to handle response from {naddr} on tenure {}: {e}", + &downloader.tenure_id_consensus_hash, + ); e }) else { - debug!("Failed to handle download response from {}", &naddr); + debug!( + "Failed to handle download response from {naddr} on tenure {}", + &downloader.tenure_id_consensus_hash + ); + Self::mark_failed_and_deprioritize_peer( + &mut self.attempt_failed_tenures, + &mut self.deprioritized_peers, + &downloader.tenure_id_consensus_hash, + &naddr, + ); neighbor_rpc.add_dead(network, &naddr); continue; }; @@ -541,12 +652,16 @@ impl NakamotoTenureDownloaderSet { ); new_blocks.insert(downloader.tenure_id_consensus_hash.clone(), blocks); if downloader.is_done() { + info!( + "Downloader for tenure {} is finished", + &downloader.tenure_id_consensus_hash + ); debug!( - "Downloader for {} on tenure {} is finished", - &naddr, &downloader.tenure_id_consensus_hash + "Downloader for tenure {} finished on {naddr}", + &downloader.tenure_id_consensus_hash, ); finished.push(naddr.clone()); - finished_tenures.push(downloader.tenure_id_consensus_hash.clone()); + finished_tenures.push(CompletedTenure::from(downloader)); continue; } } @@ -554,12 +669,12 @@ impl NakamotoTenureDownloaderSet { // clear dead, broken, and done for naddr in addrs.iter() { if neighbor_rpc.is_dead_or_broken(network, naddr) { - debug!("Remove dead/broken downloader for {}", &naddr); + debug!("Remove dead/broken downloader for {naddr}"); self.clear_downloader(naddr); } } for done_naddr in finished.drain(..) { - debug!("Remove finished downloader for {}", &done_naddr); + debug!("Remove finished downloader for {done_naddr}"); self.clear_downloader(&done_naddr); } for done_tenure in finished_tenures.drain(..) { diff --git a/stackslib/src/net/inv/nakamoto.rs b/stackslib/src/net/inv/nakamoto.rs index 3f4fcb6165..87209e4496 100644 --- a/stackslib/src/net/inv/nakamoto.rs +++ b/stackslib/src/net/inv/nakamoto.rs @@ -579,10 +579,10 @@ impl NakamotoTenureInv { /// Reset synchronization state for this peer. Don't remove inventory data; just make it so we /// can talk to the peer again - pub fn try_reset_comms(&mut self, inv_sync_interval: u64, start_rc: u64, cur_rc: u64) { + pub fn try_reset_comms(&mut self, inv_sync_interval: u64, start_rc: u64, max_rc: u64) { let now = get_epoch_time_secs(); if self.start_sync_time + inv_sync_interval <= now - && (self.cur_reward_cycle >= cur_rc || !self.online) + && (self.cur_reward_cycle >= max_rc || !self.online) { self.reset_comms(start_rc); } @@ -618,20 +618,20 @@ impl NakamotoTenureInv { pub fn getnakamotoinv_begin( &mut self, network: &mut PeerNetwork, - current_reward_cycle: u64, + max_reward_cycle: u64, ) -> bool { debug!( "{:?}: Begin Nakamoto inventory sync for {} in cycle {}", network.get_local_peer(), self.neighbor_address, - current_reward_cycle, + max_reward_cycle, ); // possibly reset communications with this peer, if it's time to do so. self.try_reset_comms( network.get_connection_opts().inv_sync_interval, - current_reward_cycle.saturating_sub(network.get_connection_opts().inv_reward_cycles), - current_reward_cycle, + max_reward_cycle.saturating_sub(network.get_connection_opts().inv_reward_cycles), + max_reward_cycle, ); if !self.is_online() { // don't talk to this peer for now @@ -643,7 +643,7 @@ impl NakamotoTenureInv { return false; } - if self.reward_cycle() > current_reward_cycle { + if self.reward_cycle() > max_reward_cycle { // we've fully sync'ed with this peer debug!( "{:?}: fully sync'ed: {}", @@ -908,10 +908,24 @@ impl NakamotoInvStateMachine { ) }); - // try to get all of the reward cycles we know about, plus the next one. We try to get - // the next one as well in case we're at a reward cycle boundary, but we're not at the - // chain tip -- the block downloader still needs that next inventory to proceed. - let proceed = inv.getnakamotoinv_begin(network, current_reward_cycle.saturating_add(1)); + let burnchain_tip_reward_cycle = sortdb + .pox_constants + .block_height_to_reward_cycle( + sortdb.first_block_height, + network.stacks_tip.burnchain_height, + ) + .ok_or(NetError::ChainstateError( + "block height comes before system start".into(), + ))?; + + let max_reward_cycle = if burnchain_tip_reward_cycle > current_reward_cycle { + // try to sync up to the next reward cycle + current_reward_cycle.saturating_add(1) + } else { + current_reward_cycle + }; + + let proceed = inv.getnakamotoinv_begin(network, max_reward_cycle); let inv_rc = inv.reward_cycle(); new_inventories.insert(naddr.clone(), inv); @@ -946,6 +960,7 @@ impl NakamotoInvStateMachine { "peer" => ?naddr, "error" => ?e ); + continue; } } diff --git a/stackslib/src/net/mod.rs b/stackslib/src/net/mod.rs index 2210160bee..9e17c3f428 100644 --- a/stackslib/src/net/mod.rs +++ b/stackslib/src/net/mod.rs @@ -16,7 +16,6 @@ #[warn(unused_imports)] use std::collections::HashMap; -#[cfg(any(test, feature = "testing"))] use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::io::prelude::*; @@ -1466,7 +1465,7 @@ pub const DENY_BAN_DURATION: u64 = 86400; // seconds (1 day) pub const DENY_MIN_BAN_DURATION: u64 = 2; /// Result of doing network work -#[derive(Clone)] +#[derive(Clone, PartialEq, Debug)] pub struct NetworkResult { /// Stacks chain tip when we began this pass pub stacks_tip: StacksBlockId, @@ -1563,6 +1562,502 @@ impl NetworkResult { } } + /// Get the set of all StacksBlocks represented + fn all_block_ids(&self) -> HashSet { + let mut blocks: HashSet<_> = self + .blocks + .iter() + .map(|(ch, blk, _)| StacksBlockId::new(&ch, &blk.block_hash())) + .collect(); + + let pushed_blocks: HashSet<_> = self + .pushed_blocks + .iter() + .map(|(_, block_list)| { + block_list + .iter() + .map(|block_data| { + block_data + .blocks + .iter() + .map(|block_datum| { + StacksBlockId::new(&block_datum.0, &block_datum.1.block_hash()) + }) + .collect::>() + }) + .flatten() + }) + .flatten() + .collect(); + + let uploaded_blocks: HashSet<_> = self + .uploaded_blocks + .iter() + .map(|blk_data| { + blk_data + .blocks + .iter() + .map(|blk| StacksBlockId::new(&blk.0, &blk.1.block_hash())) + }) + .flatten() + .collect(); + + blocks.extend(pushed_blocks.into_iter()); + blocks.extend(uploaded_blocks.into_iter()); + blocks + } + + /// Get the set of all microblocks represented + fn all_microblock_hashes(&self) -> HashSet { + let mut mblocks: HashSet<_> = self + .confirmed_microblocks + .iter() + .map(|(_, mblocks, _)| mblocks.iter().map(|mblk| mblk.block_hash())) + .flatten() + .collect(); + + let pushed_microblocks: HashSet<_> = self + .pushed_microblocks + .iter() + .map(|(_, mblock_list)| { + mblock_list + .iter() + .map(|(_, mblock_data)| { + mblock_data + .microblocks + .iter() + .map(|mblock| mblock.block_hash()) + }) + .flatten() + }) + .flatten() + .collect(); + + let uploaded_microblocks: HashSet<_> = self + .uploaded_microblocks + .iter() + .map(|mblk_data| mblk_data.microblocks.iter().map(|mblk| mblk.block_hash())) + .flatten() + .collect(); + + mblocks.extend(pushed_microblocks.into_iter()); + mblocks.extend(uploaded_microblocks.into_iter()); + mblocks + } + + /// Get the set of all nakamoto blocks represented + fn all_nakamoto_block_ids(&self) -> HashSet { + let mut naka_block_ids: HashSet<_> = self + .nakamoto_blocks + .iter() + .map(|(_, nblk)| nblk.block_id()) + .collect(); + + let pushed_nakamoto_blocks: HashSet<_> = self + .pushed_nakamoto_blocks + .iter() + .map(|(_, naka_blocks_list)| { + naka_blocks_list + .iter() + .map(|(_, naka_blocks)| { + naka_blocks + .blocks + .iter() + .map(|nblk| nblk.block_id()) + .collect::>() + }) + .collect::>>() + }) + .collect::>>>() + .into_iter() + .flatten() + .into_iter() + .fold(HashSet::new(), |mut acc, next| { + acc.extend(next.into_iter()); + acc + }); + + let uploaded_nakamoto_blocks: HashSet<_> = self + .uploaded_nakamoto_blocks + .iter() + .map(|nblk| nblk.block_id()) + .collect(); + + naka_block_ids.extend(pushed_nakamoto_blocks.into_iter()); + naka_block_ids.extend(uploaded_nakamoto_blocks.into_iter()); + naka_block_ids + } + + /// Get the set of all txids represented + fn all_txids(&self) -> HashSet { + let mut txids: HashSet<_> = self + .uploaded_transactions + .iter() + .map(|tx| tx.txid()) + .collect(); + let pushed_txids: HashSet<_> = self + .pushed_transactions + .iter() + .map(|(_, tx_list)| { + tx_list + .iter() + .map(|(_, tx)| tx.txid()) + .collect::>() + }) + .collect::>>() + .into_iter() + .fold(HashSet::new(), |mut acc, next| { + acc.extend(next.into_iter()); + acc + }); + + let synced_txids: HashSet<_> = self + .synced_transactions + .iter() + .map(|tx| tx.txid()) + .collect(); + + txids.extend(pushed_txids.into_iter()); + txids.extend(synced_txids.into_iter()); + txids + } + + /// Get all unhandled message signatures. + /// This is unique per message. + fn all_msg_sigs(&self) -> HashSet { + self.unhandled_messages + .iter() + .map(|(_, msgs)| { + msgs.iter() + .map(|msg| msg.preamble.signature.clone()) + .collect::>() + }) + .into_iter() + .fold(HashSet::new(), |mut acc, next| { + acc.extend(next.into_iter()); + acc + }) + } + + /// Merge self into `newer`, and return `newer`. + /// deduplicate messages when possible. + pub fn update(mut self, mut newer: NetworkResult) -> Self { + // merge unhandled messaegs, but deduplicate + let newer_msgs = newer.all_msg_sigs(); + for (nk, mut msgs) in self.unhandled_messages.drain() { + msgs.retain(|msg| { + let retain = !newer_msgs.contains(&msg.preamble.signature); + if !retain { + debug!( + "Drop duplicate p2p message {} seq {}", + &msg.get_message_name(), + &msg.preamble.seq + ); + } + retain + }); + if let Some(newer_msgs) = newer.unhandled_messages.get_mut(&nk) { + newer_msgs.append(&mut msgs); + } else { + newer.unhandled_messages.insert(nk, msgs); + } + } + + let newer_blocks = newer.all_block_ids(); + let newer_microblocks = newer.all_microblock_hashes(); + let newer_naka_blocks = newer.all_nakamoto_block_ids(); + let newer_txids = newer.all_txids(); + + // only retain blocks not found in `newer` + self.blocks.retain(|(ch, blk, _)| { + let block_id = StacksBlockId::new(&ch, &blk.block_hash()); + let retain = !newer_blocks.contains(&block_id); + if !retain { + debug!("Drop duplicate downloaded block {}", &block_id); + } + retain + }); + newer.blocks.append(&mut self.blocks); + + // merge microblocks, but deduplicate + self.confirmed_microblocks + .retain_mut(|(_, ref mut mblocks, _)| { + mblocks.retain(|mblk| { + let retain = !newer_microblocks.contains(&mblk.block_hash()); + if !retain { + debug!( + "Drop duplicate downloaded microblock {}", + &mblk.block_hash() + ); + } + retain + }); + mblocks.len() > 0 + }); + newer + .confirmed_microblocks + .append(&mut self.confirmed_microblocks); + + // merge nakamoto blocks, but deduplicate + self.nakamoto_blocks.retain(|_, nblk| { + let retain = !newer_naka_blocks.contains(&nblk.block_id()); + if !retain { + debug!( + "Drop duplicate downloaded nakamoto block {}", + &nblk.block_id() + ); + } + retain + }); + newer.nakamoto_blocks.extend(self.nakamoto_blocks.drain()); + + // merge pushed transactions, but deduplicate + for (nk, mut tx_data) in self.pushed_transactions.drain() { + tx_data.retain(|(_, tx)| { + let retain = !newer_txids.contains(&tx.txid()); + if !retain { + debug!("Drop duplicate pushed transaction {}", &tx.txid()); + } + retain + }); + if tx_data.len() == 0 { + continue; + } + + if let Some(newer_tx_data) = newer.pushed_transactions.get_mut(&nk) { + newer_tx_data.append(&mut tx_data); + } else { + newer.pushed_transactions.insert(nk, tx_data); + } + } + + // merge pushed blocks, but deduplicate + for (nk, mut block_list) in self.pushed_blocks.drain() { + block_list.retain_mut(|ref mut block_data| { + block_data.blocks.retain(|blk_datum| { + let block_id = StacksBlockId::new(&blk_datum.0, &blk_datum.1.block_hash()); + let retain = !newer_blocks.contains(&block_id); + if !retain { + debug!("Drop duplicate pushed block {}", &block_id); + } + retain + }); + block_data.blocks.len() > 0 + }); + if block_list.len() == 0 { + continue; + } + + if let Some(newer_block_data) = newer.pushed_blocks.get_mut(&nk) { + newer_block_data.append(&mut block_list); + } else { + newer.pushed_blocks.insert(nk, block_list); + } + } + + // merge pushed microblocks, but deduplicate + for (nk, mut microblock_data) in self.pushed_microblocks.drain() { + microblock_data.retain_mut(|(_, ref mut mblock_data)| { + mblock_data.microblocks.retain(|mblk| { + let retain = !newer_microblocks.contains(&mblk.block_hash()); + if !retain { + debug!("Drop duplicate pushed microblock {}", &mblk.block_hash()); + } + retain + }); + mblock_data.microblocks.len() > 0 + }); + if microblock_data.len() == 0 { + continue; + } + + if let Some(newer_microblock_data) = newer.pushed_microblocks.get_mut(&nk) { + newer_microblock_data.append(&mut microblock_data); + } else { + newer.pushed_microblocks.insert(nk, microblock_data); + } + } + + // merge pushed nakamoto blocks, but deduplicate + for (nk, mut nakamoto_block_data) in self.pushed_nakamoto_blocks.drain() { + nakamoto_block_data.retain_mut(|(_, ref mut naka_blocks)| { + naka_blocks.blocks.retain(|nblk| { + let retain = !newer_naka_blocks.contains(&nblk.block_id()); + if !retain { + debug!("Drop duplicate pushed nakamoto block {}", &nblk.block_id()); + } + retain + }); + naka_blocks.blocks.len() > 0 + }); + if nakamoto_block_data.len() == 0 { + continue; + } + + if let Some(newer_nakamoto_data) = newer.pushed_nakamoto_blocks.get_mut(&nk) { + newer_nakamoto_data.append(&mut nakamoto_block_data); + } else { + newer.pushed_nakamoto_blocks.insert(nk, nakamoto_block_data); + } + } + + // merge uploaded data, but deduplicate + self.uploaded_transactions.retain(|tx| { + let retain = !newer_txids.contains(&tx.txid()); + if !retain { + debug!("Drop duplicate uploaded transaction {}", &tx.txid()); + } + retain + }); + self.uploaded_blocks.retain_mut(|ref mut blk_data| { + blk_data.blocks.retain(|blk| { + let block_id = StacksBlockId::new(&blk.0, &blk.1.block_hash()); + let retain = !newer_blocks.contains(&block_id); + if !retain { + debug!("Drop duplicate uploaded block {}", &block_id); + } + retain + }); + + blk_data.blocks.len() > 0 + }); + self.uploaded_microblocks.retain_mut(|ref mut mblock_data| { + mblock_data.microblocks.retain(|mblk| { + let retain = !newer_microblocks.contains(&mblk.block_hash()); + if !retain { + debug!("Drop duplicate uploaded microblock {}", &mblk.block_hash()); + } + retain + }); + + mblock_data.microblocks.len() > 0 + }); + self.uploaded_nakamoto_blocks.retain(|nblk| { + let retain = !newer_naka_blocks.contains(&nblk.block_id()); + if !retain { + debug!( + "Drop duplicate uploaded nakamoto block {}", + &nblk.block_id() + ); + } + retain + }); + + newer + .uploaded_transactions + .append(&mut self.uploaded_transactions); + newer.uploaded_blocks.append(&mut self.uploaded_blocks); + newer + .uploaded_microblocks + .append(&mut self.uploaded_microblocks); + newer + .uploaded_nakamoto_blocks + .append(&mut self.uploaded_nakamoto_blocks); + + // merge uploaded/pushed stackerdb, but drop stale versions + let newer_stackerdb_chunk_versions: HashMap<_, _> = newer + .uploaded_stackerdb_chunks + .iter() + .chain(newer.pushed_stackerdb_chunks.iter()) + .map(|chunk| { + ( + ( + chunk.contract_id.clone(), + chunk.rc_consensus_hash.clone(), + chunk.chunk_data.slot_id, + ), + chunk.chunk_data.slot_version, + ) + }) + .collect(); + + self.uploaded_stackerdb_chunks.retain(|push_chunk| { + if push_chunk.rc_consensus_hash != newer.rc_consensus_hash { + debug!( + "Drop pushed StackerDB chunk for {} due to stale view ({} != {}): {:?}", + &push_chunk.contract_id, + &push_chunk.rc_consensus_hash, + &newer.rc_consensus_hash, + &push_chunk.chunk_data + ); + return false; + } + if let Some(version) = newer_stackerdb_chunk_versions.get(&( + push_chunk.contract_id.clone(), + push_chunk.rc_consensus_hash.clone(), + push_chunk.chunk_data.slot_id, + )) { + let retain = push_chunk.chunk_data.slot_version > *version; + if !retain { + debug!( + "Drop pushed StackerDB chunk for {} due to stale version: {:?}", + &push_chunk.contract_id, &push_chunk.chunk_data + ); + } + retain + } else { + true + } + }); + + self.pushed_stackerdb_chunks.retain(|push_chunk| { + if push_chunk.rc_consensus_hash != newer.rc_consensus_hash { + debug!( + "Drop uploaded StackerDB chunk for {} due to stale view ({} != {}): {:?}", + &push_chunk.contract_id, + &push_chunk.rc_consensus_hash, + &newer.rc_consensus_hash, + &push_chunk.chunk_data + ); + return false; + } + if let Some(version) = newer_stackerdb_chunk_versions.get(&( + push_chunk.contract_id.clone(), + push_chunk.rc_consensus_hash.clone(), + push_chunk.chunk_data.slot_id, + )) { + let retain = push_chunk.chunk_data.slot_version > *version; + if !retain { + debug!( + "Drop uploaded StackerDB chunk for {} due to stale version: {:?}", + &push_chunk.contract_id, &push_chunk.chunk_data + ); + } + retain + } else { + true + } + }); + + newer + .uploaded_stackerdb_chunks + .append(&mut self.uploaded_stackerdb_chunks); + newer + .pushed_stackerdb_chunks + .append(&mut self.pushed_stackerdb_chunks); + + // dedup sync'ed transactions + self.synced_transactions.retain(|tx| { + let retain = !newer_txids.contains(&tx.txid()); + if !retain { + debug!("Drop duplicate sync'ed transaction {}", &tx.txid()); + } + retain + }); + + newer + .synced_transactions + .append(&mut self.synced_transactions); + + // no dedup here, but do merge + newer + .stacker_db_sync_results + .append(&mut self.stacker_db_sync_results); + newer.attachments.append(&mut self.attachments); + + newer + } + pub fn has_blocks(&self) -> bool { self.blocks.len() > 0 || self.pushed_blocks.len() > 0 } @@ -1616,6 +2111,10 @@ impl NetworkResult { || self.has_stackerdb_chunks() } + pub fn has_block_data_to_store(&self) -> bool { + self.has_blocks() || self.has_microblocks() || self.has_nakamoto_blocks() + } + pub fn consume_unsolicited(&mut self, unhandled_messages: PendingMessages) { for ((_event_id, neighbor_key), messages) in unhandled_messages.into_iter() { for message in messages.into_iter() { @@ -2633,10 +3132,13 @@ pub mod test { let stackerdb_contracts: Vec<_> = stacker_db_syncs.keys().map(|cid| cid.clone()).collect(); + let burnchain_db = config.burnchain.open_burnchain_db(false).unwrap(); + let mut peer_network = PeerNetwork::new( peerdb, atlasdb, p2p_stacker_dbs, + burnchain_db, local_peer, config.peer_version, config.burnchain.clone(), diff --git a/stackslib/src/net/p2p.rs b/stackslib/src/net/p2p.rs index 054fefaf1d..d77c0df9fa 100644 --- a/stackslib/src/net/p2p.rs +++ b/stackslib/src/net/p2p.rs @@ -243,11 +243,18 @@ impl CurrentRewardSet { /// Cached stacks chain tip info, consumed by RPC endpoints #[derive(Clone, Debug, PartialEq)] pub struct StacksTipInfo { + /// consensus hash of the highest processed stacks block pub consensus_hash: ConsensusHash, + /// block hash of the highest processed stacks block pub block_hash: BlockHeaderHash, + /// height of the highest processed stacks block pub height: u64, + /// coinbase height of the highest processed tenure pub coinbase_height: u64, + /// whether or not the system has transitioned to Nakamoto pub is_nakamoto: bool, + /// highest burnchain block discovered + pub burnchain_height: u64, } impl StacksTipInfo { @@ -258,6 +265,7 @@ impl StacksTipInfo { height: 0, coinbase_height: 0, is_nakamoto: false, + burnchain_height: 0, } } @@ -306,6 +314,9 @@ pub struct PeerNetwork { pub peerdb: PeerDB, pub atlasdb: AtlasDB, + // handle to burnchain DB + pub burnchain_db: BurnchainDB, + // ongoing p2p conversations (either they reached out to us, or we to them) pub peers: PeerMap, pub sockets: HashMap, @@ -444,6 +455,7 @@ impl PeerNetwork { peerdb: PeerDB, atlasdb: AtlasDB, stackerdbs: StackerDBs, + burnchain_db: BurnchainDB, mut local_peer: LocalPeer, peer_version: u32, burnchain: Burnchain, @@ -509,6 +521,8 @@ impl PeerNetwork { peerdb, atlasdb, + burnchain_db, + peers: PeerMap::new(), sockets: HashMap::new(), events: HashMap::new(), @@ -4257,6 +4271,7 @@ impl PeerNetwork { .anchored_header .as_stacks_nakamoto() .is_some(), + burnchain_height: self.stacks_tip.burnchain_height, }; debug!( "{:?}: Parent Stacks tip off of {} is {:?}", @@ -4304,7 +4319,9 @@ impl PeerNetwork { for rc in [cur_rc, prev_rc, prev_prev_rc] { debug!("Refresh reward cycle info for cycle {}", rc); - let rc_start_height = self.burnchain.nakamoto_first_block_of_cycle(rc); + // NOTE: + 1 needed because the sortition db indexes anchor blocks at index height 1, + // not 0 + let rc_start_height = self.burnchain.nakamoto_first_block_of_cycle(rc) + 1; let Some(ancestor_sort_id) = get_ancestor_sort_id(&ih, rc_start_height, &tip_sn.sortition_id)? else { @@ -4385,6 +4402,7 @@ impl PeerNetwork { let (stacks_tip_ch, stacks_tip_bhh, stacks_tip_height) = SortitionDB::get_canonical_stacks_chain_tip_hash_and_height(sortdb.conn())?; + let new_burnchain_tip = self.burnchain_db.get_canonical_chain_tip()?; let burnchain_tip_changed = canonical_sn.block_height != self.chain_view.burn_block_height || self.num_state_machine_passes == 0 || canonical_sn.sortition_id != self.burnchain_tip.sortition_id; @@ -4463,6 +4481,7 @@ impl PeerNetwork { height: 0, coinbase_height: 0, is_nakamoto: false, + burnchain_height: 0, } } Err(e) => return Err(e), @@ -4534,12 +4553,10 @@ impl PeerNetwork { if self.get_current_epoch().epoch_id < StacksEpochId::Epoch30 { // update heaviest affirmation map view - let burnchain_db = self.burnchain.open_burnchain_db(false)?; - self.heaviest_affirmation_map = static_get_heaviest_affirmation_map( &self.burnchain, indexer, - &burnchain_db, + &self.burnchain_db, sortdb, &canonical_sn.sortition_id, ) @@ -4550,7 +4567,7 @@ impl PeerNetwork { self.tentative_best_affirmation_map = static_get_canonical_affirmation_map( &self.burnchain, indexer, - &burnchain_db, + &self.burnchain_db, sortdb, chainstate, &canonical_sn.sortition_id, @@ -4591,9 +4608,8 @@ impl PeerNetwork { if stacks_tip_changed && self.get_current_epoch().epoch_id < StacksEpochId::Epoch30 { // update stacks tip affirmation map view // (NOTE: this check has to happen _after_ self.chain_view gets updated!) - let burnchain_db = self.burnchain.open_burnchain_db(false)?; self.stacks_tip_affirmation_map = static_get_stacks_tip_affirmation_map( - &burnchain_db, + &self.burnchain_db, sortdb, &canonical_sn.sortition_id, &canonical_sn.canonical_stacks_tip_consensus_hash, @@ -4659,8 +4675,10 @@ impl PeerNetwork { height: stacks_tip_height, coinbase_height, is_nakamoto: stacks_tip_is_nakamoto, + burnchain_height: new_burnchain_tip.block_height, }; self.parent_stacks_tip = parent_stacks_tip; + self.parent_stacks_tip.burnchain_height = new_burnchain_tip.block_height; debug!( "{:?}: canonical Stacks tip is now {:?}", @@ -5266,7 +5284,7 @@ mod test { network_id: 0x9abcdef0, chain_name: "bitcoin".to_string(), network_name: "testnet".to_string(), - working_dir: "/nope".to_string(), + working_dir: ":memory:".to_string(), consensus_hash_lifetime: 24, stable_confirmations: 7, initial_reward_start_block: 50, @@ -5297,12 +5315,14 @@ mod test { let atlas_config = AtlasConfig::new(false); let atlasdb = AtlasDB::connect_memory(atlas_config).unwrap(); let stacker_db = StackerDBs::connect_memory(); + let burnchain_db = BurnchainDB::connect(":memory:", &burnchain, true).unwrap(); let local_peer = PeerDB::get_local_peer(db.conn()).unwrap(); let p2p = PeerNetwork::new( db, atlasdb, stacker_db, + burnchain_db, local_peer, 0x12345678, burnchain, diff --git a/stackslib/src/net/relay.rs b/stackslib/src/net/relay.rs index 7e4ecbb408..f923aa1281 100644 --- a/stackslib/src/net/relay.rs +++ b/stackslib/src/net/relay.rs @@ -1703,6 +1703,7 @@ impl Relayer { sortdb: &mut SortitionDB, chainstate: &mut StacksChainState, coord_comms: Option<&CoordinatorChannels>, + reject_blocks_pushed: bool, ) -> Result<(Vec, Vec), net_error> { let mut pushed_blocks = vec![]; let mut bad_neighbors = vec![]; @@ -1731,6 +1732,14 @@ impl Relayer { for nakamoto_block in nakamoto_blocks_data.blocks.drain(..) { let block_id = nakamoto_block.block_id(); + if reject_blocks_pushed { + debug!( + "Received pushed Nakamoto block {} from {}, but configured to reject it.", + block_id, neighbor_key + ); + continue; + } + debug!( "Received pushed Nakamoto block {} from {}", block_id, neighbor_key @@ -2092,6 +2101,7 @@ impl Relayer { /// Returns the list of Nakamoto blocks we stored, as well as the list of bad neighbors that /// sent us invalid blocks. pub fn process_new_nakamoto_blocks( + connection_opts: &ConnectionOptions, network_result: &mut NetworkResult, burnchain: &Burnchain, sortdb: &mut SortitionDB, @@ -2128,6 +2138,7 @@ impl Relayer { sortdb, chainstate, coord_comms, + connection_opts.reject_blocks_pushed, ) { Ok(x) => x, Err(e) => { @@ -2848,6 +2859,7 @@ impl Relayer { coord_comms: Option<&CoordinatorChannels>, ) -> u64 { let (accepted_blocks, bad_neighbors) = match Self::process_new_nakamoto_blocks( + &self.connection_opts, network_result, burnchain, sortdb, diff --git a/stackslib/src/net/stackerdb/mod.rs b/stackslib/src/net/stackerdb/mod.rs index 57d1a427dc..bbbec21290 100644 --- a/stackslib/src/net/stackerdb/mod.rs +++ b/stackslib/src/net/stackerdb/mod.rs @@ -155,7 +155,7 @@ pub const STACKERDB_CONFIG_FUNCTION: &str = "stackerdb-get-config"; pub const MINER_SLOT_COUNT: u32 = 2; /// Final result of synchronizing state with a remote set of DB replicas -#[derive(Clone)] +#[derive(Clone, PartialEq, Debug)] pub struct StackerDBSyncResult { /// which contract this is a replica for pub contract_id: QualifiedContractIdentifier, diff --git a/stackslib/src/net/tests/mod.rs b/stackslib/src/net/tests/mod.rs index 6e61e7e610..b8e9167ad9 100644 --- a/stackslib/src/net/tests/mod.rs +++ b/stackslib/src/net/tests/mod.rs @@ -22,18 +22,24 @@ pub mod mempool; pub mod neighbors; pub mod relay; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use clarity::vm::clarity::ClarityConnection; -use clarity::vm::types::PrincipalData; +use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier}; +use libstackerdb::StackerDBChunkData; use rand::prelude::SliceRandom; use rand::{thread_rng, Rng, RngCore}; use stacks_common::address::{AddressHashMode, C32_ADDRESS_VERSION_TESTNET_SINGLESIG}; +use stacks_common::bitvec::BitVec; use stacks_common::consts::{FIRST_BURNCHAIN_CONSENSUS_HASH, FIRST_STACKS_BLOCK_HASH}; use stacks_common::types::chainstate::{ - StacksAddress, StacksBlockId, StacksPrivateKey, StacksPublicKey, + BurnchainHeaderHash, ConsensusHash, StacksAddress, StacksBlockId, StacksPrivateKey, + StacksPublicKey, TrieHash, }; +use stacks_common::types::net::PeerAddress; use stacks_common::types::{Address, StacksEpochId}; +use stacks_common::util::hash::Sha512Trunc256Sum; +use stacks_common::util::secp256k1::MessageSignature; use stacks_common::util::vrf::VRFProof; use crate::burnchains::PoxConstants; @@ -45,7 +51,7 @@ use crate::chainstate::nakamoto::staging_blocks::NakamotoBlockObtainMethod; use crate::chainstate::nakamoto::test_signers::TestSigners; use crate::chainstate::nakamoto::tests::get_account; use crate::chainstate::nakamoto::tests::node::TestStacker; -use crate::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; +use crate::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader, NakamotoChainState}; use crate::chainstate::stacks::address::PoxAddress; use crate::chainstate::stacks::boot::test::{ key_to_stacks_addr, make_pox_4_lockup, make_pox_4_lockup_chain_id, make_signer_key_signature, @@ -54,8 +60,10 @@ use crate::chainstate::stacks::boot::test::{ use crate::chainstate::stacks::boot::{ MINERS_NAME, SIGNERS_VOTING_FUNCTION_NAME, SIGNERS_VOTING_NAME, }; +use crate::chainstate::stacks::db::blocks::test::make_empty_coinbase_block; use crate::chainstate::stacks::db::{MinerPaymentTxFees, StacksAccount, StacksChainState}; use crate::chainstate::stacks::events::TransactionOrigin; +use crate::chainstate::stacks::test::make_codec_test_microblock; use crate::chainstate::stacks::{ CoinbasePayload, StacksTransaction, StacksTransactionSigner, TenureChangeCause, TenureChangePayload, TokenTransferMemo, TransactionAnchorMode, TransactionAuth, @@ -66,6 +74,10 @@ use crate::core::{StacksEpoch, StacksEpochExtension}; use crate::net::relay::{BlockAcceptResponse, Relayer}; use crate::net::stackerdb::StackerDBConfig; use crate::net::test::{TestEventObserver, TestPeer, TestPeerConfig}; +use crate::net::{ + BlocksData, BlocksDatum, MicroblocksData, NakamotoBlocksData, NeighborKey, NetworkResult, + PingData, StackerDBPushChunkData, StacksMessage, StacksMessageType, +}; use crate::util_lib::boot::boot_code_id; use crate::util_lib::signed_structured_data::pox4::make_pox_4_signer_key_signature; @@ -525,7 +537,7 @@ impl NakamotoBootPlan { }) .collect(); - let old_tip = peer.network.stacks_tip.clone(); + let mut old_tip = peer.network.stacks_tip.clone(); let mut stacks_block = peer.tenure_with_txs(&stack_txs, &mut peer_nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -533,13 +545,14 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, peer.network.parent_stacks_tip); } for (other_peer, other_peer_nonce) in other_peers.iter_mut().zip(other_peer_nonces.iter_mut()) { - let old_tip = other_peer.network.stacks_tip.clone(); + let mut old_tip = other_peer.network.stacks_tip.clone(); other_peer.tenure_with_txs(&stack_txs, other_peer_nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -548,6 +561,7 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(other_peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = other_peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, other_peer.network.parent_stacks_tip); } } @@ -560,7 +574,7 @@ impl NakamotoBootPlan { .burnchain .is_in_prepare_phase(sortition_height.into()) { - let old_tip = peer.network.stacks_tip.clone(); + let mut old_tip = peer.network.stacks_tip.clone(); stacks_block = peer.tenure_with_txs(&[], &mut peer_nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -568,13 +582,14 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, peer.network.parent_stacks_tip); } other_peers .iter_mut() .zip(other_peer_nonces.iter_mut()) .for_each(|(peer, nonce)| { - let old_tip = peer.network.stacks_tip.clone(); + let mut old_tip = peer.network.stacks_tip.clone(); peer.tenure_with_txs(&[], nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -583,6 +598,7 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, peer.network.parent_stacks_tip); } }); @@ -595,7 +611,7 @@ impl NakamotoBootPlan { // advance to the start of epoch 3.0 while sortition_height < epoch_30_height - 1 { - let old_tip = peer.network.stacks_tip.clone(); + let mut old_tip = peer.network.stacks_tip.clone(); peer.tenure_with_txs(&vec![], &mut peer_nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -603,13 +619,14 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, peer.network.parent_stacks_tip); } for (other_peer, other_peer_nonce) in other_peers.iter_mut().zip(other_peer_nonces.iter_mut()) { - let old_tip = peer.network.stacks_tip.clone(); + let mut old_tip = peer.network.stacks_tip.clone(); other_peer.tenure_with_txs(&vec![], other_peer_nonce); let (stacks_tip_ch, stacks_tip_bh) = @@ -618,6 +635,8 @@ impl NakamotoBootPlan { let stacks_tip = StacksBlockId::new(&stacks_tip_ch, &stacks_tip_bh); assert_eq!(other_peer.network.stacks_tip.block_id(), stacks_tip); if old_tip.block_id() != stacks_tip { + old_tip.burnchain_height = + other_peer.network.parent_stacks_tip.burnchain_height; assert_eq!(old_tip, other_peer.network.parent_stacks_tip); } } @@ -1125,3 +1144,664 @@ fn test_boot_nakamoto_peer() { let observer = TestEventObserver::new(); let (peer, other_peers) = plan.boot_into_nakamoto_peers(boot_tenures, Some(&observer)); } + +#[test] +fn test_network_result_update() { + let mut network_result_1 = NetworkResult::new( + StacksBlockId([0x11; 32]), + 1, + 1, + 1, + 1, + 1, + ConsensusHash([0x11; 20]), + HashMap::new(), + ); + + let mut network_result_2 = NetworkResult::new( + StacksBlockId([0x22; 32]), + 2, + 2, + 2, + 2, + 2, + ConsensusHash([0x22; 20]), + HashMap::new(), + ); + + let nk1 = NeighborKey { + peer_version: 1, + network_id: 1, + addrbytes: PeerAddress([0x11; 16]), + port: 1, + }; + + let nk2 = NeighborKey { + peer_version: 2, + network_id: 2, + addrbytes: PeerAddress([0x22; 16]), + port: 2, + }; + + let msg1 = StacksMessage::new( + 1, + 1, + 1, + &BurnchainHeaderHash([0x11; 32]), + 1, + &BurnchainHeaderHash([0x11; 32]), + StacksMessageType::Ping(PingData { nonce: 1 }), + ); + + let mut msg2 = StacksMessage::new( + 2, + 2, + 2, + &BurnchainHeaderHash([0x22; 32]), + 2, + &BurnchainHeaderHash([0x22; 32]), + StacksMessageType::Ping(PingData { nonce: 2 }), + ); + msg2.sign(2, &StacksPrivateKey::new()).unwrap(); + + let pkey_1 = StacksPrivateKey::new(); + let pkey_2 = StacksPrivateKey::new(); + + let pushed_pkey_1 = StacksPrivateKey::new(); + let pushed_pkey_2 = StacksPrivateKey::new(); + + let uploaded_pkey_1 = StacksPrivateKey::new(); + let uploaded_pkey_2 = StacksPrivateKey::new(); + + let blk1 = make_empty_coinbase_block(&pkey_1); + let blk2 = make_empty_coinbase_block(&pkey_2); + + let pushed_blk1 = make_empty_coinbase_block(&pushed_pkey_1); + let pushed_blk2 = make_empty_coinbase_block(&pushed_pkey_2); + + let uploaded_blk1 = make_empty_coinbase_block(&uploaded_pkey_1); + let uploaded_blk2 = make_empty_coinbase_block(&uploaded_pkey_2); + + let mblk1 = make_codec_test_microblock(1); + let mblk2 = make_codec_test_microblock(2); + + let pushed_mblk1 = make_codec_test_microblock(3); + let pushed_mblk2 = make_codec_test_microblock(4); + + let uploaded_mblk1 = make_codec_test_microblock(5); + let uploaded_mblk2 = make_codec_test_microblock(6); + + let pushed_tx1 = make_codec_test_microblock(3).txs[2].clone(); + let pushed_tx2 = make_codec_test_microblock(4).txs[3].clone(); + + let uploaded_tx1 = make_codec_test_microblock(5).txs[4].clone(); + let uploaded_tx2 = make_codec_test_microblock(6).txs[5].clone(); + + let synced_tx1 = make_codec_test_microblock(7).txs[6].clone(); + let synced_tx2 = make_codec_test_microblock(8).txs[7].clone(); + + let naka_header_1 = NakamotoBlockHeader { + version: 1, + chain_length: 1, + burn_spent: 1, + consensus_hash: ConsensusHash([0x01; 20]), + parent_block_id: StacksBlockId([0x01; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x01; 32]), + state_index_root: TrieHash([0x01; 32]), + timestamp: 1, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let naka_header_2 = NakamotoBlockHeader { + version: 2, + chain_length: 2, + burn_spent: 2, + consensus_hash: ConsensusHash([0x02; 20]), + parent_block_id: StacksBlockId([0x02; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x02; 32]), + state_index_root: TrieHash([0x02; 32]), + timestamp: 2, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let naka_pushed_header_1 = NakamotoBlockHeader { + version: 3, + chain_length: 3, + burn_spent: 3, + consensus_hash: ConsensusHash([0x03; 20]), + parent_block_id: StacksBlockId([0x03; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x03; 32]), + state_index_root: TrieHash([0x03; 32]), + timestamp: 3, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let naka_pushed_header_2 = NakamotoBlockHeader { + version: 4, + chain_length: 4, + burn_spent: 4, + consensus_hash: ConsensusHash([0x04; 20]), + parent_block_id: StacksBlockId([0x04; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x04; 32]), + state_index_root: TrieHash([0x04; 32]), + timestamp: 4, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let naka_uploaded_header_1 = NakamotoBlockHeader { + version: 5, + chain_length: 5, + burn_spent: 5, + consensus_hash: ConsensusHash([0x05; 20]), + parent_block_id: StacksBlockId([0x05; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x05; 32]), + state_index_root: TrieHash([0x05; 32]), + timestamp: 5, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let naka_uploaded_header_2 = NakamotoBlockHeader { + version: 6, + chain_length: 6, + burn_spent: 6, + consensus_hash: ConsensusHash([0x06; 20]), + parent_block_id: StacksBlockId([0x06; 32]), + tx_merkle_root: Sha512Trunc256Sum([0x06; 32]), + state_index_root: TrieHash([0x06; 32]), + timestamp: 6, + miner_signature: MessageSignature::empty(), + signer_signature: vec![], + pox_treatment: BitVec::zeros(1).unwrap(), + }; + + let nblk1 = NakamotoBlock { + header: naka_header_1.clone(), + txs: vec![], + }; + let nblk2 = NakamotoBlock { + header: naka_header_2.clone(), + txs: vec![], + }; + + let pushed_nblk1 = NakamotoBlock { + header: naka_pushed_header_1.clone(), + txs: vec![], + }; + let pushed_nblk2 = NakamotoBlock { + header: naka_pushed_header_2.clone(), + txs: vec![], + }; + + let uploaded_nblk1 = NakamotoBlock { + header: naka_uploaded_header_1.clone(), + txs: vec![], + }; + let uploaded_nblk2 = NakamotoBlock { + header: naka_uploaded_header_2.clone(), + txs: vec![], + }; + + let pushed_stackerdb_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0x11; 20]), + chunk_data: StackerDBChunkData { + slot_id: 1, + slot_version: 1, + sig: MessageSignature::empty(), + data: vec![1], + }, + }; + + let pushed_stackerdb_chunk_2 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0x22; 20]), + chunk_data: StackerDBChunkData { + slot_id: 2, + slot_version: 2, + sig: MessageSignature::empty(), + data: vec![2], + }, + }; + + let uploaded_stackerdb_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0x33; 20]), + chunk_data: StackerDBChunkData { + slot_id: 3, + slot_version: 3, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + let uploaded_stackerdb_chunk_2 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0x44; 20]), + chunk_data: StackerDBChunkData { + slot_id: 4, + slot_version: 4, + sig: MessageSignature::empty(), + data: vec![4], + }, + }; + + network_result_1 + .unhandled_messages + .insert(nk1.clone(), vec![msg1.clone()]); + network_result_1 + .blocks + .push((ConsensusHash([0x11; 20]), blk1.clone(), 1)); + network_result_1.confirmed_microblocks.push(( + ConsensusHash([0x11; 20]), + vec![mblk1.clone()], + 1, + )); + network_result_1 + .nakamoto_blocks + .insert(nblk1.block_id(), nblk1.clone()); + network_result_1 + .pushed_transactions + .insert(nk1.clone(), vec![(vec![], pushed_tx1.clone())]); + network_result_1.pushed_blocks.insert( + nk1.clone(), + vec![BlocksData { + blocks: vec![BlocksDatum(ConsensusHash([0x11; 20]), pushed_blk1.clone())], + }], + ); + network_result_1.pushed_microblocks.insert( + nk1.clone(), + vec![( + vec![], + MicroblocksData { + index_anchor_block: StacksBlockId([0x11; 32]), + microblocks: vec![pushed_mblk1.clone()], + }, + )], + ); + network_result_1.pushed_nakamoto_blocks.insert( + nk1.clone(), + vec![( + vec![], + NakamotoBlocksData { + blocks: vec![pushed_nblk1], + }, + )], + ); + network_result_1 + .uploaded_transactions + .push(uploaded_tx1.clone()); + network_result_1.uploaded_blocks.push(BlocksData { + blocks: vec![BlocksDatum( + ConsensusHash([0x11; 20]), + uploaded_blk1.clone(), + )], + }); + network_result_1.uploaded_microblocks.push(MicroblocksData { + index_anchor_block: StacksBlockId([0x11; 32]), + microblocks: vec![uploaded_mblk1.clone()], + }); + network_result_1 + .uploaded_nakamoto_blocks + .push(uploaded_nblk1.clone()); + network_result_1 + .pushed_stackerdb_chunks + .push(pushed_stackerdb_chunk_1.clone()); + network_result_1 + .uploaded_stackerdb_chunks + .push(uploaded_stackerdb_chunk_1.clone()); + network_result_1.synced_transactions.push(synced_tx1); + + network_result_2 + .unhandled_messages + .insert(nk2.clone(), vec![msg2.clone()]); + network_result_2 + .blocks + .push((ConsensusHash([0x22; 20]), blk2.clone(), 2)); + network_result_2.confirmed_microblocks.push(( + ConsensusHash([0x22; 20]), + vec![mblk2.clone()], + 2, + )); + network_result_2 + .nakamoto_blocks + .insert(nblk2.block_id(), nblk2.clone()); + network_result_2 + .pushed_transactions + .insert(nk2.clone(), vec![(vec![], pushed_tx2.clone())]); + network_result_2.pushed_blocks.insert( + nk2.clone(), + vec![BlocksData { + blocks: vec![BlocksDatum(ConsensusHash([0x22; 20]), pushed_blk2.clone())], + }], + ); + network_result_2.pushed_microblocks.insert( + nk2.clone(), + vec![( + vec![], + MicroblocksData { + index_anchor_block: StacksBlockId([0x22; 32]), + microblocks: vec![pushed_mblk2.clone()], + }, + )], + ); + network_result_2.pushed_nakamoto_blocks.insert( + nk2.clone(), + vec![( + vec![], + NakamotoBlocksData { + blocks: vec![pushed_nblk2], + }, + )], + ); + network_result_2 + .uploaded_transactions + .push(uploaded_tx2.clone()); + network_result_2.uploaded_blocks.push(BlocksData { + blocks: vec![BlocksDatum( + ConsensusHash([0x22; 20]), + uploaded_blk2.clone(), + )], + }); + network_result_2.uploaded_microblocks.push(MicroblocksData { + index_anchor_block: StacksBlockId([0x22; 32]), + microblocks: vec![uploaded_mblk2.clone()], + }); + network_result_2 + .uploaded_nakamoto_blocks + .push(uploaded_nblk2.clone()); + network_result_2 + .pushed_stackerdb_chunks + .push(pushed_stackerdb_chunk_2.clone()); + network_result_2 + .uploaded_stackerdb_chunks + .push(uploaded_stackerdb_chunk_2.clone()); + network_result_2.synced_transactions.push(synced_tx2); + + let mut network_result_union = network_result_2.clone(); + let mut n1 = network_result_1.clone(); + network_result_union + .unhandled_messages + .extend(n1.unhandled_messages.into_iter()); + network_result_union.blocks.append(&mut n1.blocks); + network_result_union + .confirmed_microblocks + .append(&mut n1.confirmed_microblocks); + network_result_union + .nakamoto_blocks + .extend(n1.nakamoto_blocks.into_iter()); + network_result_union + .pushed_transactions + .extend(n1.pushed_transactions.into_iter()); + network_result_union + .pushed_blocks + .extend(n1.pushed_blocks.into_iter()); + network_result_union + .pushed_microblocks + .extend(n1.pushed_microblocks.into_iter()); + network_result_union + .pushed_nakamoto_blocks + .extend(n1.pushed_nakamoto_blocks.into_iter()); + network_result_union + .uploaded_transactions + .append(&mut n1.uploaded_transactions); + network_result_union + .uploaded_blocks + .append(&mut n1.uploaded_blocks); + network_result_union + .uploaded_microblocks + .append(&mut n1.uploaded_microblocks); + network_result_union + .uploaded_nakamoto_blocks + .append(&mut n1.uploaded_nakamoto_blocks); + // stackerdb chunks from n1 get dropped since their rc_consensus_hash no longer matches + network_result_union + .synced_transactions + .append(&mut n1.synced_transactions); + + // update is idempotent + let old = network_result_1.clone(); + let new = network_result_1.clone(); + assert_eq!(old.update(new), network_result_1); + + // disjoint results get unioned, except for stackerdb chunks + let old = network_result_1.clone(); + let new = network_result_2.clone(); + assert_eq!(old.update(new), network_result_union); + + // merging a subset is idempotent + assert_eq!( + network_result_1 + .clone() + .update(network_result_union.clone()), + network_result_union + ); + assert_eq!( + network_result_2 + .clone() + .update(network_result_union.clone()), + network_result_union + ); + + // stackerdb uploaded chunks get consolidated correctly + let mut old = NetworkResult::new( + StacksBlockId([0xaa; 32]), + 10, + 10, + 10, + 10, + 10, + ConsensusHash([0xaa; 20]), + HashMap::new(), + ); + let mut new = old.clone(); + + let old_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 1, + slot_version: 1, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + let new_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 1, + slot_version: 2, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + let new_chunk_2 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 2, + slot_version: 2, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + old.uploaded_stackerdb_chunks.push(old_chunk_1.clone()); + // replaced + new.uploaded_stackerdb_chunks.push(new_chunk_1.clone()); + // included + new.uploaded_stackerdb_chunks.push(new_chunk_2.clone()); + + assert_eq!( + old.update(new).uploaded_stackerdb_chunks, + vec![new_chunk_1.clone(), new_chunk_2.clone()] + ); + + // stackerdb pushed chunks get consolidated correctly + let mut old = NetworkResult::new( + StacksBlockId([0xaa; 32]), + 10, + 10, + 10, + 10, + 10, + ConsensusHash([0xaa; 20]), + HashMap::new(), + ); + let mut new = old.clone(); + + let old_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 1, + slot_version: 1, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + let new_chunk_1 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 1, + slot_version: 2, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + let new_chunk_2 = StackerDBPushChunkData { + contract_id: QualifiedContractIdentifier::transient(), + rc_consensus_hash: ConsensusHash([0xaa; 20]), + chunk_data: StackerDBChunkData { + slot_id: 2, + slot_version: 2, + sig: MessageSignature::empty(), + data: vec![3], + }, + }; + + old.pushed_stackerdb_chunks.push(old_chunk_1.clone()); + // replaced + new.pushed_stackerdb_chunks.push(new_chunk_1.clone()); + // included + new.pushed_stackerdb_chunks.push(new_chunk_2.clone()); + + assert_eq!( + old.update(new).pushed_stackerdb_chunks, + vec![new_chunk_1.clone(), new_chunk_2.clone()] + ); + + // nakamoto blocks obtained via download, upload, or pushed get consoldated + let mut old = NetworkResult::new( + StacksBlockId([0xbb; 32]), + 11, + 11, + 11, + 11, + 11, + ConsensusHash([0xbb; 20]), + HashMap::new(), + ); + old.nakamoto_blocks.insert(nblk1.block_id(), nblk1.clone()); + old.pushed_nakamoto_blocks.insert( + nk1.clone(), + vec![( + vec![], + NakamotoBlocksData { + blocks: vec![nblk1.clone()], + }, + )], + ); + old.uploaded_nakamoto_blocks.push(nblk1.clone()); + + let new = NetworkResult::new( + StacksBlockId([0xbb; 32]), + 11, + 11, + 11, + 11, + 11, + ConsensusHash([0xbb; 20]), + HashMap::new(), + ); + + let mut new_pushed = new.clone(); + let mut new_uploaded = new.clone(); + let mut new_downloaded = new.clone(); + + new_downloaded + .nakamoto_blocks + .insert(nblk1.block_id(), nblk1.clone()); + new_pushed.pushed_nakamoto_blocks.insert( + nk2.clone(), + vec![( + vec![], + NakamotoBlocksData { + blocks: vec![nblk1.clone()], + }, + )], + ); + new_uploaded.uploaded_nakamoto_blocks.push(nblk1.clone()); + + debug!("===="); + let updated_downloaded = old.clone().update(new_downloaded); + assert_eq!(updated_downloaded.nakamoto_blocks.len(), 1); + assert_eq!( + updated_downloaded + .nakamoto_blocks + .get(&nblk1.block_id()) + .unwrap(), + &nblk1 + ); + assert_eq!(updated_downloaded.pushed_nakamoto_blocks.len(), 0); + assert_eq!(updated_downloaded.uploaded_nakamoto_blocks.len(), 0); + + debug!("===="); + let updated_pushed = old.clone().update(new_pushed); + assert_eq!(updated_pushed.nakamoto_blocks.len(), 0); + assert_eq!(updated_pushed.pushed_nakamoto_blocks.len(), 1); + assert_eq!( + updated_pushed + .pushed_nakamoto_blocks + .get(&nk2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + updated_pushed.pushed_nakamoto_blocks.get(&nk2).unwrap()[0] + .1 + .blocks + .len(), + 1 + ); + assert_eq!( + updated_pushed.pushed_nakamoto_blocks.get(&nk2).unwrap()[0] + .1 + .blocks[0], + nblk1 + ); + assert_eq!(updated_pushed.uploaded_nakamoto_blocks.len(), 0); + + debug!("===="); + let updated_uploaded = old.clone().update(new_uploaded); + assert_eq!(updated_uploaded.nakamoto_blocks.len(), 0); + assert_eq!(updated_uploaded.pushed_nakamoto_blocks.len(), 0); + assert_eq!(updated_uploaded.uploaded_nakamoto_blocks.len(), 1); + assert_eq!(updated_uploaded.uploaded_nakamoto_blocks[0], nblk1); +} diff --git a/testnet/stacks-node/src/config.rs b/testnet/stacks-node/src/config.rs index d94dbd41de..8d41d66f5c 100644 --- a/testnet/stacks-node/src/config.rs +++ b/testnet/stacks-node/src/config.rs @@ -2250,6 +2250,7 @@ pub struct ConnectionOptionsFile { pub private_neighbors: Option, pub auth_token: Option, pub antientropy_retry: Option, + pub reject_blocks_pushed: Option, } impl ConnectionOptionsFile { @@ -2382,6 +2383,9 @@ impl ConnectionOptionsFile { private_neighbors: self.private_neighbors.unwrap_or(true), auth_token: self.auth_token, antientropy_retry: self.antientropy_retry.unwrap_or(default.antientropy_retry), + reject_blocks_pushed: self + .reject_blocks_pushed + .unwrap_or(default.reject_blocks_pushed), ..default }) } diff --git a/testnet/stacks-node/src/nakamoto_node.rs b/testnet/stacks-node/src/nakamoto_node.rs index ecf37ae0ec..56156f4c20 100644 --- a/testnet/stacks-node/src/nakamoto_node.rs +++ b/testnet/stacks-node/src/nakamoto_node.rs @@ -47,7 +47,7 @@ pub mod sign_coordinator; use self::peer::PeerThread; use self::relayer::{RelayerDirective, RelayerThread}; -pub const RELAYER_MAX_BUFFER: usize = 100; +pub const RELAYER_MAX_BUFFER: usize = 1; const VRF_MOCK_MINER_KEY: u64 = 1; pub const BLOCK_PROCESSOR_STACK_SIZE: usize = 32 * 1024 * 1024; // 32 MB diff --git a/testnet/stacks-node/src/nakamoto_node/peer.rs b/testnet/stacks-node/src/nakamoto_node/peer.rs index 78deb69b9f..f01618a14b 100644 --- a/testnet/stacks-node/src/nakamoto_node/peer.rs +++ b/testnet/stacks-node/src/nakamoto_node/peer.rs @@ -13,7 +13,6 @@ // // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::mpsc::TrySendError; use std::thread; @@ -54,11 +53,9 @@ pub struct PeerThread { chainstate: StacksChainState, /// handle to the mempool DB mempool: MemPoolDB, - /// buffer of relayer commands with block data that couldn't be sent to the relayer just yet - /// (i.e. due to backpressure). We track this separately, instead of just using a bigger - /// channel, because we need to know when backpressure occurs in order to throttle the p2p - /// thread's downloader. - results_with_data: VecDeque, + /// Buffered network result relayer command. + /// P2P network results are consolidated into a single directive. + results_with_data: Option, /// total number of p2p state-machine passes so far. Used to signal when to download the next /// reward cycle of blocks num_p2p_state_machine_passes: u64, @@ -199,7 +196,7 @@ impl PeerThread { sortdb, chainstate, mempool, - results_with_data: VecDeque::new(), + results_with_data: None, num_p2p_state_machine_passes: 0, num_inv_sync_passes: 0, num_download_passes: 0, @@ -239,7 +236,18 @@ impl PeerThread { ) -> bool { // initial block download? let ibd = self.globals.sync_comms.get_ibd(); - let download_backpressure = !self.results_with_data.is_empty(); + let download_backpressure = self + .results_with_data + .as_ref() + .map(|res| { + if let RelayerDirective::HandleNetResult(netres) = &res { + netres.has_block_data_to_store() + } else { + false + } + }) + .unwrap_or(false); + let poll_ms = if !download_backpressure && self.net.has_more_downloads() { // keep getting those blocks -- drive the downloader state-machine debug!( @@ -282,7 +290,6 @@ impl PeerThread { }; match p2p_res { Ok(network_result) => { - let mut have_update = false; if self.num_p2p_state_machine_passes < network_result.num_state_machine_passes { // p2p state-machine did a full pass. Notify anyone listening. self.globals.sync_comms.notify_p2p_state_pass(); @@ -293,30 +300,28 @@ impl PeerThread { // inv-sync state-machine did a full pass. Notify anyone listening. self.globals.sync_comms.notify_inv_sync_pass(); self.num_inv_sync_passes = network_result.num_inv_sync_passes; - - // the relayer cares about the number of inventory passes, so pass this along - have_update = true; } if self.num_download_passes < network_result.num_download_passes { // download state-machine did a full pass. Notify anyone listening. self.globals.sync_comms.notify_download_pass(); self.num_download_passes = network_result.num_download_passes; - - // the relayer cares about the number of download passes, so pass this along - have_update = true; } - if network_result.has_data_to_store() - || self.last_burn_block_height != network_result.burn_height - || have_update - { - // pass along if we have blocks, microblocks, or transactions, or a status - // update on the network's view of the burnchain - self.last_burn_block_height = network_result.burn_height; - self.results_with_data - .push_back(RelayerDirective::HandleNetResult(network_result)); + self.last_burn_block_height = network_result.burn_height; + if let Some(res) = self.results_with_data.take() { + if let RelayerDirective::HandleNetResult(netres) = res { + let new_res = netres.update(network_result); + self.results_with_data = Some(RelayerDirective::HandleNetResult(new_res)); + } + } else { + self.results_with_data = + Some(RelayerDirective::HandleNetResult(network_result)); } + + self.globals.raise_initiative( + "PeerThread::run_one_pass() with data-bearing network result".to_string(), + ); } Err(e) => { // this is only reachable if the network is not instantiated correctly -- @@ -325,20 +330,21 @@ impl PeerThread { } }; - while let Some(next_result) = self.results_with_data.pop_front() { + if let Some(next_result) = self.results_with_data.take() { // have blocks, microblocks, and/or transactions (don't care about anything else), // or a directive to mine microblocks + self.globals.raise_initiative( + "PeerThread::run_one_pass() with backlogged network results".to_string(), + ); if let Err(e) = self.globals.relay_send.try_send(next_result) { debug!( - "P2P: {:?}: download backpressure detected (bufferred {})", + "P2P: {:?}: download backpressure detected", &self.net.local_peer, - self.results_with_data.len() ); match e { TrySendError::Full(directive) => { // don't lose this data -- just try it again - self.results_with_data.push_front(directive); - break; + self.results_with_data = Some(directive); } TrySendError::Disconnected(_) => { info!("P2P: Relayer hang up with p2p channel"); @@ -347,13 +353,7 @@ impl PeerThread { } } } else { - debug!( - "P2P: Dispatched result to Relayer! {} results remaining", - self.results_with_data.len() - ); - self.globals.raise_initiative( - "PeerThread::run_one_pass() with data-bearing network result".to_string(), - ); + debug!("P2P: Dispatched result to Relayer!",); } } diff --git a/testnet/stacks-node/src/nakamoto_node/relayer.rs b/testnet/stacks-node/src/nakamoto_node/relayer.rs index 63c931bba3..a42f033b20 100644 --- a/testnet/stacks-node/src/nakamoto_node/relayer.rs +++ b/testnet/stacks-node/src/nakamoto_node/relayer.rs @@ -1195,7 +1195,7 @@ impl RelayerThread { while self.globals.keep_running() { let raised_initiative = self.globals.take_initiative(); let timed_out = Instant::now() >= self.next_initiative; - let directive = if raised_initiative.is_some() || timed_out { + let mut initiative_directive = if raised_initiative.is_some() || timed_out { self.next_initiative = Instant::now() + Duration::from_millis(self.config.node.next_initiative_delay); self.initiative() @@ -1203,13 +1203,17 @@ impl RelayerThread { None }; - let directive = if let Some(directive) = directive { + let directive = if let Some(directive) = initiative_directive.take() { directive } else { + // channel was drained, so do a time-bound recv match relay_rcv.recv_timeout(Duration::from_millis( self.config.node.next_initiative_delay, )) { - Ok(directive) => directive, + Ok(directive) => { + // only do this once, so we can call .initiative() again + directive + } Err(RecvTimeoutError::Timeout) => { continue; } @@ -1221,7 +1225,7 @@ impl RelayerThread { debug!("Relayer: main loop directive"; "directive" => %directive, - "raised_initiative" => %raised_initiative.unwrap_or("relay_rcv".to_string()), + "raised_initiative" => ?raised_initiative, "timed_out" => %timed_out); if !self.handle_directive(directive) { diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 8eaefbe432..791af3d254 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -4899,6 +4899,9 @@ impl StacksNode { stackerdb_machines.insert(contract_id, (stackerdb_config, stacker_db_sync)); } let peerdb = Self::setup_peer_db(config, &burnchain, &stackerdb_contract_ids); + let burnchain_db = burnchain + .open_burnchain_db(false) + .expect("Failed to open burnchain DB"); let local_peer = match PeerDB::get_local_peer(peerdb.conn()) { Ok(local_peer) => local_peer, @@ -4909,6 +4912,7 @@ impl StacksNode { peerdb, atlasdb, stackerdbs, + burnchain_db, local_peer, config.burnchain.peer_version, burnchain, diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 8aebd4814a..62b6b094aa 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -490,11 +490,15 @@ impl Node { let event_dispatcher = self.event_dispatcher.clone(); let exit_at_block_height = self.config.burnchain.process_exit_at_block_height; + let burnchain_db = burnchain + .open_burnchain_db(false) + .expect("Failed to open burnchain DB"); let p2p_net = PeerNetwork::new( peerdb, atlasdb, stackerdbs, + burnchain_db, local_peer, self.config.burnchain.peer_version, burnchain.clone(),