From bdac62531a2b5e0fdf92fd2cc7c1ea3c32e29e2e Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Wed, 6 Apr 2022 17:47:50 -0500 Subject: [PATCH 1/9] feat: replace atlas sync channel with direct db insert --- src/chainstate/coordinator/mod.rs | 38 ++-- src/net/atlas/db.rs | 239 +++++++++++++++++++---- src/net/atlas/download.rs | 153 +++++++++------ src/net/atlas/tests.rs | 39 +++- src/net/mod.rs | 2 - src/net/p2p.rs | 8 +- testnet/stacks-node/src/neon_node.rs | 9 - testnet/stacks-node/src/node.rs | 9 - testnet/stacks-node/src/run_loop/neon.rs | 15 +- 9 files changed, 361 insertions(+), 151 deletions(-) diff --git a/src/chainstate/coordinator/mod.rs b/src/chainstate/coordinator/mod.rs index 01e3c70c84..616efc5b3d 100644 --- a/src/chainstate/coordinator/mod.rs +++ b/src/chainstate/coordinator/mod.rs @@ -45,7 +45,7 @@ use crate::core::StacksEpoch; use crate::monitoring::{ increment_contract_calls_processed, increment_stx_blocks_processed_counter, }; -use crate::net::atlas::{AtlasConfig, AttachmentInstance}; +use crate::net::atlas::{AtlasConfig, AtlasDB, AttachmentInstance}; use crate::util_lib::db::Error as DBError; use clarity::vm::{ costs::ExecutionCost, @@ -160,7 +160,7 @@ pub struct ChainsCoordinator< chain_state_db: StacksChainState, sortition_db: SortitionDB, burnchain: Burnchain, - attachments_tx: SyncSender>, + atlas_db: Option, dispatcher: Option<&'a T>, cost_estimator: Option<&'a mut CE>, fee_estimator: Option<&'a mut FE>, @@ -264,12 +264,12 @@ impl<'a, T: BlockEventDispatcher, CE: CostEstimator + ?Sized, FE: FeeEstimator + pub fn run( chain_state_db: StacksChainState, burnchain: Burnchain, - attachments_tx: SyncSender>, dispatcher: &'a mut T, comms: CoordinatorReceivers, atlas_config: AtlasConfig, cost_estimator: Option<&mut CE>, fee_estimator: Option<&mut FE>, + atlas_db: AtlasDB, ) where T: BlockEventDispatcher, { @@ -296,13 +296,13 @@ impl<'a, T: BlockEventDispatcher, CE: CostEstimator + ?Sized, FE: FeeEstimator + chain_state_db, sortition_db, burnchain, - attachments_tx, dispatcher: Some(dispatcher), notifier: arc_notices, reward_set_provider: OnChainRewardSetProvider(), cost_estimator, fee_estimator, atlas_config, + atlas_db: Some(atlas_db), }; loop { @@ -355,7 +355,7 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider> ChainsCoordinator<'a, T, chain_id: u32, path: &str, reward_set_provider: U, - attachments_tx: SyncSender>, + _attachments_tx: SyncSender>, dispatcher: Option<&'a T>, ) -> ChainsCoordinator<'a, T, (), U, (), ()> { let burnchain = burnchain.clone(); @@ -388,8 +388,8 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider> ChainsCoordinator<'a, T, fee_estimator: None, reward_set_provider, notifier: (), - attachments_tx, atlas_config: AtlasConfig::default(false), + atlas_db: None, } } } @@ -769,15 +769,27 @@ impl< } if !attachments_instances.is_empty() { info!( - "Atlas: {} attachment instances emitted from events", - attachments_instances.len() + "Atlas: New attachment instances emitted by block"; + "attachments_count" => attachments_instances.len(), + "index_block_hash" => %block_receipt.header.index_block_hash(), + "stacks_height" => block_receipt.header.stacks_block_height, ); - match self.attachments_tx.send(attachments_instances) { - Ok(_) => {} - Err(e) => { - error!("Atlas: error dispatching attachments {}", e); + if let Some(atlas_db) = self.atlas_db.as_mut() { + for new_attachment in attachments_instances.into_iter() { + if let Err(e) = atlas_db.queue_attachment_instance(&new_attachment) + { + warn!( + "Atlas: Error writing attachment instance to DB"; + "err" => ?e, + "index_block_hash" => %new_attachment.index_block_hash, + "contract_id" => %new_attachment.contract_id, + "attachment_index" => %new_attachment.attachment_index, + ); + } } - }; + } else { + warn!("Atlas: attempted to write attachments, but stacks-node not configured with Atlas DB"); + } } if let Some(ref mut estimator) = self.cost_estimator { diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index ba9672d49c..183de453e8 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -14,7 +14,12 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use rusqlite::types::FromSql; +use rusqlite::types::FromSqlError; use rusqlite::types::ToSql; +use rusqlite::types::ToSqlOutput; +use rusqlite::types::ValueRef; +use rusqlite::OptionalExtension; use rusqlite::Row; use rusqlite::Transaction; use rusqlite::{Connection, OpenFlags, NO_PARAMS}; @@ -47,7 +52,7 @@ use crate::types::chainstate::StacksBlockId; use super::{AtlasConfig, Attachment, AttachmentInstance}; -pub const ATLASDB_VERSION: &'static str = "1"; +pub const ATLASDB_VERSION: &'static str = "2"; const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" @@ -73,9 +78,31 @@ const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ "CREATE TABLE db_config(version TEXT NOT NULL);", ]; +const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ + r#" + ALTER TABLE attachment_instances + ADD status INTEGER NOT NULL + ;"#, + r#" + UPDATE attachment_instances SET status = 2; + "#, + r#" + CREATE INDEX IF NOT EXISTS index_instance_status ON attachment_instances(status); + "#, +]; + const ATLASDB_INDEXES: &'static [&'static str] = &["CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);"]; +pub enum AttachmentInstanceStatus { + /// This variant indicates that the attachments instance has been written, + /// but the downloader has not yet checked that the attachment matched + Queued, + /// This variant indicates that the attachments instance has been written, + /// and checked for whether or not an already existing attachment matched + Checked, +} + impl FromRow for Attachment { fn from_row<'a>(row: &'a Row) -> Result { let content: Vec = row.get_unwrap("content"); @@ -117,6 +144,27 @@ impl FromRow<(u32, u32)> for (u32, u32) { } } +impl ToSql for AttachmentInstanceStatus { + fn to_sql(&self) -> Result, rusqlite::Error> { + let integer_rep: i64 = match self { + AttachmentInstanceStatus::Queued => 1, + AttachmentInstanceStatus::Checked => 2, + }; + Ok(integer_rep.into()) + } +} + +impl FromSql for AttachmentInstanceStatus { + fn column_result(value: ValueRef<'_>) -> Result { + let integer_rep: i64 = value.as_i64()?; + match integer_rep { + 1 => Ok(AttachmentInstanceStatus::Queued), + 2 => Ok(AttachmentInstanceStatus::Checked), + x => Err(FromSqlError::OutOfRange(x)), + } + } +} + #[derive(Debug)] pub struct AtlasDB { pub atlas_config: AtlasConfig, @@ -134,20 +182,32 @@ impl AtlasDB { Ok(()) } + /// Get the database schema version, given a DB connection + fn get_schema_version(conn: &Connection) -> Result { + let version = conn.query_row( + "SELECT MAX(version) from db_config", + rusqlite::NO_PARAMS, + |row| row.get(0), + )?; + Ok(version) + } + fn instantiate(&mut self) -> Result<(), db_error> { let genesis_attachments = self.atlas_config.genesis_attachments.take(); let tx = self.tx_begin()?; for row_text in ATLASDB_INITIAL_SCHEMA { - tx.execute_batch(row_text).map_err(db_error::SqliteError)?; + tx.execute_batch(row_text)?; + } + for row_text in ATLASDB_SCHEMA_2 { + tx.execute_batch(row_text)?; } tx.execute( "INSERT INTO db_config (version) VALUES (?1)", &[&ATLASDB_VERSION], - ) - .map_err(db_error::SqliteError)?; + )?; if let Some(attachments) = genesis_attachments { let now = util::get_epoch_time_secs() as i64; @@ -225,10 +285,64 @@ impl AtlasDB { } if readwrite { db.add_indexes()?; + db.check_schema_version_and_update()?; + } else { + db.check_schema_version_or_error()?; } + Ok(db) } + fn check_schema_version_or_error(&mut self) -> Result<(), db_error> { + match Self::get_schema_version(self.conn()) { + Ok(version) => { + let expected_version = ATLASDB_VERSION.to_string(); + if version == expected_version { + Ok(()) + } else { + Err(db_error::Other(format!( + "The version of the Atlas DB {} does not match the expected {} and cannot be updated from AtlasDB::open()", + version, expected_version + ))) + } + } + Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e), + } + } + + fn apply_schema_2(db_conn: &Connection) -> Result<(), db_error> { + for row_text in ATLASDB_SCHEMA_2 { + db_conn.execute_batch(row_text)?; + } + + db_conn.execute( + "INSERT OR REPLACE INTO db_config (version) VALUES (?1)", + &["2"], + )?; + + Ok(()) + } + + fn check_schema_version_and_update(&mut self) -> Result<(), db_error> { + let tx = self.tx_begin()?; + match AtlasDB::get_schema_version(&tx) { + Ok(version) => { + let expected_version = ATLASDB_VERSION.to_string(); + if version == expected_version { + return Ok(()); + } + if version == "1" { + Self::apply_schema_2(&tx)?; + tx.commit()?; + Ok(()) + } else { + panic!("The schema version of the Atlas DB is invalid.") + } + } + Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e), + } + } + // Open an atlas database in memory (used for testing) #[cfg(test)] pub fn connect_memory(atlas_config: AtlasConfig) -> Result { @@ -387,19 +501,13 @@ impl AtlasDB { let tx = self.tx_begin()?; tx.execute( "INSERT OR REPLACE INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)", - &[ - &attachment.hash() as &dyn ToSql, - &attachment.content as &dyn ToSql, - &now as &dyn ToSql, - ], - ) - .map_err(db_error::SqliteError)?; + rusqlite::params![&attachment.hash(), &attachment.content, &now], + )?; tx.execute( - "UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1", - &[&attachment.hash() as &dyn ToSql], - ) - .map_err(db_error::SqliteError)?; - tx.commit().map_err(db_error::SqliteError)?; + "UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1 AND status = ?2", + rusqlite::params![&attachment.hash(), &AttachmentInstanceStatus::Checked], + )?; + tx.commit()?; Ok(()) } @@ -434,8 +542,8 @@ impl AtlasDB { pub fn find_unresolved_attachment_instances( &mut self, ) -> Result, db_error> { - let qry = "SELECT * FROM attachment_instances WHERE is_available = 0".to_string(); - let rows = query_rows::(&self.conn, &qry, NO_PARAMS)?; + let qry = "SELECT * FROM attachment_instances WHERE is_available = 0 AND status = ?"; + let rows = query_rows(&self.conn, qry, &[&AttachmentInstanceStatus::Checked])?; Ok(rows) } @@ -444,9 +552,9 @@ impl AtlasDB { content_hash: &Hash160, ) -> Result, db_error> { let hex_content_hash = to_hex(&content_hash.0[..]); - let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1".to_string(); - let args = [&hex_content_hash as &dyn ToSql]; - let rows = query_rows::(&self.conn, &qry, &args)?; + let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1 AND status = ?2"; + let args = rusqlite::params![&hex_content_hash, &AttachmentInstanceStatus::Checked]; + let rows = query_rows(&self.conn, qry, args)?; Ok(rows) } @@ -462,31 +570,82 @@ impl AtlasDB { Ok(row) } - pub fn insert_uninstantiated_attachment_instance( + /// Queue a new attachment instance, status will be set to "queued", + /// and the is_available field set to false + pub fn queue_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + ) -> Result<(), db_error> { + self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Queued, false) + } + + /// Insert an attachment instance from an initial batch. + /// All such instances are marked "checked", and is_available = true + pub fn insert_initial_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + ) -> Result<(), db_error> { + self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Checked, true) + } + + /// Return all the queued attachment instances + pub fn queued_attachments(&self) -> Result, db_error> { + query_rows( + &self.conn, + "SELECT * FROM attachment_instances WHERE status = ?1", + &[&AttachmentInstanceStatus::Queued], + ) + } + + /// Update a queued attachment to "checked", setting the `is_available` field. + pub fn mark_attachment_instance_checked( &mut self, attachment: &AttachmentInstance, is_available: bool, ) -> Result<(), db_error> { - let hex_content_hash = to_hex(&attachment.content_hash.0[..]); - let hex_tx_id = attachment.tx_id.to_hex(); - let tx = self.tx_begin()?; + self.conn.execute( + "UPDATE attachment_instances SET status = ?1, is_available = ?2 + WHERE index_block_hash = ?3, contract_id = ?4, attachment_index = ?5", + rusqlite::params![ + &AttachmentInstanceStatus::Checked, + &is_available, + &attachment.index_block_hash, + &attachment.contract_id.to_string(), + &attachment.attachment_index, + ], + )?; + Ok(()) + } + + /// Insert an attachment instance. + fn insert_attachment_instance( + &mut self, + attachment: &AttachmentInstance, + status: AttachmentInstanceStatus, + is_available: bool, + ) -> Result<(), db_error> { + let sql_tx = self.tx_begin()?; let now = util::get_epoch_time_secs() as i64; - let res = tx.execute( - "INSERT OR REPLACE INTO attachment_instances (content_hash, created_at, index_block_hash, attachment_index, block_height, is_available, metadata, contract_id, tx_id) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", - &[ - &hex_content_hash as &dyn ToSql, - &now as &dyn ToSql, - &attachment.index_block_hash as &dyn ToSql, - &attachment.attachment_index as &dyn ToSql, + sql_tx.execute( + "INSERT OR REPLACE INTO attachment_instances ( + content_hash, created_at, index_block_hash, + attachment_index, block_height, is_available, + metadata, contract_id, tx_id, status) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + rusqlite::params![ + &attachment.content_hash, + &now, + &attachment.index_block_hash, + &attachment.attachment_index, &u64_to_sql(attachment.stacks_block_height)?, - &is_available as &dyn ToSql, - &attachment.metadata as &dyn ToSql, - &attachment.contract_id.to_string() as &dyn ToSql, - &hex_tx_id as &dyn ToSql, - ] - ); - res.map_err(db_error::SqliteError)?; - tx.commit().map_err(db_error::SqliteError)?; + &is_available, + &attachment.metadata, + &attachment.contract_id.to_string(), + &attachment.tx_id, + &status + ], + )?; + sql_tx.commit()?; Ok(()) } } diff --git a/src/net/atlas/download.rs b/src/net/atlas/download.rs index a2ab7e1d88..d6fc60e9cf 100644 --- a/src/net/atlas/download.rs +++ b/src/net/atlas/download.rs @@ -113,12 +113,7 @@ impl AttachmentsDownloader { // Handle initial batch if self.initial_batch.len() > 0 { - let mut batch = HashSet::new(); - for attachment_instance in self.initial_batch.drain(..) { - batch.insert(attachment_instance); - } - let mut resolved = - self.enqueue_new_attachments(&mut batch, &mut network.atlasdb, true)?; + let mut resolved = self.enqueue_initial_attachments(&mut network.atlasdb)?; resolved_attachments.append(&mut resolved); } @@ -235,74 +230,57 @@ impl AttachmentsDownloader { Ok((resolved_attachments, events_to_deregister)) } - pub fn enqueue_new_attachments( + fn check_attachment_instances( &mut self, - new_attachments: &mut HashSet, - atlasdb: &mut AtlasDB, - initial_batch: bool, - ) -> Result, net_error> { - if new_attachments.is_empty() { - return Ok(vec![]); - } - + atlas_db: &mut AtlasDB, + iterator: Vec, + do_if_found: F, + do_if_not_found: G, + ) -> Result, net_error> + where + F: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), net_error>, + G: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), net_error>, + { let mut attachments_batches: HashMap = HashMap::new(); let mut resolved_attachments = vec![]; - for attachment_instance in new_attachments.drain() { - // Are we dealing with an empty hash - allowed for undoing onchain binding + for attachment_instance in iterator { if attachment_instance.content_hash == Hash160::empty() { - // todo(ludo) insert or update ? - atlasdb - .insert_uninstantiated_attachment_instance(&attachment_instance, true) - .map_err(|e| net_error::DBError(e))?; + // Are we dealing with an empty hash - allowed for undoing onchain binding + do_if_found(atlas_db, &attachment_instance)?; debug!("Atlas: inserting and pairing new attachment instance with empty hash"); resolved_attachments.push((attachment_instance, Attachment::empty())); - continue; - } - - // Do we already have a matching validated attachment - if let Ok(Some(entry)) = atlasdb.find_attachment(&attachment_instance.content_hash) { - atlasdb - .insert_uninstantiated_attachment_instance(&attachment_instance, true) - .map_err(|e| net_error::DBError(e))?; + } else if let Ok(Some(entry)) = + atlas_db.find_attachment(&attachment_instance.content_hash) + { + // Do we already have a matching validated attachment + do_if_found(atlas_db, &attachment_instance)?; debug!( "Atlas: inserting and pairing new attachment instance to existing attachment" ); resolved_attachments.push((attachment_instance, entry)); - continue; - } - - // Do we already have a matching inboxed attachment - if let Ok(Some(attachment)) = - atlasdb.find_uninstantiated_attachment(&attachment_instance.content_hash) + } else if let Ok(Some(attachment)) = + atlas_db.find_uninstantiated_attachment(&attachment_instance.content_hash) { - atlasdb - .insert_instantiated_attachment(&attachment) - .map_err(|e| net_error::DBError(e))?; - atlasdb - .insert_uninstantiated_attachment_instance(&attachment_instance, true) - .map_err(|e| net_error::DBError(e))?; + // Do we already have a matching inboxed attachment + atlas_db.insert_instantiated_attachment(&attachment)?; + do_if_found(atlas_db, &attachment_instance)?; debug!("Atlas: inserting and pairing new attachment instance to inboxed attachment, now validated"); resolved_attachments.push((attachment_instance, attachment)); - continue; - } - - // This attachment in refering to an unknown attachment. - // Let's append it to the batch being constructed in this routine. - match attachments_batches.entry(attachment_instance.index_block_hash) { - Entry::Occupied(entry) => { - entry.into_mut().track_attachment(&attachment_instance); - } - Entry::Vacant(v) => { - let mut batch = AttachmentsBatch::new(); - batch.track_attachment(&attachment_instance); - v.insert(batch); - } - }; + } else { + // This attachment refers to an unknown attachment. + // Let's append it to the batch being constructed in this routine. + match attachments_batches.entry(attachment_instance.index_block_hash) { + Entry::Occupied(entry) => { + entry.into_mut().track_attachment(&attachment_instance); + } + Entry::Vacant(v) => { + let mut batch = AttachmentsBatch::new(); + batch.track_attachment(&attachment_instance); + v.insert(batch); + } + }; - if !initial_batch { - atlasdb - .insert_uninstantiated_attachment_instance(&attachment_instance, false) - .map_err(|e| net_error::DBError(e))?; + do_if_not_found(atlas_db, &attachment_instance)?; } } @@ -312,6 +290,61 @@ impl AttachmentsDownloader { Ok(resolved_attachments) } + + /// Check any queued attachment instances to see if we already have data for them, + /// returning a vector of (instance, attachment) pairs for any of the queued attachments + /// which already had the associated data + /// Marks any processed attachments as checked + pub fn check_queued_attachment_instances( + &mut self, + atlas_db: &mut AtlasDB, + ) -> Result, net_error> { + let new_attachments = atlas_db.queued_attachments()?; + + self.check_attachment_instances( + atlas_db, + new_attachments, + |atlas_db, attachment_instance| { + atlas_db + .mark_attachment_instance_checked(&attachment_instance, true) + .map_err(net_error::from) + }, + |atlas_db, attachment_instance| { + atlas_db + .mark_attachment_instance_checked(&attachment_instance, false) + .map_err(net_error::from) + }, + ) + } + + /// Insert the initial attachments set. Only add the attachment instance if associated data + /// was found. + pub fn enqueue_initial_attachments( + &mut self, + atlas_db: &mut AtlasDB, + ) -> Result, net_error> { + if self.initial_batch.is_empty() { + return Ok(vec![]); + } + + // we're draining the initial batch, so to avoid angering The Borrow Checker + // use mem replace to just take the whole vec. + let initial_batch = std::mem::replace(&mut self.initial_batch, vec![]); + + self.check_attachment_instances( + atlas_db, + initial_batch, + |atlas_db, attachment_instance| { + atlas_db + .insert_initial_attachment_instance(&attachment_instance) + .map_err(net_error::from) + }, + |_atlas_db, _attachment_instance| { + // If attachment not found, don't insert attachment instance + Ok(()) + }, + ) + } } #[derive(Debug)] diff --git a/src/net/atlas/tests.rs b/src/net/atlas/tests.rs index ef1622710c..333aeb82a1 100644 --- a/src/net/atlas/tests.rs +++ b/src/net/atlas/tests.rs @@ -1003,7 +1003,7 @@ fn test_evict_expired_unresolved_attachment_instances() { }; let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap(); - // Insert some uninstanciated attachments + // Insert some uninstantiated attachments let uninstantiated_attachment_instances = [ new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1), new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1), @@ -1016,7 +1016,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } @@ -1029,7 +1032,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } @@ -1043,7 +1049,10 @@ fn test_evict_expired_unresolved_attachment_instances() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } @@ -1092,7 +1101,7 @@ fn test_bit_vectors() { let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap(); - // Insert some uninstanciated attachments + // Insert some uninstantiated attachments let uninstantiated_attachment_instances = [ new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1), new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1), @@ -1101,7 +1110,10 @@ fn test_bit_vectors() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } let block_id_1 = uninstantiated_attachment_instances[0].index_block_hash; @@ -1118,7 +1130,10 @@ fn test_bit_vectors() { ]; for attachment_instance in uninstantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, false) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, false) .unwrap(); } let bit_vector = atlas_db @@ -1134,7 +1149,10 @@ fn test_bit_vectors() { ]; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } @@ -1160,7 +1178,10 @@ fn test_bit_vectors() { let block_id_2 = instantiated_attachment_instances[0].index_block_hash; for attachment_instance in instantiated_attachment_instances.iter() { atlas_db - .insert_uninstantiated_attachment_instance(attachment_instance, true) + .queue_attachment_instance(attachment_instance) + .unwrap(); + atlas_db + .mark_attachment_instance_checked(attachment_instance, true) .unwrap(); } diff --git a/src/net/mod.rs b/src/net/mod.rs index 93accf0fcb..7e88f05ff9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2835,7 +2835,6 @@ pub mod test { false, 100, &RPCHandlerArgs::default(), - &mut HashSet::new(), ); self.sortdb = Some(sortdb); @@ -2859,7 +2858,6 @@ pub mod test { false, 100, &RPCHandlerArgs::default(), - &mut HashSet::new(), ); self.sortdb = Some(sortdb); diff --git a/src/net/p2p.rs b/src/net/p2p.rs index c160924a29..a7b79118c1 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -5235,7 +5235,6 @@ impl PeerNetwork { ibd: bool, poll_timeout: u64, handler_args: &RPCHandlerArgs, - attachment_requests: &mut HashSet, ) -> Result { debug!(">>>>>>>>>>>>>>>>>>>>>>> Begin Network Dispatch (poll for {}) >>>>>>>>>>>>>>>>>>>>>>>>>>>>", poll_timeout); let mut poll_states = match self.network { @@ -5276,11 +5275,8 @@ impl PeerNetwork { // Events are being parsed and dispatched here once and we want to // enqueue them. match PeerNetwork::with_attachments_downloader(self, |network, attachments_downloader| { - let mut known_attachments = attachments_downloader.enqueue_new_attachments( - attachment_requests, - &mut network.atlasdb, - false, - )?; + let mut known_attachments = + attachments_downloader.check_queued_attachment_instances(&mut network.atlasdb)?; network_result.attachments.append(&mut known_attachments); Ok(()) }) { diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index d0bf8a8dcd..6e117af396 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -711,14 +711,6 @@ fn spawn_peer( cmp::min(poll_timeout, config.node.microblock_frequency) }; - let mut expected_attachments = match attachments_rx.try_recv() { - Ok(expected_attachments) => expected_attachments, - _ => { - debug!("Atlas: attachment channel is empty"); - HashSet::new() - } - }; - let _ = Relayer::setup_unconfirmed_state_readonly(&mut chainstate, &sortdb); recv_unconfirmed_txs(&mut chainstate, unconfirmed_txs.clone()); @@ -731,7 +723,6 @@ fn spawn_peer( ibd, poll_ms, &handler_args, - &mut expected_attachments, ) { Ok(network_result) => { if num_p2p_state_machine_passes < network_result.num_state_machine_passes { diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 1a32b1a40b..7161e9cca9 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -225,14 +225,6 @@ fn spawn_peer( } }; - let mut expected_attachments = match attachments_rx.try_recv() { - Ok(expected_attachments) => expected_attachments, - _ => { - debug!("Atlas: attachment channel is empty"); - HashSet::new() - } - }; - let net_result = this .run( &sortdb, @@ -243,7 +235,6 @@ fn spawn_peer( false, poll_timeout, &handler_args, - &mut expected_attachments, ) .unwrap(); if net_result.has_transactions() { diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index 9f9cb0a839..60012c88a8 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -24,7 +24,9 @@ use stacks::chainstate::coordinator::{ check_chainstate_db_versions, BlockEventDispatcher, ChainsCoordinator, CoordinatorCommunication, }; use stacks::chainstate::stacks::db::{ChainStateBootData, StacksChainState}; -use stacks::net::atlas::{AtlasConfig, Attachment, AttachmentInstance, ATTACHMENTS_CHANNEL_SIZE}; +use stacks::net::atlas::{ + AtlasConfig, AtlasDB, Attachment, AttachmentInstance, ATTACHMENTS_CHANNEL_SIZE, +}; use stx_genesis::GenesisData; use crate::monitoring::start_serving_monitoring_metrics; @@ -438,7 +440,14 @@ impl RunLoop { let moved_config = self.config.clone(); let moved_burnchain_config = burnchain_config.clone(); let mut coordinator_dispatcher = self.event_dispatcher.clone(); - let (attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE); + let (_attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE); + + let atlas_db = AtlasDB::connect( + moved_atlas_config.clone(), + &self.config.get_atlas_db_file_path(), + true, + ) + .expect("Failed to connect Atlas DB during startup"); let coordinator_thread_handle = thread::Builder::new() .name("chains-coordinator".to_string()) @@ -449,12 +458,12 @@ impl RunLoop { ChainsCoordinator::run( chain_state_db, moved_burnchain_config, - attachments_tx, &mut coordinator_dispatcher, coordinator_receivers, moved_atlas_config, cost_estimator.as_deref_mut(), fee_estimator.as_deref_mut(), + atlas_db, ); }) .expect("FATAL: failed to start chains coordinator thread"); From 3a7c700fe93b4df4a76c6db0f2b956c76eba0707 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Fri, 8 Apr 2022 11:35:22 -0500 Subject: [PATCH 2/9] add limit to queue result size, clean out attachments_rx/tx instances --- src/net/atlas/db.rs | 17 +++++-- testnet/stacks-node/src/neon_node.rs | 3 -- testnet/stacks-node/src/node.rs | 58 +++++++++++++--------- testnet/stacks-node/src/run_loop/helium.rs | 14 +++--- testnet/stacks-node/src/run_loop/neon.rs | 8 ++- 5 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index 183de453e8..1fc9ee51d8 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -54,6 +54,15 @@ use super::{AtlasConfig, Attachment, AttachmentInstance}; pub const ATLASDB_VERSION: &'static str = "2"; +/// The maximum number of atlas attachment instances that should be +/// checked at once (this is used to limit the return size of +/// `queued_attachments`). Because these checks will sometimes surface +/// existing attachment data associated with those instances, the +/// memory impact of these checks is not limited to the +/// AttachmentInstance size (which is small), but can include the +/// Attachment as well (which is larger). +pub const MAX_PROCESS_PER_ROUND: u32 = 1_000; + const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" CREATE TABLE attachments( @@ -588,12 +597,12 @@ impl AtlasDB { self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Checked, true) } - /// Return all the queued attachment instances + /// Return all the queued attachment instances, limited by `MAX_PROCESS_PER_ROUND` pub fn queued_attachments(&self) -> Result, db_error> { query_rows( &self.conn, - "SELECT * FROM attachment_instances WHERE status = ?1", - &[&AttachmentInstanceStatus::Queued], + "SELECT * FROM attachment_instances WHERE status = ?1 LIMIT ?2", + rusqlite::params![&AttachmentInstanceStatus::Queued, MAX_PROCESS_PER_ROUND], ) } @@ -605,7 +614,7 @@ impl AtlasDB { ) -> Result<(), db_error> { self.conn.execute( "UPDATE attachment_instances SET status = ?1, is_available = ?2 - WHERE index_block_hash = ?3, contract_id = ?4, attachment_index = ?5", + WHERE index_block_hash = ?3 AND contract_id = ?4 AND attachment_index = ?5", rusqlite::params![ &AttachmentInstanceStatus::Checked, &is_available, diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 6e117af396..4d987ac9be 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -624,7 +624,6 @@ fn spawn_peer( rpc_sock: &SocketAddr, poll_timeout: u64, relay_channel: SyncSender, - attachments_rx: Receiver>, unconfirmed_txs: Arc>, ) -> Result, NetError> { let config = runloop.config().clone(); @@ -1212,7 +1211,6 @@ impl StacksNode { runloop: &RunLoop, last_burn_block: Option, coord_comms: CoordinatorChannels, - attachments_rx: Receiver>, ) -> StacksNode { let config = runloop.config().clone(); let miner = runloop.is_miner(); @@ -1427,7 +1425,6 @@ impl StacksNode { &rpc_sock, 5000, relay_send.clone(), - attachments_rx, shared_unconfirmed_txs, ) .expect("Failed to initialize p2p thread"); diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 7161e9cca9..bf5e08bdf3 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -92,7 +92,6 @@ pub struct Node { last_sortitioned_block: Option, event_dispatcher: EventDispatcher, nonce: u64, - attachments_tx: SyncSender>, } pub fn get_account_lockups( @@ -165,8 +164,8 @@ fn spawn_peer( exit_at_block_height: Option, genesis_chainstate_hash: Sha256Sum, poll_timeout: u64, - attachments_rx: Receiver>, config: Config, + atlas_db: AtlasDB, ) -> Result, NetError> { this.bind(p2p_sock, rpc_sock).unwrap(); let server_thread = thread::spawn(move || { @@ -263,11 +262,7 @@ pub fn use_test_genesis_chainstate(config: &Config) -> bool { impl Node { /// Instantiate and initialize a new node, given a config - pub fn new( - config: Config, - boot_block_exec: Box ()>, - attachments_tx: SyncSender>, - ) -> Self { + pub fn new(config: Config, boot_block_exec: Box ()>) -> Self { let use_test_genesis_data = if config.burnchain.mode == "mocknet" { use_test_genesis_chainstate(&config) } else { @@ -354,7 +349,6 @@ impl Node { burnchain_tip: None, nonce: 0, event_dispatcher, - attachments_tx, } } @@ -384,7 +378,6 @@ impl Node { Err(_e) => panic!(), }; - let (attachments_tx, attachments_rx) = sync_channel(1); let mut node = Node { active_registered_key: None, bootstraping_chain: false, @@ -396,10 +389,9 @@ impl Node { burnchain_tip: None, nonce: 0, event_dispatcher, - attachments_tx, }; - node.spawn_peer_server(attachments_rx); + node.spawn_peer_server(); loop { let sortdb = @@ -422,7 +414,20 @@ impl Node { node } - pub fn spawn_peer_server(&mut self, attachments_rx: Receiver>) { + fn make_atlas_config() -> AtlasConfig { + AtlasConfig::default(false) + } + + pub fn make_atlas_db(&self) -> AtlasDB { + AtlasDB::connect( + Self::make_atlas_config(), + &self.config.get_atlas_db_file_path(), + true, + ) + .unwrap() + } + + pub fn spawn_peer_server(&mut self) { // we can call _open_ here rather than _connect_, since connect is first called in // make_genesis_block let sortdb = SortitionDB::open(&self.config.get_burn_db_file_path(), true) @@ -503,9 +508,9 @@ impl Node { } tx.commit().unwrap(); } - let atlas_config = AtlasConfig::default(false); - let atlasdb = - AtlasDB::connect(atlas_config, &self.config.get_atlas_db_file_path(), true).unwrap(); + let atlas_config = Self::make_atlas_config(); + let atlasdb = self.make_atlas_db(); + let peer_atlasdb = self.make_atlas_db(); let local_peer = match PeerDB::get_local_peer(peerdb.conn()) { Ok(local_peer) => local_peer, @@ -537,8 +542,8 @@ impl Node { exit_at_block_height, Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH).unwrap(), 1000, - attachments_rx, self.config.clone(), + peer_atlasdb, ) .unwrap(); @@ -750,6 +755,7 @@ impl Node { consensus_hash: &ConsensusHash, microblocks: Vec, db: &mut SortitionDB, + atlas_db: &mut AtlasDB, ) -> ChainTip { let parent_consensus_hash = { // look up parent consensus hash @@ -826,7 +832,7 @@ impl Node { } }; - let atlas_config = AtlasConfig::default(false); + let atlas_config = Self::make_atlas_config(); let mut processed_blocks = vec![]; loop { let mut process_blocks_at_tip = { @@ -846,13 +852,19 @@ impl Node { let attachments_instances = self.get_attachment_instances(epoch_receipt, &atlas_config); if !attachments_instances.is_empty() { - match self.attachments_tx.send(attachments_instances) { - Ok(_) => {} - Err(e) => { - error!("Error dispatching attachments {}", e); - panic!(); + for new_attachment in attachments_instances.into_iter() { + if let Err(e) = + atlas_db.queue_attachment_instance(&new_attachment) + { + warn!( + "Atlas: Error writing attachment instance to DB"; + "err" => ?e, + "index_block_hash" => %new_attachment.index_block_hash, + "contract_id" => %new_attachment.contract_id, + "attachment_index" => %new_attachment.attachment_index, + ); } - }; + } } } _ => {} diff --git a/testnet/stacks-node/src/run_loop/helium.rs b/testnet/stacks-node/src/run_loop/helium.rs index fa7d7ba853..33d9b0f88f 100644 --- a/testnet/stacks-node/src/run_loop/helium.rs +++ b/testnet/stacks-node/src/run_loop/helium.rs @@ -15,7 +15,6 @@ pub struct RunLoop { config: Config, pub node: Node, pub callbacks: RunLoopCallbacks, - attachments_rx: Option>>, } impl RunLoop { @@ -28,16 +27,13 @@ impl RunLoop { config: Config, boot_exec: Box ()>, ) -> Self { - let (attachments_tx, attachments_rx) = sync_channel(1); - // Build node based on config - let node = Node::new(config.clone(), boot_exec, attachments_tx); + let node = Node::new(config.clone(), boot_exec); Self { config, node, callbacks: RunLoopCallbacks::new(), - attachments_rx: Some(attachments_rx), } } @@ -74,8 +70,7 @@ impl RunLoop { self.node.process_burnchain_state(&burnchain_tip); // todo(ludo): should return genesis? let mut chain_tip = ChainTip::genesis(&BurnchainHeaderHash::zero(), 0, 0); - let attachments_rx = self.attachments_rx.take().unwrap(); - self.node.spawn_peer_server(attachments_rx); + self.node.spawn_peer_server(); // Bootstrap the chain: node will start a new tenure, // using the sortition hash from block #1 for generating a VRF. @@ -128,12 +123,14 @@ impl RunLoop { // Have the node process its own tenure. // We should have some additional checks here, and ensure that the previous artifacts are legit. + let mut atlas_db = self.node.make_atlas_db(); chain_tip = self.node.process_tenure( &artifacts_from_1st_tenure.anchored_block, &last_sortitioned_block.block_snapshot.consensus_hash, artifacts_from_1st_tenure.microblocks.clone(), burnchain.sortdb_mut(), + &mut atlas_db, ); self.callbacks.invoke_new_stacks_chain_state( @@ -204,11 +201,14 @@ impl RunLoop { Some(ref artifacts) => { // Have the node process its tenure. // We should have some additional checks here, and ensure that the previous artifacts are legit. + let mut atlas_db = self.node.make_atlas_db(); + chain_tip = self.node.process_tenure( &artifacts.anchored_block, &last_sortitioned_block.block_snapshot.consensus_hash, artifacts.microblocks.clone(), burnchain.sortdb_mut(), + &mut atlas_db, ); self.callbacks.invoke_new_stacks_chain_state( diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index 60012c88a8..3497908157 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -386,7 +386,7 @@ impl RunLoop { &mut self, burnchain_config: &Burnchain, coordinator_receivers: CoordinatorReceivers, - ) -> (JoinHandle<()>, Receiver>) { + ) -> JoinHandle<()> { let use_test_genesis_data = use_test_genesis_chainstate(&self.config); // load up genesis balances @@ -440,7 +440,6 @@ impl RunLoop { let moved_config = self.config.clone(); let moved_burnchain_config = burnchain_config.clone(); let mut coordinator_dispatcher = self.event_dispatcher.clone(); - let (_attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE); let atlas_db = AtlasDB::connect( moved_atlas_config.clone(), @@ -468,7 +467,7 @@ impl RunLoop { }) .expect("FATAL: failed to start chains coordinator thread"); - (coordinator_thread_handle, attachments_rx) + coordinator_thread_handle } /// Instantiate the PoX watchdog @@ -548,7 +547,7 @@ impl RunLoop { self.is_miner = Some(is_miner); // have headers; boot up the chains coordinator and instantiate the chain state - let (coordinator_thread_handle, attachments_rx) = + let coordinator_thread_handle = self.spawn_chains_coordinator(&burnchain_config, coordinator_receivers); self.instantiate_pox_watchdog(); @@ -568,7 +567,6 @@ impl RunLoop { self, Some(burnchain_tip.clone()), coordinator_senders.clone(), - attachments_rx, ); let sortdb = burnchain.sortdb_mut(); let mut sortition_db_height = RunLoop::get_sortition_db_height(&sortdb, &burnchain_config); From 6bac33f054baae811f16652786f537cf81250fa4 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Fri, 8 Apr 2022 13:04:37 -0500 Subject: [PATCH 3/9] update CHANGELOG add more rustdocs --- CHANGELOG.md | 4 ++++ src/net/atlas/db.rs | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b794df647..70f270288e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE - Expose a node's public key and public key hash160 (i.e. what appears in /v2/neighbors) via the /v2/info API endpoint (#3046) +### Fixed +- The AtlasDB previously could lose `AttachmentInstance` data during shutdown + or crashes (#3082). This release resolves that. + ## [2.05.0.1.0] ### Added diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index 1fc9ee51d8..b7704feb3c 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -103,6 +103,13 @@ const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ const ATLASDB_INDEXES: &'static [&'static str] = &["CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);"]; +/// Attachment instances pass through different states once written to the AtlasDB. +/// These instances are initially written as a new Stacks block is processed, and marked +/// as `Queued`. These queued instances contain all the data of the new attachment instance, +/// but they have not yet been checked against the AtlasDB to determine if there is extant +/// Attachment content/data associated with them. The network run loop (`p2p` thread) checks +/// for any queued attachment instances on each pass, and performs that check. Once the check +/// is completed, any checked instances are updated to `Checked`. pub enum AttachmentInstanceStatus { /// This variant indicates that the attachments instance has been written, /// but the downloader has not yet checked that the attachment matched From 97ee5f6b33d2f60cf9b66cff2efa91ff4a5be2d6 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Wed, 8 Feb 2023 11:49:48 -0600 Subject: [PATCH 4/9] cleanup warns --- testnet/stacks-node/src/neon_node.rs | 4 ++-- testnet/stacks-node/src/node.rs | 5 +---- testnet/stacks-node/src/run_loop/helium.rs | 4 +--- testnet/stacks-node/src/run_loop/neon.rs | 8 ++------ testnet/stacks-node/src/tests/epoch_21.rs | 4 ++-- 5 files changed, 8 insertions(+), 17 deletions(-) diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index ded09284b4..e83c7a3537 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -139,7 +139,7 @@ /// This file may be refactored in the future into a full-fledged module. use std::cmp; use std::collections::HashMap; -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::convert::{TryFrom, TryInto}; use std::default::Default; use std::mem; @@ -184,7 +184,7 @@ use stacks::cost_estimates::UnitEstimator; use stacks::cost_estimates::{CostEstimator, FeeEstimator}; use stacks::monitoring::{increment_stx_blocks_mined_counter, update_active_miners_count_gauge}; use stacks::net::{ - atlas::{AtlasConfig, AtlasDB, AttachmentInstance}, + atlas::{AtlasConfig, AtlasDB}, db::{LocalPeer, PeerDB}, dns::DNSClient, dns::DNSResolver, diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 7557678842..092b64b6e6 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -177,7 +177,6 @@ fn spawn_peer( genesis_chainstate_hash: Sha256Sum, poll_timeout: u64, config: Config, - atlas_db: AtlasDB, ) -> Result, NetError> { this.bind(p2p_sock, rpc_sock).unwrap(); let server_thread = thread::spawn(move || { @@ -545,9 +544,8 @@ impl Node { } tx.commit().unwrap(); } - let atlas_config = Self::make_atlas_config(); + let _atlas_config = Self::make_atlas_config(); let atlasdb = self.make_atlas_db(); - let peer_atlasdb = self.make_atlas_db(); let local_peer = match PeerDB::get_local_peer(peerdb.conn()) { Ok(local_peer) => local_peer, @@ -581,7 +579,6 @@ impl Node { Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH).unwrap(), 1000, self.config.clone(), - peer_atlasdb, ) .unwrap(); diff --git a/testnet/stacks-node/src/run_loop/helium.rs b/testnet/stacks-node/src/run_loop/helium.rs index 33d9b0f88f..048d5f34e6 100644 --- a/testnet/stacks-node/src/run_loop/helium.rs +++ b/testnet/stacks-node/src/run_loop/helium.rs @@ -4,10 +4,8 @@ use crate::{ BitcoinRegtestController, BurnchainController, ChainTip, Config, MocknetController, Node, }; use stacks::chainstate::stacks::db::ClarityTx; -use stacks::net::atlas::AttachmentInstance; + use stacks::types::chainstate::BurnchainHeaderHash; -use std::collections::HashSet; -use std::sync::mpsc::{sync_channel, Receiver}; /// RunLoop is coordinating a simulated burnchain and some simulated nodes /// taking turns in producing blocks. diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index 0d031abcab..75950ecebb 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -5,14 +5,12 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicU64; use std::sync::mpsc::sync_channel; -use std::sync::mpsc::Receiver; + use std::sync::Arc; use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; -use std::collections::HashSet; - use stacks::deps::ctrlc as termination; use stacks::deps::ctrlc::SignalId; @@ -28,9 +26,7 @@ use stacks::chainstate::coordinator::{ }; use stacks::chainstate::stacks::db::{ChainStateBootData, StacksChainState}; use stacks::core::StacksEpochId; -use stacks::net::atlas::{ - AtlasConfig, AtlasDB, Attachment, AttachmentInstance, ATTACHMENTS_CHANNEL_SIZE, -}; +use stacks::net::atlas::{AtlasConfig, AtlasDB, Attachment}; use stacks::util_lib::db::Error as db_error; use stx_genesis::GenesisData; diff --git a/testnet/stacks-node/src/tests/epoch_21.rs b/testnet/stacks-node/src/tests/epoch_21.rs index 3b0675bcb0..da33f49065 100644 --- a/testnet/stacks-node/src/tests/epoch_21.rs +++ b/testnet/stacks-node/src/tests/epoch_21.rs @@ -4711,7 +4711,7 @@ fn trait_invocation_cross_epoch() { test_observer::spawn(); - let (mut conf, miner_account) = neon_integration_test_conf(); + let (mut conf, _miner_account) = neon_integration_test_conf(); let mut initial_balances = vec![InitialBalance { address: spender_addr.clone(), amount: 200_000_000, @@ -4728,7 +4728,7 @@ fn trait_invocation_cross_epoch() { epochs[3].start_height = epoch_2_1; conf.burnchain.epochs = Some(epochs); - let http_origin = format!("http://{}", &conf.node.rpc_bind); + let _http_origin = format!("http://{}", &conf.node.rpc_bind); let mut burnchain_config = Burnchain::regtest(&conf.get_burn_db_path()); From 6eb017d42e8e456a4f4fe0bfd36ab4984019a970 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Wed, 22 Feb 2023 15:34:12 -0600 Subject: [PATCH 5/9] test: add unit test for schema migration --- src/net/atlas/db.rs | 75 ++++++++++++++++++++++++++++++++++--- src/net/atlas/tests.rs | 84 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 5 deletions(-) diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index b7704feb3c..2df4da2f60 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -88,10 +88,15 @@ const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[ ]; const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ + // We have to allow status to be null, because SQLite won't let us add + // a not null column without a default. The default defeats the point of + // having not-null here anyways, so we leave this field nullable. r#" ALTER TABLE attachment_instances - ADD status INTEGER NOT NULL + ADD status INTEGER ;"#, + // All of the attachment instances that previously existed in the database + // already were "checked" r#" UPDATE attachment_instances SET status = 2; "#, @@ -128,6 +133,7 @@ impl FromRow for Attachment { impl FromRow for AttachmentInstance { fn from_row<'a>(row: &'a Row) -> Result { + eprintln!("{:?}", row.get_raw("index_block_hash")); let hex_content_hash: String = row.get_unwrap("content_hash"); let attachment_index: u32 = row.get_unwrap("attachment_index"); let block_height = @@ -269,7 +275,7 @@ impl AtlasDB { // If opened for read/write and it doesn't exist, instantiate it. pub fn connect( atlas_config: AtlasConfig, - path: &String, + path: &str, readwrite: bool, ) -> Result { let mut create_flag = false; @@ -289,8 +295,17 @@ impl AtlasDB { OpenFlags::SQLITE_OPEN_READ_ONLY } }; - let conn = sqlite_open(path, open_flags, false)?; + Self::check_instantiate_db(atlas_config, conn, readwrite, create_flag) + } + + /// Inner method for instantiating the db if necessary, updating the schema, or adding indexes + fn check_instantiate_db( + atlas_config: AtlasConfig, + conn: Connection, + readwrite: bool, + create_flag: bool, + ) -> Result { let mut db = AtlasDB { atlas_config, conn, @@ -300,8 +315,8 @@ impl AtlasDB { db.instantiate()?; } if readwrite { - db.add_indexes()?; db.check_schema_version_and_update()?; + db.add_indexes()?; } else { db.check_schema_version_or_error()?; } @@ -373,6 +388,56 @@ impl AtlasDB { Ok(db) } + #[cfg(test)] + /// Only ever to be used in testing, open and instantiate a V1 atlasdb + pub fn connect_memory_db_v1(atlas_config: AtlasConfig) -> Result { + let conn = Connection::open_in_memory()?; + let mut db = AtlasDB { + atlas_config, + conn, + readwrite: true, + }; + + let genesis_attachments = db.atlas_config.genesis_attachments.take(); + + let tx = db.tx_begin()?; + + for row_text in ATLASDB_INITIAL_SCHEMA { + tx.execute_batch(row_text)?; + } + + tx.execute("INSERT INTO db_config (version) VALUES (?1)", &["1"])?; + + if let Some(attachments) = genesis_attachments { + let now = util::get_epoch_time_secs() as i64; + for attachment in attachments { + tx.execute( + "INSERT INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)", + rusqlite::params![ + &attachment.hash(), + &attachment.content, + &now, + ], + )?; + } + } + + tx.commit()?; + + db.add_indexes()?; + + Ok(db) + } + + #[cfg(test)] + /// Only ever to be used in testing, connect to db, but using existing sqlconn + pub fn connect_with_sqlconn( + atlas_config: AtlasConfig, + conn: Connection, + ) -> Result { + Self::check_instantiate_db(atlas_config, conn, true, false) + } + pub fn conn(&self) -> &Connection { &self.conn } @@ -564,7 +629,7 @@ impl AtlasDB { } pub fn find_all_attachment_instances( - &mut self, + &self, content_hash: &Hash160, ) -> Result, db_error> { let hex_content_hash = to_hex(&content_hash.0[..]); diff --git a/src/net/atlas/tests.rs b/src/net/atlas/tests.rs index 333aeb82a1..1d2937ad22 100644 --- a/src/net/atlas/tests.rs +++ b/src/net/atlas/tests.rs @@ -28,6 +28,7 @@ use crate::net::{ PeerHost, Requestable, }; use crate::util_lib::boot::boot_code_id; +use crate::util_lib::db::u64_to_sql; use crate::util_lib::strings::UrlString; use clarity::vm::types::QualifiedContractIdentifier; use stacks_common::types::chainstate::BlockHeaderHash; @@ -774,6 +775,89 @@ fn test_keep_uninstantiated_attachments() { ); } +#[test] +fn schema_2_migration() { + let atlas_config = AtlasConfig { + contracts: HashSet::new(), + attachments_max_size: 1024, + max_uninstantiated_attachments: 10, + uninstantiated_attachments_expire_after: 0, + unresolved_attachment_instances_expire_after: 10, + genesis_attachments: None, + }; + + let atlas_db = AtlasDB::connect_memory_db_v1(atlas_config.clone()).unwrap(); + let conn = atlas_db.conn; + + let attachments = [ + AttachmentInstance { + // content_hash, index_block_hash, and txid must contain hex letters! + // because their fields are declared `STRING`, if you supply all numerals, + // sqlite assigns the field a REAL affinity (instead of TEXT) + content_hash: Hash160([0xa0; 20]), + attachment_index: 1, + stacks_block_height: 1, + index_block_hash: StacksBlockId([0x1b; 32]), + metadata: "".into(), + contract_id: QualifiedContractIdentifier::transient(), + tx_id: Txid([0x2f; 32]), + canonical_stacks_tip_height: Some(1), + }, + AttachmentInstance { + content_hash: Hash160([0x00; 20]), + attachment_index: 1, + stacks_block_height: 1, + index_block_hash: StacksBlockId([0x01; 32]), + metadata: "".into(), + contract_id: QualifiedContractIdentifier::transient(), + tx_id: Txid([0x02; 32]), + canonical_stacks_tip_height: Some(1), + }, + ]; + + for attachment in attachments { + // need to manually insert data, because the insertion routine in the codebase + // sets `status` which doesn't exist in v1 + conn.execute( + "INSERT OR REPLACE INTO attachment_instances ( + content_hash, created_at, index_block_hash, + attachment_index, block_height, is_available, + metadata, contract_id, tx_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + rusqlite::params![ + &attachment.content_hash, + &0, + &attachment.index_block_hash, + &attachment.attachment_index, + &u64_to_sql(attachment.stacks_block_height).unwrap(), + &true, + &attachment.metadata, + &attachment.contract_id.to_string(), + &attachment.tx_id, + ], + ) + .unwrap(); + } + + // perform the migration and unwrap() to assert that it runs okay + let atlas_db = AtlasDB::connect_with_sqlconn(atlas_config, conn).unwrap(); + + assert_eq!( + atlas_db + .find_all_attachment_instances(&Hash160([0xa0; 20])) + .unwrap() + .len(), + 1, + "Should have one attachment instance marked 'checked' with hash `0xa0a0a0..`" + ); + + assert_eq!( + atlas_db.queued_attachments().unwrap().len(), + 0, + "Should have no attachment instance marked 'queued'" + ); +} + #[test] fn test_evict_k_oldest_uninstantiated_attachments() { let atlas_config = AtlasConfig { From d9a5e1852e601178ff7394d25488e764a1337cd8 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Wed, 22 Feb 2023 15:57:38 -0600 Subject: [PATCH 6/9] chore: add comments/docstrings --- src/net/atlas/db.rs | 2 +- src/net/atlas/download.rs | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index 2df4da2f60..832daf4f13 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -96,7 +96,7 @@ const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ ADD status INTEGER ;"#, // All of the attachment instances that previously existed in the database - // already were "checked" + // already were "checked", so set status to 2 (which corresponds to "checked"). r#" UPDATE attachment_instances SET status = 2; "#, diff --git a/src/net/atlas/download.rs b/src/net/atlas/download.rs index a01111b63b..619d8a03e7 100644 --- a/src/net/atlas/download.rs +++ b/src/net/atlas/download.rs @@ -231,6 +231,18 @@ impl AttachmentsDownloader { Ok((resolved_attachments, events_to_deregister)) } + /// Given a list of `AttachmentInstance`, check if the content corresponding to that + /// instance is (1) already validated (2) inboxed or (3) unknown. + /// + /// In the event of (1) or (2), `do_if_found` is invoked, and the attachment instance will + /// be returned (with the attachment data) in the result set. If the attachment was inboxed (case 2), + /// the attachment is marked as instantiated in the atlas db. + /// + /// In the event of (3), `do_if_not_found` is invoked, and the attachment instance is added + /// to `self.priority_queue`. + /// + /// The return value of this function is a vector of all the instances from `iterator` which + /// resolved to Attachment data, paired with that data. fn check_attachment_instances( &mut self, atlas_db: &mut AtlasDB, From d3bf2911b3ccd5dc776ca5637e1f417b121c74d3 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Thu, 23 Feb 2023 15:50:30 -0600 Subject: [PATCH 7/9] test: atlas instance queuing through restart --- src/chainstate/coordinator/mod.rs | 19 +- src/chainstate/coordinator/tests.rs | 350 +++++++++++++++++++++++++--- src/net/mod.rs | 6 +- 3 files changed, 337 insertions(+), 38 deletions(-) diff --git a/src/chainstate/coordinator/mod.rs b/src/chainstate/coordinator/mod.rs index c333b3f91a..614483d4a2 100644 --- a/src/chainstate/coordinator/mod.rs +++ b/src/chainstate/coordinator/mod.rs @@ -419,35 +419,36 @@ impl< impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader> ChainsCoordinator<'a, T, (), U, (), (), B> { + /// Create a coordinator for testing, with some parameters defaulted to None #[cfg(test)] pub fn test_new( burnchain: &Burnchain, chain_id: u32, path: &str, reward_set_provider: U, - attachments_tx: SyncSender>, indexer: B, ) -> ChainsCoordinator<'a, T, (), U, (), (), B> { - ChainsCoordinator::test_new_with_observer( + ChainsCoordinator::test_new_full( burnchain, chain_id, path, reward_set_provider, - attachments_tx, None, indexer, + None, ) } + /// Create a coordinator for testing allowing for all configurable params #[cfg(test)] - pub fn test_new_with_observer( + pub fn test_new_full( burnchain: &Burnchain, chain_id: u32, path: &str, reward_set_provider: U, - _attachments_tx: SyncSender>, dispatcher: Option<&'a T>, burnchain_indexer: B, + atlas_config: Option, ) -> ChainsCoordinator<'a, T, (), U, (), (), B> { let burnchain = burnchain.clone(); @@ -472,6 +473,10 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader let canonical_sortition_tip = SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap(); + let atlas_config = atlas_config.unwrap_or(AtlasConfig::default(false)); + let atlas_db = + AtlasDB::connect(atlas_config.clone(), &format!("{}/atlas", path), true).unwrap(); + ChainsCoordinator { canonical_sortition_tip: Some(canonical_sortition_tip), burnchain_blocks_db, @@ -483,8 +488,8 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader fee_estimator: None, reward_set_provider, notifier: (), - atlas_config: AtlasConfig::default(false), - atlas_db: None, + atlas_config, + atlas_db: Some(atlas_db), config: ChainsCoordinatorConfig::new(), burnchain_indexer, } diff --git a/src/chainstate/coordinator/tests.rs b/src/chainstate/coordinator/tests.rs index aa3ee613e8..f9e1676103 100644 --- a/src/chainstate/coordinator/tests.rs +++ b/src/chainstate/coordinator/tests.rs @@ -51,6 +51,7 @@ use crate::core; use crate::core::*; use crate::monitoring::increment_stx_blocks_processed_counter; use crate::util_lib::boot::boot_code_addr; +use crate::util_lib::strings::StacksString; use crate::vm::errors::Error as InterpreterError; use clarity::vm::{ costs::{ExecutionCost, LimitedCostTracker}, @@ -88,6 +89,14 @@ lazy_static! { pub static ref STACKS_BLOCK_HEADERS: Arc = Arc::new(AtomicU64::new(1)); } +fn test_path(name: &str) -> String { + format!( + "/tmp/stacks-node-tests/coordinator-tests/{}/{}", + get_epoch_time_secs(), + name + ) +} + pub fn next_block_hash() -> BlockHeaderHash { let cur = STACKS_BLOCK_HEADERS.fetch_add(1, Ordering::SeqCst); let mut bytes = vec![]; @@ -452,7 +461,6 @@ pub fn make_coordinator<'a>( burnchain: Option, ) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer> { - let (tx, _) = sync_channel(100000); let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None)); let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); ChainsCoordinator::test_new( @@ -460,11 +468,29 @@ pub fn make_coordinator<'a>( 0x80000000, path, OnChainRewardSetProvider(), - tx, indexer, ) } +pub fn make_coordinator_atlas<'a>( + path: &str, + burnchain: Option, + atlas_config: Option, +) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer> +{ + let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None)); + let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); + ChainsCoordinator::test_new_full( + &burnchain, + 0x80000000, + path, + OnChainRewardSetProvider(), + None, + indexer, + atlas_config, + ) +} + struct StubbedRewardSetProvider(Vec); impl RewardSetProvider for StubbedRewardSetProvider { @@ -491,7 +517,6 @@ fn make_reward_set_coordinator<'a>( pox_consts: Option, ) -> ChainsCoordinator<'a, NullEventDispatcher, (), StubbedRewardSetProvider, (), (), BitcoinIndexer> { - let (tx, _) = sync_channel(100000); let burnchain = get_burnchain(path, None); let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir); ChainsCoordinator::test_new( @@ -499,7 +524,6 @@ fn make_reward_set_coordinator<'a>( 0x80000000, path, StubbedRewardSetProvider(addrs), - tx, indexer, ) } @@ -729,6 +753,7 @@ fn make_stacks_block_from_parent_sortition( false, (Txid([0; 32]), 0), Some(parent_sortition), + &[], ) } @@ -795,12 +820,14 @@ fn make_stacks_block_with_recipients_and_sunset_burn( post_sunset_burn, (Txid([0; 32]), 0), None, + &[], ) } /// build a stacks block with just the coinbase off of /// parent_block, in the canonical sortition fork of SortitionDB. /// parent_block _must_ be included in the StacksChainState +/// `txs`: transactions to try to include in block fn make_stacks_block_with_input( sort_db: &SortitionDB, state: &mut StacksChainState, @@ -816,6 +843,7 @@ fn make_stacks_block_with_input( post_sunset_burn: bool, input: (Txid, u32), parents_sortition_opt: Option, + txs: &[StacksTransaction], ) -> (BlockstackOperationType, StacksBlock) { let tx_auth = TransactionAuth::from_p2pkh(miner).unwrap(); @@ -888,6 +916,10 @@ fn make_stacks_block_with_input( .try_mine_tx(&mut epoch_tx, &coinbase_op, ast_rules) .unwrap(); + for tx in txs { + builder.try_mine_tx(&mut epoch_tx, tx, ast_rules).unwrap(); + } + let block = builder.mine_anchored_block(&mut epoch_tx); builder.epoch_finish(epoch_tx); @@ -940,7 +972,7 @@ fn make_stacks_block_with_input( #[test] fn missed_block_commits_2_05() { - let path = "/tmp/stacks-blockchain-missed_block_commits_2_05"; + let path = &test_path("missed_block_commits_2_05"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1047,6 +1079,7 @@ fn missed_block_commits_2_05() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 2 last_input = Some(( @@ -1100,6 +1133,7 @@ fn missed_block_commits_2_05() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1256,7 +1290,7 @@ fn missed_block_commits_2_05() { /// in 2.1 due to the bad missed block-commit *not* counting towards the miner's sortition weight. #[test] fn missed_block_commits_2_1() { - let path = "/tmp/stacks-blockchain-missed_block_commits_2_1"; + let path = &test_path("missed_block_commits_2_1"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1368,6 +1402,7 @@ fn missed_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 2 last_input = Some(( @@ -1423,6 +1458,7 @@ fn missed_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1596,7 +1632,7 @@ fn missed_block_commits_2_1() { /// the UTXO chain #[test] fn late_block_commits_2_1() { - let path = "/tmp/stacks-blockchain-late_block_commits_2_1"; + let path = &test_path("late_block_commits_2_1"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -1705,6 +1741,7 @@ fn late_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ); // NOTE: intended for block block_height - 3 last_input = Some(( @@ -1760,6 +1797,7 @@ fn late_block_commits_2_1() { false, last_input.as_ref().unwrap().clone(), None, + &[], ) }; @@ -1933,9 +1971,9 @@ fn late_block_commits_2_1() { #[test] fn test_simple_setup() { - let path = "/tmp/stacks-blockchain-simple-setup"; + let path = &test_path("simple-setup"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain-simple-setup.blinded"; + let path_blinded = &test_path("simple-setup.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -2146,7 +2184,7 @@ fn test_simple_setup() { #[test] fn test_sortition_with_reward_set() { - let path = "/tmp/stacks-blockchain-simple-reward-set"; + let path = &test_path("simple-reward-set"); let _r = std::fs::remove_dir_all(path); let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect(); @@ -2415,7 +2453,7 @@ fn test_sortition_with_reward_set() { #[test] fn test_sortition_with_burner_reward_set() { - let path = "/tmp/stacks-blockchain-burner-reward-set"; + let path = &test_path("burner-reward-set"); let _r = std::fs::remove_dir_all(path); let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect(); @@ -2658,7 +2696,7 @@ fn test_sortition_with_burner_reward_set() { #[test] fn test_pox_btc_ops() { - let path = "/tmp/stacks-blockchain-pox-btc-ops"; + let path = &test_path("pox-btc-ops"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -2935,7 +2973,7 @@ fn test_pox_btc_ops() { #[test] fn test_stx_transfer_btc_ops() { - let path = "/tmp/stacks-blockchain-stx_transfer-btc-ops"; + let path = &test_path("stx_transfer-btc-ops"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = u32::max_value(); @@ -3326,7 +3364,7 @@ fn get_delegation_info_pox_2( // \ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ S30 -> S31 -> ... #[test] fn test_delegate_stx_btc_ops() { - let path = "/tmp/stacks-blockchain-delegate-stx-btc-ops"; + let path = &test_path("delegate-stx-btc-ops"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = 12; @@ -3867,7 +3905,7 @@ fn test_initial_coinbase_reward_distributions() { // panic when trying to re-create the costs-2 contract. #[test] fn test_epoch_switch_cost_contract_instantiation() { - let path = "/tmp/stacks-blockchain-epoch-switch-cost-contract-instantiation"; + let path = &test_path("epoch-switch-cost-contract-instantiation"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -4066,7 +4104,7 @@ fn test_epoch_switch_cost_contract_instantiation() { // the test would panic when trying to re-create the pox-2 contract. #[test] fn test_epoch_switch_pox_contract_instantiation() { - let path = "/tmp/stacks-blockchain-epoch-switch-pox-contract-instantiation"; + let path = &test_path("epoch-switch-pox-contract-instantiation"); let _r = std::fs::remove_dir_all(path); let sunset_ht = 8000; @@ -4270,6 +4308,265 @@ fn test_epoch_switch_pox_contract_instantiation() { } } +#[test] +fn atlas_stop_start() { + let path = &test_path("atlas_stop_start"); + let _r = std::fs::remove_dir_all(path); + + let sunset_ht = 8000; + let pox_consts = Some(PoxConstants::new(6, 3, 3, 25, 5, 10, sunset_ht, 10)); + let burnchain_conf = get_burnchain(path, pox_consts.clone()); + + // publish a simple contract used to generate atlas attachment instances + let atlas_contract_content = " + (define-data-var attachment-index uint u1) + (define-public (make-attach (zonefile-hash (buff 20))) + (let ((current-index (var-get attachment-index))) + (print { + attachment: { + hash: zonefile-hash, + attachment-index: current-index, + metadata: \"test-meta\" + } + }) + (var-set attachment-index (+ u1 current-index)) + (ok true)))"; + let atlas_name: clarity::vm::ContractName = "atlas-test".into(); + + let vrf_keys: Vec<_> = (0..15).map(|_| VRFPrivateKey::new()).collect(); + let committers: Vec<_> = (0..15).map(|_| StacksPrivateKey::new()).collect(); + + let signer_sk = StacksPrivateKey::new(); + let signer_pk = p2pkh_from(&signer_sk); + let balance = 6_000_000_000 * (core::MICROSTACKS_PER_STACKS as u64); + let stacked_amt = 1_000_000_000 * (core::MICROSTACKS_PER_STACKS as u128); + let initial_balances = vec![(signer_pk.clone().into(), balance)]; + let atlas_qci = QualifiedContractIdentifier::new(signer_pk.clone().into(), atlas_name.clone()); + // include our simple contract in the atlas config + let mut atlas_config = AtlasConfig::default(false); + atlas_config.contracts.insert(atlas_qci.clone()); + + setup_states( + &[path], + &vrf_keys, + &committers, + pox_consts.clone(), + Some(initial_balances), + StacksEpochId::Epoch21, + ); + + let mut coord = make_coordinator_atlas( + path, + Some(burnchain_conf.clone()), + Some(atlas_config.clone()), + ); + + coord.handle_new_burnchain_block().unwrap(); + + let sort_db = get_sortition_db(path, pox_consts.clone()); + + let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap(); + assert_eq!(tip.block_height, 1); + assert_eq!(tip.sortition, false); + let (_, ops) = sort_db + .get_sortition_result(&tip.sortition_id) + .unwrap() + .unwrap(); + + // we should have all the VRF registrations accepted + assert_eq!(ops.accepted_ops.len(), vrf_keys.len()); + assert_eq!(ops.consumed_leader_keys.len(), 0); + + // process sequential blocks, and their sortitions... + let mut stacks_blocks: Vec<(SortitionId, StacksBlock)> = vec![]; + + let mut contract_publish = StacksTransaction::new( + TransactionVersion::Testnet, + TransactionAuth::from_p2pkh(&signer_sk).unwrap(), + TransactionPayload::SmartContract( + TransactionSmartContract { + name: atlas_name.clone(), + code_body: StacksString::from_str(atlas_contract_content).unwrap(), + }, + None, + ), + ); + contract_publish.chain_id = 0x80000000; + contract_publish.anchor_mode = TransactionAnchorMode::OnChainOnly; + contract_publish.auth.set_origin_nonce(0); + contract_publish.auth.set_tx_fee(100); + let mut signer = StacksTransactionSigner::new(&contract_publish); + signer.sign_origin(&signer_sk).unwrap(); + let contract_publish = signer.get_tx().unwrap(); + + let make_attachments: Vec = (0..5) + .map(|ix| { + ( + ix, + StacksTransaction::new( + TransactionVersion::Testnet, + TransactionAuth::from_p2pkh(&signer_sk).unwrap(), + TransactionPayload::ContractCall(TransactionContractCall { + address: signer_pk.clone().into(), + contract_name: atlas_name.clone(), + function_name: "make-attach".into(), + function_args: vec![Value::buff_from(vec![ix; 20]).unwrap()], + }), + ), + ) + }) + .map(|(ix, mut cc_tx)| { + cc_tx.chain_id = 0x80000000; + cc_tx.anchor_mode = TransactionAnchorMode::OnChainOnly; + cc_tx.auth.set_origin_nonce(ix as u64 + 1); + cc_tx.auth.set_tx_fee(100); + let mut signer = StacksTransactionSigner::new(&cc_tx); + signer.sign_origin(&signer_sk).unwrap(); + signer.get_tx().unwrap() + }) + .collect(); + + for ix in 0..3 { + let vrf_key = &vrf_keys[ix]; + let miner = &committers[ix]; + + let mut burnchain = get_burnchain_db(path, pox_consts.clone()); + let mut chainstate = get_chainstate(path); + + let parent = if ix == 0 { + BlockHeaderHash([0; 32]) + } else { + stacks_blocks[ix - 1].1.header.block_hash() + }; + + let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap(); + let b = get_burnchain(path, pox_consts.clone()); + + let next_mock_header = BurnchainBlockHeader { + block_height: burnchain_tip.block_height + 1, + block_hash: BurnchainHeaderHash([0; 32]), + parent_block_hash: burnchain_tip.block_hash, + num_txs: 0, + timestamp: 1, + }; + + let reward_cycle_info = coord.get_reward_cycle_info(&next_mock_header).unwrap(); + + let txs = if ix == 1 { + vec![contract_publish.clone()] + } else if ix == 2 { + make_attachments.clone() + } else { + vec![] + }; + + let (good_op, block) = if ix == 0 { + make_genesis_block_with_recipients( + &sort_db, + &mut chainstate, + &parent, + miner, + 10000, + vrf_key, + ix as u32, + None, + ) + } else { + make_stacks_block_with_input( + &sort_db, + &mut chainstate, + &b, + &parent, + burnchain_tip.block_height, + miner, + 1000, + vrf_key, + ix as u32, + None, + 0, + false, + (Txid([0; 32]), 0), + None, + &txs, + ) + }; + + let expected_winner = good_op.txid(); + let ops = vec![good_op]; + + let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap(); + produce_burn_block( + &b, + &mut burnchain, + &burnchain_tip.block_hash, + ops, + vec![].iter_mut(), + ); + // handle the sortition + coord.handle_new_burnchain_block().unwrap(); + + let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap(); + assert_eq!(&tip.winning_block_txid, &expected_winner); + + // load the block into staging + let block_hash = block.header.block_hash(); + + assert_eq!(&tip.winning_stacks_block_hash, &block_hash); + stacks_blocks.push((tip.sortition_id.clone(), block.clone())); + + preprocess_block(&mut chainstate, &sort_db, &tip, block); + + // handle the stacks block + coord.handle_new_stacks_block().unwrap(); + + let stacks_tip = SortitionDB::get_canonical_stacks_chain_tip_hash(sort_db.conn()).unwrap(); + let burn_block_height = tip.block_height; + + // check that the bns contract exists + let does_bns_contract_exist = chainstate + .with_read_only_clarity_tx( + &sort_db.index_conn(), + &StacksBlockId::new(&stacks_tip.0, &stacks_tip.1), + |conn| { + conn.with_clarity_db_readonly(|db| db.get_contract(&boot_code_id("bns", false))) + }, + ) + .unwrap(); + + assert!(does_bns_contract_exist.is_ok()); + } + + // okay, we've broadcasted some transactions, lets check that the atlas db has a queue + let atlas_queue = coord + .atlas_db + .as_ref() + .unwrap() + .queued_attachments() + .unwrap(); + assert_eq!( + atlas_queue.len(), + make_attachments.len(), + "Should be as many queued attachments, as attachment txs submitted" + ); + + // now, we'll shut down all the coordinator connections and reopen them + // to ensure that the queue remains in place + let coord = (); // dispose of the coordinator, closing all its connections + let coord = make_coordinator_atlas(path, Some(burnchain_conf), Some(atlas_config)); + + let atlas_queue = coord + .atlas_db + .as_ref() + .unwrap() + .queued_attachments() + .unwrap(); + assert_eq!( + atlas_queue.len(), + make_attachments.len(), + "Should be as many queued attachments, as attachment txs submitted" + ); +} + fn get_total_stacked_info( chainstate: &mut StacksChainState, burn_dbconn: &dyn BurnStateDB, @@ -4309,7 +4606,7 @@ fn get_total_stacked_info( // sent should occur in the "pox.clar" contract. #[test] fn test_epoch_verify_active_pox_contract() { - let path = "/tmp/stacks-blockchain-verify-active-pox-contract"; + let path = &test_path("verify-active-pox-contract"); let _r = std::fs::remove_dir_all(path); let pox_v1_unlock_ht = 12; @@ -4598,7 +4895,7 @@ fn test_epoch_verify_active_pox_contract() { } fn test_sortition_with_sunset() { - let path = "/tmp/stacks-blockchain-sortition-with-sunset"; + let path = &test_path("sortition-with-sunset"); let _r = std::fs::remove_dir_all(path); @@ -4903,7 +5200,7 @@ fn test_sortition_with_sunset() { /// Epoch 2.1 activates at block 50 (n.b. reward cycles are 6 blocks long) #[test] fn test_sortition_with_sunset_and_epoch_switch() { - let path = "/tmp/stacks-blockchain-sortition-with-sunset-and-epoch-switch"; + let path = &test_path("sortition-with-sunset-and-epoch-switch"); let _r = std::fs::remove_dir_all(path); let rc_len = 6; @@ -5251,10 +5548,9 @@ fn test_sortition_with_sunset_and_epoch_switch() { /// (because its parent is block `0`, and nobody stacks in /// this test, all block commits must burn) fn test_pox_processable_block_in_different_pox_forks() { - let path = "/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks"; + let path = &test_path("pox_processable_block_in_different_pox_forks"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = - "/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks.blinded"; + let path_blinded = &test_path("pox_processable_block_in_different_pox_forks.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -5551,9 +5847,9 @@ fn test_pox_processable_block_in_different_pox_forks() { #[test] fn test_pox_no_anchor_selected() { - let path = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected"; + let path = &test_path("pox_fork_no_anchor_selected"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected.blinded"; + let path_blinded = &test_path("pox_fork_no_anchor_selected.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -5765,9 +6061,9 @@ fn test_pox_no_anchor_selected() { #[test] fn test_pox_fork_out_of_order() { - let path = "/tmp/stacks-blockchain.test.pox_fork_out_of_order"; + let path = &test_path("pox_fork_out_of_order"); // setup a second set of states that won't see the broadcasted blocks - let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_out_of_order.blinded"; + let path_blinded = &test_path("pox_fork_out_of_order.blinded"); let _r = std::fs::remove_dir_all(path); let _r = std::fs::remove_dir_all(path_blinded); @@ -6179,7 +6475,7 @@ fn preprocess_block( #[test] fn test_check_chainstate_db_versions() { - let path = "/tmp/stacks-blockchain-check_chainstate_db_versions"; + let path = &test_path("check_chainstate_db_versions"); let _ = std::fs::remove_dir_all(path); let sortdb_path = format!("{}/sortdb", &path); diff --git a/src/net/mod.rs b/src/net/mod.rs index 880c5abe55..5aa80cd8bd 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2788,17 +2788,15 @@ pub mod test { ) .unwrap(); - let (tx, _) = sync_channel(100000); - let indexer = BitcoinIndexer::new_unit_test(&config.burnchain.working_dir); - let mut coord = ChainsCoordinator::test_new_with_observer( + let mut coord = ChainsCoordinator::test_new_full( &config.burnchain, config.network_id, &test_path, OnChainRewardSetProvider(), - tx, observer, indexer, + None, ); coord.handle_new_burnchain_block().unwrap(); From cb9f8dca0f9697e7cfe04c1b2e776de4b8aec8ce Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Tue, 28 Feb 2023 17:48:54 -0600 Subject: [PATCH 8/9] docs: add more rustdocs --- src/net/atlas/db.rs | 29 +++++++++++++++++++++++++++-- src/net/atlas/download.rs | 3 +++ src/net/atlas/mod.rs | 5 +++++ testnet/stacks-node/src/node.rs | 1 - 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index 832daf4f13..d4992450b5 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -14,6 +14,25 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! +//! The `AtlasDB` stores `Attachment` and `AttachmentInstance` objects. +//! `AttachmentInstance` objects indicate what corresponding `Attachment` +//! data a node is interesting in fetching and storing. +//! +//! In the `AtlasDB`, `AttachmentInstance` objects have a status field +//! and an availability field. The status field indicates whether or +//! not the attachment instance has been checked by the +//! `AttachmentDownloader`. The `AttachmentDownloader` does not +//! immediately check entries before insertion in the database: +//! Atlas event processing and downloader logic proceed in +//! different threads. +//! +//! Once the `AttachmentDownloader` checks an attachment instance, it +//! either marks the instance as available (if the content data is +//! already stored on the node) or it adds the attachment instance +//! to its download queue. +//! + use rusqlite::types::FromSql; use rusqlite::types::FromSqlError; use rusqlite::types::ToSql; @@ -117,7 +136,7 @@ const ATLASDB_INDEXES: &'static [&'static str] = /// is completed, any checked instances are updated to `Checked`. pub enum AttachmentInstanceStatus { /// This variant indicates that the attachments instance has been written, - /// but the downloader has not yet checked that the attachment matched + /// but the AtlasDownloader has not yet checked that the attachment matched Queued, /// This variant indicates that the attachments instance has been written, /// and checked for whether or not an already existing attachment matched @@ -652,7 +671,10 @@ impl AtlasDB { } /// Queue a new attachment instance, status will be set to "queued", - /// and the is_available field set to false + /// and the is_available field set to false. + /// + /// This is invoked after block processing by the coordinator thread (which + /// handles atlas event logic). pub fn queue_attachment_instance( &mut self, attachment: &AttachmentInstance, @@ -662,6 +684,9 @@ impl AtlasDB { /// Insert an attachment instance from an initial batch. /// All such instances are marked "checked", and is_available = true + /// + /// This is invoked by the AtlasDownloader when it first runs. The AtlasDownloader + /// is currently managed in the P2P thread. pub fn insert_initial_attachment_instance( &mut self, attachment: &AttachmentInstance, diff --git a/src/net/atlas/download.rs b/src/net/atlas/download.rs index 619d8a03e7..9efe57efde 100644 --- a/src/net/atlas/download.rs +++ b/src/net/atlas/download.rs @@ -308,6 +308,9 @@ impl AttachmentsDownloader { /// returning a vector of (instance, attachment) pairs for any of the queued attachments /// which already had the associated data /// Marks any processed attachments as checked + /// + /// This method is invoked in the thread managing the AttachmentDownloader. This is currently + /// the P2P thread. pub fn check_queued_attachment_instances( &mut self, atlas_db: &mut AtlasDB, diff --git a/src/net/atlas/mod.rs b/src/net/atlas/mod.rs index 073cf25ed6..0d351502ad 100644 --- a/src/net/atlas/mod.rs +++ b/src/net/atlas/mod.rs @@ -80,6 +80,7 @@ impl AtlasConfig { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] +/// Attachments are the content associated with an AttachmentInstance pub struct Attachment { pub content: Vec, } @@ -99,6 +100,10 @@ impl Attachment { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] +/// An attachment instance is a reference to atlas data: a commitment +/// to track the content that is the inverse of `content_hash`. +/// Attachment instances are created by atlas events issued by contracts +/// specified in a node's `AtlasConfig`. pub struct AttachmentInstance { pub content_hash: Hash160, pub attachment_index: u32, diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index 092b64b6e6..10da48829f 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -544,7 +544,6 @@ impl Node { } tx.commit().unwrap(); } - let _atlas_config = Self::make_atlas_config(); let atlasdb = self.make_atlas_db(); let local_peer = match PeerDB::get_local_peer(peerdb.conn()) { From 9b97a39191133a53cfdcb07cccceebbab99246f7 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Thu, 2 Mar 2023 11:29:35 -0600 Subject: [PATCH 9/9] chore: address PR feedback --- src/net/atlas/db.rs | 24 +++++++++++++----------- src/net/atlas/tests.rs | 35 +++++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/net/atlas/db.rs b/src/net/atlas/db.rs index d4992450b5..4270caa00f 100644 --- a/src/net/atlas/db.rs +++ b/src/net/atlas/db.rs @@ -119,13 +119,12 @@ const ATLASDB_SCHEMA_2: &'static [&'static str] = &[ r#" UPDATE attachment_instances SET status = 2; "#, - r#" - CREATE INDEX IF NOT EXISTS index_instance_status ON attachment_instances(status); - "#, ]; -const ATLASDB_INDEXES: &'static [&'static str] = - &["CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);"]; +const ATLASDB_INDEXES: &'static [&'static str] = &[ + "CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);", + "CREATE INDEX IF NOT EXISTS index_instance_status ON attachment_instances(status);", +]; /// Attachment instances pass through different states once written to the AtlasDB. /// These instances are initially written as a new Stacks block is processed, and marked @@ -152,7 +151,6 @@ impl FromRow for Attachment { impl FromRow for AttachmentInstance { fn from_row<'a>(row: &'a Row) -> Result { - eprintln!("{:?}", row.get_raw("index_block_hash")); let hex_content_hash: String = row.get_unwrap("content_hash"); let attachment_index: u32 = row.get_unwrap("attachment_index"); let block_height = @@ -350,10 +348,10 @@ impl AtlasDB { if version == expected_version { Ok(()) } else { - Err(db_error::Other(format!( - "The version of the Atlas DB {} does not match the expected {} and cannot be updated from AtlasDB::open()", - version, expected_version - ))) + let version = version.parse().expect( + "Invalid schema version for AtlasDB: should be a parseable integer", + ); + Err(db_error::OldSchema(version)) } } Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e), @@ -443,7 +441,11 @@ impl AtlasDB { tx.commit()?; - db.add_indexes()?; + let tx = db.tx_begin()?; + for row_text in &ATLASDB_INDEXES[0..1] { + tx.execute_batch(row_text)?; + } + tx.commit()?; Ok(db) } diff --git a/src/net/atlas/tests.rs b/src/net/atlas/tests.rs index 1d2937ad22..efb5a397aa 100644 --- a/src/net/atlas/tests.rs +++ b/src/net/atlas/tests.rs @@ -797,25 +797,25 @@ fn schema_2_migration() { content_hash: Hash160([0xa0; 20]), attachment_index: 1, stacks_block_height: 1, - index_block_hash: StacksBlockId([0x1b; 32]), + index_block_hash: StacksBlockId([0xb1; 32]), metadata: "".into(), contract_id: QualifiedContractIdentifier::transient(), tx_id: Txid([0x2f; 32]), - canonical_stacks_tip_height: Some(1), + canonical_stacks_tip_height: None, }, AttachmentInstance { content_hash: Hash160([0x00; 20]), attachment_index: 1, stacks_block_height: 1, - index_block_hash: StacksBlockId([0x01; 32]), + index_block_hash: StacksBlockId([0x0a; 32]), metadata: "".into(), contract_id: QualifiedContractIdentifier::transient(), - tx_id: Txid([0x02; 32]), - canonical_stacks_tip_height: Some(1), + tx_id: Txid([0x0b; 32]), + canonical_stacks_tip_height: None, }, ]; - for attachment in attachments { + for attachment in attachments.iter() { // need to manually insert data, because the insertion routine in the codebase // sets `status` which doesn't exist in v1 conn.execute( @@ -842,15 +842,30 @@ fn schema_2_migration() { // perform the migration and unwrap() to assert that it runs okay let atlas_db = AtlasDB::connect_with_sqlconn(atlas_config, conn).unwrap(); + let mut attachments_fetched_a0 = atlas_db + .find_all_attachment_instances(&Hash160([0xa0; 20])) + .unwrap(); assert_eq!( - atlas_db - .find_all_attachment_instances(&Hash160([0xa0; 20])) - .unwrap() - .len(), + attachments_fetched_a0.len(), 1, "Should have one attachment instance marked 'checked' with hash `0xa0a0a0..`" ); + let attachment_a0 = attachments_fetched_a0.pop().unwrap(); + assert_eq!(&attachment_a0, &attachments[0]); + + let mut attachments_fetched_00 = atlas_db + .find_all_attachment_instances(&Hash160([0x00; 20])) + .unwrap(); + assert_eq!( + attachments_fetched_00.len(), + 1, + "Should have one attachment instance marked 'checked' with hash `0x000000..`" + ); + + let attachment_00 = attachments_fetched_00.pop().unwrap(); + assert_eq!(&attachment_00, &attachments[1]); + assert_eq!( atlas_db.queued_attachments().unwrap().len(), 0,