Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: snappy downloader #5393

Open
wants to merge 38 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
28fb59b
fix: drain the relayer channel to alleviate block download pressure
jcnelson Oct 28, 2024
61d701b
chore: only consider block-bearing network results if in ibd mode or …
jcnelson Oct 28, 2024
e8cb18f
chore: shed network results when in ibd or with download backpressure…
jcnelson Oct 28, 2024
be96888
chore: fix compile issues
jcnelson Oct 28, 2024
c5ec5b3
fix: drive main loop wakeups when we're backlogged
jcnelson Oct 28, 2024
0d26d50
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Oct 29, 2024
0685670
chore: option to disable block pushes
jcnelson Oct 30, 2024
9bc3125
Merge branch 'fix/relayer-drain-channel' of https://github.com/stacks…
jcnelson Oct 30, 2024
babd3d9
feat: make NetworkResults mergeable
jcnelson Oct 31, 2024
596d41d
chore: make StackerDBSyncResult Debug and PartialEq
jcnelson Oct 31, 2024
1fc9d72
chore: test NetworkResult::update()
jcnelson Oct 31, 2024
225ada1
chore: remove logic to drain the network result channel (it's not nee…
jcnelson Oct 31, 2024
07b65cb
chore: remove dead code
jcnelson Oct 31, 2024
27e7301
chore: count download attempts, and don't download processed tenures,…
jcnelson Oct 31, 2024
d1bf24f
chore: p2p --> relayer channel only needs one slot
jcnelson Oct 31, 2024
72c9f54
chore: pub(crate) visibility to avoid private leakage
jcnelson Oct 31, 2024
9361bea
chore: log attempt failures, and only start as many downloaders as given
jcnelson Oct 31, 2024
c88d0e6
chore: log more downloader diagnostics
jcnelson Oct 31, 2024
fa493d5
chore: deprioritize unreliable peers
jcnelson Oct 31, 2024
71eccc9
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 1, 2024
26d8a4d
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 1, 2024
1074fe0
fix: off-by-one error in reward set caching logic in p2p stack
jcnelson Nov 1, 2024
e4e9b18
Merge branch 'fix/relayer-drain-channel' of https://github.com/stacks…
jcnelson Nov 1, 2024
e3b41fa
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 1, 2024
ca3b2ae
chore: API sync
jcnelson Nov 4, 2024
06108f2
fix: use burnchain tip reward cycle to infer whether or not to sync t…
jcnelson Nov 4, 2024
3369de5
chore: API sync
jcnelson Nov 4, 2024
0e1058e
chore: store burnchain DB handle in p2p network and load burnchain he…
jcnelson Nov 4, 2024
ad4faaf
chore: API sync, and test fixes
jcnelson Nov 4, 2024
4365ebf
chore: API sync
jcnelson Nov 4, 2024
19b7c94
Merge branch 'fix/relayer-drain-channel' of https://github.com/stacks…
jcnelson Nov 4, 2024
30292cb
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 4, 2024
18be1fe
chore: address PR feedback
jcnelson Nov 4, 2024
bce9839
fix: disconnect from neighbors serving unconfirmed tenures that are s…
jcnelson Nov 5, 2024
5ed737a
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 5, 2024
ead50a8
fix: fix unit tests that broke due to PeerNetwork needing an existing…
jcnelson Nov 5, 2024
9731059
Merge branch 'develop' into fix/relayer-drain-channel
jcnelson Nov 5, 2024
45adc33
chore: address remaining PR feedback and get tests to pass
jcnelson Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 155 additions & 131 deletions stackslib/src/net/chat.rs

Large diffs are not rendered by default.

46 changes: 36 additions & 10 deletions stackslib/src/net/download/nakamoto/download_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ use crate::net::server::HttpPeer;
use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey};
use crate::util_lib::db::{DBConn, Error as DBError};

/// How often to check for unconfirmed tenures
const CHECK_UNCONFIRMED_TENURES_MS: u128 = 1_000;

/// The overall downloader can operate in one of two states:
/// * it's doing IBD, in which case it's downloading tenures using neighbor inventories and
/// the start/end block ID hashes obtained from block-commits. This works up until the last two
Expand Down Expand Up @@ -118,6 +121,10 @@ pub struct NakamotoDownloadStateMachine {
pub(super) neighbor_rpc: NeighborRPC,
/// Nakamoto chain tip
nakamoto_tip: StacksBlockId,
/// do we need to fetch unconfirmed tenures?
fetch_unconfirmed_tenures: bool,
/// last time an unconfirmed tenures was checked
last_unconfirmed_download_check_ms: u128,
/// last time an unconfirmed downloader was run
last_unconfirmed_download_run_ms: u128,
}
Expand All @@ -139,6 +146,8 @@ impl NakamotoDownloadStateMachine {
unconfirmed_tenure_downloads: HashMap::new(),
neighbor_rpc: NeighborRPC::new(),
nakamoto_tip,
fetch_unconfirmed_tenures: false,
last_unconfirmed_download_check_ms: 0,
last_unconfirmed_download_run_ms: 0,
}
}
Expand Down Expand Up @@ -1218,6 +1227,7 @@ impl NakamotoDownloadStateMachine {
) {
Ok(blocks_opt) => blocks_opt,
Err(NetError::StaleView) => {
neighbor_rpc.add_dead(network, &naddr);
continue;
}
Err(e) => {
Expand Down Expand Up @@ -1426,14 +1436,30 @@ impl NakamotoDownloadStateMachine {
);

// check this now, since we mutate self.available
let need_unconfirmed_tenures = Self::need_unconfirmed_tenures(
burnchain_height,
&network.burnchain_tip,
&self.wanted_tenures,
self.prev_wanted_tenures.as_ref().unwrap_or(&vec![]),
&self.tenure_block_ids,
&self.available_tenures,
);
self.fetch_unconfirmed_tenures = if self
.last_unconfirmed_download_check_ms
.saturating_add(CHECK_UNCONFIRMED_TENURES_MS)
> get_epoch_time_ms()
{
debug!(
"Throttle checking for unconfirmed tenures until {}",
self.last_unconfirmed_download_check_ms
.saturating_add(CHECK_UNCONFIRMED_TENURES_MS)
/ 1000
);
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
false
} else {
let do_fetch = Self::need_unconfirmed_tenures(
burnchain_height,
&network.burnchain_tip,
&self.wanted_tenures,
self.prev_wanted_tenures.as_ref().unwrap_or(&vec![]),
&self.tenure_block_ids,
&self.available_tenures,
);
self.last_unconfirmed_download_check_ms = get_epoch_time_ms();
do_fetch
};

match self.state {
NakamotoDownloadState::Confirmed => {
Expand All @@ -1443,7 +1469,7 @@ impl NakamotoDownloadStateMachine {
.expect("FATAL: max_inflight_blocks exceeds usize::MAX"),
);

if self.tenure_downloads.is_empty() && need_unconfirmed_tenures {
if self.tenure_downloads.is_empty() && self.fetch_unconfirmed_tenures {
debug!(
"Transition from {} to {}",
&self.state,
Expand Down Expand Up @@ -1488,7 +1514,7 @@ impl NakamotoDownloadStateMachine {
} else if self.unconfirmed_tenure_downloads.is_empty()
&& self.unconfirmed_tenure_download_schedule.is_empty()
{
if need_unconfirmed_tenures {
if self.fetch_unconfirmed_tenures {
// do this again
self.unconfirmed_tenure_download_schedule =
Self::make_unconfirmed_tenure_download_schedule(
Expand Down
26 changes: 4 additions & 22 deletions stackslib/src/net/download/nakamoto/tenure_downloader_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl From<&mut NakamotoTenureDownloader> for CompletedTenure {
}
}

pub const PEER_DEPRIORITIZATION_TIME: u64 = 60;
pub const PEER_DEPRIORITIZATION_TIME_SECS: u64 = 60;

/// A set of confirmed downloader state machines assigned to one or more neighbors. The block
/// downloader runs tenure-downloaders in parallel, since the downloader for the N+1'st tenure
Expand Down Expand Up @@ -152,7 +152,7 @@ impl NakamotoTenureDownloaderSet {
) {
deprioritized_peers.insert(
peer.clone(),
get_epoch_time_secs() + PEER_DEPRIORITIZATION_TIME,
get_epoch_time_secs() + PEER_DEPRIORITIZATION_TIME_SECS,
);
}

Expand Down Expand Up @@ -482,20 +482,11 @@ impl NakamotoTenureDownloaderSet {
continue;
};

let attempt_count = if let Some(attempt_count) = self.attempted_tenures.get(&ch) {
*attempt_count
} else {
0
};
let attempt_count = *self.attempted_tenures.get(&ch).unwrap_or(&0);
self.attempted_tenures
.insert(ch.clone(), attempt_count.saturating_add(1));

let attempt_failed_count =
if let Some(attempt_failed_count) = self.attempt_failed_tenures.get(&ch) {
*attempt_failed_count
} else {
0
};
let attempt_failed_count = *self.attempt_failed_tenures.get(&ch).unwrap_or(&0);

info!("Download tenure {}", &ch;
"peer" => %naddr,
Expand All @@ -511,15 +502,6 @@ impl NakamotoTenureDownloaderSet {
"tenure_end_reward_cycle" => tenure_info.end_reward_cycle,
"tenure_burn_height" => tenure_info.tenure_id_burn_block_height);

debug!(
"Download tenure {} (start={}, end={}) (rc {},{}) burn_height {}",
&ch,
&tenure_info.start_block_id,
&tenure_info.end_block_id,
tenure_info.start_reward_cycle,
tenure_info.end_reward_cycle,
tenure_info.tenure_id_burn_block_height,
);
let tenure_download = NakamotoTenureDownloader::new(
ch.clone(),
tenure_info.start_block_id.clone(),
Expand Down
37 changes: 26 additions & 11 deletions stackslib/src/net/inv/nakamoto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,10 @@ impl NakamotoTenureInv {

/// Reset synchronization state for this peer. Don't remove inventory data; just make it so we
/// can talk to the peer again
pub fn try_reset_comms(&mut self, inv_sync_interval: u64, start_rc: u64, cur_rc: u64) {
pub fn try_reset_comms(&mut self, inv_sync_interval: u64, start_rc: u64, max_rc: u64) {
let now = get_epoch_time_secs();
if self.start_sync_time + inv_sync_interval <= now
&& (self.cur_reward_cycle >= cur_rc || !self.online)
&& (self.cur_reward_cycle >= max_rc || !self.online)
{
self.reset_comms(start_rc);
}
Expand Down Expand Up @@ -618,20 +618,20 @@ impl NakamotoTenureInv {
pub fn getnakamotoinv_begin(
&mut self,
network: &mut PeerNetwork,
current_reward_cycle: u64,
max_reward_cycle: u64,
) -> bool {
debug!(
"{:?}: Begin Nakamoto inventory sync for {} in cycle {}",
network.get_local_peer(),
self.neighbor_address,
current_reward_cycle,
max_reward_cycle,
);

// possibly reset communications with this peer, if it's time to do so.
self.try_reset_comms(
network.get_connection_opts().inv_sync_interval,
current_reward_cycle.saturating_sub(network.get_connection_opts().inv_reward_cycles),
current_reward_cycle,
max_reward_cycle.saturating_sub(network.get_connection_opts().inv_reward_cycles),
max_reward_cycle,
);
if !self.is_online() {
// don't talk to this peer for now
Expand All @@ -643,7 +643,7 @@ impl NakamotoTenureInv {
return false;
}

if self.reward_cycle() > current_reward_cycle {
if self.reward_cycle() > max_reward_cycle {
// we've fully sync'ed with this peer
debug!(
"{:?}: fully sync'ed: {}",
Expand Down Expand Up @@ -908,10 +908,24 @@ impl<NC: NeighborComms> NakamotoInvStateMachine<NC> {
)
});

// try to get all of the reward cycles we know about, plus the next one. We try to get
// the next one as well in case we're at a reward cycle boundary, but we're not at the
// chain tip -- the block downloader still needs that next inventory to proceed.
let proceed = inv.getnakamotoinv_begin(network, current_reward_cycle.saturating_add(1));
let burnchain_tip_reward_cycle = sortdb
.pox_constants
.block_height_to_reward_cycle(
sortdb.first_block_height,
network.stacks_tip.burnchain_height,
)
.ok_or(NetError::ChainstateError(
"block height comes before system start".into(),
))?;

let max_reward_cycle = if burnchain_tip_reward_cycle > current_reward_cycle {
// try to sync up to the next reward cycle
current_reward_cycle.saturating_add(1)
} else {
current_reward_cycle
};

let proceed = inv.getnakamotoinv_begin(network, max_reward_cycle);
let inv_rc = inv.reward_cycle();
new_inventories.insert(naddr.clone(), inv);

Expand Down Expand Up @@ -946,6 +960,7 @@ impl<NC: NeighborComms> NakamotoInvStateMachine<NC> {
"peer" => ?naddr,
"error" => ?e
);
continue;
}
}

Expand Down
3 changes: 3 additions & 0 deletions stackslib/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3169,10 +3169,13 @@ pub mod test {
let stackerdb_contracts: Vec<_> =
stacker_db_syncs.keys().map(|cid| cid.clone()).collect();

let burnchain_db = config.burnchain.open_burnchain_db(false).unwrap();

let mut peer_network = PeerNetwork::new(
peerdb,
atlasdb,
p2p_stacker_dbs,
burnchain_db,
local_peer,
config.peer_version,
config.burnchain.clone(),
Expand Down
30 changes: 24 additions & 6 deletions stackslib/src/net/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,18 @@ impl CurrentRewardSet {
/// Cached stacks chain tip info, consumed by RPC endpoints
#[derive(Clone, Debug, PartialEq)]
pub struct StacksTipInfo {
/// consensus hash of the highest processed stacks block
pub consensus_hash: ConsensusHash,
/// block hash of the highest processed stacks block
pub block_hash: BlockHeaderHash,
/// height of the highest processed stacks block
pub height: u64,
/// coinbase height of the highest processed tenure
pub coinbase_height: u64,
/// whether or not the system has transitioned to Nakamoto
pub is_nakamoto: bool,
/// highest burnchain block discovered
pub burnchain_height: u64,
}

impl StacksTipInfo {
Expand All @@ -258,6 +265,7 @@ impl StacksTipInfo {
height: 0,
coinbase_height: 0,
is_nakamoto: false,
burnchain_height: 0,
}
}

Expand Down Expand Up @@ -306,6 +314,9 @@ pub struct PeerNetwork {
pub peerdb: PeerDB,
pub atlasdb: AtlasDB,

// handle to burnchain DB
pub burnchain_db: BurnchainDB,

// ongoing p2p conversations (either they reached out to us, or we to them)
pub peers: PeerMap,
pub sockets: HashMap<usize, mio_net::TcpStream>,
Expand Down Expand Up @@ -444,6 +455,7 @@ impl PeerNetwork {
peerdb: PeerDB,
atlasdb: AtlasDB,
stackerdbs: StackerDBs,
burnchain_db: BurnchainDB,
mut local_peer: LocalPeer,
peer_version: u32,
burnchain: Burnchain,
Expand Down Expand Up @@ -509,6 +521,8 @@ impl PeerNetwork {
peerdb,
atlasdb,

burnchain_db,

peers: PeerMap::new(),
sockets: HashMap::new(),
events: HashMap::new(),
Expand Down Expand Up @@ -4257,6 +4271,7 @@ impl PeerNetwork {
.anchored_header
.as_stacks_nakamoto()
.is_some(),
burnchain_height: self.stacks_tip.burnchain_height,
};
debug!(
"{:?}: Parent Stacks tip off of {} is {:?}",
Expand Down Expand Up @@ -4387,6 +4402,7 @@ impl PeerNetwork {
let (stacks_tip_ch, stacks_tip_bhh, stacks_tip_height) =
SortitionDB::get_canonical_stacks_chain_tip_hash_and_height(sortdb.conn())?;

let new_burnchain_tip = self.burnchain_db.get_canonical_chain_tip()?;
let burnchain_tip_changed = canonical_sn.block_height != self.chain_view.burn_block_height
|| self.num_state_machine_passes == 0
|| canonical_sn.sortition_id != self.burnchain_tip.sortition_id;
Expand Down Expand Up @@ -4465,6 +4481,7 @@ impl PeerNetwork {
height: 0,
coinbase_height: 0,
is_nakamoto: false,
burnchain_height: 0,
}
}
Err(e) => return Err(e),
Expand Down Expand Up @@ -4536,12 +4553,10 @@ impl PeerNetwork {

if self.get_current_epoch().epoch_id < StacksEpochId::Epoch30 {
// update heaviest affirmation map view
let burnchain_db = self.burnchain.open_burnchain_db(false)?;

self.heaviest_affirmation_map = static_get_heaviest_affirmation_map(
&self.burnchain,
indexer,
&burnchain_db,
&self.burnchain_db,
sortdb,
&canonical_sn.sortition_id,
)
Expand All @@ -4552,7 +4567,7 @@ impl PeerNetwork {
self.tentative_best_affirmation_map = static_get_canonical_affirmation_map(
&self.burnchain,
indexer,
&burnchain_db,
&self.burnchain_db,
sortdb,
chainstate,
&canonical_sn.sortition_id,
Expand Down Expand Up @@ -4593,9 +4608,8 @@ impl PeerNetwork {
if stacks_tip_changed && self.get_current_epoch().epoch_id < StacksEpochId::Epoch30 {
// update stacks tip affirmation map view
// (NOTE: this check has to happen _after_ self.chain_view gets updated!)
let burnchain_db = self.burnchain.open_burnchain_db(false)?;
self.stacks_tip_affirmation_map = static_get_stacks_tip_affirmation_map(
&burnchain_db,
&self.burnchain_db,
sortdb,
&canonical_sn.sortition_id,
&canonical_sn.canonical_stacks_tip_consensus_hash,
Expand Down Expand Up @@ -4661,8 +4675,10 @@ impl PeerNetwork {
height: stacks_tip_height,
coinbase_height,
is_nakamoto: stacks_tip_is_nakamoto,
burnchain_height: new_burnchain_tip.block_height,
};
self.parent_stacks_tip = parent_stacks_tip;
self.parent_stacks_tip.burnchain_height = new_burnchain_tip.block_height;

debug!(
"{:?}: canonical Stacks tip is now {:?}",
Expand Down Expand Up @@ -5299,12 +5315,14 @@ mod test {
let atlas_config = AtlasConfig::new(false);
let atlasdb = AtlasDB::connect_memory(atlas_config).unwrap();
let stacker_db = StackerDBs::connect_memory();
let burnchain_db = burnchain.open_burnchain_db(false).unwrap();

let local_peer = PeerDB::get_local_peer(db.conn()).unwrap();
let p2p = PeerNetwork::new(
db,
atlasdb,
stacker_db,
burnchain_db,
local_peer,
0x12345678,
burnchain,
Expand Down
Loading
Loading