diff --git a/CHANGELOG.md b/CHANGELOG.md index c24ea0c996..a5db3826cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -258,6 +258,10 @@ this version of the software on it. - The `blockstack-core` binary has been renamed to `stacks-inspect`. This binary provides CLI tools for chain and mempool inspection. +### Fixed +- The AtlasDB previously could lose `AttachmentInstance` data during shutdown + or crashes (#3082). This release resolves that. + ## [2.05.0.1.0] ### Added diff --git a/src/chainstate/coordinator/mod.rs b/src/chainstate/coordinator/mod.rs index 322efe133f..614483d4a2 100644 --- a/src/chainstate/coordinator/mod.rs +++ b/src/chainstate/coordinator/mod.rs @@ -58,7 +58,7 @@ use crate::core::{StacksEpoch, StacksEpochId}; use crate::monitoring::{ increment_contract_calls_processed, increment_stx_blocks_processed_counter, }; -use crate::net::atlas::{AtlasConfig, AttachmentInstance}; +use crate::net::atlas::{AtlasConfig, AtlasDB, AttachmentInstance}; use crate::util_lib::db::DBConn; use crate::util_lib::db::DBTx; use crate::util_lib::db::Error as DBError; @@ -201,7 +201,7 @@ pub struct ChainsCoordinator< chain_state_db: StacksChainState, sortition_db: SortitionDB, burnchain: Burnchain, - attachments_tx: SyncSender>, + atlas_db: Option, dispatcher: Option<&'a T>, cost_estimator: Option<&'a mut CE>, fee_estimator: Option<&'a mut FE>, @@ -319,7 +319,6 @@ impl< config: ChainsCoordinatorConfig, chain_state_db: StacksChainState, burnchain: Burnchain, - attachments_tx: SyncSender>, dispatcher: &'a mut T, comms: CoordinatorReceivers, atlas_config: AtlasConfig, @@ -327,6 +326,7 @@ impl< fee_estimator: Option<&mut FE>, miner_status: Arc>, burnchain_indexer: B, + atlas_db: AtlasDB, ) where T: BlockEventDispatcher, { @@ -356,13 +356,13 @@ impl< chain_state_db, sortition_db, burnchain, - attachments_tx, dispatcher: Some(dispatcher), notifier: arc_notices, reward_set_provider: OnChainRewardSetProvider(), cost_estimator, fee_estimator, atlas_config, + atlas_db: Some(atlas_db), config, burnchain_indexer, }; @@ -419,35 +419,36 @@ impl< impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader> ChainsCoordinator<'a, T, (), U, (), (), B> { + /// Create a coordinator for testing, with some parameters defaulted to None #[cfg(test)] pub fn test_new( burnchain: &Burnchain, chain_id: u32, path: &str, reward_set_provider: U, - attachments_tx: SyncSender>, indexer: B, ) -> ChainsCoordinator<'a, T, (), U, (), (), B> { - ChainsCoordinator::test_new_with_observer( + ChainsCoordinator::test_new_full( burnchain, chain_id, path, reward_set_provider, - attachments_tx, None, indexer, + None, ) } + /// Create a coordinator for testing allowing for all configurable params #[cfg(test)] - pub fn test_new_with_observer( + pub fn test_new_full( burnchain: &Burnchain, chain_id: u32, path: &str, reward_set_provider: U, - attachments_tx: SyncSender>, dispatcher: Option<&'a T>, burnchain_indexer: B, + atlas_config: Option, ) -> ChainsCoordinator<'a, T, (), U, (), (), B> { let burnchain = burnchain.clone(); @@ -472,6 +473,10 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader let canonical_sortition_tip = SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap(); + let atlas_config = atlas_config.unwrap_or(AtlasConfig::default(false)); + let atlas_db = + AtlasDB::connect(atlas_config.clone(), &format!("{}/atlas", path), true).unwrap(); + ChainsCoordinator { canonical_sortition_tip: Some(canonical_sortition_tip), burnchain_blocks_db, @@ -483,8 +488,8 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader fee_estimator: None, reward_set_provider, notifier: (), - attachments_tx, - atlas_config: AtlasConfig::default(false), + atlas_config, + atlas_db: Some(atlas_db), config: ChainsCoordinatorConfig::new(), burnchain_indexer, } @@ -2557,7 +2562,8 @@ impl< /// Process any Atlas attachment events and forward them to the Atlas subsystem fn process_atlas_attachment_events( - &self, + atlas_db: Option<&mut AtlasDB>, + atlas_config: &AtlasConfig, block_receipt: &StacksEpochReceipt, canonical_stacks_tip_height: u64, ) { @@ -2567,7 +2573,7 @@ impl< if let TransactionPayload::ContractCall(ref contract_call) = transaction.payload { let contract_id = contract_call.to_clarity_contract_id(); increment_contract_calls_processed(); - if self.atlas_config.contracts.contains(&contract_id) { + if atlas_config.contracts.contains(&contract_id) { for event in receipt.events.iter() { if let StacksTransactionEvent::SmartContractEvent(ref event_data) = event @@ -2591,15 +2597,26 @@ impl< } if !attachments_instances.is_empty() { info!( - "Atlas: {} attachment instances emitted from events", - attachments_instances.len() + "Atlas: New attachment instances emitted by block"; + "attachments_count" => attachments_instances.len(), + "index_block_hash" => %block_receipt.header.index_block_hash(), + "stacks_height" => block_receipt.header.stacks_block_height, ); - match self.attachments_tx.send(attachments_instances) { - Ok(_) => {} - Err(e) => { - error!("Atlas: error dispatching attachments {}", e); + if let Some(atlas_db) = atlas_db { + for new_attachment in attachments_instances.into_iter() { + if let Err(e) = atlas_db.queue_attachment_instance(&new_attachment) { + warn!( + "Atlas: Error writing attachment instance to DB"; + "err" => ?e, + "index_block_hash" => %new_attachment.index_block_hash, + "contract_id" => %new_attachment.contract_id, + "attachment_index" => %new_attachment.attachment_index, + ); + } } - }; + } else { + warn!("Atlas: attempted to write attachments, but stacks-node not configured with Atlas DB"); + } } } @@ -2897,7 +2914,9 @@ impl< self.notifier.notify_stacks_block_processed(); increment_stx_blocks_processed_counter(); - self.process_atlas_attachment_events( + Self::process_atlas_attachment_events( + self.atlas_db.as_mut(), + &self.atlas_config, &block_receipt, new_canonical_block_snapshot.canonical_stacks_tip_height, ); diff --git a/src/chainstate/coordinator/tests.rs b/src/chainstate/coordinator/tests.rs index aa3ee613e8..f9e1676103 100644 --- a/src/chainstate/coordinator/tests.rs +++ b/src/chainstate/coordinator/tests.rs @@ -51,6 +51,7 @@ use crate::core; use crate::core::*; use crate::monitoring::increment_stx_blocks_processed_counter; use crate::util_lib::boot::boot_code_addr; +use crate::util_lib::strings::StacksString; use crate::vm::errors::Error as InterpreterError; use clarity::vm::{ costs::{ExecutionCost, LimitedCostTracker}, @@ -88,6 +89,14 @@ lazy_static! { pub static ref STACKS_BLOCK_HEADERS: Arc = Arc::new(AtomicU64::new(1)); } +fn test_path(name: &str) -> String { + format!( + "/tmp/stacks-node-tests/coordinator-tests/{}/{}", + get_epoch_time_secs(), + name + ) +} + pub fn next_block_hash() -> BlockHeaderHash { let cur = STACKS_BLOCK_HEADERS.fetch_add(1, Ordering::SeqCst); let mut bytes = vec![]; @@ -452,7 +461,6 @@ pub fn make_coordinator<'a>( burnchain: Option, ) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer> { - let (tx, _) = sync_channel(100000); let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None)); let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); ChainsCoordinator::test_new( @@ -460,11 +468,29 @@ pub fn make_coordinator<'a>( 0x80000000, path, OnChainRewardSetProvider(), - tx, indexer, ) } +pub fn make_coordinator_atlas<'a>( + path: &str, + burnchain: Option, + atlas_config: Option, +) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer> +{ + let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None)); + let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); + ChainsCoordinator::test_new_full( + &burnchain, + 0x80000000, + path, + OnChainRewardSetProvider(), + None, + indexer, + atlas_config, + ) +} + struct StubbedRewardSetProvider(Vec); impl RewardSetProvider for StubbedRewardSetProvider { @@ -491,7 +517,6 @@ fn make_reward_set_coordinator<'a>( pox_consts: Option, ) -> ChainsCoordinator<'a, NullEventDispatcher, (), StubbedRewardSetProvider, (), (), BitcoinIndexer> { - let (tx, _) = sync_channel(100000); let burnchain = get_burnchain(path, None); let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); ChainsCoordinator::test_new( @@ -499,7 +524,6 @@ fn make_reward_set_coordinator<'a>( 0x80000000, path, StubbedRewardSetProvider(addrs), - tx, indexer, ) } @@ -729,6 +753,7 @@ fn make_stacks_block_from_parent_sortition( false, (Txid([0; 32]), 0), Some(parent_sortition), + &[], ) } @@ -795,12 +820,14 @@ fn make_stacks_block_with_recipients_and_sunset_burn( post_sunset_burn, (Txid([0; 32]), 0), None, + &[], ) } /// build a stacks block with just the coinbase off of /// parent_block, in the canonical sortition fork of SortitionDB. /// parent_block _must_ be included in the StacksChainState +/// `txs`: transactions to try to include in block fn make_stacks_block_with_input( sort_db: &SortitionDB, state: &mut StacksChainState, @@ -816,6 +843,7 @@ fn make_stacks_block_with_input( post_sunset_burn: bool, input: (Txid, u32), parents_sortition_opt: Option, + txs: &[StacksTransaction], ) -> (BlockstackOperationType, StacksBlock) { let tx_auth = TransactionAuth::from_p2pkh(miner).unwrap(); @@ -888,6 +916,10 @@ fn make_stacks_block_with_input( .try_mine_tx(&mut epoch_tx, &coinbase_op, ast_rules) .unwrap(); + for tx in txs { + builder.try_mine_tx(&mut epoch_tx, tx, ast_rules).unwrap(); + } + let block = builder.mine_anchored_block(&mut epoch_tx); builder.epoch_finish(epoch_tx); @@ -940,7 +972,7 @@ fn make_stacks_block_with_input( #[test] fn missed_block_commits_2_05() { - let path = "/tmp/stacks-blockchain-missed_block_commits_2_05"; + let path = &test_path("missed_block_commits_2_05"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1047,6 +1079,7 @@ fn missed_block_commits_2_05() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 2 last_input = Some(( @@ -1100,6 +1133,7 @@ fn missed_block_commits_2_05() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1256,7 +1290,7 @@ fn missed_block_commits_2_05() { /// in 2.1 due to the bad missed block-commit *not* counting towards the miner's sortition weight. #[test] fn missed_block_commits_2_1() { - let path = "/tmp/stacks-blockchain-missed_block_commits_2_1"; + let path = &test_path("missed_block_commits_2_1"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1368,6 +1402,7 @@ fn missed_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 2 last_input = Some(( @@ -1423,6 +1458,7 @@ fn missed_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1596,7 +1632,7 @@ fn missed_block_commits_2_1() { /// the UTXO chain #[test] fn late_block_commits_2_1() { - let path = "/tmp/stacks-blockchain-late_block_commits_2_1"; + let path = &test_path("late_block_commits_2_1"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1705,6 +1741,7 @@ fn late_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 3 last_input = Some(( @@ -1760,6 +1797,7 @@ fn late_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1933,9 +1971,9 @@ fn late_block_commits_2_1() { #[test] fn test_simple_setup() { - let path = "/tmp/stacks-blockchain-simple-setup"; + let path = &test_path("simple-setup"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain-simple-setup.blinded"; + let path_blinded = &test_path("simple-setup.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -2146,7 +2184,7 @@ fn test_simple_setup() { #[test] fn test_sortition_with_reward_set() { - let path = "/tmp/stacks-blockchain-simple-reward-set"; + let path = &test_path("simple-reward-set"); let _r = std::fs::remove_dir_all(path); let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect(); @@ -2415,7 +2453,7 @@ fn test_sortition_with_reward_set() { #[test] fn test_sortition_with_burner_reward_set() { - let path = "/tmp/stacks-blockchain-burner-reward-set"; + let path = &test_path("burner-reward-set"); let _r = std::fs::remove_dir_all(path); let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect(); @@ -2658,7 +2696,7 @@ fn test_sortition_with_burner_reward_set() { #[test] fn test_pox_btc_ops() { - let path = "/tmp/stacks-blockchain-pox-btc-ops"; + let path = &test_path("pox-btc-ops"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -2935,7 +2973,7 @@ fn test_pox_btc_ops() { #[test] fn test_stx_transfer_btc_ops() { - let path = "/tmp/stacks-blockchain-stx_transfer-btc-ops"; + let path = &test_path("stx_transfer-btc-ops"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = u32::max_value(); @@ -3326,7 +3364,7 @@ fn get_delegation_info_pox_2( // \ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ S30 -> S31 -> ... #[test] fn test_delegate_stx_btc_ops() { - let path = "/tmp/stacks-blockchain-delegate-stx-btc-ops"; + let path = &test_path("delegate-stx-btc-ops"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = 12; @@ -3867,7 +3905,7 @@ fn test_initial_coinbase_reward_distributions() { // panic when trying to re-create the costs-2 contract. #[test] fn test_epoch_switch_cost_contract_instantiation() { - let path = "/tmp/stacks-blockchain-epoch-switch-cost-contract-instantiation"; + let path = &test_path("epoch-switch-cost-contract-instantiation"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -4066,7 +4104,7 @@ fn test_epoch_switch_cost_contract_instantiation() { // the test would panic when trying to re-create the pox-2 contract. #[test] fn test_epoch_switch_pox_contract_instantiation() { - let path = "/tmp/stacks-blockchain-epoch-switch-pox-contract-instantiation"; + let path = &test_path("epoch-switch-pox-contract-instantiation"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -4270,6 +4308,265 @@ fn test_epoch_switch_pox_contract_instantiation() { } } +#[test] +fn atlas_stop_start() { + let path = &test_path("atlas_stop_start"); + let _r = std::fs::remove_dir_all(path); + + let sunset_ht = 8000; + let pox_consts = Some(PoxConstants::new(6, 3, 3, 25, 5, 10, sunset_ht, 10)); + let burnchain_conf = get_burnchain(path, pox_consts.clone()); + + // publish a simple contract used to generate atlas attachment instances + let atlas_contract_content = " + (define-data-var attachment-index uint u1) + (define-public (make-attach (zonefile-hash (buff 20))) + (let ((current-index (var-get attachment-index))) + (print { + attachment: { + hash: zonefile-hash, + attachment-index: current-index, + metadata: \"test-meta\" + } + }) + (var-set attachment-index (+ u1 current-index)) + (ok true)))"; + let atlas_name: clarity::vm::ContractName = "atlas-test".into(); + + let vrf_keys: Vec<_> = (0..15).map(|_| VRFPrivateKey::new()).collect(); + let committers: Vec<_> = (0..15).map(|_| StacksPrivateKey::new()).collect(); + + let signer_sk = StacksPrivateKey::new(); + let signer_pk = p2pkh_from(&signer_sk); + let balance = 6_000_000_000 * (core::MICROSTACKS_PER_STACKS as u64); + let stacked_amt = 1_000_000_000 * (core::MICROSTACKS_PER_STACKS as u128); + let initial_balances = vec![(signer_pk.clone().into(), balance)]; + let atlas_qci = QualifiedContractIdentifier::new(signer_pk.clone().into(), atlas_name.clone()); + // include our simple contract in the atlas config + let mut atlas_config = AtlasConfig::default(false); + atlas_config.contracts.insert(atlas_qci.clone()); + + setup_states( + &[path], + &vrf_keys, + &committers, + pox_consts.clone(), + Some(initial_balances), + StacksEpochId::Epoch21, + ); + + let mut coord = make_coordinator_atlas( + path, + Some(burnchain_conf.clone()), + Some(atlas_config.clone()), + ); + + coord.handle_new_burnchain_block().unwrap(); + + let sort_db = get_sortition_db(path, pox_consts.clone()); + + let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap(); + assert_eq!(tip.block_height, 1); + assert_eq!(tip.sortition, false); + let (_, ops) = sort_db + .get_sortition_result(&tip.sortition_id) + .unwrap() + .unwrap(); + + // we should have all the VRF registrations accepted + assert_eq!(ops.accepted_ops.len(), vrf_keys.len()); + assert_eq!(ops.consumed_leader_keys.len(), 0); + + // process sequential blocks, and their sortitions... + let mut stacks_blocks: Vec<(SortitionId, StacksBlock)> = vec![]; + + let mut contract_publish = StacksTransaction::new( + TransactionVersion::Testnet, + TransactionAuth::from_p2pkh(&signer_sk).unwrap(), + TransactionPayload::SmartContract( + TransactionSmartContract { + name: atlas_name.clone(), + code_body: StacksString::from_str(atlas_contract_content).unwrap(), + }, + None, + ), + ); + contract_publish.chain_id = 0x80000000; + contract_publish.anchor_mode = TransactionAnchorMode::OnChainOnly; + contract_publish.auth.set_origin_nonce(0); + contract_publish.auth.set_tx_fee(100); + let mut signer = StacksTransactionSigner::new(&contract_publish); + signer.sign_origin(&signer_sk).unwrap(); + let contract_publish = signer.get_tx().unwrap(); + + let make_attachments: Vec = (0..5) + .map(|ix| { + ( + ix, + StacksTransaction::new( + TransactionVersion::Testnet, + TransactionAuth::from_p2pkh(&signer_sk).unwrap(), + TransactionPayload::ContractCall(TransactionContractCall { + address: signer_pk.clone().into(), + contract_name: atlas_name.clone(), + function_name: "make-attach".into(), + function_args: vec![Value::buff_from(vec![ix; 20]).unwrap()], + }), + ), + ) + }) + .map(|(ix, mut cc_tx)| { + cc_tx.chain_id = 0x80000000; + cc_tx.anchor_mode = TransactionAnchorMode::OnChainOnly; + cc_tx.auth.set_origin_nonce(ix as u64 + 1); + cc_tx.auth.set_tx_fee(100); + let mut signer = StacksTransactionSigner::new(&cc_tx); + signer.sign_origin(&signer_sk).unwrap(); + signer.get_tx().unwrap() + }) + .collect(); + + for ix in 0..3 { + let vrf_key = &vrf_keys[ix]; + let miner = &committers[ix]; + + let mut burnchain = get_burnchain_db(path, pox_consts.clone()); + let mut chainstate = get_chainstate(path); + + let parent = if ix == 0 { + BlockHeaderHash([0; 32]) + } else { + stacks_blocks[ix - 1].1.header.block_hash() + }; + + let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap(); + let b = get_burnchain(path, pox_consts.clone()); + + let next_mock_header = BurnchainBlockHeader { + block_height: burnchain_tip.block_height + 1, + block_hash: BurnchainHeaderHash([0; 32]), + parent_block_hash: burnchain_tip.block_hash, + num_txs: 0, + timestamp: 1, + }; + + let reward_cycle_info = coord.get_reward_cycle_info(&next_mock_header).unwrap(); + + let txs = if ix == 1 { + vec![contract_publish.clone()] + } else if ix == 2 { + make_attachments.clone() + } else { + vec![] + }; + + let (good_op, block) = if ix == 0 { + make_genesis_block_with_recipients( + &sort_db, + &mut chainstate, + &parent, + miner, + 10000, + vrf_key, + ix as u32, + None, + ) + } else { + make_stacks_block_with_input( + &sort_db, + &mut chainstate, + &b, + &parent, + burnchain_tip.block_height, + miner, + 1000, + vrf_key, + ix as u32, + None, + 0, + false, + (Txid([0; 32]), 0), + None, + &txs, + ) + }; + + let expected_winner = good_op.txid(); + let ops = vec![good_op]; + + let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap(); + produce_burn_block( + &b, + &mut burnchain, + &burnchain_tip.block_hash, + ops, + vec![].iter_mut(), + ); + // handle the sortition + coord.handle_new_burnchain_block().unwrap(); + + let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap(); + assert_eq!(&tip.winning_block_txid, &expected_winner); + + // load the block into staging + let block_hash = block.header.block_hash(); + + assert_eq!(&tip.winning_stacks_block_hash, &block_hash); + stacks_blocks.push((tip.sortition_id.clone(), block.clone())); + + preprocess_block(&mut chainstate, &sort_db, &tip, block); + + // handle the stacks block + coord.handle_new_stacks_block().unwrap(); + + let stacks_tip = SortitionDB::get_canonical_stacks_chain_tip_hash(sort_db.conn()).unwrap(); + let burn_block_height = tip.block_height; + + // check that the bns contract exists + let does_bns_contract_exist = chainstate + .with_read_only_clarity_tx( + &sort_db.index_conn(), + &StacksBlockId::new(&stacks_tip.0, &stacks_tip.1), + |conn| { + conn.with_clarity_db_readonly(|db| db.get_contract(&boot_code_id("bns", false))) + }, + ) + .unwrap(); + + assert!(does_bns_contract_exist.is_ok()); + } + + // okay, we've broadcasted some transactions, lets check that the atlas db has a queue + let atlas_queue = coord + .atlas_db + .as_ref() + .unwrap() + .queued_attachments() + .unwrap(); + assert_eq!( + atlas_queue.len(), + make_attachments.len(), + "Should be as many queued attachments, as attachment txs submitted" + ); + + // now, we'll shut down all the coordinator connections and reopen them + // to ensure that the queue remains in place + let coord = (); // dispose of the coordinator, closing all its connections + let coord = make_coordinator_atlas(path, Some(burnchain_conf), Some(atlas_config)); + + let atlas_queue = coord + .atlas_db + .as_ref() + .unwrap() + .queued_attachments() + .unwrap(); + assert_eq!( + atlas_queue.len(), + make_attachments.len(), + "Should be as many queued attachments, as attachment txs submitted" + ); +} + fn get_total_stacked_info( chainstate: &mut StacksChainState, burn_dbconn: &dyn BurnStateDB, @@ -4309,7 +4606,7 @@ fn get_total_stacked_info( // sent should occur in the "pox.clar" contract. #[test] fn test_epoch_verify_active_pox_contract() { - let path = "/tmp/stacks-blockchain-verify-active-pox-contract"; + let path = &test_path("verify-active-pox-contract"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = 12; @@ -4598,7 +4895,7 @@ fn test_epoch_verify_active_pox_contract() { } fn test_sortition_with_sunset() { - let path = "/tmp/stacks-blockchain-sortition-with-sunset"; + let path = &test_path("sortition-with-sunset"); let _r = std::fs::remove_dir_all(path); @@ -4903,7 +5200,7 @@ fn test_sortition_with_sunset() { /// Epoch 2.1 activates at block 50 (n.b. reward cycles are 6 blocks long) #[test] fn test_sortition_with_sunset_and_epoch_switch() { - let path = "/tmp/stacks-blockchain-sortition-with-sunset-and-epoch-switch"; + let path = &test_path("sortition-with-sunset-and-epoch-switch"); let _r = std::fs::remove_dir_all(path); let rc_len = 6; @@ -5251,10 +5548,9 @@ fn test_sortition_with_sunset_and_epoch_switch() { /// (because its parent is block `0`, and nobody stacks in /// this test, all block commits must burn) fn test_pox_processable_block_in_different_pox_forks() { - let path = "/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks"; + let path = &test_path("pox_processable_block_in_different_pox_forks"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = - "/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks.blinded"; + let path_blinded = &test_path("pox_processable_block_in_different_pox_forks.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -5551,9 +5847,9 @@ fn test_pox_processable_block_in_different_pox_forks() { #[test] fn test_pox_no_anchor_selected() { - let path = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected"; + let path = &test_path("pox_fork_no_anchor_selected"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected.blinded"; + let path_blinded = &test_path("pox_fork_no_anchor_selected.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -5765,9 +6061,9 @@ fn test_pox_no_anchor_selected() { #[test] fn test_pox_fork_out_of_order() { - let path = "/tmp/stacks-blockchain.test.pox_fork_out_of_order"; + let path = &test_path("pox_fork_out_of_order"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_out_of_order.blinded"; + let path_blinded = &test_path("pox_fork_out_of_order.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -6179,7 +6475,7 @@ fn preprocess_block( #[test] fn test_check_chainstate_db_versions() { - let path = "/tmp/stacks-blockchain-check_chainstate_db_versions"; + let path = &test_path("check_chainstate_db_versions"); let _ = std::fs::remove_dir_all(path); let sortdb_path = format!("{}/sortdb", &path); diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index ba9672d49c..4270caa00f 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -14,7 +14,31 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! +//! The `AtlasDB` stores `Attachment` and `AttachmentInstance` objects. +//! `AttachmentInstance` objects indicate what corresponding `Attachment` +//! data a node is interesting in fetching and storing. +//! +//! In the `AtlasDB`, `AttachmentInstance` objects have a status field +//! and an availability field. The status field indicates whether or +//! not the attachment instance has been checked by the +//! `AttachmentDownloader`. The `AttachmentDownloader` does not +//! immediately check entries before insertion in the database: +//! Atlas event processing and downloader logic proceed in +//! different threads. +//! +//! Once the `AttachmentDownloader` checks an attachment instance, it +//! either marks the instance as available (if the content data is +//! already stored on the node) or it adds the attachment instance +//! to its download queue. +//! + +use rusqlite::types::FromSql; +use rusqlite::types::FromSqlError; use rusqlite::types::ToSql; +use rusqlite::types::ToSqlOutput; +use rusqlite::types::ValueRef; +use rusqlite::OptionalExtension; use rusqlite::Row; use rusqlite::Transaction; use rusqlite::{Connection, OpenFlags, NO_PARAMS}; @@ -47,7 +71,16 @@ use crate::types::chainstate::StacksBlockId; use super::{AtlasConfig, Attachment, AttachmentInstance}; -pub const ATLASDB_VERSION: &'static str = "1"; +pub const ATLASDB_VERSION: &'static str = "2"; + +/// The maximum number of atlas attachment instances that should be +/// checked at once (this is used to limit the return size of +/// `queued_attachments`). Because these checks will sometimes surface +/// existing attachment data associated with those instances, the +/// memory impact of these checks is not limited to the +/// AttachmentInstance size (which is small), but can include the +/// Attachment as well (which is larger). +pub const MAX_PROCESS_PER_ROUND: u32 = 1_000; const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" @@ -73,8 +106,41 @@ const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ "CREATE TABLE db_config(version TEXT NOT NULL);", ]; -const ATLASDB_INDEXES: &'static [&'static str] = - &["CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);"]; +const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ + // We have to allow status to be null, because SQLite won't let us add + // a not null column without a default. The default defeats the point of + // having not-null here anyways, so we leave this field nullable. + r#" + ALTER TABLE attachment_instances + ADD status INTEGER + ;"#, + // All of the attachment instances that previously existed in the database + // already were "checked", so set status to 2 (which corresponds to "checked"). + r#" + UPDATE attachment_instances SET status = 2; + "#, +]; + +const ATLASDB_INDEXES: &'static [&'static str] = &[ + "CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);", + "CREATE INDEX IF NOT EXISTS index_instance_status ON attachment_instances(status);", +]; + +/// Attachment instances pass through different states once written to the AtlasDB. +/// These instances are initially written as a new Stacks block is processed, and marked +/// as `Queued`. These queued instances contain all the data of the new attachment instance, +/// but they have not yet been checked against the AtlasDB to determine if there is extant +/// Attachment content/data associated with them. The network run loop (`p2p` thread) checks +/// for any queued attachment instances on each pass, and performs that check. Once the check +/// is completed, any checked instances are updated to `Checked`. +pub enum AttachmentInstanceStatus { + /// This variant indicates that the attachments instance has been written, + /// but the AtlasDownloader has not yet checked that the attachment matched + Queued, + /// This variant indicates that the attachments instance has been written, + /// and checked for whether or not an already existing attachment matched + Checked, +} impl FromRow for Attachment { fn from_row<'a>(row: &'a Row) -> Result { @@ -117,6 +183,27 @@ impl FromRow<(u32, u32)> for (u32, u32) { } } +impl ToSql for AttachmentInstanceStatus { + fn to_sql(&self) -> Result, rusqlite::Error> { + let integer_rep: i64 = match self { + AttachmentInstanceStatus::Queued => 1, + AttachmentInstanceStatus::Checked => 2, + }; + Ok(integer_rep.into()) + } +} + +impl FromSql for AttachmentInstanceStatus { + fn column_result(value: ValueRef<'_>) -> Result { + let integer_rep: i64 = value.as_i64()?; + match integer_rep { + 1 => Ok(AttachmentInstanceStatus::Queued), + 2 => Ok(AttachmentInstanceStatus::Checked), + x => Err(FromSqlError::OutOfRange(x)), + } + } +} + #[derive(Debug)] pub struct AtlasDB { pub atlas_config: AtlasConfig, @@ -134,20 +221,32 @@ impl AtlasDB { Ok(()) } + /// Get the database schema version, given a DB connection + fn get_schema_version(conn: &Connection) -> Result { + let version = conn.query_row( + "SELECT MAX(version) from db_config", + rusqlite::NO_PARAMS, + |row| row.get(0), + )?; + Ok(version) + } + fn instantiate(&mut self) -> Result<(), db_error> { let genesis_attachments = self.atlas_config.genesis_attachments.take(); let tx = self.tx_begin()?; for row_text in ATLASDB_INITIAL_SCHEMA { - tx.execute_batch(row_text).map_err(db_error::SqliteError)?; + tx.execute_batch(row_text)?; + } + for row_text in ATLASDB_SCHEMA_2 { + tx.execute_batch(row_text)?; } tx.execute( "INSERT INTO db_config (version) VALUES (?1)", &[&ATLASDB_VERSION], - ) - .map_err(db_error::SqliteError)?; + )?; if let Some(attachments) = genesis_attachments { let now = util::get_epoch_time_secs() as i64; @@ -193,7 +292,7 @@ impl AtlasDB { // If opened for read/write and it doesn't exist, instantiate it. pub fn connect( atlas_config: AtlasConfig, - path: &String, + path: &str, readwrite: bool, ) -> Result { let mut create_flag = false; @@ -213,8 +312,17 @@ impl AtlasDB { OpenFlags::SQLITE_OPEN_READ_ONLY } }; - let conn = sqlite_open(path, open_flags, false)?; + Self::check_instantiate_db(atlas_config, conn, readwrite, create_flag) + } + + /// Inner method for instantiating the db if necessary, updating the schema, or adding indexes + fn check_instantiate_db( + atlas_config: AtlasConfig, + conn: Connection, + readwrite: bool, + create_flag: bool, + ) -> Result { let mut db = AtlasDB { atlas_config, conn, @@ -224,11 +332,65 @@ impl AtlasDB { db.instantiate()?; } if readwrite { + db.check_schema_version_and_update()?; db.add_indexes()?; + } else { + db.check_schema_version_or_error()?; } + Ok(db) } + fn check_schema_version_or_error(&mut self) -> Result<(), db_error> { + match Self::get_schema_version(self.conn()) { + Ok(version) => { + let expected_version = ATLASDB_VERSION.to_string(); + if version == expected_version { + Ok(()) + } else { + let version = version.parse().expect( + "Invalid schema version for AtlasDB: should be a parseable integer", + ); + Err(db_error::OldSchema(version)) + } + } + Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e), + } + } + + fn apply_schema_2(db_conn: &Connection) -> Result<(), db_error> { + for row_text in ATLASDB_SCHEMA_2 { + db_conn.execute_batch(row_text)?; + } + + db_conn.execute( + "INSERT OR REPLACE INTO db_config (version) VALUES (?1)", + &["2"], + )?; + + Ok(()) + } + + fn check_schema_version_and_update(&mut self) -> Result<(), db_error> { + let tx = self.tx_begin()?; + match AtlasDB::get_schema_version(&tx) { + Ok(version) => { + let expected_version = ATLASDB_VERSION.to_string(); + if version == expected_version { + return Ok(()); + } + if version == "1" { + Self::apply_schema_2(&tx)?; + tx.commit()?; + Ok(()) + } else { + panic!("The schema version of the Atlas DB is invalid.") + } + } + Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e), + } + } + // Open an atlas database in memory (used for testing) #[cfg(test)] pub fn connect_memory(atlas_config: AtlasConfig) -> Result { @@ -243,6 +405,60 @@ impl AtlasDB { Ok(db) } + #[cfg(test)] + /// Only ever to be used in testing, open and instantiate a V1 atlasdb + pub fn connect_memory_db_v1(atlas_config: AtlasConfig) -> Result { + let conn = Connection::open_in_memory()?; + let mut db = AtlasDB { + atlas_config, + conn, + readwrite: true, + }; + + let genesis_attachments = db.atlas_config.genesis_attachments.take(); + + let tx = db.tx_begin()?; + + for row_text in ATLASDB_INITIAL_SCHEMA { + tx.execute_batch(row_text)?; + } + + tx.execute("INSERT INTO db_config (version) VALUES (?1)", &["1"])?; + + if let Some(attachments) = genesis_attachments { + let now = util::get_epoch_time_secs() as i64; + for attachment in attachments { + tx.execute( + "INSERT INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)", + rusqlite::params![ + &attachment.hash(), + &attachment.content, + &now, + ], + )?; + } + } + + tx.commit()?; + + let tx = db.tx_begin()?; + for row_text in &ATLASDB_INDEXES[0..1] { + tx.execute_batch(row_text)?; + } + tx.commit()?; + + Ok(db) + } + + #[cfg(test)] + /// Only ever to be used in testing, connect to db, but using existing sqlconn + pub fn connect_with_sqlconn( + atlas_config: AtlasConfig, + conn: Connection, + ) -> Result { + Self::check_instantiate_db(atlas_config, conn, true, false) + } + pub fn conn(&self) -> &Connection { &self.conn } @@ -387,19 +603,13 @@ impl AtlasDB { let tx = self.tx_begin()?; tx.execute( "INSERT OR REPLACE INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)", - &[ - &attachment.hash() as &dyn ToSql, - &attachment.content as &dyn ToSql, - &now as &dyn ToSql, - ], - ) - .map_err(db_error::SqliteError)?; + rusqlite::params![&attachment.hash(), &attachment.content, &now], + )?; tx.execute( - "UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1", - &[&attachment.hash() as &dyn ToSql], - ) - .map_err(db_error::SqliteError)?; - tx.commit().map_err(db_error::SqliteError)?; + "UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1 AND status = ?2", + rusqlite::params![&attachment.hash(), &AttachmentInstanceStatus::Checked], + )?; + tx.commit()?; Ok(()) } @@ -434,19 +644,19 @@ impl AtlasDB { pub fn find_unresolved_attachment_instances( &mut self, ) -> Result, db_error> { - let qry = "SELECT * FROM attachment_instances WHERE is_available = 0".to_string(); - let rows = query_rows::(&self.conn, &qry, NO_PARAMS)?; + let qry = "SELECT * FROM attachment_instances WHERE is_available = 0 AND status = ?"; + let rows = query_rows(&self.conn, qry, &[&AttachmentInstanceStatus::Checked])?; Ok(rows) } pub fn find_all_attachment_instances( - &mut self, + &self, content_hash: &Hash160, ) -> Result, db_error> { let hex_content_hash = to_hex(&content_hash.0[..]); - let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1".to_string(); - let args = [&hex_content_hash as &dyn ToSql]; - let rows = query_rows::(&self.conn, &qry, &args)?; + let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1 AND status = ?2"; + let args = rusqlite::params![&hex_content_hash, &AttachmentInstanceStatus::Checked]; + let rows = query_rows(&self.conn, qry, args)?; Ok(rows) } @@ -462,31 +672,88 @@ impl AtlasDB { Ok(row) } - pub fn insert_uninstantiated_attachment_instance( + /// Queue a new attachment instance, status will be set to "queued", + /// and the is_available field set to false. + /// + /// This is invoked after block processing by the coordinator thread (which + /// handles atlas event logic). + pub fn queue_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + ) -> Result<(), db_error> { + self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Queued, false) + } + + /// Insert an attachment instance from an initial batch. + /// All such instances are marked "checked", and is_available = true + /// + /// This is invoked by the AtlasDownloader when it first runs. The AtlasDownloader + /// is currently managed in the P2P thread. + pub fn insert_initial_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + ) -> Result<(), db_error> { + self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Checked, true) + } + + /// Return all the queued attachment instances, limited by `MAX_PROCESS_PER_ROUND` + pub fn queued_attachments(&self) -> Result, db_error> { + query_rows( + &self.conn, + "SELECT * FROM attachment_instances WHERE status = ?1 LIMIT ?2", + rusqlite::params![&AttachmentInstanceStatus::Queued, MAX_PROCESS_PER_ROUND], + ) + } + + /// Update a queued attachment to "checked", setting the `is_available` field. + pub fn mark_attachment_instance_checked( &mut self, attachment: &AttachmentInstance, is_available: bool, ) -> Result<(), db_error> { - let hex_content_hash = to_hex(&attachment.content_hash.0[..]); - let hex_tx_id = attachment.tx_id.to_hex(); - let tx = self.tx_begin()?; + self.conn.execute( + "UPDATE attachment_instances SET status = ?1, is_available = ?2 + WHERE index_block_hash = ?3 AND contract_id = ?4 AND attachment_index = ?5", + rusqlite::params![ + &AttachmentInstanceStatus::Checked, + &is_available, + &attachment.index_block_hash, + &attachment.contract_id.to_string(), + &attachment.attachment_index, + ], + )?; + Ok(()) + } + + /// Insert an attachment instance. + fn insert_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + status: AttachmentInstanceStatus, + is_available: bool, + ) -> Result<(), db_error> { + let sql_tx = self.tx_begin()?; let now = util::get_epoch_time_secs() as i64; - let res = tx.execute( - "INSERT OR REPLACE INTO attachment_instances (content_hash, created_at, index_block_hash, attachment_index, block_height, is_available, metadata, contract_id, tx_id) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", - &[ - &hex_content_hash as &dyn ToSql, - &now as &dyn ToSql, - &attachment.index_block_hash as &dyn ToSql, - &attachment.attachment_index as &dyn ToSql, + sql_tx.execute( + "INSERT OR REPLACE INTO attachment_instances ( + content_hash, created_at, index_block_hash, + attachment_index, block_height, is_available, + metadata, contract_id, tx_id, status) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + rusqlite::params![ + &attachment.content_hash, + &now, + &attachment.index_block_hash, + &attachment.attachment_index, &u64_to_sql(attachment.stacks_block_height)?, - &is_available as &dyn ToSql, - &attachment.metadata as &dyn ToSql, - &attachment.contract_id.to_string() as &dyn ToSql, - &hex_tx_id as &dyn ToSql, - ] - ); - res.map_err(db_error::SqliteError)?; - tx.commit().map_err(db_error::SqliteError)?; + &is_available, + &attachment.metadata, + &attachment.contract_id.to_string(), + &attachment.tx_id, + &status + ], + )?; + sql_tx.commit()?; Ok(()) } } diff --git a/src/net/atlas/download.rs b/src/net/atlas/download.rs index 273fff3d58..9efe57efde 100644 --- a/src/net/atlas/download.rs +++ b/src/net/atlas/download.rs @@ -114,12 +114,7 @@ impl AttachmentsDownloader { // Handle initial batch if self.initial_batch.len() > 0 { - let mut batch = HashSet::new(); - for attachment_instance in self.initial_batch.drain(..) { - batch.insert(attachment_instance); - } - let mut resolved = - self.enqueue_new_attachments(&mut batch, &mut network.atlasdb, true)?; + let mut resolved = self.enqueue_initial_attachments(&mut network.atlasdb)?; resolved_attachments.append(&mut resolved); } @@ -236,64 +231,69 @@ impl AttachmentsDownloader { Ok((resolved_attachments, events_to_deregister)) } - pub fn enqueue_new_attachments( + /// Given a list of `AttachmentInstance`, check if the content corresponding to that + /// instance is (1) already validated (2) inboxed or (3) unknown. + /// + /// In the event of (1) or (2), `do_if_found` is invoked, and the attachment instance will + /// be returned (with the attachment data) in the result set. If the attachment was inboxed (case 2), + /// the attachment is marked as instantiated in the atlas db. + /// + /// In the event of (3), `do_if_not_found` is invoked, and the attachment instance is added + /// to `self.priority_queue`. + /// + /// The return value of this function is a vector of all the instances from `iterator` which + /// resolved to Attachment data, paired with that data. + fn check_attachment_instances( &mut self, - new_attachments: &mut HashSet, - atlasdb: &mut AtlasDB, - initial_batch: bool, - ) -> Result, DBError> { - if new_attachments.is_empty() { - return Ok(vec![]); - } - + atlas_db: &mut AtlasDB, + iterator: Vec, + do_if_found: F, + do_if_not_found: G, + ) -> Result, DBError> + where + F: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), DBError>, + G: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), DBError>, + { let mut attachments_batches: HashMap = HashMap::new(); let mut resolved_attachments = vec![]; - for attachment_instance in new_attachments.drain() { - // Are we dealing with an empty hash - allowed for undoing onchain binding + for attachment_instance in iterator { if attachment_instance.content_hash == Hash160::empty() { - // todo(ludo) insert or update ? - atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?; + // Are we dealing with an empty hash - allowed for undoing onchain binding + do_if_found(atlas_db, &attachment_instance)?; debug!("Atlas: inserting and pairing new attachment instance with empty hash"); resolved_attachments.push((attachment_instance, Attachment::empty())); - continue; - } - - // Do we already have a matching validated attachment - if let Ok(Some(entry)) = atlasdb.find_attachment(&attachment_instance.content_hash) { - atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?; + } else if let Ok(Some(entry)) = + atlas_db.find_attachment(&attachment_instance.content_hash) + { + // Do we already have a matching validated attachment + do_if_found(atlas_db, &attachment_instance)?; debug!( "Atlas: inserting and pairing new attachment instance to existing attachment" ); resolved_attachments.push((attachment_instance, entry)); - continue; - } - - // Do we already have a matching inboxed attachment - if let Ok(Some(attachment)) = - atlasdb.find_uninstantiated_attachment(&attachment_instance.content_hash) + } else if let Ok(Some(attachment)) = + atlas_db.find_uninstantiated_attachment(&attachment_instance.content_hash) { - atlasdb.insert_instantiated_attachment(&attachment)?; - atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?; + // Do we already have a matching inboxed attachment + atlas_db.insert_instantiated_attachment(&attachment)?; + do_if_found(atlas_db, &attachment_instance)?; debug!("Atlas: inserting and pairing new attachment instance to inboxed attachment, now validated"); resolved_attachments.push((attachment_instance, attachment)); - continue; - } - - // This attachment in refering to an unknown attachment. - // Let's append it to the batch being constructed in this routine. - match attachments_batches.entry(attachment_instance.index_block_hash) { - Entry::Occupied(entry) => { - entry.into_mut().track_attachment(&attachment_instance); - } - Entry::Vacant(v) => { - let mut batch = AttachmentsBatch::new(); - batch.track_attachment(&attachment_instance); - v.insert(batch); - } - }; + } else { + // This attachment refers to an unknown attachment. + // Let's append it to the batch being constructed in this routine. + match attachments_batches.entry(attachment_instance.index_block_hash) { + Entry::Occupied(entry) => { + entry.into_mut().track_attachment(&attachment_instance); + } + Entry::Vacant(v) => { + let mut batch = AttachmentsBatch::new(); + batch.track_attachment(&attachment_instance); + v.insert(batch); + } + }; - if !initial_batch { - atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, false)?; + do_if_not_found(atlas_db, &attachment_instance)?; } } @@ -303,6 +303,58 @@ impl AttachmentsDownloader { Ok(resolved_attachments) } + + /// Check any queued attachment instances to see if we already have data for them, + /// returning a vector of (instance, attachment) pairs for any of the queued attachments + /// which already had the associated data + /// Marks any processed attachments as checked + /// + /// This method is invoked in the thread managing the AttachmentDownloader. This is currently + /// the P2P thread. + pub fn check_queued_attachment_instances( + &mut self, + atlas_db: &mut AtlasDB, + ) -> Result, DBError> { + let new_attachments = atlas_db.queued_attachments()?; + + self.check_attachment_instances( + atlas_db, + new_attachments, + |atlas_db, attachment_instance| { + atlas_db.mark_attachment_instance_checked(&attachment_instance, true) + }, + |atlas_db, attachment_instance| { + atlas_db.mark_attachment_instance_checked(&attachment_instance, false) + }, + ) + } + + /// Insert the initial attachments set. Only add the attachment instance if associated data + /// was found. + pub fn enqueue_initial_attachments( + &mut self, + atlas_db: &mut AtlasDB, + ) -> Result, DBError> { + if self.initial_batch.is_empty() { + return Ok(vec![]); + } + + // we're draining the initial batch, so to avoid angering The Borrow Checker + // use mem replace to just take the whole vec. + let initial_batch = std::mem::replace(&mut self.initial_batch, vec![]); + + self.check_attachment_instances( + atlas_db, + initial_batch, + |atlas_db, attachment_instance| { + atlas_db.insert_initial_attachment_instance(&attachment_instance) + }, + |_atlas_db, _attachment_instance| { + // If attachment not found, don't insert attachment instance + Ok(()) + }, + ) + } } #[derive(Debug)] diff --git a/src/net/atlas/mod.rs b/src/net/atlas/mod.rs index 073cf25ed6..0d351502ad 100644 --- a/src/net/atlas/mod.rs +++ b/src/net/atlas/mod.rs @@ -80,6 +80,7 @@ impl AtlasConfig { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] +/// Attachments are the content associated with an AttachmentInstance pub struct Attachment { pub content: Vec, } @@ -99,6 +100,10 @@ impl Attachment { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] +/// An attachment instance is a reference to atlas data: a commitment +/// to track the content that is the inverse of `content_hash`. +/// Attachment instances are created by atlas events issued by contracts +/// specified in a node's `AtlasConfig`. pub struct AttachmentInstance { pub content_hash: Hash160, pub attachment_index: u32, diff --git a/src/net/atlas/tests.rs b/src/net/atlas/tests.rs index ef1622710c..efb5a397aa 100644 --- a/src/net/atlas/tests.rs +++ b/src/net/atlas/tests.rs @@ -28,6 +28,7 @@ use crate::net::{ PeerHost, Requestable, }; use crate::util_lib::boot::boot_code_id; +use crate::util_lib::db::u64_to_sql; use crate::util_lib::strings::UrlString; use clarity::vm::types::QualifiedContractIdentifier; use stacks_common::types::chainstate::BlockHeaderHash; @@ -774,6 +775,104 @@ fn test_keep_uninstantiated_attachments() { ); } +#[test] +fn schema_2_migration() { + let atlas_config = AtlasConfig { + contracts: HashSet::new(), + attachments_max_size: 1024, + max_uninstantiated_attachments: 10, + uninstantiated_attachments_expire_after: 0, + unresolved_attachment_instances_expire_after: 10, + genesis_attachments: None, + }; + + let atlas_db = AtlasDB::connect_memory_db_v1(atlas_config.clone()).unwrap(); + let conn = atlas_db.conn; + + let attachments = [ + AttachmentInstance { + // content_hash, index_block_hash, and txid must contain hex letters! + // because their fields are declared `STRING`, if you supply all numerals, + // sqlite assigns the field a REAL affinity (instead of TEXT) + content_hash: Hash160([0xa0; 20]), + attachment_index: 1, + stacks_block_height: 1, + index_block_hash: StacksBlockId([0xb1; 32]), + metadata: "".into(), + contract_id: QualifiedContractIdentifier::transient(), + tx_id: Txid([0x2f; 32]), + canonical_stacks_tip_height: None, + }, + AttachmentInstance { + content_hash: Hash160([0x00; 20]), + attachment_index: 1, + stacks_block_height: 1, + index_block_hash: StacksBlockId([0x0a; 32]), + metadata: "".into(), + contract_id: QualifiedContractIdentifier::transient(), + tx_id: Txid([0x0b; 32]), + canonical_stacks_tip_height: None, + }, + ]; + + for attachment in attachments.iter() { + // need to manually insert data, because the insertion routine in the codebase + // sets `status` which doesn't exist in v1 + conn.execute( + "INSERT OR REPLACE INTO attachment_instances ( + content_hash, created_at, index_block_hash, + attachment_index, block_height, is_available, + metadata, contract_id, tx_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + rusqlite::params![ + &attachment.content_hash, + &0, + &attachment.index_block_hash, + &attachment.attachment_index, + &u64_to_sql(attachment.stacks_block_height).unwrap(), + &true, + &attachment.metadata, + &attachment.contract_id.to_string(), + &attachment.tx_id, + ], + ) + .unwrap(); + } + + // perform the migration and unwrap() to assert that it runs okay + let atlas_db = AtlasDB::connect_with_sqlconn(atlas_config, conn).unwrap(); + + let mut attachments_fetched_a0 = atlas_db + .find_all_attachment_instances(&Hash160([0xa0; 20])) + .unwrap(); + assert_eq!( + attachments_fetched_a0.len(), + 1, + "Should have one attachment instance marked 'checked' with hash `0xa0a0a0..`" + ); + + let attachment_a0 = attachments_fetched_a0.pop().unwrap(); + assert_eq!(&attachment_a0, &attachments[0]); + + let mut attachments_fetched_00 = atlas_db + .find_all_attachment_instances(&Hash160([0x00; 20])) + .unwrap(); + assert_eq!( + attachments_fetched_00.len(), + 1, + "Should have one attachment instance marked 'checked' with hash `0x000000..`" + ); + + let attachment_00 = attachments_fetched_00.pop().unwrap(); + assert_eq!(&attachment_00, &attachments[1]); + + assert_eq!( + atlas_db.queued_attachments().unwrap().len(), + 0, + "Should have no attachment instance marked 'queued'" + ); +} + #[test] fn test_evict_k_oldest_uninstantiated_attachments() { let atlas_config = AtlasConfig { @@ -1003,7 +1102,7 @@ fn test_evict_expired_unresolved_attachment_instances() { }; let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap(); - // Insert some uninstanciated attachments + // Insert some uninstantiated attachments let uninstantiated_attachment_instances = [ new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1), new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1), @@ -1016,7 +1115,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } @@ -1029,7 +1131,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } @@ -1043,7 +1148,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } @@ -1092,7 +1200,7 @@ fn test_bit_vectors() { let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap(); - // Insert some uninstanciated attachments + // Insert some uninstantiated attachments let uninstantiated_attachment_instances = [ new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1), new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1), @@ -1101,7 +1209,10 @@ fn test_bit_vectors() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } let block_id_1 = uninstantiated_attachment_instances[0].index_block_hash; @@ -1118,7 +1229,10 @@ fn test_bit_vectors() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } let bit_vector = atlas_db @@ -1134,7 +1248,10 @@ fn test_bit_vectors() { ]; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } @@ -1160,7 +1277,10 @@ fn test_bit_vectors() { let block_id_2 = instantiated_attachment_instances[0].index_block_hash; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } diff --git a/src/net/mod.rs b/src/net/mod.rs index 76b5a14782..5aa80cd8bd 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2788,17 +2788,15 @@ pub mod test { ) .unwrap(); - let (tx, _) = sync_channel(100000); - let indexer = BitcoinIndexer::new_unit_test(&config.burnchain.working_dir); - let mut coord = ChainsCoordinator::test_new_with_observer( + let mut coord = ChainsCoordinator::test_new_full( &config.burnchain, config.network_id, &test_path, OnChainRewardSetProvider(), - tx, observer, indexer, + None, ); coord.handle_new_burnchain_block().unwrap(); @@ -2968,7 +2966,6 @@ pub mod test { ibd, 100, &RPCHandlerArgs::default(), - &mut HashSet::new(), ); self.sortdb = Some(sortdb); @@ -3010,7 +3007,6 @@ pub mod test { ibd, 100, &RPCHandlerArgs::default(), - &mut HashSet::new(), ); self.sortdb = Some(sortdb); diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 62070ea5a8..014a8e965d 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -5340,7 +5340,6 @@ impl PeerNetwork { ibd: bool, poll_timeout: u64, handler_args: &RPCHandlerArgs, - attachment_requests: &mut HashSet, ) -> Result { debug!(">>>>>>>>>>>>>>>>>>>>>>> Begin Network Dispatch (poll for {}) >>>>>>>>>>>>>>>>>>>>>>>>>>>>", poll_timeout); let mut poll_states = match self.network { @@ -5393,7 +5392,7 @@ impl PeerNetwork { // enqueue them. PeerNetwork::with_attachments_downloader(self, |network, attachments_downloader| { let mut known_attachments = attachments_downloader - .enqueue_new_attachments(attachment_requests, &mut network.atlasdb, false) + .check_queued_attachment_instances(&mut network.atlasdb) .expect("FATAL: failed to store new attachments to the atlas DB"); network_result.attachments.append(&mut known_attachments); Ok(()) diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index dce07cc854..e83c7a3537 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -139,7 +139,7 @@ /// This file may be refactored in the future into a full-fledged module. use std::cmp; use std::collections::HashMap; -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::convert::{TryFrom, TryInto}; use std::default::Default; use std::mem; @@ -184,7 +184,7 @@ use stacks::cost_estimates::UnitEstimator; use stacks::cost_estimates::{CostEstimator, FeeEstimator}; use stacks::monitoring::{increment_stx_blocks_mined_counter, update_active_miners_count_gauge}; use stacks::net::{ - atlas::{AtlasConfig, AtlasDB, AttachmentInstance}, + atlas::{AtlasConfig, AtlasDB}, db::{LocalPeer, PeerDB}, dns::DNSClient, dns::DNSResolver, @@ -3524,8 +3524,6 @@ pub struct PeerThread { globals: Globals, /// how long to wait for network messages on each poll, in millis poll_timeout: u64, - /// receiver for attachments discovered by the chains coordinator thread - attachments_rx: Receiver>, /// handle to the sortition DB (optional so we can take/replace it) sortdb: Option, /// handle to the chainstate DB (optional so we can take/replace it) @@ -3577,11 +3575,7 @@ impl PeerThread { /// Binds the addresses in the config (which may panic if the port is blocked). /// This is so the node will crash "early" before any new threads start if there's going to be /// a bind error anyway. - pub fn new( - runloop: &RunLoop, - mut net: PeerNetwork, - attachments_rx: Receiver>, - ) -> PeerThread { + pub fn new(runloop: &RunLoop, mut net: PeerNetwork) -> PeerThread { let config = runloop.config().clone(); let mempool = Self::connect_mempool_db(&config); let burn_db_path = config.get_burn_db_file_path(); @@ -3611,7 +3605,6 @@ impl PeerThread { net: Some(net), globals: runloop.get_globals(), poll_timeout, - attachments_rx, sortdb: Some(sortdb), chainstate: Some(chainstate), mempool: Some(mempool), @@ -3693,17 +3686,6 @@ impl PeerThread { self.poll_timeout }; - let mut expected_attachments = match self.attachments_rx.try_recv() { - Ok(expected_attachments) => { - debug!("Atlas: received attachments: {:?}", &expected_attachments); - expected_attachments - } - _ => { - debug!("Atlas: attachment channel is empty"); - HashSet::new() - } - }; - // move over unconfirmed state obtained from the relayer self.with_chainstate(|p2p_thread, sortdb, chainstate, _mempool| { let _ = Relayer::setup_unconfirmed_state_readonly(chainstate, sortdb); @@ -3739,7 +3721,6 @@ impl PeerThread { ibd, poll_ms, &handler_args, - &mut expected_attachments, ) }) }); @@ -4133,9 +4114,6 @@ impl StacksNode { globals: Globals, // relay receiver endpoint for the p2p thread, so the relayer can feed it data to push relay_recv: Receiver, - // attachments receiver endpoint for the p2p thread, so the chains coordinator can feed it - // attachments it discovers - attachments_receiver: Receiver>, ) -> StacksNode { let config = runloop.config().clone(); let is_miner = runloop.is_miner(); @@ -4195,7 +4173,7 @@ impl StacksNode { .expect("FATAL: failed to start relayer thread"); let p2p_event_dispatcher = runloop.get_event_dispatcher(); - let p2p_thread = PeerThread::new(runloop, p2p_net, attachments_receiver); + let p2p_thread = PeerThread::new(runloop, p2p_net); let p2p_thread_handle = thread::Builder::new() .stack_size(BLOCK_PROCESSOR_STACK_SIZE) .name(format!( diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 78f5838309..10da48829f 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -1,7 +1,6 @@ use std::convert::TryFrom; use std::default::Default; use std::net::SocketAddr; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::{collections::HashSet, env}; use std::{thread, thread::JoinHandle, time}; @@ -102,7 +101,6 @@ pub struct Node { last_sortitioned_block: Option, event_dispatcher: EventDispatcher, nonce: u64, - attachments_tx: SyncSender>, leader_key_registers: HashSet, block_commits: HashSet, } @@ -178,7 +176,6 @@ fn spawn_peer( exit_at_block_height: Option, genesis_chainstate_hash: Sha256Sum, poll_timeout: u64, - attachments_rx: Receiver>, config: Config, ) -> Result, NetError> { this.bind(p2p_sock, rpc_sock).unwrap(); @@ -242,14 +239,6 @@ fn spawn_peer( } }; - let mut expected_attachments = match attachments_rx.try_recv() { - Ok(expected_attachments) => expected_attachments, - _ => { - debug!("Atlas: attachment channel is empty"); - HashSet::new() - } - }; - let indexer = make_bitcoin_indexer(&config); let net_result = this @@ -263,7 +252,6 @@ fn spawn_peer( false, poll_timeout, &handler_args, - &mut expected_attachments, ) .unwrap(); if net_result.has_transactions() { @@ -292,11 +280,7 @@ pub fn use_test_genesis_chainstate(config: &Config) -> bool { impl Node { /// Instantiate and initialize a new node, given a config - pub fn new( - config: Config, - boot_block_exec: Box ()>, - attachments_tx: SyncSender>, - ) -> Self { + pub fn new(config: Config, boot_block_exec: Box ()>) -> Self { let use_test_genesis_data = if config.burnchain.mode == "mocknet" { use_test_genesis_chainstate(&config) } else { @@ -390,7 +374,6 @@ impl Node { burnchain_tip: None, nonce: 0, event_dispatcher, - attachments_tx, leader_key_registers: HashSet::new(), block_commits: HashSet::new(), } @@ -423,7 +406,6 @@ impl Node { Err(_e) => panic!(), }; - let (attachments_tx, attachments_rx) = sync_channel(1); let mut node = Node { active_registered_key: None, bootstraping_chain: false, @@ -435,12 +417,11 @@ impl Node { burnchain_tip: None, nonce: 0, event_dispatcher, - attachments_tx, leader_key_registers: HashSet::new(), block_commits: HashSet::new(), }; - node.spawn_peer_server(attachments_rx); + node.spawn_peer_server(); let pox_constants = burnchain_controller.sortdb_ref().pox_constants.clone(); loop { @@ -464,7 +445,20 @@ impl Node { node } - pub fn spawn_peer_server(&mut self, attachments_rx: Receiver>) { + fn make_atlas_config() -> AtlasConfig { + AtlasConfig::default(false) + } + + pub fn make_atlas_db(&self) -> AtlasDB { + AtlasDB::connect( + Self::make_atlas_config(), + &self.config.get_atlas_db_file_path(), + true, + ) + .unwrap() + } + + pub fn spawn_peer_server(&mut self) { // we can call _open_ here rather than _connect_, since connect is first called in // make_genesis_block let burnchain = self.config.get_burnchain(); @@ -550,9 +544,7 @@ impl Node { } tx.commit().unwrap(); } - let atlas_config = AtlasConfig::default(false); - let atlasdb = - AtlasDB::connect(atlas_config, &self.config.get_atlas_db_file_path(), true).unwrap(); + let atlasdb = self.make_atlas_db(); let local_peer = match PeerDB::get_local_peer(peerdb.conn()) { Ok(local_peer) => local_peer, @@ -585,7 +577,6 @@ impl Node { exit_at_block_height, Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH).unwrap(), 1000, - attachments_rx, self.config.clone(), ) .unwrap(); @@ -841,6 +832,7 @@ impl Node { consensus_hash: &ConsensusHash, microblocks: Vec, db: &mut SortitionDB, + atlas_db: &mut AtlasDB, ) -> ChainTip { let _parent_consensus_hash = { // look up parent consensus hash @@ -898,7 +890,7 @@ impl Node { BurnchainDB::connect(&burnchain.get_burnchaindb_path(), &burnchain, true) .expect("FATAL: failed to connect to burnchain DB"); - let atlas_config = AtlasConfig::default(false); + let atlas_config = Self::make_atlas_config(); let mut processed_blocks = vec![]; loop { let mut process_blocks_at_tip = { @@ -922,13 +914,19 @@ impl Node { let attachments_instances = self.get_attachment_instances(epoch_receipt, &atlas_config); if !attachments_instances.is_empty() { - match self.attachments_tx.send(attachments_instances) { - Ok(_) => {} - Err(e) => { - error!("Error dispatching attachments {}", e); - panic!(); + for new_attachment in attachments_instances.into_iter() { + if let Err(e) = + atlas_db.queue_attachment_instance(&new_attachment) + { + warn!( + "Atlas: Error writing attachment instance to DB"; + "err" => ?e, + "index_block_hash" => %new_attachment.index_block_hash, + "contract_id" => %new_attachment.contract_id, + "attachment_index" => %new_attachment.attachment_index, + ); } - }; + } } } _ => {} diff --git a/testnet/stacks-node/src/run_loop/helium.rs b/testnet/stacks-node/src/run_loop/helium.rs index fa7d7ba853..048d5f34e6 100644 --- a/testnet/stacks-node/src/run_loop/helium.rs +++ b/testnet/stacks-node/src/run_loop/helium.rs @@ -4,10 +4,8 @@ use crate::{ BitcoinRegtestController, BurnchainController, ChainTip, Config, MocknetController, Node, }; use stacks::chainstate::stacks::db::ClarityTx; -use stacks::net::atlas::AttachmentInstance; + use stacks::types::chainstate::BurnchainHeaderHash; -use std::collections::HashSet; -use std::sync::mpsc::{sync_channel, Receiver}; /// RunLoop is coordinating a simulated burnchain and some simulated nodes /// taking turns in producing blocks. @@ -15,7 +13,6 @@ pub struct RunLoop { config: Config, pub node: Node, pub callbacks: RunLoopCallbacks, - attachments_rx: Option>>, } impl RunLoop { @@ -28,16 +25,13 @@ impl RunLoop { config: Config, boot_exec: Box ()>, ) -> Self { - let (attachments_tx, attachments_rx) = sync_channel(1); - // Build node based on config - let node = Node::new(config.clone(), boot_exec, attachments_tx); + let node = Node::new(config.clone(), boot_exec); Self { config, node, callbacks: RunLoopCallbacks::new(), - attachments_rx: Some(attachments_rx), } } @@ -74,8 +68,7 @@ impl RunLoop { self.node.process_burnchain_state(&burnchain_tip); // todo(ludo): should return genesis? let mut chain_tip = ChainTip::genesis(&BurnchainHeaderHash::zero(), 0, 0); - let attachments_rx = self.attachments_rx.take().unwrap(); - self.node.spawn_peer_server(attachments_rx); + self.node.spawn_peer_server(); // Bootstrap the chain: node will start a new tenure, // using the sortition hash from block #1 for generating a VRF. @@ -128,12 +121,14 @@ impl RunLoop { // Have the node process its own tenure. // We should have some additional checks here, and ensure that the previous artifacts are legit. + let mut atlas_db = self.node.make_atlas_db(); chain_tip = self.node.process_tenure( &artifacts_from_1st_tenure.anchored_block, &last_sortitioned_block.block_snapshot.consensus_hash, artifacts_from_1st_tenure.microblocks.clone(), burnchain.sortdb_mut(), + &mut atlas_db, ); self.callbacks.invoke_new_stacks_chain_state( @@ -204,11 +199,14 @@ impl RunLoop { Some(ref artifacts) => { // Have the node process its tenure. // We should have some additional checks here, and ensure that the previous artifacts are legit. + let mut atlas_db = self.node.make_atlas_db(); + chain_tip = self.node.process_tenure( &artifacts.anchored_block, &last_sortitioned_block.block_snapshot.consensus_hash, artifacts.microblocks.clone(), burnchain.sortdb_mut(), + &mut atlas_db, ); self.callbacks.invoke_new_stacks_chain_state( diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index 7c5d387124..75950ecebb 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -5,14 +5,12 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicU64; use std::sync::mpsc::sync_channel; -use std::sync::mpsc::Receiver; + use std::sync::Arc; use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; -use std::collections::HashSet; - use stacks::deps::ctrlc as termination; use stacks::deps::ctrlc::SignalId; @@ -28,7 +26,7 @@ use stacks::chainstate::coordinator::{ }; use stacks::chainstate::stacks::db::{ChainStateBootData, StacksChainState}; use stacks::core::StacksEpochId; -use stacks::net::atlas::{AtlasConfig, Attachment, AttachmentInstance, ATTACHMENTS_CHANNEL_SIZE}; +use stacks::net::atlas::{AtlasConfig, AtlasDB, Attachment}; use stacks::util_lib::db::Error as db_error; use stx_genesis::GenesisData; @@ -502,7 +500,7 @@ impl RunLoop { burnchain_config: &Burnchain, coordinator_receivers: CoordinatorReceivers, miner_status: Arc>, - ) -> (JoinHandle<()>, Receiver>) { + ) -> JoinHandle<()> { let use_test_genesis_data = use_test_genesis_chainstate(&self.config); // load up genesis Atlas attachments @@ -521,7 +519,12 @@ impl RunLoop { let moved_config = self.config.clone(); let moved_burnchain_config = burnchain_config.clone(); let mut coordinator_dispatcher = self.event_dispatcher.clone(); - let (attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE); + let atlas_db = AtlasDB::connect( + moved_atlas_config.clone(), + &self.config.get_atlas_db_file_path(), + true, + ) + .expect("Failed to connect Atlas DB during startup"); let coordinator_indexer = make_bitcoin_indexer(&self.config); let coordinator_thread_handle = thread::Builder::new() @@ -549,7 +552,6 @@ impl RunLoop { coord_config, chain_state_db, moved_burnchain_config, - attachments_tx, &mut coordinator_dispatcher, coordinator_receivers, moved_atlas_config, @@ -557,11 +559,12 @@ impl RunLoop { fee_estimator.as_deref_mut(), miner_status, coordinator_indexer, + atlas_db, ); }) .expect("FATAL: failed to start chains coordinator thread"); - (coordinator_thread_handle, attachments_rx) + coordinator_thread_handle } /// Instantiate the PoX watchdog @@ -979,7 +982,7 @@ impl RunLoop { self.set_globals(globals.clone()); // have headers; boot up the chains coordinator and instantiate the chain state - let (coordinator_thread_handle, attachments_rx) = self.spawn_chains_coordinator( + let coordinator_thread_handle = self.spawn_chains_coordinator( &burnchain_config, coordinator_receivers, globals.get_miner_status(), @@ -1011,7 +1014,7 @@ impl RunLoop { // Boot up the p2p network and relayer, and figure out how many sortitions we have so far // (it could be non-zero if the node is resuming from chainstate) - let mut node = StacksNode::spawn(self, globals.clone(), relay_recv, attachments_rx); + let mut node = StacksNode::spawn(self, globals.clone(), relay_recv); let liveness_thread = self.spawn_chain_liveness_thread(globals.clone()); // Wait for all pending sortitions to process diff --git a/testnet/stacks-node/src/tests/epoch_21.rs b/testnet/stacks-node/src/tests/epoch_21.rs index 3b0675bcb0..da33f49065 100644 --- a/testnet/stacks-node/src/tests/epoch_21.rs +++ b/testnet/stacks-node/src/tests/epoch_21.rs @@ -4711,7 +4711,7 @@ fn trait_invocation_cross_epoch() { test_observer::spawn(); - let (mut conf, miner_account) = neon_integration_test_conf(); + let (mut conf, _miner_account) = neon_integration_test_conf(); let mut initial_balances = vec![InitialBalance { address: spender_addr.clone(), amount: 200_000_000, @@ -4728,7 +4728,7 @@ fn trait_invocation_cross_epoch() { epochs[3].start_height = epoch_2_1; conf.burnchain.epochs = Some(epochs); - let http_origin = format!("http://{}", &conf.node.rpc_bind); + let _http_origin = format!("http://{}", &conf.node.rpc_bind); let mut burnchain_config = Burnchain::regtest(&conf.get_burn_db_path());