From 4f0f7162b699103623ab0209c748eff1c965ec0e Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Fri, 21 Jun 2024 13:07:07 -0500 Subject: [PATCH] feat: push blocks to signer set * add /v3/blocks/upload, and relayer handling for it * push blocks from miner to signer set, signer set pushes to their stacks-node * configure the signer set test to rely on signers to push the block (by disabling p2p block push) --- libsigner/src/libsigner.rs | 2 +- libsigner/src/v0/messages.rs | 65 ++++--- libsigner/src/v1/messages.rs | 4 +- stacks-signer/src/client/stackerdb.rs | 6 +- stacks-signer/src/client/stacks_client.rs | 19 ++ stacks-signer/src/v0/signer.rs | 20 +- stackslib/src/net/api/mod.rs | 3 + stackslib/src/net/api/postblock_v3.rs | 174 ++++++++++++++++++ stackslib/src/net/mod.rs | 10 +- stackslib/src/net/relay.rs | 61 ++++-- stackslib/src/net/stackerdb/mod.rs | 2 +- testnet/stacks-node/src/nakamoto_node.rs | 2 + .../stacks-node/src/nakamoto_node/miner.rs | 91 ++++++--- .../src/nakamoto_node/sign_coordinator.rs | 147 +++++++++------ testnet/stacks-node/src/tests/signer/v0.rs | 8 +- 15 files changed, 478 insertions(+), 136 deletions(-) create mode 100644 stackslib/src/net/api/postblock_v3.rs diff --git a/libsigner/src/libsigner.rs b/libsigner/src/libsigner.rs index 43d8e5b687..0da4e68a8f 100644 --- a/libsigner/src/libsigner.rs +++ b/libsigner/src/libsigner.rs @@ -74,5 +74,5 @@ pub trait MessageSlotID: Sized + Eq + Hash + Debug + Copy { /// A trait for signer messages used in signer communciation pub trait SignerMessage: StacksMessageCodec { /// The contract identifier for the message slot in stacker db - fn msg_id(&self) -> T; + fn msg_id(&self) -> Option; } diff --git a/libsigner/src/v0/messages.rs b/libsigner/src/v0/messages.rs index 4d32253f2e..f831aa9e99 100644 --- a/libsigner/src/v0/messages.rs +++ b/libsigner/src/v0/messages.rs @@ -64,12 +64,19 @@ define_u8_enum!( /// Enum representing the stackerdb message identifier: this is /// the contract index in the signers contracts (i.e., X in signers-0-X) MessageSlotID { - /// Block Proposal message from miners - BlockProposal = 0, /// Block Response message from signers BlockResponse = 1 }); +define_u8_enum!( +/// Enum representing the slots used by the miner +MinerSlotID { + /// Block proposal from the miner + BlockProposal = 0, + /// Block pushed from the miner + BlockPushed = 1 +}); + impl MessageSlotIDTrait for MessageSlotID { fn stacker_db_contract(&self, mainnet: bool, reward_cycle: u64) -> QualifiedContractIdentifier { NakamotoSigners::make_signers_db_contract_id(reward_cycle, self.to_u32(), mainnet) @@ -80,7 +87,7 @@ impl MessageSlotIDTrait for MessageSlotID { } impl SignerMessageTrait for SignerMessage { - fn msg_id(&self) -> MessageSlotID { + fn msg_id(&self) -> Option { self.msg_id() } } @@ -91,7 +98,9 @@ SignerMessageTypePrefix { /// Block Proposal message from miners BlockProposal = 0, /// Block Response message from signers - BlockResponse = 1 + BlockResponse = 1, + /// Block Pushed message from miners + BlockPushed = 2 }); #[cfg_attr(test, mutants::skip)] @@ -133,67 +142,65 @@ impl From<&SignerMessage> for SignerMessageTypePrefix { match message { SignerMessage::BlockProposal(_) => SignerMessageTypePrefix::BlockProposal, SignerMessage::BlockResponse(_) => SignerMessageTypePrefix::BlockResponse, + SignerMessage::BlockPushed(_) => SignerMessageTypePrefix::BlockPushed, } } } /// The messages being sent through the stacker db contracts -#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum SignerMessage { /// The block proposal from miners for signers to observe and sign BlockProposal(BlockProposal), /// The block response from signers for miners to observe BlockResponse(BlockResponse), -} - -impl Debug for SignerMessage { - #[cfg_attr(test, mutants::skip)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::BlockProposal(b) => Debug::fmt(b, f), - Self::BlockResponse(b) => Debug::fmt(b, f), - } - } + /// A block pushed from miners to the signers set + BlockPushed(NakamotoBlock), } impl SignerMessage { /// Helper function to determine the slot ID for the provided stacker-db writer id + /// Not every message has a `MessageSlotID`: messages from the miner do not + /// broadcast over `.signers-0-X` contracts. #[cfg_attr(test, mutants::skip)] - pub fn msg_id(&self) -> MessageSlotID { + pub fn msg_id(&self) -> Option { match self { - Self::BlockProposal(_) => MessageSlotID::BlockProposal, - Self::BlockResponse(_) => MessageSlotID::BlockResponse, + Self::BlockProposal(_) | Self::BlockPushed(_) => None, + Self::BlockResponse(_) => Some(MessageSlotID::BlockResponse), } } } impl StacksMessageCodec for SignerMessage { fn consensus_serialize(&self, fd: &mut W) -> Result<(), CodecError> { - write_next(fd, &(SignerMessageTypePrefix::from(self) as u8))?; + SignerMessageTypePrefix::from(self) + .to_u8() + .consensus_serialize(fd)?; match self { - SignerMessage::BlockProposal(block_proposal) => { - write_next(fd, block_proposal)?; - } - SignerMessage::BlockResponse(block_response) => { - write_next(fd, block_response)?; - } - }; + SignerMessage::BlockProposal(block_proposal) => block_proposal.consensus_serialize(fd), + SignerMessage::BlockResponse(block_response) => block_response.consensus_serialize(fd), + SignerMessage::BlockPushed(block) => block.consensus_serialize(fd), + }?; Ok(()) } #[cfg_attr(test, mutants::skip)] fn consensus_deserialize(fd: &mut R) -> Result { - let type_prefix_byte = read_next::(fd)?; + let type_prefix_byte = u8::consensus_deserialize(fd)?; let type_prefix = SignerMessageTypePrefix::try_from(type_prefix_byte)?; let message = match type_prefix { SignerMessageTypePrefix::BlockProposal => { - let block_proposal = read_next::(fd)?; + let block_proposal = StacksMessageCodec::consensus_deserialize(fd)?; SignerMessage::BlockProposal(block_proposal) } SignerMessageTypePrefix::BlockResponse => { - let block_response = read_next::(fd)?; + let block_response = StacksMessageCodec::consensus_deserialize(fd)?; SignerMessage::BlockResponse(block_response) } + SignerMessageTypePrefix::BlockPushed => { + let block = StacksMessageCodec::consensus_deserialize(fd)?; + SignerMessage::BlockPushed(block) + } }; Ok(message) } diff --git a/libsigner/src/v1/messages.rs b/libsigner/src/v1/messages.rs index 30bd4a5769..b412d9a66f 100644 --- a/libsigner/src/v1/messages.rs +++ b/libsigner/src/v1/messages.rs @@ -110,8 +110,8 @@ impl MessageSlotIDTrait for MessageSlotID { } impl SignerMessageTrait for SignerMessage { - fn msg_id(&self) -> MessageSlotID { - self.msg_id() + fn msg_id(&self) -> Option { + Some(self.msg_id()) } } diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index e5ccb5a89f..e0c6cb1621 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -94,7 +94,11 @@ impl StackerDB { &mut self, message: T, ) -> Result { - let msg_id = message.msg_id(); + let msg_id = message.msg_id().ok_or_else(|| { + ClientError::PutChunkRejected( + "Tried to send a SignerMessage which does not have a corresponding .signers slot identifier".into() + ) + })?; let message_bytes = message.serialize_to_vec(); self.send_message_bytes_with_retry(&msg_id, message_bytes) } diff --git a/stacks-signer/src/client/stacks_client.rs b/stacks-signer/src/client/stacks_client.rs index d85f6d9b7a..0f330a2e96 100644 --- a/stacks-signer/src/client/stacks_client.rs +++ b/stacks-signer/src/client/stacks_client.rs @@ -35,7 +35,9 @@ use blockstack_lib::net::api::getinfo::RPCPeerInfoData; use blockstack_lib::net::api::getpoxinfo::RPCPoxInfoData; use blockstack_lib::net::api::getsortition::{SortitionInfo, RPC_SORTITION_INFO_PATH}; use blockstack_lib::net::api::getstackers::GetStackersResponse; +use blockstack_lib::net::api::postblock::StacksBlockAcceptedData; use blockstack_lib::net::api::postblock_proposal::NakamotoBlockProposal; +use blockstack_lib::net::api::postblock_v3; use blockstack_lib::net::api::postfeerate::{FeeRateEstimateRequestBody, RPCFeeEstimateResponse}; use blockstack_lib::util_lib::boot::{boot_code_addr, boot_code_id}; use clarity::util::hash::to_hex; @@ -655,6 +657,23 @@ impl StacksClient { Ok(unsigned_tx) } + /// Try to post a completed nakamoto block to our connected stacks-node + /// Returns `true` if the block was accepted or `false` if the block + /// was rejected. + pub fn post_block(&self, block: &NakamotoBlock) -> Result { + let response = self + .stacks_node_client + .post(format!("{}{}", self.http_origin, postblock_v3::PATH)) + .header("Content-Type", "application/octet-stream") + .body(block.serialize_to_vec()) + .send()?; + if !response.status().is_success() { + return Err(ClientError::RequestFailure(response.status())); + } + let post_block_resp = response.json::()?; + Ok(post_block_resp.accepted) + } + /// Helper function to submit a transaction to the Stacks mempool pub fn submit_transaction(&self, tx: &StacksTransaction) -> Result { let txid = tx.txid(); diff --git a/stacks-signer/src/v0/signer.rs b/stacks-signer/src/v0/signer.rs index b1a11546f0..962c1f623b 100644 --- a/stacks-signer/src/v0/signer.rs +++ b/stacks-signer/src/v0/signer.rs @@ -21,9 +21,9 @@ use clarity::types::PrivateKey; use clarity::util::hash::MerkleHashFunc; use libsigner::v0::messages::{BlockResponse, MessageSlotID, RejectCode, SignerMessage}; use libsigner::{BlockProposal, SignerEvent}; -use slog::{slog_debug, slog_error, slog_warn}; +use slog::{slog_debug, slog_error, slog_info, slog_warn}; use stacks_common::types::chainstate::StacksAddress; -use stacks_common::{debug, error, warn}; +use stacks_common::{debug, error, info, warn}; use crate::client::{SignerSlotID, StackerDB, StacksClient}; use crate::config::SignerConfig; @@ -118,8 +118,20 @@ impl SignerTrait for Signer { messages.len(); ); for message in messages { - if let SignerMessage::BlockProposal(block_proposal) = message { - self.handle_block_proposal(stacks_client, block_proposal); + match message { + SignerMessage::BlockProposal(block_proposal) => { + self.handle_block_proposal(stacks_client, block_proposal); + } + SignerMessage::BlockPushed(b) => { + let block_push_result = stacks_client.post_block(&b); + info!( + "{self}: Got block pushed message"; + "block_id" => %b.block_id(), + "signer_sighash" => %b.header.signer_signature_hash(), + "push_result" => ?block_push_result, + ); + } + _ => {} } } } diff --git a/stackslib/src/net/api/mod.rs b/stackslib/src/net/api/mod.rs index 34fa1ec4c3..d256c15b97 100644 --- a/stackslib/src/net/api/mod.rs +++ b/stackslib/src/net/api/mod.rs @@ -66,6 +66,8 @@ pub mod gettransaction_unconfirmed; pub mod liststackerdbreplicas; pub mod postblock; pub mod postblock_proposal; +#[warn(unused_imports)] +pub mod postblock_v3; pub mod postfeerate; pub mod postmempoolquery; pub mod postmicroblock; @@ -129,6 +131,7 @@ impl StacksHttp { self.register_rpc_endpoint(postblock_proposal::RPCBlockProposalRequestHandler::new( self.block_proposal_token.clone(), )); + self.register_rpc_endpoint(postblock_v3::RPCPostBlockRequestHandler::default()); self.register_rpc_endpoint(postfeerate::RPCPostFeeRateRequestHandler::new()); self.register_rpc_endpoint(postmempoolquery::RPCMempoolQueryRequestHandler::new()); self.register_rpc_endpoint(postmicroblock::RPCPostMicroblockRequestHandler::new()); diff --git a/stackslib/src/net/api/postblock_v3.rs b/stackslib/src/net/api/postblock_v3.rs new file mode 100644 index 0000000000..8ce5a65dde --- /dev/null +++ b/stackslib/src/net/api/postblock_v3.rs @@ -0,0 +1,174 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use regex::{Captures, Regex}; +use stacks_common::codec::{Error as CodecError, StacksMessageCodec, MAX_PAYLOAD_LEN}; +use stacks_common::types::net::PeerHost; + +use super::postblock::StacksBlockAcceptedData; +use crate::chainstate::nakamoto::NakamotoBlock; +use crate::net::http::{ + parse_json, Error, HttpContentType, HttpRequest, HttpRequestContents, HttpRequestPreamble, + HttpResponse, HttpResponseContents, HttpResponsePayload, HttpResponsePreamble, +}; +use crate::net::httpcore::{HttpPreambleExtensions, RPCRequestHandler, StacksHttpRequest}; +use crate::net::relay::Relayer; +use crate::net::{Error as NetError, NakamotoBlocksData, StacksMessageType, StacksNodeState}; + +pub static PATH: &'static str = "/v3/blocks/upload/"; + +#[derive(Clone, Default)] +pub struct RPCPostBlockRequestHandler { + pub block: Option, +} + +impl RPCPostBlockRequestHandler { + /// Decode a bare block from the body + fn parse_postblock_octets(mut body: &[u8]) -> Result { + let block = NakamotoBlock::consensus_deserialize(&mut body).map_err(|e| { + if let CodecError::DeserializeError(msg) = e { + Error::DecodeError(format!("Failed to deserialize posted transaction: {}", msg)) + } else { + e.into() + } + })?; + Ok(block) + } +} + +/// Decode the HTTP request +impl HttpRequest for RPCPostBlockRequestHandler { + fn verb(&self) -> &'static str { + "POST" + } + + fn path_regex(&self) -> Regex { + Regex::new(&format!("^{PATH}$")).unwrap() + } + + fn metrics_identifier(&self) -> &str { + PATH + } + + /// Try to decode this request. + /// There's nothing to load here, so just make sure the request is well-formed. + fn try_parse_request( + &mut self, + preamble: &HttpRequestPreamble, + _captures: &Captures, + query: Option<&str>, + body: &[u8], + ) -> Result { + if preamble.get_content_length() == 0 { + return Err(Error::DecodeError( + "Invalid Http request: expected non-zero-length body for PostBlock".to_string(), + )); + } + + if preamble.get_content_length() > MAX_PAYLOAD_LEN { + return Err(Error::DecodeError( + "Invalid Http request: PostBlock body is too big".to_string(), + )); + } + + if Some(HttpContentType::Bytes) != preamble.content_type || preamble.content_type.is_none() + { + return Err(Error::DecodeError( + "Invalid Http request: PostBlock takes application/octet-stream".to_string(), + )); + } + + let block = Self::parse_postblock_octets(body)?; + + self.block = Some(block); + Ok(HttpRequestContents::new().query_string(query)) + } +} + +impl RPCRequestHandler for RPCPostBlockRequestHandler { + /// Reset internal state + fn restart(&mut self) { + self.block = None; + } + + /// Make the response + fn try_handle_request( + &mut self, + preamble: HttpRequestPreamble, + _contents: HttpRequestContents, + node: &mut StacksNodeState, + ) -> Result<(HttpResponsePreamble, HttpResponseContents), NetError> { + // get out the request body + let block = self + .block + .take() + .ok_or(NetError::SendError("`block` not set".into()))?; + + let accepted = + node.with_node_state(|network, sortdb, chainstate, _mempool, _rpc_args| { + let mut handle_conn = sortdb.index_handle_at_tip(); + Relayer::process_new_nakamoto_block( + &network.burnchain, + &sortdb, + &mut handle_conn, + chainstate, + &block, + None, + ) + })?; + + let data_resp = StacksBlockAcceptedData { + accepted, + stacks_block_id: block.block_id(), + }; + + // should set to relay... + if data_resp.accepted { + node.set_relay_message(StacksMessageType::NakamotoBlocks(NakamotoBlocksData { + blocks: vec![block], + })); + } + + let mut preamble = HttpResponsePreamble::ok_json(&preamble); + preamble.set_canonical_stacks_tip_height(Some(node.canonical_stacks_tip_height())); + let body = HttpResponseContents::try_from_json(&data_resp)?; + Ok((preamble, body)) + } +} + +/// Decode the HTTP response +impl HttpResponse for RPCPostBlockRequestHandler { + fn try_parse_response( + &self, + preamble: &HttpResponsePreamble, + body: &[u8], + ) -> Result { + let accepted: StacksBlockAcceptedData = parse_json(preamble, body)?; + Ok(HttpResponsePayload::try_from_json(accepted)?) + } +} + +impl StacksHttpRequest { + /// Make a new post-block request + pub fn new_post_block_v3(host: PeerHost, block: &NakamotoBlock) -> StacksHttpRequest { + StacksHttpRequest::new_for_peer( + host, + "POST".into(), + PATH.into(), + HttpRequestContents::new().payload_stacks(block), + ) + .expect("FATAL: failed to construct request from infallible data") + } +} diff --git a/stackslib/src/net/mod.rs b/stackslib/src/net/mod.rs index e8e52fd137..6875b336af 100644 --- a/stackslib/src/net/mod.rs +++ b/stackslib/src/net/mod.rs @@ -1468,6 +1468,8 @@ pub struct NetworkResult { pub uploaded_transactions: Vec, /// blocks sent to us via the http server pub uploaded_blocks: Vec, + /// blocks sent to us via the http server + pub uploaded_nakamoto_blocks: Vec, /// microblocks sent to us by the http server pub uploaded_microblocks: Vec, /// chunks we received from the HTTP server @@ -1515,6 +1517,7 @@ impl NetworkResult { pushed_microblocks: HashMap::new(), pushed_nakamoto_blocks: HashMap::new(), uploaded_transactions: vec![], + uploaded_nakamoto_blocks: vec![], uploaded_blocks: vec![], uploaded_microblocks: vec![], uploaded_stackerdb_chunks: vec![], @@ -1638,8 +1641,8 @@ impl NetworkResult { } } - pub fn consume_http_uploads(&mut self, mut msgs: Vec) -> () { - for msg in msgs.drain(..) { + pub fn consume_http_uploads(&mut self, msgs: Vec) -> () { + for msg in msgs.into_iter() { match msg { StacksMessageType::Transaction(tx_data) => { self.uploaded_transactions.push(tx_data); @@ -1653,6 +1656,9 @@ impl NetworkResult { StacksMessageType::StackerDBPushChunk(chunk_data) => { self.uploaded_stackerdb_chunks.push(chunk_data); } + StacksMessageType::NakamotoBlocks(data) => { + self.uploaded_nakamoto_blocks.extend(data.blocks); + } _ => { // drop warn!("Dropping unknown HTTP message"); diff --git a/stackslib/src/net/relay.rs b/stackslib/src/net/relay.rs index 11fa5f6364..3004f4d7c9 100644 --- a/stackslib/src/net/relay.rs +++ b/stackslib/src/net/relay.rs @@ -1949,22 +1949,53 @@ impl Relayer { }; // process pushed Nakamoto blocks - let (mut pushed_blocks_and_relayers, bad_neighbors) = - match Self::process_pushed_nakamoto_blocks( - network_result, - burnchain, - sortdb, - chainstate, - coord_comms, - ) { - Ok(x) => x, - Err(e) => { - warn!("Failed to process pushed Nakamoto blocks: {:?}", &e); - (vec![], vec![]) - } - }; + let (pushed_blocks_and_relayers, bad_neighbors) = match Self::process_pushed_nakamoto_blocks( + network_result, + burnchain, + sortdb, + chainstate, + coord_comms, + ) { + Ok(x) => x, + Err(e) => { + warn!("Failed to process pushed Nakamoto blocks: {:?}", &e); + (vec![], vec![]) + } + }; - accepted_nakamoto_blocks_and_relayers.append(&mut pushed_blocks_and_relayers); + let mut http_uploaded_blocks = vec![]; + for block in network_result.uploaded_nakamoto_blocks.drain(..) { + let block_id = block.block_id(); + let have_block = chainstate + .nakamoto_blocks_db() + .has_nakamoto_block(&block_id) + .unwrap_or_else(|e| { + warn!( + "Failed to determine if we have Nakamoto block"; + "stacks_block_id" => %block_id, + "err" => ?e + ); + false + }); + if have_block { + debug!( + "Received http-uploaded nakamoto block"; + "stacks_block_id" => %block_id, + ); + http_uploaded_blocks.push(block); + } + } + if !http_uploaded_blocks.is_empty() { + coord_comms.inspect(|comm| { + comm.announce_new_stacks_block(); + }); + } + + accepted_nakamoto_blocks_and_relayers.extend(pushed_blocks_and_relayers); + accepted_nakamoto_blocks_and_relayers.push(AcceptedNakamotoBlocks { + relayers: vec![], + blocks: http_uploaded_blocks, + }); Ok((accepted_nakamoto_blocks_and_relayers, bad_neighbors)) } diff --git a/stackslib/src/net/stackerdb/mod.rs b/stackslib/src/net/stackerdb/mod.rs index 53aa2f3c22..754df3fba1 100644 --- a/stackslib/src/net/stackerdb/mod.rs +++ b/stackslib/src/net/stackerdb/mod.rs @@ -151,7 +151,7 @@ pub const STACKERDB_MAX_PAGE_COUNT: u32 = 2; pub const STACKERDB_SLOTS_FUNCTION: &str = "stackerdb-get-signer-slots"; pub const STACKERDB_CONFIG_FUNCTION: &str = "stackerdb-get-config"; -pub const MINER_SLOT_COUNT: u32 = 1; +pub const MINER_SLOT_COUNT: u32 = 2; /// Final result of synchronizing state with a remote set of DB replicas #[derive(Clone)] diff --git a/testnet/stacks-node/src/nakamoto_node.rs b/testnet/stacks-node/src/nakamoto_node.rs index a556a41093..22ba5f2d7e 100644 --- a/testnet/stacks-node/src/nakamoto_node.rs +++ b/testnet/stacks-node/src/nakamoto_node.rs @@ -95,6 +95,8 @@ pub enum Error { BadVrfConstruction, CannotSelfSign, MiningFailure(ChainstateError), + /// The miner didn't accept their own block + AcceptFailure(ChainstateError), MinerSignatureError(&'static str), SignerSignatureError(String), /// A failure occurred while configuring the miner thread diff --git a/testnet/stacks-node/src/nakamoto_node/miner.rs b/testnet/stacks-node/src/nakamoto_node/miner.rs index 421702bcfb..2bc93e5cf3 100644 --- a/testnet/stacks-node/src/nakamoto_node/miner.rs +++ b/testnet/stacks-node/src/nakamoto_node/miner.rs @@ -18,10 +18,13 @@ use std::thread; use std::thread::JoinHandle; use std::time::{Duration, Instant}; +use clarity::boot_util::boot_code_id; use clarity::vm::clarity::ClarityConnection; use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier}; use hashbrown::HashSet; -use libsigner::v1::messages::{MessageSlotID, SignerMessage}; +use libsigner::v0::messages::{MinerSlotID, SignerMessage as SignerMessageV0}; +use libsigner::v1::messages::{MessageSlotID, SignerMessage as SignerMessageV1}; +use libsigner::StackerDBSession; use stacks::burnchains::Burnchain; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; @@ -30,7 +33,7 @@ use stacks::chainstate::nakamoto::coordinator::load_nakamoto_reward_set; use stacks::chainstate::nakamoto::miner::{NakamotoBlockBuilder, NakamotoTenureInfo}; use stacks::chainstate::nakamoto::signer_set::NakamotoSigners; use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; -use stacks::chainstate::stacks::boot::RewardSet; +use stacks::chainstate::stacks::boot::{RewardSet, MINERS_NAME}; use stacks::chainstate::stacks::db::{StacksChainState, StacksHeaderInfo}; use stacks::chainstate::stacks::{ CoinbasePayload, Error as ChainstateError, StacksTransaction, StacksTransactionSigner, @@ -56,9 +59,9 @@ use crate::run_loop::RegisteredKey; use crate::{neon_node, ChainTip}; #[cfg(test)] -lazy_static::lazy_static! { - pub static ref TEST_BROADCAST_STALL: std::sync::Mutex> = std::sync::Mutex::new(None); -} +pub static TEST_BROADCAST_STALL: std::sync::Mutex> = std::sync::Mutex::new(None); +#[cfg(test)] +pub static TEST_SKIP_P2P_BROADCAST: std::sync::Mutex> = std::sync::Mutex::new(None); /// If the miner was interrupted while mining a block, how long should the /// miner thread sleep before trying again? @@ -258,7 +261,7 @@ impl BlockMinerThread { }; new_block.header.signer_signature = signer_signature; - if let Err(e) = self.broadcast(new_block.clone(), reward_set) { + if let Err(e) = self.broadcast(new_block.clone(), reward_set, &stackerdbs) { warn!("Error accepting own block: {e:?}. Will try mining again."); continue; } else { @@ -562,12 +565,12 @@ impl BlockMinerThread { let signer_chunks = stackerdbs .get_latest_chunks(&signers_contract_id, &slot_ids) .expect("FATAL: could not get latest chunks from stacker DB"); - let signer_messages: Vec<(u32, SignerMessage)> = slot_ids + let signer_messages: Vec<(u32, SignerMessageV1)> = slot_ids .iter() .zip(signer_chunks.into_iter()) .filter_map(|(slot_id, chunk)| { chunk.and_then(|chunk| { - read_next::(&mut &chunk[..]) + read_next::(&mut &chunk[..]) .ok() .map(|msg| (*slot_id, msg)) }) @@ -609,7 +612,7 @@ impl BlockMinerThread { let mut filtered_transactions: HashMap = HashMap::new(); for (_slot, signer_message) in signer_messages { match signer_message { - SignerMessage::Transactions(transactions) => { + SignerMessageV1::Transactions(transactions) => { NakamotoSigners::update_filtered_transactions( &mut filtered_transactions, &account_nonces, @@ -623,11 +626,40 @@ impl BlockMinerThread { Ok(filtered_transactions.into_values().collect()) } + fn broadcast_p2p( + sort_db: &SortitionDB, + chain_state: &mut StacksChainState, + block: &NakamotoBlock, + reward_set: RewardSet, + ) -> Result<(), ChainstateError> { + #[cfg(test)] + { + if *TEST_SKIP_P2P_BROADCAST.lock().unwrap() == Some(true) { + return Ok(()); + } + } + + let mut sortition_handle = sort_db.index_handle_at_ch(&block.header.consensus_hash)?; + let chainstate_config = chain_state.config(); + let (headers_conn, staging_tx) = chain_state.headers_conn_and_staging_tx_begin()?; + NakamotoChainState::accept_block( + &chainstate_config, + &block, + &mut sortition_handle, + &staging_tx, + headers_conn, + reward_set, + )?; + staging_tx.commit()?; + Ok(()) + } + fn broadcast( &self, block: NakamotoBlock, reward_set: RewardSet, - ) -> Result<(), ChainstateError> { + stackerdbs: &StackerDBs, + ) -> Result<(), NakamotoNodeError> { #[cfg(test)] { if *TEST_BROADCAST_STALL.lock().unwrap() == Some(true) { @@ -647,7 +679,6 @@ impl BlockMinerThread { } let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) .expect("FATAL: could not open chainstate DB"); - let chainstate_config = chain_state.config(); let sort_db = SortitionDB::open( &self.config.get_burn_db_file_path(), true, @@ -655,18 +686,32 @@ impl BlockMinerThread { ) .expect("FATAL: could not open sortition DB"); - let mut sortition_handle = sort_db.index_handle_at_ch(&block.header.consensus_hash)?; - let (headers_conn, staging_tx) = chain_state.headers_conn_and_staging_tx_begin()?; - NakamotoChainState::accept_block( - &chainstate_config, - &block, - &mut sortition_handle, - &staging_tx, - headers_conn, - reward_set, - )?; - staging_tx.commit()?; - Ok(()) + Self::broadcast_p2p(&sort_db, &mut chain_state, &block, reward_set) + .map_err(NakamotoNodeError::AcceptFailure)?; + + let Some(ref miner_privkey) = self.config.miner.mining_key else { + return Err(NakamotoNodeError::MinerConfigurationFailed( + "No mining key configured, cannot mine", + )); + }; + + let rpc_socket = self.config.node.get_rpc_loopback().ok_or_else(|| { + NakamotoNodeError::MinerConfigurationFailed("Failed to get RPC loopback socket") + })?; + let miners_contract_id = boot_code_id(MINERS_NAME, chain_state.mainnet); + let mut miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id); + + SignCoordinator::send_miners_message( + miner_privkey, + &sort_db, + &self.burn_block, + &stackerdbs, + SignerMessageV0::BlockPushed(block), + MinerSlotID::BlockPushed, + chain_state.mainnet, + &mut miners_session, + ) + .map_err(NakamotoNodeError::SigningCoordinatorFailure) } /// Get the coinbase recipient address, if set in the config and if allowed in this epoch diff --git a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs index 7fcb5bb008..63f46a9557 100644 --- a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs +++ b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs @@ -18,7 +18,7 @@ use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; use hashbrown::{HashMap, HashSet}; -use libsigner::v0::messages::{BlockResponse, SignerMessage as SignerMessageV0}; +use libsigner::v0::messages::{BlockResponse, MinerSlotID, SignerMessage as SignerMessageV0}; use libsigner::v1::messages::{MessageSlotID, SignerMessage as SignerMessageV1}; use libsigner::{BlockProposal, SignerEntries, SignerEvent, SignerSession, StackerDBSession}; use stacks::burnchains::Burnchain; @@ -337,19 +337,44 @@ impl SignCoordinator { tip: &BlockSnapshot, stackerdbs: &StackerDBs, message: M, + miner_slot_id: MinerSlotID, is_mainnet: bool, miners_session: &mut StackerDBSession, ) -> Result<(), String> { let mut miner_sk = StacksPrivateKey::from_slice(&message_key.to_bytes()).unwrap(); miner_sk.set_compress_public(true); + Self::send_miners_message( + &miner_sk, + sortdb, + tip, + stackerdbs, + message, + miner_slot_id, + is_mainnet, + miners_session, + ) + } + + pub fn send_miners_message( + miner_sk: &StacksPrivateKey, + sortdb: &SortitionDB, + tip: &BlockSnapshot, + stackerdbs: &StackerDBs, + message: M, + miner_slot_id: MinerSlotID, + is_mainnet: bool, + miners_session: &mut StackerDBSession, + ) -> Result<(), String> { let miner_pubkey = StacksPublicKey::from_private(&miner_sk); let Some(slot_range) = NakamotoChainState::get_miner_slot(sortdb, tip, &miner_pubkey) .map_err(|e| format!("Failed to read miner slot information: {e:?}"))? else { return Err("No slot for miner".into()); }; - // We only have one slot per miner - let slot_id = slot_range.start; + + let slot_id = slot_range + .start + .saturating_add(miner_slot_id.to_u8().into()); if !slot_range.contains(&slot_id) { return Err("Not enough slots for miner messages".into()); } @@ -420,6 +445,7 @@ impl SignCoordinator { burn_tip, &stackerdbs, nonce_req_msg.into(), + MinerSlotID::BlockProposal, self.is_mainnet, &mut self.miners_session, ) @@ -571,6 +597,9 @@ impl SignCoordinator { burn_tip, stackerdbs, msg.into(), + // TODO: note, in v1, we'll want to add a new slot, but for now, it just shares + // with the block proposal + MinerSlotID::BlockProposal, self.is_mainnet, &mut self.miners_session, ) { @@ -630,6 +659,7 @@ impl SignCoordinator { burn_tip, &stackerdbs, block_proposal_message, + MinerSlotID::BlockProposal, self.is_mainnet, &mut self.miners_session, ) @@ -709,70 +739,73 @@ impl SignCoordinator { ); for (message, slot_id) in messages.into_iter().zip(slot_ids) { - match message { + let (response_hash, signature) = match message { SignerMessageV0::BlockResponse(BlockResponse::Accepted(( response_hash, signature, - ))) => { - let block_sighash = block.header.signer_signature_hash(); - if block_sighash != response_hash { - warn!( - "Processed signature but didn't validate over the expected block. Returning error."; - "signature" => %signature, - "block_signer_signature_hash" => %block_sighash, - "slot_id" => slot_id, - ); - continue; - } - debug!("SignCoordinator: Received valid signature from signer"; "slot_id" => slot_id, "signature" => %signature); - let Some(signer_entry) = &self.signer_entries.get(&slot_id) else { - return Err(NakamotoNodeError::SignerSignatureError( - "Signer entry not found".into(), - )); - }; - let Ok(signer_pubkey) = - StacksPublicKey::from_slice(&signer_entry.signing_key) - else { - return Err(NakamotoNodeError::SignerSignatureError( - "Failed to parse signer public key".into(), - )); - }; - let Ok(valid_sig) = signer_pubkey.verify(block_sighash.bits(), &signature) - else { - warn!("Got invalid signature from a signer. Ignoring."); - continue; - }; - if !valid_sig { - warn!( - "Processed signature but didn't validate over the expected block. Ignoring"; - "signature" => %signature, - "block_signer_signature_hash" => %block_sighash, - "slot_id" => slot_id, - ); - continue; - } - if !gathered_signatures.contains_key(&slot_id) { - total_weight_signed = total_weight_signed - .checked_add(signer_entry.weight) - .expect("FATAL: total weight signed exceeds u32::MAX"); - } - debug!("Signature Added to block"; - "block_signer_sighash" => %block_sighash, - "signer_pubkey" => signer_pubkey.to_hex(), - "signer_slot_id" => slot_id, - "signature" => %signature, - "signer_weight" => signer_entry.weight, - "total_weight_signed" => total_weight_signed, - ); - gathered_signatures.insert(slot_id, signature); - } + ))) => (response_hash, signature), SignerMessageV0::BlockResponse(BlockResponse::Rejected(_)) => { debug!("Received rejected block response. Ignoring."); + continue; } SignerMessageV0::BlockProposal(_) => { debug!("Received block proposal message. Ignoring."); + continue; + } + SignerMessageV0::BlockPushed(_) => { + debug!("Received block pushed message. Ignoring."); + continue; } + }; + let block_sighash = block.header.signer_signature_hash(); + if block_sighash != response_hash { + warn!( + "Processed signature but didn't validate over the expected block. Returning error."; + "signature" => %signature, + "block_signer_signature_hash" => %block_sighash, + "slot_id" => slot_id, + ); + continue; + } + debug!("SignCoordinator: Received valid signature from signer"; "slot_id" => slot_id, "signature" => %signature); + let Some(signer_entry) = &self.signer_entries.get(&slot_id) else { + return Err(NakamotoNodeError::SignerSignatureError( + "Signer entry not found".into(), + )); + }; + let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) + else { + return Err(NakamotoNodeError::SignerSignatureError( + "Failed to parse signer public key".into(), + )); + }; + let Ok(valid_sig) = signer_pubkey.verify(block_sighash.bits(), &signature) else { + warn!("Got invalid signature from a signer. Ignoring."); + continue; + }; + if !valid_sig { + warn!( + "Processed signature but didn't validate over the expected block. Ignoring"; + "signature" => %signature, + "block_signer_signature_hash" => %block_sighash, + "slot_id" => slot_id, + ); + continue; + } + if !gathered_signatures.contains_key(&slot_id) { + total_weight_signed = total_weight_signed + .checked_add(signer_entry.weight) + .expect("FATAL: total weight signed exceeds u32::MAX"); } + debug!("Signature Added to block"; + "block_signer_sighash" => %block_sighash, + "signer_pubkey" => signer_pubkey.to_hex(), + "signer_slot_id" => slot_id, + "signature" => %signature, + "signer_weight" => signer_entry.weight, + "total_weight_signed" => total_weight_signed, + ); + gathered_signatures.insert(slot_id, signature); } // After gathering all signatures, return them if we've hit the threshold diff --git a/testnet/stacks-node/src/tests/signer/v0.rs b/testnet/stacks-node/src/tests/signer/v0.rs index 29c9c001dd..61eafd41df 100644 --- a/testnet/stacks-node/src/tests/signer/v0.rs +++ b/testnet/stacks-node/src/tests/signer/v0.rs @@ -38,7 +38,7 @@ use tracing_subscriber::{fmt, EnvFilter}; use super::SignerTest; use crate::tests::nakamoto_integrations::{boot_to_epoch_3_reward_set, next_block_and}; use crate::tests::neon_integrations::next_block_and_wait; -use crate::BurnchainController; +use crate::{nakamoto_node, BurnchainController}; impl SignerTest { /// Run the test until the epoch 3 boundary @@ -241,6 +241,12 @@ fn miner_gather_signatures() { .with(EnvFilter::from_default_env()) .init(); + // Disable p2p broadcast of the nakamoto blocks, so that we rely + // on the signer's using StackerDB to get pushed blocks + *nakamoto_node::miner::TEST_SKIP_P2P_BROADCAST + .lock() + .unwrap() = Some(true); + info!("------------------------- Test Setup -------------------------"); let num_signers = 5; let mut signer_test: SignerTest = SignerTest::new(num_signers);