From f05b160f40d3488a9a3b5d3b04320da979a15489 Mon Sep 17 00:00:00 2001 From: Yureka Date: Wed, 27 Dec 2023 12:05:57 +0100 Subject: [PATCH] initialize tables on the fly for nonexisting peers --- src/bmp_collector.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/bmp_collector.rs b/src/bmp_collector.rs index 2f2c86c..b200a9d 100644 --- a/src/bmp_collector.rs +++ b/src/bmp_collector.rs @@ -11,7 +11,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; use tokio_util::codec::length_delimited::LengthDelimitedCodec; use zettabgp::bmp::prelude::{ - BmpMessagePeerDown, BmpMessagePeerHeader, BmpMessagePeerUp, BmpMessageRouteMonitoring, + BmpMessagePeerDown, BmpMessagePeerHeader, BmpMessageRouteMonitoring, BmpMessageTermination, }; use zettabgp::bmp::BmpMessage; @@ -59,15 +59,15 @@ async fn process_route_monitoring( pub fn run_peer( client_addr: SocketAddr, - n: BmpMessagePeerUp, + peer: BmpMessagePeerHeader, store: &impl Store, ) -> mpsc::Sender> { let (tx, mut rx) = mpsc::channel(16); let store = store.clone(); tokio::task::spawn(async move { - trace!("{} {:?}", client_addr, n); - if let Some(session_id) = table_selector_for_peer(client_addr, &n.peer) + trace!("{} {:?}", client_addr, peer); + if let Some(session_id) = table_selector_for_peer(client_addr, &peer) .and_then(|store| store.session_id().cloned()) { store.session_up(session_id, Session {}).await; @@ -83,12 +83,12 @@ pub fn run_peer( break; } None => { - trace!("{} {:?} stream ended", client_addr, n.peer); + trace!("{} {:?} stream ended", client_addr, peer); break; } } } - if let Some(session_id) = table_selector_for_peer(client_addr, &n.peer) + if let Some(session_id) = table_selector_for_peer(client_addr, &peer) .and_then(|store| store.session_id().cloned()) { store.session_down(session_id, None).await; @@ -166,12 +166,15 @@ pub async fn run_client( .ok_or(anyhow::anyhow!("unexpected end of stream"))?; match msg { - BmpMessage::RouteMonitoring(rm) => match channels.get(&rm.peer.peeraddress) { - Some(channel) => channel.send(Ok(rm)).await.unwrap(), - None => warn!("message for nonexisting peer: {:?}", &rm), + BmpMessage::RouteMonitoring(rm) => { + let channel = channels.entry(rm.peer.peeraddress).or_insert_with(|| { + warn!("the bmp device {} sent a message for a nonexisting peer, we'll initialize the table now: {:?}", &client_addr, &rm); + run_peer(client_addr, rm.peer.clone(), store) + }); + channel.send(Ok(rm)).await.unwrap(); }, BmpMessage::PeerUpNotification(n) => { - channels.insert(n.peer.peeraddress, run_peer(client_addr, n, store)); + channels.insert(n.peer.peeraddress, run_peer(client_addr, n.peer, store)); } BmpMessage::PeerDownNotification(n) => match channels.remove(&n.peer.peeraddress) { Some(channel) => channel.send(Err(n)).await.unwrap(),