From 7a75907db6c8a1b39cc344314f8d70fccaaf3e73 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 20 Dec 2022 17:04:42 +0100 Subject: [PATCH] chore: Migrate to new SEDA-based chainsync stage (#501) --- src/sources/mod.rs | 2 + src/sources/n2n/chainsync.rs | 114 +++++++++++++++++++++++++++++++++++ src/sources/n2n/mod.rs | 3 - 3 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 src/sources/n2n/chainsync.rs diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 08769ec3..479f9b0e 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -7,6 +7,8 @@ use crate::model; pub mod n2c; pub mod n2n; +pub mod plexer; +pub mod prelude; pub mod utils; #[derive(Deserialize)] diff --git a/src/sources/n2n/chainsync.rs b/src/sources/n2n/chainsync.rs new file mode 100644 index 00000000..5199e4a3 --- /dev/null +++ b/src/sources/n2n/chainsync.rs @@ -0,0 +1,114 @@ +use gasket::error::AsWorkError; +use pallas::ledger::traverse::MultiEraHeader; +use pallas::network::miniprotocols::chainsync; +use pallas::network::miniprotocols::chainsync::{HeaderContent, NextResponse}; +use tracing::{debug, info}; + +use crate::prelude::*; + +fn to_traverse<'b>(header: &'b chainsync::HeaderContent) -> Result, Error> { + let out = match header.byron_prefix { + Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor), + None => MultiEraHeader::decode(header.variant, None, &header.cbor), + }; + + out.map_err(Error::parse) +} + +pub type DownstreamPort = gasket::messaging::OutputPort; + +pub type OuroborosClient = pallas::network::miniprotocols::chainsync::N2NClient; + +pub struct Worker { + chain_cursor: Cursor, + client: OuroborosClient, + downstream: DownstreamPort, + block_count: gasket::metrics::Counter, + chain_tip: gasket::metrics::Gauge, +} + +impl Worker { + pub fn new(chain_cursor: Cursor, plexer: ProtocolChannel, downstream: DownstreamPort) -> Self { + let client = OuroborosClient::new(plexer); + + Self { + chain_cursor, + client, + downstream, + block_count: Default::default(), + chain_tip: Default::default(), + } + } + + fn process_next( + &mut self, + next: NextResponse, + ) -> Result<(), gasket::error::Error> { + match next { + chainsync::NextResponse::RollForward(h, t) => { + let h = to_traverse(&h).or_panic()?; + self.downstream + .send(ChainSyncEvent::RollForward(h.slot(), h.hash()).into())?; + + debug!(slot = h.slot(), hash = %h.hash(), "chain sync roll forward"); + self.chain_tip.set(t.1 as i64); + Ok(()) + } + chainsync::NextResponse::RollBackward(p, t) => { + self.downstream.send(ChainSyncEvent::Rollback(p).into())?; + self.chain_tip.set(t.1 as i64); + Ok(()) + } + chainsync::NextResponse::Await => { + info!("chain-sync reached the tip of the chain"); + Ok(()) + } + } + } + + fn request_next(&mut self) -> Result<(), gasket::error::Error> { + info!("requesting next block"); + let next = self.client.request_next().or_restart()?; + self.process_next(next) + } + + fn await_next(&mut self) -> Result<(), gasket::error::Error> { + info!("awaiting next block (blocking)"); + let next = self.client.recv_while_must_reply().or_restart()?; + self.process_next(next) + } +} + +impl gasket::runtime::Worker for Worker { + fn metrics(&self) -> gasket::metrics::Registry { + gasket::metrics::Builder::new() + .with_counter("received_blocks", &self.block_count) + .with_gauge("chain_tip", &self.chain_tip) + .build() + } + + fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { + let intersects = self.chain_cursor.intersections().or_panic()?; + + info!("intersecting chain at points: {:?}", intersects); + + let (point, _) = self + .client + .find_intersect(intersects) + .map_err(Error::client) + .or_restart()?; + + info!(?point, "chain-sync intersected"); + + Ok(()) + } + + fn work(&mut self) -> gasket::runtime::WorkResult { + match self.client.has_agency() { + true => self.request_next()?, + false => self.await_next()?, + }; + + Ok(gasket::runtime::WorkOutcome::Partial) + } +} diff --git a/src/sources/n2n/mod.rs b/src/sources/n2n/mod.rs index b488ea9d..048d19f0 100644 --- a/src/sources/n2n/mod.rs +++ b/src/sources/n2n/mod.rs @@ -1,5 +1,2 @@ pub mod blockfetch; pub mod chainsync; -pub mod plexer; -pub mod prelude; -pub mod reducer;