Skip to content

Commit

Permalink
initialize tables on the fly for nonexisting peers
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-re-ka committed Dec 27, 2023
1 parent 5f7cea0 commit f05b160
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions src/bmp_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use zettabgp::bmp::prelude::{
BmpMessagePeerDown, BmpMessagePeerHeader, BmpMessagePeerUp, BmpMessageRouteMonitoring,
BmpMessagePeerDown, BmpMessagePeerHeader, BmpMessageRouteMonitoring,
BmpMessageTermination,
};
use zettabgp::bmp::BmpMessage;
Expand Down Expand Up @@ -59,15 +59,15 @@ async fn process_route_monitoring(

pub fn run_peer(
client_addr: SocketAddr,
n: BmpMessagePeerUp,
peer: BmpMessagePeerHeader,
store: &impl Store,
) -> mpsc::Sender<Result<BmpMessageRouteMonitoring, BmpMessagePeerDown>> {
let (tx, mut rx) = mpsc::channel(16);
let store = store.clone();

tokio::task::spawn(async move {
trace!("{} {:?}", client_addr, n);
if let Some(session_id) = table_selector_for_peer(client_addr, &n.peer)
trace!("{} {:?}", client_addr, peer);
if let Some(session_id) = table_selector_for_peer(client_addr, &peer)
.and_then(|store| store.session_id().cloned())
{
store.session_up(session_id, Session {}).await;
Expand All @@ -83,12 +83,12 @@ pub fn run_peer(
break;
}
None => {
trace!("{} {:?} stream ended", client_addr, n.peer);
trace!("{} {:?} stream ended", client_addr, peer);
break;
}
}
}
if let Some(session_id) = table_selector_for_peer(client_addr, &n.peer)
if let Some(session_id) = table_selector_for_peer(client_addr, &peer)
.and_then(|store| store.session_id().cloned())
{
store.session_down(session_id, None).await;
Expand Down Expand Up @@ -166,12 +166,15 @@ pub async fn run_client(
.ok_or(anyhow::anyhow!("unexpected end of stream"))?;

match msg {
BmpMessage::RouteMonitoring(rm) => match channels.get(&rm.peer.peeraddress) {
Some(channel) => channel.send(Ok(rm)).await.unwrap(),
None => warn!("message for nonexisting peer: {:?}", &rm),
BmpMessage::RouteMonitoring(rm) => {
let channel = channels.entry(rm.peer.peeraddress).or_insert_with(|| {
warn!("the bmp device {} sent a message for a nonexisting peer, we'll initialize the table now: {:?}", &client_addr, &rm);
run_peer(client_addr, rm.peer.clone(), store)
});
channel.send(Ok(rm)).await.unwrap();
},
BmpMessage::PeerUpNotification(n) => {
channels.insert(n.peer.peeraddress, run_peer(client_addr, n, store));
channels.insert(n.peer.peeraddress, run_peer(client_addr, n.peer, store));
}
BmpMessage::PeerDownNotification(n) => match channels.remove(&n.peer.peeraddress) {
Some(channel) => channel.send(Err(n)).await.unwrap(),
Expand Down

0 comments on commit f05b160

Please sign in to comment.