Skip to content

Commit

Permalink
test: start of testing for #2913. adds a mined_block event to the dis…
Browse files Browse the repository at this point in the history
…patcher
  • Loading branch information
kantai committed Nov 9, 2021
1 parent 7abd0c2 commit f688e8c
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/chainstate/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub trait BlockEventDispatcher {
parent_burn_block_hash: BurnchainHeaderHash,
parent_burn_block_height: u32,
parent_burn_block_timestamp: u64,
anchored_consumed: &ExecutionCost,
mblock_confirmed_consumed: &ExecutionCost,
);

/// called whenever a burn block is about to be
Expand Down Expand Up @@ -833,6 +835,8 @@ impl<
block_receipt.parent_burn_block_hash,
block_receipt.parent_burn_block_height,
block_receipt.parent_burn_block_timestamp,
&block_receipt.anchored_block_cost,
&block_receipt.parent_microblocks_cost,
);
}

Expand Down
9 changes: 9 additions & 0 deletions src/chainstate/stacks/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,15 @@ impl StacksBlockBuilder {

let ts_end = get_epoch_time_ms();

if let Some(observer) = event_observer {
observer.mined_block_event(
SortitionDB::get_canonical_burn_chain_tip(burn_dbconn.conn())?.block_height + 1,
&block,
size,
&consumed,
);
}

debug!(
"Miner: mined anchored block";
"block_hash" => %block.block_hash(),
Expand Down
8 changes: 8 additions & 0 deletions src/core/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use vm::types::PrincipalData;
use clarity_vm::clarity::ClarityConnection;

use crate::chainstate::stacks::events::StacksTransactionReceipt;
use crate::chainstate::stacks::StacksBlock;
use crate::codec::StacksMessageCodec;
use crate::cost_estimates;
use crate::cost_estimates::metrics::CostMetric;
Expand Down Expand Up @@ -144,6 +145,13 @@ impl std::fmt::Display for MemPoolDropReason {

pub trait MemPoolEventDispatcher {
fn mempool_txs_dropped(&self, txids: Vec<Txid>, reason: MemPoolDropReason);
fn mined_block_event(
&self,
target_burn_height: u64,
block: &StacksBlock,
block_size_bytes: u64,
consumed: &ExecutionCost,
);
}

#[derive(Debug, PartialEq, Clone)]
Expand Down
1 change: 1 addition & 0 deletions testnet/stacks-node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,7 @@ pub enum EventKeyType {
Microblocks,
AnyEvent,
BurnchainBlocks,
MinedBlocks,
}

impl EventKeyType {
Expand Down
73 changes: 73 additions & 0 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use stacks::net::atlas::{Attachment, AttachmentInstance};
use stacks::types::chainstate::{BurnchainHeaderHash, StacksAddress, StacksBlockId};
use stacks::util::hash::bytes_to_hex;
use stacks::vm::analysis::contract_interface_builder::build_contract_interface;
use stacks::vm::costs::ExecutionCost;
use stacks::vm::types::{AssetIdentifier, QualifiedContractIdentifier, Value};

use super::config::{EventKeyType, EventObserverConfig};
Expand Down Expand Up @@ -59,10 +60,20 @@ const STATUS_RESP_POST_CONDITION: &str = "abort_by_post_condition";
pub const PATH_MICROBLOCK_SUBMIT: &str = "new_microblocks";
pub const PATH_MEMPOOL_TX_SUBMIT: &str = "new_mempool_tx";
pub const PATH_MEMPOOL_TX_DROP: &str = "drop_mempool_tx";
pub const PATH_MINED_BLOCK: &str = "mined_block";
pub const PATH_BURN_BLOCK_SUBMIT: &str = "new_burn_block";
pub const PATH_BLOCK_PROCESSED: &str = "new_block";
pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new";

#[derive(Clone, Serialize, Deserialize)]
pub struct MinedBlockEvent {
pub target_burn_block: u64,
pub block_hash: String,
pub stacks_height: u64,
pub block_size: u64,
pub anchor_consumed: ExecutionCost,
}

impl EventObserver {
fn send_payload(&self, payload: &serde_json::Value, path: &str) {
let body = match serde_json::to_vec(&payload) {
Expand Down Expand Up @@ -295,6 +306,10 @@ impl EventObserver {
self.send_payload(payload, PATH_MEMPOOL_TX_DROP);
}

fn send_mined_block(&self, payload: &serde_json::Value) {
self.send_payload(payload, PATH_MINED_BLOCK);
}

fn send_new_burn_block(&self, payload: &serde_json::Value) {
self.send_payload(payload, PATH_BURN_BLOCK_SUBMIT);
}
Expand All @@ -310,6 +325,8 @@ impl EventObserver {
parent_burn_block_hash: BurnchainHeaderHash,
parent_burn_block_height: u32,
parent_burn_block_timestamp: u64,
anchored_consumed: &ExecutionCost,
mblock_confirmed_consumed: &ExecutionCost,
) {
// Serialize events to JSON
let serialized_events: Vec<serde_json::Value> = filtered_events
Expand Down Expand Up @@ -347,6 +364,8 @@ impl EventObserver {
"parent_burn_block_hash": format!("0x{}", parent_burn_block_hash),
"parent_burn_block_height": parent_burn_block_height,
"parent_burn_block_timestamp": parent_burn_block_timestamp,
"anchored_block_consumed_cost": anchored_consumed,
"microblocks_confirmed_consumed_cost": mblock_confirmed_consumed,
});

// Send payload
Expand All @@ -364,6 +383,7 @@ pub struct EventDispatcher {
microblock_observers_lookup: HashSet<u16>,
stx_observers_lookup: HashSet<u16>,
any_event_observers_lookup: HashSet<u16>,
miner_observers_lookup: HashSet<u16>,
boot_receipts: Arc<Mutex<Option<Vec<StacksTransactionReceipt>>>>,
}

Expand All @@ -373,6 +393,16 @@ impl MemPoolEventDispatcher for EventDispatcher {
self.process_dropped_mempool_txs(txids, reason)
}
}

fn mined_block_event(
&self,
target_burn_height: u64,
block: &StacksBlock,
block_size_bytes: u64,
consumed: &ExecutionCost,
) {
self.process_mined_block_event(target_burn_height, block, block_size_bytes, consumed)
}
}

impl BlockEventDispatcher for EventDispatcher {
Expand All @@ -388,6 +418,8 @@ impl BlockEventDispatcher for EventDispatcher {
parent_burn_block_hash: BurnchainHeaderHash,
parent_burn_block_height: u32,
parent_burn_block_timestamp: u64,
anchored_consumed: &ExecutionCost,
mblock_confirmed_consumed: &ExecutionCost,
) {
let chain_tip = ChainTip {
metadata,
Expand All @@ -403,6 +435,8 @@ impl BlockEventDispatcher for EventDispatcher {
parent_burn_block_hash,
parent_burn_block_height,
parent_burn_block_timestamp,
anchored_consumed,
mblock_confirmed_consumed,
)
}

Expand Down Expand Up @@ -440,6 +474,7 @@ impl EventDispatcher {
mempool_observers_lookup: HashSet::new(),
microblock_observers_lookup: HashSet::new(),
boot_receipts: Arc::new(Mutex::new(None)),
miner_observers_lookup: HashSet::new(),
}
}

Expand Down Expand Up @@ -587,6 +622,8 @@ impl EventDispatcher {
parent_burn_block_hash: BurnchainHeaderHash,
parent_burn_block_height: u32,
parent_burn_block_timestamp: u64,
anchored_consumed: &ExecutionCost,
mblock_confirmed_consumed: &ExecutionCost,
) {
let boot_receipts = if chain_tip.metadata.block_height == 1 {
let mut boot_receipts_result = self
Expand Down Expand Up @@ -649,6 +686,8 @@ impl EventDispatcher {
parent_burn_block_hash,
parent_burn_block_height,
parent_burn_block_timestamp,
anchored_consumed,
mblock_confirmed_consumed,
);
}
}
Expand Down Expand Up @@ -736,6 +775,37 @@ impl EventDispatcher {
}
}

pub fn process_mined_block_event(
&self,
target_burn_height: u64,
block: &StacksBlock,
block_size_bytes: u64,
consumed: &ExecutionCost,
) {
let interested_observers: Vec<_> = self
.registered_observers
.iter()
.enumerate()
.filter(|(obs_id, _observer)| self.miner_observers_lookup.contains(&(*obs_id as u16)))
.collect();
if interested_observers.len() < 1 {
return;
}

let payload = serde_json::to_value(MinedBlockEvent {
target_burn_block: target_burn_height,
block_hash: block.block_hash().to_string(),
stacks_height: block.header.total_work.work,
block_size: block_size_bytes,
anchor_consumed: consumed.clone(),
})
.unwrap();

for (_, observer) in interested_observers.iter() {
observer.send_mined_block(&payload);
}
}

pub fn process_dropped_mempool_txs(&self, txs: Vec<Txid>, reason: MemPoolDropReason) {
// lazily assemble payload only if we have observers
let interested_observers: Vec<_> = self
Expand Down Expand Up @@ -857,6 +927,9 @@ impl EventDispatcher {
EventKeyType::AnyEvent => {
self.any_event_observers_lookup.insert(observer_index);
}
EventKeyType::MinedBlocks => {
self.miner_observers_lookup.insert(observer_index);
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion testnet/stacks-node/src/neon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ impl InitializedNeonNode {
}
}

let (anchored_block, _, _) = match StacksBlockBuilder::build_anchored_block(
let (anchored_block, consumed_cost, _) = match StacksBlockBuilder::build_anchored_block(
chain_state,
&burn_db.index_conn(),
mem_pool,
Expand Down Expand Up @@ -1935,6 +1935,12 @@ impl InitializedNeonNode {
attempt
);

eprintln!(
"Assembled block at burn height = {}, consumed.runtime = {}",
burn_block.block_height + 1,
consumed_cost.runtime
);

// let's figure out the recipient set!
let recipients = match get_next_recipients(
&burn_block,
Expand Down
2 changes: 2 additions & 0 deletions testnet/stacks-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ impl Node {
parent_burn_block_hash,
parent_burn_block_height,
parent_burn_block_timestamp,
&processed_block.anchored_block_cost,
&processed_block.parent_microblocks_cost,
);

self.chain_tip = Some(chain_tip.clone());
Expand Down
Loading

0 comments on commit f688e8c

Please sign in to comment.