Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock #5036

Merged
merged 9 commits into from
Aug 7, 2024
87 changes: 87 additions & 0 deletions stackslib/src/chainstate/nakamoto/coordinator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::chainstate::nakamoto::coordinator::load_nakamoto_reward_set;
use crate::chainstate::nakamoto::miner::NakamotoBlockBuilder;
use crate::chainstate::nakamoto::signer_set::NakamotoSigners;
use crate::chainstate::nakamoto::test_signers::TestSigners;
use crate::chainstate::nakamoto::test_stall::*;
use crate::chainstate::nakamoto::tests::get_account;
use crate::chainstate::nakamoto::tests::node::TestStacker;
use crate::chainstate::nakamoto::{
Expand Down Expand Up @@ -2453,3 +2454,89 @@ pub fn simple_nakamoto_coordinator_10_extended_tenures_10_sortitions() -> TestPe
fn test_nakamoto_coordinator_10_tenures_and_extensions_10_blocks() {
simple_nakamoto_coordinator_10_extended_tenures_10_sortitions();
}

#[test]
fn process_next_nakamoto_block_deadlock() {
let private_key = StacksPrivateKey::from_seed(&[2]);
let addr = StacksAddress::p2pkh(false, &StacksPublicKey::from_private(&private_key));

let num_stackers: u32 = 4;
let mut signing_key_seed = num_stackers.to_be_bytes().to_vec();
signing_key_seed.extend_from_slice(&[1, 1, 1, 1]);
let signing_key = StacksPrivateKey::from_seed(signing_key_seed.as_slice());
let test_stackers = (0..num_stackers)
.map(|index| TestStacker {
signer_private_key: signing_key.clone(),
stacker_private_key: StacksPrivateKey::from_seed(&index.to_be_bytes()),
amount: u64::MAX as u128 - 10000,
pox_addr: Some(PoxAddress::Standard(
StacksAddress::new(
C32_ADDRESS_VERSION_TESTNET_SINGLESIG,
Hash160::from_data(&index.to_be_bytes()),
),
Some(AddressHashMode::SerializeP2PKH),
)),
max_amount: None,
})
.collect::<Vec<_>>();
let test_signers = TestSigners::new(vec![signing_key]);
let mut pox_constants = TestPeerConfig::default().burnchain.pox_constants;
pox_constants.reward_cycle_length = 10;
pox_constants.v2_unlock_height = 21;
pox_constants.pox_3_activation_height = 26;
pox_constants.v3_unlock_height = 27;
pox_constants.pox_4_activation_height = 28;

let mut boot_plan = NakamotoBootPlan::new(function_name!())
.with_test_stackers(test_stackers.clone())
.with_test_signers(test_signers.clone())
.with_private_key(private_key);
boot_plan.pox_constants = pox_constants;

info!("Creating peer");

let mut peer = boot_plan.boot_into_nakamoto_peer(vec![], None);
let mut sortition_db = peer.sortdb().reopen().unwrap();
let (chainstate, _) = &mut peer
.stacks_node
.as_mut()
.unwrap()
.chainstate
.reopen()
.unwrap();

enable_process_block_stall();

let miner_thread = std::thread::spawn(move || {
info!(" ------------------------------- MINING TENURE");
let (block, burn_height, ..) =
peer.single_block_tenure(&private_key, |_| {}, |_| {}, |_| true);
info!(" ------------------------------- TENURE MINED");
});

// Wait a bit, to ensure the miner has reached the stall
std::thread::sleep(std::time::Duration::from_secs(10));

// Lock the sortdb
info!(" ------------------------------- TRYING TO LOCK THE SORTDB");
let sort_tx = sortition_db.tx_begin().unwrap();
info!(" ------------------------------- SORTDB LOCKED");

// Un-stall the block processing
disable_process_block_stall();

// Wait a bit, to ensure the tenure will have grabbed any locks it needs
std::thread::sleep(std::time::Duration::from_secs(10));

// Lock the chainstate db
info!(" ------------------------------- TRYING TO LOCK THE CHAINSTATE");
let chainstate_tx = chainstate.chainstate_tx_begin().unwrap();

info!(" ------------------------------- SORTDB AND CHAINSTATE LOCKED");
drop(chainstate_tx);
drop(sort_tx);
info!(" ------------------------------- MAIN THREAD FINISHED");

// Wait for the blocker and miner threads to finish
miner_thread.join().unwrap();
}
70 changes: 49 additions & 21 deletions stackslib/src/chainstate/nakamoto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,31 @@ lazy_static! {
];
}

#[cfg(test)]
mod test_stall {
pub static TEST_PROCESS_BLOCK_STALL: std::sync::Mutex<Option<bool>> =
std::sync::Mutex::new(None);

pub fn stall_block_processing() {
if *TEST_PROCESS_BLOCK_STALL.lock().unwrap() == Some(true) {
// Do an extra check just so we don't log EVERY time.
warn!("Block processing is stalled due to testing directive.");
while *TEST_PROCESS_BLOCK_STALL.lock().unwrap() == Some(true) {
std::thread::sleep(std::time::Duration::from_millis(10));
}
info!("Block processing is no longer stalled due to testing directive.");
}
}

pub fn enable_process_block_stall() {
TEST_PROCESS_BLOCK_STALL.lock().unwrap().replace(true);
}

pub fn disable_process_block_stall() {
TEST_PROCESS_BLOCK_STALL.lock().unwrap().replace(false);
}
}

/// Trait for common MARF getters between StacksDBConn and StacksDBTx
pub trait StacksDBIndexed {
fn get(&mut self, tip: &StacksBlockId, key: &str) -> Result<Option<String>, DBError>;
Expand Down Expand Up @@ -1722,6 +1747,9 @@ impl NakamotoChainState {
canonical_sortition_tip: &SortitionId,
dispatcher_opt: Option<&'a T>,
) -> Result<Option<StacksEpochReceipt>, ChainstateError> {
#[cfg(test)]
test_stall::stall_block_processing();

let nakamoto_blocks_db = stacks_chain_state.nakamoto_blocks_db();
let Some((next_ready_block, block_size)) =
nakamoto_blocks_db.next_ready_nakamoto_block(stacks_chain_state.db())?
Expand Down Expand Up @@ -1992,22 +2020,14 @@ impl NakamotoChainState {
next_ready_block.header.consensus_hash
);

// set stacks block accepted
let mut sort_tx = sort_db.tx_handle_begin(canonical_sortition_tip)?;
sort_tx.set_stacks_block_accepted(
&next_ready_block.header.consensus_hash,
&next_ready_block.header.block_hash(),
next_ready_block.header.chain_length,
)?;

// this will panic if the Clarity commit fails.
clarity_commit.commit();
chainstate_tx.commit()
.unwrap_or_else(|e| {
error!("Failed to commit chainstate transaction after committing Clarity block. The chainstate database is now corrupted.";
"error" => ?e);
panic!()
});
.unwrap_or_else(|e| {
error!("Failed to commit chainstate transaction after committing Clarity block. The chainstate database is now corrupted.";
"error" => ?e);
panic!()
});

// as a separate transaction, mark this block as processed.
// This is done separately so that the staging blocks DB, which receives writes
Expand All @@ -2019,6 +2039,22 @@ impl NakamotoChainState {

let signer_bitvec = (&next_ready_block).header.pox_treatment.clone();

// set stacks block accepted
let mut sort_tx = sort_db.tx_handle_begin(canonical_sortition_tip)?;
sort_tx.set_stacks_block_accepted(
&next_ready_block.header.consensus_hash,
&next_ready_block.header.block_hash(),
next_ready_block.header.chain_length,
)?;

sort_tx
.commit()
.unwrap_or_else(|e| {
error!("Failed to commit sortition db transaction after committing chainstate and clarity block. The chainstate database is now corrupted.";
"error" => ?e);
panic!()
});

// announce the block, if we're connected to an event dispatcher
if let Some(dispatcher) = dispatcher_opt {
let block_event = (
Expand All @@ -2045,14 +2081,6 @@ impl NakamotoChainState {
);
}

sort_tx
.commit()
.unwrap_or_else(|e| {
error!("Failed to commit sortition db transaction after committing chainstate and clarity block. The chainstate database is now corrupted.";
"error" => ?e);
panic!()
});

Ok(Some(receipt))
}

Expand Down