Skip to content

Commit

Permalink
add record_payment_fail
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Oct 14, 2024
1 parent 4330e76 commit 7f04471
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 24 deletions.
135 changes: 122 additions & 13 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::history::PaymentHistory;
use super::history::{PairResult, PaymentHistory};
use super::network::{get_chain_hash, SendPaymentData, SendPaymentResponse};
use super::path::NodeHeap;
use super::types::Pubkey;
use super::types::{ChannelAnnouncement, ChannelUpdate, Hash256, NodeAnnouncement};
use super::types::{Pubkey, TlcErr};
use crate::fiber::channel::CHANNEL_DISABLED_FLAG;
use crate::fiber::fee::calculate_tlc_forward_fee;
use crate::fiber::path::{NodeHeapElement, ProbabilityEvaluator};
Expand All @@ -13,7 +13,7 @@ use ckb_jsonrpc_types::JsonBytes;
use ckb_types::packed::{OutPoint, Script};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use tentacle::multiaddr::Multiaddr;
use tentacle::secio::PeerId;
use thiserror::Error;
Expand Down Expand Up @@ -146,7 +146,6 @@ pub struct NetworkGraph<S> {
nodes: HashMap<Pubkey, NodeInfo>,
store: S,
chain_hash: Hash256,
#[allow(dead_code)]
history: PaymentHistory,
}

Expand Down Expand Up @@ -504,14 +503,77 @@ where
}
}

pub(crate) fn record_payment_success(&mut self, payment_session: &PaymentSession) {
let session_route = &payment_session.route;
for i in 0..session_route.nodes.len() - 1 {
let node = &session_route.nodes[i];
let next = &session_route.nodes[i + 1].pubkey;
let result = PairResult {
fail_time: 0,
fail_amount: 0,
success_time: std::time::UNIX_EPOCH.elapsed().unwrap().as_millis(),
success_amount: node.amount,
};

self.history.add_result(node.pubkey, *next, result);
}
}

pub(crate) fn record_payment_fail(
&mut self,
payment_session: &PaymentSession,
tlc_err: TlcErr,
) {
// first let's find out the node that failed
// TODO: with the real onion packet, we may use the decrypt times to find out the failed node?
let Some(failed_node) = tlc_err.error_node_id() else {
return;
};

let Some(index) = payment_session
.route
.nodes
.iter()
.position(|node| node.pubkey == failed_node)
else {
return;
};

match index {
0 => {
// the first node failed, we should mark the channel failed
//match tlc_err.error_code {}
}
_ => {
todo!()
}
}

let session_route = &payment_session.route;
for i in 0..index {
let node = &session_route.nodes[i];
let next = &session_route.nodes[i + 1].pubkey;
let result = PairResult {
fail_time: std::time::UNIX_EPOCH.elapsed().unwrap().as_millis(),
fail_amount: node.amount,
success_time: 0,
success_amount: 0,
};

self.history.add_result(node.pubkey, *next, result);
}
}

#[cfg(test)]
pub fn reset(&mut self) {
self.channels.clear();
self.nodes.clear();
self.connected_peer_addresses.clear();
self.history = PaymentHistory::new();
}

/// Returns a list of `PaymentHopData` for all nodes in the route, including the origin and the target node.
/// Returns a list of `PaymentHopData` for all nodes in the route,
/// including the origin and the target node.
pub fn build_route(
&self,
payment_request: SendPaymentData,
Expand Down Expand Up @@ -546,7 +608,7 @@ where

let mut current_amount = amount;
let mut current_expiry = 0;
let mut onion_infos = vec![];
let mut hops_data = vec![];
for i in (0..route.len()).rev() {
let is_last = i == route.len() - 1;
let (next_hop, next_channel_outpoint) = if is_last {
Expand Down Expand Up @@ -577,7 +639,7 @@ where

// make sure the final hop's amount is the same as the payment amount
// the last hop will check the amount from TLC and the amount from the onion packet
onion_infos.push(PaymentHopData {
hops_data.push(PaymentHopData {
amount: current_amount,
payment_hash,
next_hop,
Expand All @@ -590,7 +652,7 @@ where
current_expiry += expiry;
}
// Add the first hop as the instruction for the current node, so the logic for send HTLC can be reused.
onion_infos.push(PaymentHopData {
hops_data.push(PaymentHopData {
amount: current_amount,
payment_hash,
next_hop: Some(route[0].target),
Expand All @@ -599,10 +661,20 @@ where
channel_outpoint: Some(route[0].channel_outpoint.clone()),
preimage: None,
});
onion_infos.reverse();
assert_eq!(onion_infos.len(), route.len() + 1);
assert_eq!(onion_infos[route.len()].amount, amount);
Ok(onion_infos)
hops_data.reverse();
assert_eq!(hops_data.len(), route.len() + 1);
assert_eq!(hops_data[route.len()].amount, amount);
// assert there is no duplicate node in the route
assert_eq!(
hops_data
.iter()
.filter_map(|x| x.next_hop)
.collect::<HashSet<_>>()
.len(),
route.len()
);

Ok(hops_data)
}

// the algorithm works from target-to-source to find the shortest path
Expand Down Expand Up @@ -831,6 +903,35 @@ pub enum PaymentSessionStatus {
Failed,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SessionRouteNode {
pub pubkey: Pubkey,
pub amount: u128,
}

// The router is a list of nodes that the payment will go through.
// We store in the payment session and then will use it to track the payment history.
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct SessionRoute {
pub nodes: Vec<SessionRouteNode>,
}

impl SessionRoute {
pub fn new(payment_hops: &Vec<PaymentHopData>) -> Self {
let mut router = Self::default();
for hop in payment_hops {
if let Some(key) = hop.next_hop {
router.add_node(key, hop.amount);
}
}
router
}

fn add_node(&mut self, pubkey: Pubkey, amount: u128) {
self.nodes.push(SessionRouteNode { pubkey, amount });
}
}

#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PaymentSession {
Expand All @@ -845,6 +946,7 @@ pub struct PaymentSession {
#[serde_as(as = "Option<EntityHex>")]
pub first_hop_channel_outpoint: Option<OutPoint>,
pub first_hop_tlc_id: Option<u64>,
pub route: SessionRoute,
}

impl PaymentSession {
Expand All @@ -860,6 +962,7 @@ impl PaymentSession {
last_updated_at: now,
first_hop_channel_outpoint: None,
first_hop_tlc_id: None,
route: SessionRoute::default(),
}
}

Expand All @@ -872,10 +975,16 @@ impl PaymentSession {
self.last_updated_at = std::time::UNIX_EPOCH.elapsed().unwrap().as_micros();
}

pub fn set_inflight_status(&mut self, channel_outpoint: OutPoint, tlc_id: u64) {
pub fn set_inflight_status(
&mut self,
channel_outpoint: OutPoint,
tlc_id: u64,
session_route: SessionRoute,
) {
self.set_status(PaymentSessionStatus::Inflight);
self.first_hop_channel_outpoint = Some(channel_outpoint);
self.first_hop_tlc_id = Some(tlc_id);
self.route = session_route;
}

pub fn set_success_status(&mut self) {
Expand Down
9 changes: 4 additions & 5 deletions src/fiber/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,26 @@ pub(crate) struct PaymentResult {

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PaymentHistory {
pub latest_results: HashMap<Pubkey, PaymentResult>,
pub inner: HashMap<Pubkey, PaymentResult>,
}

impl PaymentHistory {
pub fn new() -> Self {
PaymentHistory {
latest_results: HashMap::new(),
inner: HashMap::new(),
}
}

#[allow(dead_code)]
pub fn add_result(&mut self, source: Pubkey, target: Pubkey, result: PairResult) {
let payment_result = self.latest_results.entry(source).or_insert(PaymentResult {
let payment_result = self.inner.entry(source).or_insert(PaymentResult {
pairs: HashMap::new(),
});
payment_result.pairs.insert(target, result);
}

#[allow(dead_code)]
pub fn get_result(&self, pubkey: &Pubkey) -> Option<&PaymentResult> {
self.latest_results.get(pubkey)
self.inner.get(pubkey)
}

#[allow(dead_code)]
Expand Down
28 changes: 22 additions & 6 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use super::channel::{
};
use super::config::AnnouncedNodeName;
use super::fee::{calculate_commitment_tx_fee, default_minimal_ckb_amount};
use super::graph::{NetworkGraph, NetworkGraphStateStore};
use super::graph::{NetworkGraph, NetworkGraphStateStore, SessionRoute};
use super::graph_syncer::{GraphSyncer, GraphSyncerMessage};
use super::key::blake2b_hash_with_salt;
use super::types::{
Expand Down Expand Up @@ -1199,7 +1199,7 @@ where
}
NetworkActorEvent::TlcRemoveReceived(payment_hash, remove_tlc) => {
// When a node is restarted, RemoveTLC will also be resent if necessary
self.on_tlc_remove_received(state, payment_hash, remove_tlc.reason)
self.on_remove_tlc_event(state, payment_hash, remove_tlc.reason)
.await;
}
}
Expand Down Expand Up @@ -2055,7 +2055,7 @@ where
reply.send(add_tlc_res).expect("send error");
}

async fn on_tlc_remove_received(
async fn on_remove_tlc_event(
&self,
state: &mut NetworkActorState<S>,
payment_hash: Hash256,
Expand All @@ -2066,11 +2066,19 @@ where
match reason {
RemoveTlcReason::RemoveTlcFulfill(_) => {
payment_session.set_success_status();
self.network_graph
.write()
.await
.record_payment_success(&payment_session);
self.store.insert_payment_session(payment_session);
}
RemoveTlcReason::RemoveTlcFail(reason) => {
let detail_error = reason.decode().expect("decoded error");
self.update_with_tcl_fail(&detail_error).await;
self.network_graph
.write()
.await
.record_payment_fail(&payment_session, detail_error.clone());
if payment_session.can_retry() && !detail_error.error_code.payment_failed()
{
let res = self.try_payment_session(state, payment_session).await;
Expand All @@ -2096,8 +2104,11 @@ where
match extra_data {
TlcErrData::ChannelFailed { channel_update, .. } => {
if let Some(channel_update) = channel_update {
let mut graph = self.network_graph.write().await;
let _ = graph.process_channel_update(channel_update.clone());
let _ = self
.network_graph
.write()
.await
.process_channel_update(channel_update.clone());
}
}
_ => {}
Expand Down Expand Up @@ -2162,6 +2173,7 @@ where
.clone()
.expect("first hop channel outpoint");

let session_route = SessionRoute::new(&hops_infos);
// generate session key
let session_key = Privkey::from_slice(KeyPair::generate_random_key().as_ref());
let peeled_packet = PeeledPaymentOnionPacket::create(
Expand Down Expand Up @@ -2194,7 +2206,11 @@ where
continue;
}
Ok(tlc_id) => {
payment_session.set_inflight_status(first_channel_outpoint, tlc_id);
payment_session.set_inflight_status(
first_channel_outpoint,
tlc_id,
session_route,
);
self.store.insert_payment_session(payment_session.clone());
return Ok(payment_session);
}
Expand Down

0 comments on commit 7f04471

Please sign in to comment.