Skip to content

Commit

Permalink
Merge pull request #3086 from stacks-network/feat/atlas-tolerance-3082
Browse files Browse the repository at this point in the history
Improved Atlas shutdown and crash tolerance (#3082)
  • Loading branch information
kantai authored Mar 6, 2023
2 parents 4a1e837 + 2b70498 commit 154b316
Show file tree
Hide file tree
Showing 14 changed files with 976 additions and 241 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ this version of the software on it.
- The `blockstack-core` binary has been renamed to `stacks-inspect`.
This binary provides CLI tools for chain and mempool inspection.

### Fixed
- The AtlasDB previously could lose `AttachmentInstance` data during shutdown
or crashes (#3082). This release resolves that.

## [2.05.0.1.0]

### Added
Expand Down
61 changes: 40 additions & 21 deletions src/chainstate/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use crate::core::{StacksEpoch, StacksEpochId};
use crate::monitoring::{
increment_contract_calls_processed, increment_stx_blocks_processed_counter,
};
use crate::net::atlas::{AtlasConfig, AttachmentInstance};
use crate::net::atlas::{AtlasConfig, AtlasDB, AttachmentInstance};
use crate::util_lib::db::DBConn;
use crate::util_lib::db::DBTx;
use crate::util_lib::db::Error as DBError;
Expand Down Expand Up @@ -201,7 +201,7 @@ pub struct ChainsCoordinator<
chain_state_db: StacksChainState,
sortition_db: SortitionDB,
burnchain: Burnchain,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
atlas_db: Option<AtlasDB>,
dispatcher: Option<&'a T>,
cost_estimator: Option<&'a mut CE>,
fee_estimator: Option<&'a mut FE>,
Expand Down Expand Up @@ -319,14 +319,14 @@ impl<
config: ChainsCoordinatorConfig,
chain_state_db: StacksChainState,
burnchain: Burnchain,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
dispatcher: &'a mut T,
comms: CoordinatorReceivers,
atlas_config: AtlasConfig,
cost_estimator: Option<&mut CE>,
fee_estimator: Option<&mut FE>,
miner_status: Arc<Mutex<MinerStatus>>,
burnchain_indexer: B,
atlas_db: AtlasDB,
) where
T: BlockEventDispatcher,
{
Expand Down Expand Up @@ -356,13 +356,13 @@ impl<
chain_state_db,
sortition_db,
burnchain,
attachments_tx,
dispatcher: Some(dispatcher),
notifier: arc_notices,
reward_set_provider: OnChainRewardSetProvider(),
cost_estimator,
fee_estimator,
atlas_config,
atlas_db: Some(atlas_db),
config,
burnchain_indexer,
};
Expand Down Expand Up @@ -419,35 +419,36 @@ impl<
impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader>
ChainsCoordinator<'a, T, (), U, (), (), B>
{
/// Create a coordinator for testing, with some parameters defaulted to None
#[cfg(test)]
pub fn test_new(
burnchain: &Burnchain,
chain_id: u32,
path: &str,
reward_set_provider: U,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
indexer: B,
) -> ChainsCoordinator<'a, T, (), U, (), (), B> {
ChainsCoordinator::test_new_with_observer(
ChainsCoordinator::test_new_full(
burnchain,
chain_id,
path,
reward_set_provider,
attachments_tx,
None,
indexer,
None,
)
}

/// Create a coordinator for testing allowing for all configurable params
#[cfg(test)]
pub fn test_new_with_observer(
pub fn test_new_full(
burnchain: &Burnchain,
chain_id: u32,
path: &str,
reward_set_provider: U,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
dispatcher: Option<&'a T>,
burnchain_indexer: B,
atlas_config: Option<AtlasConfig>,
) -> ChainsCoordinator<'a, T, (), U, (), (), B> {
let burnchain = burnchain.clone();

Expand All @@ -472,6 +473,10 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader
let canonical_sortition_tip =
SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap();

let atlas_config = atlas_config.unwrap_or(AtlasConfig::default(false));
let atlas_db =
AtlasDB::connect(atlas_config.clone(), &format!("{}/atlas", path), true).unwrap();

ChainsCoordinator {
canonical_sortition_tip: Some(canonical_sortition_tip),
burnchain_blocks_db,
Expand All @@ -483,8 +488,8 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader
fee_estimator: None,
reward_set_provider,
notifier: (),
attachments_tx,
atlas_config: AtlasConfig::default(false),
atlas_config,
atlas_db: Some(atlas_db),
config: ChainsCoordinatorConfig::new(),
burnchain_indexer,
}
Expand Down Expand Up @@ -2557,7 +2562,8 @@ impl<

/// Process any Atlas attachment events and forward them to the Atlas subsystem
fn process_atlas_attachment_events(
&self,
atlas_db: Option<&mut AtlasDB>,
atlas_config: &AtlasConfig,
block_receipt: &StacksEpochReceipt,
canonical_stacks_tip_height: u64,
) {
Expand All @@ -2567,7 +2573,7 @@ impl<
if let TransactionPayload::ContractCall(ref contract_call) = transaction.payload {
let contract_id = contract_call.to_clarity_contract_id();
increment_contract_calls_processed();
if self.atlas_config.contracts.contains(&contract_id) {
if atlas_config.contracts.contains(&contract_id) {
for event in receipt.events.iter() {
if let StacksTransactionEvent::SmartContractEvent(ref event_data) =
event
Expand All @@ -2591,15 +2597,26 @@ impl<
}
if !attachments_instances.is_empty() {
info!(
"Atlas: {} attachment instances emitted from events",
attachments_instances.len()
"Atlas: New attachment instances emitted by block";
"attachments_count" => attachments_instances.len(),
"index_block_hash" => %block_receipt.header.index_block_hash(),
"stacks_height" => block_receipt.header.stacks_block_height,
);
match self.attachments_tx.send(attachments_instances) {
Ok(_) => {}
Err(e) => {
error!("Atlas: error dispatching attachments {}", e);
if let Some(atlas_db) = atlas_db {
for new_attachment in attachments_instances.into_iter() {
if let Err(e) = atlas_db.queue_attachment_instance(&new_attachment) {
warn!(
"Atlas: Error writing attachment instance to DB";
"err" => ?e,
"index_block_hash" => %new_attachment.index_block_hash,
"contract_id" => %new_attachment.contract_id,
"attachment_index" => %new_attachment.attachment_index,
);
}
}
};
} else {
warn!("Atlas: attempted to write attachments, but stacks-node not configured with Atlas DB");
}
}
}

Expand Down Expand Up @@ -2897,7 +2914,9 @@ impl<
self.notifier.notify_stacks_block_processed();
increment_stx_blocks_processed_counter();

self.process_atlas_attachment_events(
Self::process_atlas_attachment_events(
self.atlas_db.as_mut(),
&self.atlas_config,
&block_receipt,
new_canonical_block_snapshot.canonical_stacks_tip_height,
);
Expand Down
Loading

0 comments on commit 154b316

Please sign in to comment.