Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/stackerdb event observer #3870

Merged
merged 19 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d18ef3a
feat: track uploaded stackerdb chunks in the network result
jcnelson Aug 22, 2023
6a87551
feat: unify MemPoolEventDispatcher and StackerDBEventDispatcher behin…
jcnelson Aug 22, 2023
395effe
feat: POST /v2/stackerdb/... will now report the chunk to the HTTP se…
jcnelson Aug 22, 2023
57919d9
feat: StackerDBEventObserver trait
jcnelson Aug 22, 2023
6640032
feat: "stackerdb" event key
jcnelson Aug 22, 2023
d5ea39e
feat: push stackerdb events to observers with the "stackerdb" or "any…
jcnelson Aug 22, 2023
13884dc
feat: add event handler for stackerdb events to test observer
jcnelson Aug 22, 2023
f890c6e
feat: integration test for stackerdb event observer
jcnelson Aug 22, 2023
1276dcb
docs: describe /stackerdb_chunks endpoint for event observer
jcnelson Aug 22, 2023
765dc61
fix: use Bitcoin 0.25 for integration tests
jcnelson Aug 23, 2023
7b7b032
Merge branch 'feat/stackerdb-rpc' into feat/stackerdb-event-observer
jcnelson Aug 23, 2023
739c085
fix: API sync with unit tests
jcnelson Aug 23, 2023
54d66d8
chore: cargo fmt
jcnelson Aug 23, 2023
1f3bd2e
fix: use Bitcoin 25 for all CI tests
jcnelson Aug 23, 2023
9c4e540
fix: the stackerdb event observer now sends chunks over the event obs…
jcnelson Aug 25, 2023
8d9ef90
fix: API sync with unit tests
jcnelson Aug 25, 2023
a45cfdf
chore: run stackerdb tests in CI
jcnelson Aug 29, 2023
bdcb0e8
chore: address PR feedback -- make stackerdb events consumptive, and …
jcnelson Aug 29, 2023
9480c71
Merge branch 'feat/stackerdb-rpc' into feat/stackerdb-event-observer
jcnelson Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/actions/bitcoin-int-tests/Dockerfile.atlas-test
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ COPY . .

RUN cargo test --no-run --workspace

RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-0.20.0/bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-25.0/bitcoin-25.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-25.0-x86_64-linux-gnu.tar.gz

RUN ln -s /bitcoin-0.20.0/bin/bitcoind /bin/
RUN ln -s /bitcoin-25.0/bin/bitcoind /bin/

ENV BITCOIND_TEST 1
WORKDIR /src/testnet/stacks-node
Expand Down
6 changes: 3 additions & 3 deletions .github/actions/bitcoin-int-tests/Dockerfile.large-genesis
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ WORKDIR /src

COPY . .

RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-0.20.0/bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-25.0/bitcoin-25.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-25.0-x86_64-linux-gnu.tar.gz

RUN ln -s /bitcoin-0.20.0/bin/bitcoind /bin/
RUN ln -s /bitcoin-25.0/bin/bitcoind /bin/

RUN rustup component add llvm-tools-preview && \
cargo install grcov
Expand Down
8 changes: 4 additions & 4 deletions .github/actions/bitcoin-int-tests/Dockerfile.net-tests
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ WORKDIR /src

COPY . .

RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-0.20.0/bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-0.20.0-x86_64-linux-gnu.tar.gz
RUN cd / && wget https://bitcoin.org/bin/bitcoin-core-25.0/bitcoin-25.0-x86_64-linux-gnu.tar.gz
RUN cd / && tar -xvzf bitcoin-25.0-x86_64-linux-gnu.tar.gz

RUN ln -s /bitcoin-0.20.0/bin/bitcoind /bin/
RUN ln -s /bitcoin-0.20.0/bin/bitcoin-cli /bin/
RUN ln -s /bitcoin-25.0/bin/bitcoind /bin/
RUN ln -s /bitcoin-25.0/bin/bitcoin-cli /bin/

RUN apt-get update
RUN apt-get install -y jq screen net-tools ncat sqlite3 xxd openssl curl
Expand Down
27 changes: 27 additions & 0 deletions docs/event-dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,30 @@ Example:
]
}
```

### `POST /stackerdb_chunks`

This payload includes data related to a single mutation to a StackerDB replica
that this node subscribes to. The data will only get sent here if the
corresponding chunk has already been successfully stored. The data includes the
chunk ID, chunk version, smart contract ID, signature, and data hash; the
consumer may request the chunk data itself with a follow-up GET request on the node.

This endpoint broadcasts events to `AnyEvent` observers, as well as to
`StackerDBChunks` observers.

Example:

```json
{
"contract_id": "STVN97YYA10MY5F6KQJHKNYJNM24C4A1AT39WRW.hello-world",
"modified_slots": [
{
"slot_id": 0,
"slot_version": 20,
"data_hash": "9d26b8009ab4693d3792791a4ea5e338822b5631af94e567a485b7265ba0f107",
"signature": "015814daf929d8700af344987681f44e913890a12e38550abe8e40f149ef5269f40f4008083a0f2e0ddf65dcd05ecfc151c7ff8a5308ad04c77c0e87b5aeadad31"
}
]
}
```
14 changes: 14 additions & 0 deletions stackslib/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,7 @@ pub struct NetworkResult {
pub uploaded_transactions: Vec<StacksTransaction>, // transactions sent to us by the http server
pub uploaded_blocks: Vec<BlocksData>, // blocks sent to us via the http server
pub uploaded_microblocks: Vec<MicroblocksData>, // microblocks sent to us by the http server
pub uploaded_stackerdb_chunks: Vec<StackerDBPushChunkData>, // chunks we received from the HTTP server
pub attachments: Vec<(AttachmentInstance, Attachment)>,
pub synced_transactions: Vec<StacksTransaction>, // transactions we downloaded via a mempool sync
pub stacker_db_sync_results: Vec<StackerDBSyncResult>, // chunks for stacker DBs we downloaded
Expand Down Expand Up @@ -2217,6 +2218,7 @@ impl NetworkResult {
uploaded_transactions: vec![],
uploaded_blocks: vec![],
uploaded_microblocks: vec![],
uploaded_stackerdb_chunks: vec![],
attachments: vec![],
synced_transactions: vec![],
stacker_db_sync_results: vec![],
Expand Down Expand Up @@ -2249,6 +2251,14 @@ impl NetworkResult {
self.attachments.len() > 0
}

pub fn has_stackerdb_chunks(&self) -> bool {
self.stacker_db_sync_results
.iter()
.fold(0, |acc, x| acc + x.chunks_to_store.len())
> 0
|| self.uploaded_stackerdb_chunks.len() > 0
}

pub fn transactions(&self) -> Vec<StacksTransaction> {
self.pushed_transactions
.values()
Expand All @@ -2263,6 +2273,7 @@ impl NetworkResult {
|| self.has_microblocks()
|| self.has_transactions()
|| self.has_attachments()
|| self.has_stackerdb_chunks()
}

pub fn consume_unsolicited(
Expand Down Expand Up @@ -2324,6 +2335,9 @@ impl NetworkResult {
StacksMessageType::Microblocks(mblock_data) => {
self.uploaded_microblocks.push(mblock_data);
}
StacksMessageType::StackerDBPushChunk(chunk_data) => {
self.uploaded_stackerdb_chunks.push(chunk_data);
}
_ => {
// drop
warn!("Dropping unknown HTTP message");
Expand Down
98 changes: 94 additions & 4 deletions stackslib/src/net/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use crate::net::http::*;
use crate::net::p2p::*;
use crate::net::poll::*;
use crate::net::rpc::*;
use crate::net::stackerdb::{StackerDBConfig, StackerDBSyncResult, StackerDBs};
use crate::net::stackerdb::{
StackerDBConfig, StackerDBEventDispatcher, StackerDBSyncResult, StackerDBs,
};
use crate::net::Error as net_error;
use crate::net::*;
use crate::types::chainstate::StacksBlockId;
Expand Down Expand Up @@ -103,6 +105,39 @@ pub struct ProcessedNetReceipts {
pub num_new_unconfirmed_microblocks: u64,
}

/// A trait for implementing both mempool event observer methods and stackerdb methods.
/// This is required for event observers to fully report on newly-relayed data.
pub trait RelayEventDispatcher:
MemPoolEventDispatcher
+ StackerDBEventDispatcher
+ AsMemPoolEventDispatcher
+ AsStackerDBEventDispatcher
{
}
impl<T: MemPoolEventDispatcher + StackerDBEventDispatcher> RelayEventDispatcher for T {}

/// Trait for upcasting to MemPoolEventDispatcher
pub trait AsMemPoolEventDispatcher {
fn as_mempool_event_dispatcher(&self) -> &dyn MemPoolEventDispatcher;
}

/// Trait for upcasting to StackerDBEventDispatcher
pub trait AsStackerDBEventDispatcher {
fn as_stackerdb_event_dispatcher(&self) -> &dyn StackerDBEventDispatcher;
}

impl<T: RelayEventDispatcher> AsMemPoolEventDispatcher for T {
fn as_mempool_event_dispatcher(&self) -> &dyn MemPoolEventDispatcher {
self
}
}

impl<T: RelayEventDispatcher> AsStackerDBEventDispatcher for T {
fn as_stackerdb_event_dispatcher(&self) -> &dyn StackerDBEventDispatcher {
self
}
}

/// Private trait for keeping track of messages that can be relayed, so we can identify the peers
/// who frequently send us duplicates.
pub trait RelayPayload {
Expand Down Expand Up @@ -1701,11 +1736,38 @@ impl Relayer {
}
}

/// Process HTTP-uploaded stackerdb chunks.
/// They're already stored by the RPC handler, so just forward events for them.
pub fn process_uploaded_stackerdb_chunks(
uploaded_chunks: &[StackerDBPushChunkData],
event_observer: Option<&dyn StackerDBEventDispatcher>,
jferrant marked this conversation as resolved.
Show resolved Hide resolved
) {
if let Some(observer) = event_observer {
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<SlotMetadata>> =
HashMap::new();
for chunk in uploaded_chunks {
debug!("Got uploaded StackerDB chunk"; "stackerdb_contract_id" => &format!("{}", &chunk.contract_id), "slot_id" => chunk.chunk_data.slot_id, "slot_version" => chunk.chunk_data.slot_version);
if let Some(events) = all_events.get_mut(&chunk.contract_id) {
events.push(chunk.chunk_data.get_slot_metadata());
} else {
all_events.insert(
chunk.contract_id.clone(),
vec![chunk.chunk_data.get_slot_metadata()],
);
}
}
for (contract_id, new_chunks) in all_events.iter() {
observer.new_stackerdb_chunks(contract_id, new_chunks);
}
}
}

/// Process newly-arrived chunks obtained from a peer stackerdb replica.
pub fn process_stacker_db_chunks(
stackerdbs: &mut StackerDBs,
stackerdb_configs: &HashMap<QualifiedContractIdentifier, StackerDBConfig>,
sync_results: &[StackerDBSyncResult],
event_observer: Option<&dyn StackerDBEventDispatcher>,
) -> Result<(), Error> {
// sort stacker results by contract, so as to minimize the number of transactions.
let mut sync_results_map: HashMap<&QualifiedContractIdentifier, Vec<&StackerDBSyncResult>> =
Expand All @@ -1719,6 +1781,9 @@ impl Relayer {
}
}

let mut all_events: HashMap<QualifiedContractIdentifier, Vec<SlotMetadata>> =
HashMap::new();

for (sc, sync_results) in sync_results_map.iter() {
if let Some(config) = stackerdb_configs.get(sc) {
let tx = stackerdbs.tx_begin(config.clone())?;
Expand All @@ -1737,6 +1802,12 @@ impl Relayer {
} else {
debug!("Stored chunk"; "stackerdb_contract_id" => &format!("{}", &sync_result.contract_id), "slot_id" => md.slot_id, "slot_version" => md.slot_version);
}

if let Some(event_list) = all_events.get_mut(&sync_result.contract_id) {
event_list.push(md);
} else {
all_events.insert(sync_result.contract_id.clone(), vec![md]);
}
}
}
tx.commit()?;
Expand All @@ -1745,6 +1816,11 @@ impl Relayer {
}
}

if let Some(observer) = event_observer.as_ref() {
for (contract_id, new_chunks) in all_events.iter() {
observer.new_stackerdb_chunks(contract_id, new_chunks);
}
}
Ok(())
}

Expand All @@ -1754,6 +1830,7 @@ impl Relayer {
stackerdbs: &mut StackerDBs,
stackerdb_configs: &HashMap<QualifiedContractIdentifier, StackerDBConfig>,
unhandled_messages: &mut HashMap<NeighborKey, Vec<StacksMessage>>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) -> Result<(), Error> {
// synthesize StackerDBSyncResults from each chunk
let mut sync_results = vec![];
Expand All @@ -1769,7 +1846,12 @@ impl Relayer {
});
}

Relayer::process_stacker_db_chunks(stackerdbs, stackerdb_configs, &sync_results)
Relayer::process_stacker_db_chunks(
stackerdbs,
stackerdb_configs,
&sync_results,
event_observer,
)
}

/// Given a network result, consume and store all data.
Expand All @@ -1791,7 +1873,7 @@ impl Relayer {
mempool: &mut MemPoolDB,
ibd: bool,
coord_comms: Option<&CoordinatorChannels>,
event_observer: Option<&dyn MemPoolEventDispatcher>,
event_observer: Option<&dyn RelayEventDispatcher>,
) -> Result<ProcessedNetReceipts, net_error> {
let mut num_new_blocks = 0;
let mut num_new_confirmed_microblocks = 0;
Expand Down Expand Up @@ -1891,7 +1973,7 @@ impl Relayer {
sortdb,
chainstate,
mempool,
event_observer,
event_observer.map(|obs| obs.as_mempool_event_dispatcher()),
)?;

if new_txs.len() > 0 {
Expand Down Expand Up @@ -1920,18 +2002,26 @@ impl Relayer {
processed_unconfirmed_state = Relayer::refresh_unconfirmed(chainstate, sortdb);
}

// push events for HTTP-uploaded stacker DB chunks
Relayer::process_uploaded_stackerdb_chunks(
&network_result.uploaded_stackerdb_chunks,
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
);

// store downloaded stacker DB chunks
Relayer::process_stacker_db_chunks(
&mut self.stacker_dbs,
&network_result.stacker_db_configs,
&network_result.stacker_db_sync_results,
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
)?;

// store pushed stacker DB chunks
Relayer::process_pushed_stacker_db_chunks(
&mut self.stacker_dbs,
&network_result.stacker_db_configs,
&mut network_result.unhandled_messages,
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
)?;

let receipts = ProcessedNetReceipts {
Expand Down
Loading