diff --git a/src/fiber/graph.rs b/src/fiber/graph.rs index 21513fa6..8a73f432 100644 --- a/src/fiber/graph.rs +++ b/src/fiber/graph.rs @@ -1,8 +1,8 @@ -use super::history::PaymentHistory; +use super::history::{PairResult, PaymentHistory}; use super::network::{get_chain_hash, SendPaymentData, SendPaymentResponse}; use super::path::NodeHeap; -use super::types::Pubkey; use super::types::{ChannelAnnouncement, ChannelUpdate, Hash256, NodeAnnouncement}; +use super::types::{Pubkey, TlcErr}; use crate::fiber::channel::CHANNEL_DISABLED_FLAG; use crate::fiber::fee::calculate_tlc_forward_fee; use crate::fiber::path::{NodeHeapElement, ProbabilityEvaluator}; @@ -13,7 +13,7 @@ use ckb_jsonrpc_types::JsonBytes; use ckb_types::packed::{OutPoint, Script}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tentacle::multiaddr::Multiaddr; use tentacle::secio::PeerId; use thiserror::Error; @@ -146,7 +146,6 @@ pub struct NetworkGraph { nodes: HashMap, store: S, chain_hash: Hash256, - #[allow(dead_code)] history: PaymentHistory, } @@ -504,14 +503,77 @@ where } } + pub(crate) fn record_payment_success(&mut self, payment_session: &PaymentSession) { + let session_route = &payment_session.route; + for i in 0..session_route.nodes.len() - 1 { + let node = &session_route.nodes[i]; + let next = &session_route.nodes[i + 1].pubkey; + let result = PairResult { + fail_time: 0, + fail_amount: 0, + success_time: std::time::UNIX_EPOCH.elapsed().unwrap().as_millis(), + success_amount: node.amount, + }; + + self.history.add_result(node.pubkey, *next, result); + } + } + + pub(crate) fn record_payment_fail( + &mut self, + payment_session: &PaymentSession, + tlc_err: TlcErr, + ) { + // first let's find out the node that failed + // TODO: with the real onion packet, we may use the decrypt times to find out the failed node? + let Some(failed_node) = tlc_err.error_node_id() else { + return; + }; + + let Some(index) = payment_session + .route + .nodes + .iter() + .position(|node| node.pubkey == failed_node) + else { + return; + }; + + match index { + 0 => { + // the first node failed, we should mark the channel failed + //match tlc_err.error_code {} + } + _ => { + todo!() + } + } + + let session_route = &payment_session.route; + for i in 0..index { + let node = &session_route.nodes[i]; + let next = &session_route.nodes[i + 1].pubkey; + let result = PairResult { + fail_time: std::time::UNIX_EPOCH.elapsed().unwrap().as_millis(), + fail_amount: node.amount, + success_time: 0, + success_amount: 0, + }; + + self.history.add_result(node.pubkey, *next, result); + } + } + #[cfg(test)] pub fn reset(&mut self) { self.channels.clear(); self.nodes.clear(); self.connected_peer_addresses.clear(); + self.history = PaymentHistory::new(); } - /// Returns a list of `PaymentHopData` for all nodes in the route, including the origin and the target node. + /// Returns a list of `PaymentHopData` for all nodes in the route, + /// including the origin and the target node. pub fn build_route( &self, payment_request: SendPaymentData, @@ -546,7 +608,7 @@ where let mut current_amount = amount; let mut current_expiry = 0; - let mut onion_infos = vec![]; + let mut hops_data = vec![]; for i in (0..route.len()).rev() { let is_last = i == route.len() - 1; let (next_hop, next_channel_outpoint) = if is_last { @@ -577,7 +639,7 @@ where // make sure the final hop's amount is the same as the payment amount // the last hop will check the amount from TLC and the amount from the onion packet - onion_infos.push(PaymentHopData { + hops_data.push(PaymentHopData { amount: current_amount, payment_hash, next_hop, @@ -590,7 +652,7 @@ where current_expiry += expiry; } // Add the first hop as the instruction for the current node, so the logic for send HTLC can be reused. - onion_infos.push(PaymentHopData { + hops_data.push(PaymentHopData { amount: current_amount, payment_hash, next_hop: Some(route[0].target), @@ -599,10 +661,20 @@ where channel_outpoint: Some(route[0].channel_outpoint.clone()), preimage: None, }); - onion_infos.reverse(); - assert_eq!(onion_infos.len(), route.len() + 1); - assert_eq!(onion_infos[route.len()].amount, amount); - Ok(onion_infos) + hops_data.reverse(); + assert_eq!(hops_data.len(), route.len() + 1); + assert_eq!(hops_data[route.len()].amount, amount); + // assert there is no duplicate node in the route + assert_eq!( + hops_data + .iter() + .filter_map(|x| x.next_hop) + .collect::>() + .len(), + route.len() + ); + + Ok(hops_data) } // the algorithm works from target-to-source to find the shortest path @@ -831,6 +903,35 @@ pub enum PaymentSessionStatus { Failed, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SessionRouteNode { + pub pubkey: Pubkey, + pub amount: u128, +} + +// The router is a list of nodes that the payment will go through. +// We store in the payment session and then will use it to track the payment history. +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct SessionRoute { + pub nodes: Vec, +} + +impl SessionRoute { + pub fn new(payment_hops: &Vec) -> Self { + let mut router = Self::default(); + for hop in payment_hops { + if let Some(key) = hop.next_hop { + router.add_node(key, hop.amount); + } + } + router + } + + fn add_node(&mut self, pubkey: Pubkey, amount: u128) { + self.nodes.push(SessionRouteNode { pubkey, amount }); + } +} + #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PaymentSession { @@ -845,6 +946,7 @@ pub struct PaymentSession { #[serde_as(as = "Option")] pub first_hop_channel_outpoint: Option, pub first_hop_tlc_id: Option, + pub route: SessionRoute, } impl PaymentSession { @@ -860,6 +962,7 @@ impl PaymentSession { last_updated_at: now, first_hop_channel_outpoint: None, first_hop_tlc_id: None, + route: SessionRoute::default(), } } @@ -872,10 +975,16 @@ impl PaymentSession { self.last_updated_at = std::time::UNIX_EPOCH.elapsed().unwrap().as_micros(); } - pub fn set_inflight_status(&mut self, channel_outpoint: OutPoint, tlc_id: u64) { + pub fn set_inflight_status( + &mut self, + channel_outpoint: OutPoint, + tlc_id: u64, + session_route: SessionRoute, + ) { self.set_status(PaymentSessionStatus::Inflight); self.first_hop_channel_outpoint = Some(channel_outpoint); self.first_hop_tlc_id = Some(tlc_id); + self.route = session_route; } pub fn set_success_status(&mut self) { diff --git a/src/fiber/history.rs b/src/fiber/history.rs index 5c6df61e..ab4f4db7 100644 --- a/src/fiber/history.rs +++ b/src/fiber/history.rs @@ -16,19 +16,18 @@ pub(crate) struct PaymentResult { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct PaymentHistory { - pub latest_results: HashMap, + pub inner: HashMap, } impl PaymentHistory { pub fn new() -> Self { PaymentHistory { - latest_results: HashMap::new(), + inner: HashMap::new(), } } - #[allow(dead_code)] pub fn add_result(&mut self, source: Pubkey, target: Pubkey, result: PairResult) { - let payment_result = self.latest_results.entry(source).or_insert(PaymentResult { + let payment_result = self.inner.entry(source).or_insert(PaymentResult { pairs: HashMap::new(), }); payment_result.pairs.insert(target, result); @@ -36,7 +35,7 @@ impl PaymentHistory { #[allow(dead_code)] pub fn get_result(&self, pubkey: &Pubkey) -> Option<&PaymentResult> { - self.latest_results.get(pubkey) + self.inner.get(pubkey) } #[allow(dead_code)] diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 47bad5aa..b95bf41b 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -50,7 +50,7 @@ use super::channel::{ }; use super::config::AnnouncedNodeName; use super::fee::{calculate_commitment_tx_fee, default_minimal_ckb_amount}; -use super::graph::{NetworkGraph, NetworkGraphStateStore}; +use super::graph::{NetworkGraph, NetworkGraphStateStore, SessionRoute}; use super::graph_syncer::{GraphSyncer, GraphSyncerMessage}; use super::key::blake2b_hash_with_salt; use super::types::{ @@ -1199,7 +1199,7 @@ where } NetworkActorEvent::TlcRemoveReceived(payment_hash, remove_tlc) => { // When a node is restarted, RemoveTLC will also be resent if necessary - self.on_tlc_remove_received(state, payment_hash, remove_tlc.reason) + self.on_remove_tlc_event(state, payment_hash, remove_tlc.reason) .await; } } @@ -2055,7 +2055,7 @@ where reply.send(add_tlc_res).expect("send error"); } - async fn on_tlc_remove_received( + async fn on_remove_tlc_event( &self, state: &mut NetworkActorState, payment_hash: Hash256, @@ -2066,11 +2066,19 @@ where match reason { RemoveTlcReason::RemoveTlcFulfill(_) => { payment_session.set_success_status(); + self.network_graph + .write() + .await + .record_payment_success(&payment_session); self.store.insert_payment_session(payment_session); } RemoveTlcReason::RemoveTlcFail(reason) => { let detail_error = reason.decode().expect("decoded error"); self.update_with_tcl_fail(&detail_error).await; + self.network_graph + .write() + .await + .record_payment_fail(&payment_session, detail_error.clone()); if payment_session.can_retry() && !detail_error.error_code.payment_failed() { let res = self.try_payment_session(state, payment_session).await; @@ -2096,8 +2104,11 @@ where match extra_data { TlcErrData::ChannelFailed { channel_update, .. } => { if let Some(channel_update) = channel_update { - let mut graph = self.network_graph.write().await; - let _ = graph.process_channel_update(channel_update.clone()); + let _ = self + .network_graph + .write() + .await + .process_channel_update(channel_update.clone()); } } _ => {} @@ -2162,6 +2173,7 @@ where .clone() .expect("first hop channel outpoint"); + let session_route = SessionRoute::new(&hops_infos); // generate session key let session_key = Privkey::from_slice(KeyPair::generate_random_key().as_ref()); let peeled_packet = PeeledPaymentOnionPacket::create( @@ -2194,7 +2206,11 @@ where continue; } Ok(tlc_id) => { - payment_session.set_inflight_status(first_channel_outpoint, tlc_id); + payment_session.set_inflight_status( + first_channel_outpoint, + tlc_id, + session_route, + ); self.store.insert_payment_session(payment_session.clone()); return Ok(payment_session); }