Skip to content

Commit

Permalink
Merge pull request #3870 from stacks-network/feat/stackerdb-event-obs…
Browse files Browse the repository at this point in the history
…erver

Feat/stackerdb event observer
  • Loading branch information
jcnelson committed Sep 1, 2023
2 parents a5cd26e + 9480c71 commit ef6c5a4
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 34 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ jobs:
- tests::neon_integrations::bad_microblock_pubkey
- tests::epoch_24::fix_to_pox_contract
- tests::epoch_24::verify_auto_unlock_behavior
- tests::stackerdb::test_stackerdb_load_store
- tests::stackerdb::test_stackerdb_event_observer
steps:
- name: Checkout the latest code
id: git_checkout
Expand Down
27 changes: 27 additions & 0 deletions docs/event-dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,30 @@ Example:
]
}
```

### `POST /stackerdb_chunks`

This payload includes data related to a single mutation to a StackerDB replica
that this node subscribes to. The data will only get sent here if the
corresponding chunk has already been successfully stored. The data includes the
chunk ID, chunk version, smart contract ID, signature, and data hash; the
consumer may request the chunk data itself with a follow-up GET request on the node.

This endpoint broadcasts events to `AnyEvent` observers, as well as to
`StackerDBChunks` observers.

Example:

```json
{
"contract_id": "STVN97YYA10MY5F6KQJHKNYJNM24C4A1AT39WRW.hello-world",
"modified_slots": [
{
"slot_id": 0,
"slot_version": 20,
"data_hash": "9d26b8009ab4693d3792791a4ea5e338822b5631af94e567a485b7265ba0f107",
"signature": "015814daf929d8700af344987681f44e913890a12e38550abe8e40f149ef5269f40f4008083a0f2e0ddf65dcd05ecfc151c7ff8a5308ad04c77c0e87b5aeadad31"
}
]
}
```
14 changes: 14 additions & 0 deletions stackslib/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,7 @@ pub struct NetworkResult {
pub uploaded_transactions: Vec<StacksTransaction>, // transactions sent to us by the http server
pub uploaded_blocks: Vec<BlocksData>, // blocks sent to us via the http server
pub uploaded_microblocks: Vec<MicroblocksData>, // microblocks sent to us by the http server
pub uploaded_stackerdb_chunks: Vec<StackerDBPushChunkData>, // chunks we received from the HTTP server
pub attachments: Vec<(AttachmentInstance, Attachment)>,
pub synced_transactions: Vec<StacksTransaction>, // transactions we downloaded via a mempool sync
pub stacker_db_sync_results: Vec<StackerDBSyncResult>, // chunks for stacker DBs we downloaded
Expand Down Expand Up @@ -2217,6 +2218,7 @@ impl NetworkResult {
uploaded_transactions: vec![],
uploaded_blocks: vec![],
uploaded_microblocks: vec![],
uploaded_stackerdb_chunks: vec![],
attachments: vec![],
synced_transactions: vec![],
stacker_db_sync_results: vec![],
Expand Down Expand Up @@ -2249,6 +2251,14 @@ impl NetworkResult {
self.attachments.len() > 0
}

pub fn has_stackerdb_chunks(&self) -> bool {
self.stacker_db_sync_results
.iter()
.fold(0, |acc, x| acc + x.chunks_to_store.len())
> 0
|| self.uploaded_stackerdb_chunks.len() > 0
}

pub fn transactions(&self) -> Vec<StacksTransaction> {
self.pushed_transactions
.values()
Expand All @@ -2263,6 +2273,7 @@ impl NetworkResult {
|| self.has_microblocks()
|| self.has_transactions()
|| self.has_attachments()
|| self.has_stackerdb_chunks()
}

pub fn consume_unsolicited(
Expand Down Expand Up @@ -2324,6 +2335,9 @@ impl NetworkResult {
StacksMessageType::Microblocks(mblock_data) => {
self.uploaded_microblocks.push(mblock_data);
}
StacksMessageType::StackerDBPushChunk(chunk_data) => {
self.uploaded_stackerdb_chunks.push(chunk_data);
}
_ => {
// drop
warn!("Dropping unknown HTTP message");
Expand Down
118 changes: 103 additions & 15 deletions stackslib/src/net/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::mem;

use rand::prelude::*;
use rand::thread_rng;
Expand All @@ -44,7 +45,9 @@ use crate::net::http::*;
use crate::net::p2p::*;
use crate::net::poll::*;
use crate::net::rpc::*;
use crate::net::stackerdb::{StackerDBConfig, StackerDBSyncResult, StackerDBs};
use crate::net::stackerdb::{
StackerDBConfig, StackerDBEventDispatcher, StackerDBSyncResult, StackerDBs,
};
use crate::net::Error as net_error;
use crate::net::*;
use crate::types::chainstate::StacksBlockId;
Expand Down Expand Up @@ -103,6 +106,39 @@ pub struct ProcessedNetReceipts {
pub num_new_unconfirmed_microblocks: u64,
}

/// A trait for implementing both mempool event observer methods and stackerdb methods.
/// This is required for event observers to fully report on newly-relayed data.
pub trait RelayEventDispatcher:
MemPoolEventDispatcher
+ StackerDBEventDispatcher
+ AsMemPoolEventDispatcher
+ AsStackerDBEventDispatcher
{
}
impl<T: MemPoolEventDispatcher + StackerDBEventDispatcher> RelayEventDispatcher for T {}

/// Trait for upcasting to MemPoolEventDispatcher
pub trait AsMemPoolEventDispatcher {
fn as_mempool_event_dispatcher(&self) -> &dyn MemPoolEventDispatcher;
}

/// Trait for upcasting to StackerDBEventDispatcher
pub trait AsStackerDBEventDispatcher {
fn as_stackerdb_event_dispatcher(&self) -> &dyn StackerDBEventDispatcher;
}

impl<T: RelayEventDispatcher> AsMemPoolEventDispatcher for T {
fn as_mempool_event_dispatcher(&self) -> &dyn MemPoolEventDispatcher {
self
}
}

impl<T: RelayEventDispatcher> AsStackerDBEventDispatcher for T {
fn as_stackerdb_event_dispatcher(&self) -> &dyn StackerDBEventDispatcher {
self
}
}

/// Private trait for keeping track of messages that can be relayed, so we can identify the peers
/// who frequently send us duplicates.
pub trait RelayPayload {
Expand Down Expand Up @@ -1701,31 +1737,58 @@ impl Relayer {
}
}

/// Process HTTP-uploaded stackerdb chunks.
/// They're already stored by the RPC handler, so just forward events for them.
pub fn process_uploaded_stackerdb_chunks(
uploaded_chunks: Vec<StackerDBPushChunkData>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) {
if let Some(observer) = event_observer {
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<StackerDBChunkData>> =
HashMap::new();
for chunk in uploaded_chunks.into_iter() {
debug!("Got uploaded StackerDB chunk"; "stackerdb_contract_id" => &format!("{}", &chunk.contract_id), "slot_id" => chunk.chunk_data.slot_id, "slot_version" => chunk.chunk_data.slot_version);
if let Some(events) = all_events.get_mut(&chunk.contract_id) {
events.push(chunk.chunk_data);
} else {
all_events.insert(chunk.contract_id.clone(), vec![chunk.chunk_data]);
}
}
for (contract_id, new_chunks) in all_events.into_iter() {
observer.new_stackerdb_chunks(contract_id, new_chunks);
}
}
}

/// Process newly-arrived chunks obtained from a peer stackerdb replica.
pub fn process_stacker_db_chunks(
stackerdbs: &mut StackerDBs,
stackerdb_configs: &HashMap<QualifiedContractIdentifier, StackerDBConfig>,
sync_results: &[StackerDBSyncResult],
sync_results: Vec<StackerDBSyncResult>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) -> Result<(), Error> {
// sort stacker results by contract, so as to minimize the number of transactions.
let mut sync_results_map: HashMap<&QualifiedContractIdentifier, Vec<&StackerDBSyncResult>> =
let mut sync_results_map: HashMap<QualifiedContractIdentifier, Vec<StackerDBSyncResult>> =
HashMap::new();
for sync_result in sync_results {
let sc = &sync_result.contract_id;
if let Some(result_list) = sync_results_map.get_mut(sc) {
for sync_result in sync_results.into_iter() {
let sc = sync_result.contract_id.clone();
if let Some(result_list) = sync_results_map.get_mut(&sc) {
result_list.push(sync_result);
} else {
sync_results_map.insert(sc, vec![sync_result]);
}
}

for (sc, sync_results) in sync_results_map.iter() {
if let Some(config) = stackerdb_configs.get(sc) {
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<StackerDBChunkData>> =
HashMap::new();

for (sc, sync_results) in sync_results_map.into_iter() {
if let Some(config) = stackerdb_configs.get(&sc) {
let tx = stackerdbs.tx_begin(config.clone())?;
for sync_result in sync_results {
for chunk in sync_result.chunks_to_store.iter() {
for sync_result in sync_results.into_iter() {
for chunk in sync_result.chunks_to_store.into_iter() {
let md = chunk.get_slot_metadata();
if let Err(e) = tx.try_replace_chunk(sc, &md, &chunk.data) {
if let Err(e) = tx.try_replace_chunk(&sc, &md, &chunk.data) {
warn!(
"Failed to store chunk for StackerDB";
"stackerdb_contract_id" => &format!("{}", &sync_result.contract_id),
Expand All @@ -1737,6 +1800,12 @@ impl Relayer {
} else {
debug!("Stored chunk"; "stackerdb_contract_id" => &format!("{}", &sync_result.contract_id), "slot_id" => md.slot_id, "slot_version" => md.slot_version);
}

if let Some(event_list) = all_events.get_mut(&sync_result.contract_id) {
event_list.push(chunk);
} else {
all_events.insert(sync_result.contract_id.clone(), vec![chunk]);
}
}
}
tx.commit()?;
Expand All @@ -1745,6 +1814,11 @@ impl Relayer {
}
}

if let Some(observer) = event_observer.as_ref() {
for (contract_id, new_chunks) in all_events.into_iter() {
observer.new_stackerdb_chunks(contract_id, new_chunks);
}
}
Ok(())
}

Expand All @@ -1754,6 +1828,7 @@ impl Relayer {
stackerdbs: &mut StackerDBs,
stackerdb_configs: &HashMap<QualifiedContractIdentifier, StackerDBConfig>,
unhandled_messages: &mut HashMap<NeighborKey, Vec<StacksMessage>>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) -> Result<(), Error> {
// synthesize StackerDBSyncResults from each chunk
let mut sync_results = vec![];
Expand All @@ -1769,7 +1844,12 @@ impl Relayer {
});
}

Relayer::process_stacker_db_chunks(stackerdbs, stackerdb_configs, &sync_results)
Relayer::process_stacker_db_chunks(
stackerdbs,
stackerdb_configs,
sync_results,
event_observer,
)
}

/// Given a network result, consume and store all data.
Expand All @@ -1791,7 +1871,7 @@ impl Relayer {
mempool: &mut MemPoolDB,
ibd: bool,
coord_comms: Option<&CoordinatorChannels>,
event_observer: Option<&dyn MemPoolEventDispatcher>,
event_observer: Option<&dyn RelayEventDispatcher>,
) -> Result<ProcessedNetReceipts, net_error> {
let mut num_new_blocks = 0;
let mut num_new_confirmed_microblocks = 0;
Expand Down Expand Up @@ -1891,7 +1971,7 @@ impl Relayer {
sortdb,
chainstate,
mempool,
event_observer,
event_observer.map(|obs| obs.as_mempool_event_dispatcher()),
)?;

if new_txs.len() > 0 {
Expand Down Expand Up @@ -1920,18 +2000,26 @@ impl Relayer {
processed_unconfirmed_state = Relayer::refresh_unconfirmed(chainstate, sortdb);
}

// push events for HTTP-uploaded stacker DB chunks
Relayer::process_uploaded_stackerdb_chunks(
mem::replace(&mut network_result.uploaded_stackerdb_chunks, vec![]),
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
);

// store downloaded stacker DB chunks
Relayer::process_stacker_db_chunks(
&mut self.stacker_dbs,
&network_result.stacker_db_configs,
&network_result.stacker_db_sync_results,
mem::replace(&mut network_result.stacker_db_sync_results, vec![]),
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
)?;

// store pushed stacker DB chunks
Relayer::process_pushed_stacker_db_chunks(
&mut self.stacker_dbs,
&network_result.stacker_db_configs,
&mut network_result.unhandled_messages,
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
)?;

let receipts = ProcessedNetReceipts {
Expand Down
25 changes: 18 additions & 7 deletions stackslib/src/net/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::net::PeerHost;
use crate::net::ProtocolFamily;
use crate::net::RPCFeeEstimate;
use crate::net::RPCFeeEstimateResponse;
use crate::net::StackerDBPushChunkData;
use crate::net::StacksHttp;
use crate::net::StacksHttpMessage;
use crate::net::StacksMessageType;
Expand Down Expand Up @@ -2531,6 +2532,8 @@ impl ConversationHttp {
}

/// Handle a post for a new StackerDB chunk.
/// If we accept it, then forward it to the relayer as well
/// so an event can be generated for it.
fn handle_post_stackerdb_chunk<W: Write>(
http: &mut StacksHttp,
fd: &mut W,
Expand All @@ -2539,15 +2542,15 @@ impl ConversationHttp {
stackerdb_contract_id: &QualifiedContractIdentifier,
stackerdb_chunk: &StackerDBChunkData,
canonical_stacks_tip_height: u64,
) -> Result<(), net_error> {
) -> Result<Option<StacksMessageType>, net_error> {
let response_metadata =
HttpResponseMetadata::from_http_request_type(req, Some(canonical_stacks_tip_height));

if let Err(_e) = tx.get_stackerdb_id(stackerdb_contract_id) {
// shouldn't be necessary (this is checked against the peer network's configured DBs),
// but you never know.
let resp = HttpResponseType::NotFound(response_metadata, "No such StackerDB".into());
return resp.send(http, fd).and_then(|_| Ok(()));
return resp.send(http, fd).and_then(|_| Ok(None));
}
if let Err(_e) = tx.try_replace_chunk(
stackerdb_contract_id,
Expand All @@ -2567,7 +2570,7 @@ impl ConversationHttp {
response_metadata,
format!("Failed to load StackerDB chunk"),
);
return resp.send(http, fd).and_then(|_| Ok(()));
return resp.send(http, fd).and_then(|_| Ok(None));
}
};

Expand Down Expand Up @@ -2595,7 +2598,7 @@ impl ConversationHttp {
metadata: slot_metadata_opt,
};
let resp = HttpResponseType::StackerDBChunkAck(response_metadata, ack);
return resp.send(http, fd).and_then(|_| Ok(()));
return resp.send(http, fd).and_then(|_| Ok(None));
}

let slot_metadata = if let Some(md) =
Expand All @@ -2608,7 +2611,7 @@ impl ConversationHttp {
response_metadata,
format!("Failed to load slot metadata after storing chunk"),
);
return resp.send(http, fd).and_then(|_| Ok(()));
return resp.send(http, fd).and_then(|_| Ok(None));
};

// success!
Expand All @@ -2619,7 +2622,15 @@ impl ConversationHttp {
};

let resp = HttpResponseType::StackerDBChunkAck(response_metadata, ack);
return resp.send(http, fd).and_then(|_| Ok(()));
return resp.send(http, fd).and_then(|_| {
Ok(Some(StacksMessageType::StackerDBPushChunk(
StackerDBPushChunkData {
contract_id: stackerdb_contract_id.clone(),
rc_consensus_hash: ConsensusHash([0u8; 20]), // unused,
chunk_data: stackerdb_chunk.clone(),
},
)))
});
}

/// Handle an external HTTP request.
Expand Down Expand Up @@ -3230,7 +3241,7 @@ impl ConversationHttp {
) => {
let tip_height = network.burnchain_tip.canonical_stacks_tip_height;
if let Ok(mut tx) = network.stackerdbs_tx_begin(stackerdb_contract_id) {
ConversationHttp::handle_post_stackerdb_chunk(
ret = ConversationHttp::handle_post_stackerdb_chunk(
&mut self.connection.protocol,
&mut reply,
&req,
Expand Down
Loading

0 comments on commit ef6c5a4

Please sign in to comment.