From a26a6432738c7d1f387c69f6b72b6236d6dcbf34 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:05:48 -0700 Subject: [PATCH 1/8] channeldb: remove unused buckets --- channeldb/graph.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 2d924d94f1..2d5a2491b7 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -123,9 +123,6 @@ var ( // case we'll remove all entries from the prune log with a block height // that no longer exists. pruneLogBucket = []byte("prune-log") - - edgeBloomKey = []byte("edge-bloom") - nodeBloomKey = []byte("node-bloom") ) const ( From b780dfacdbb46c0b3caa18e6b67c0eeabafe179b Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:06:12 -0700 Subject: [PATCH 2/8] channeldb: add zombie edge index In this commit, we add a zombie edge index to the database. This allows us to quickly determine across restarts whether we're attempting to process an edge we've previously deemed as zombie. --- channeldb/db.go | 3 ++ channeldb/graph.go | 114 ++++++++++++++++++++++++++++++++++++++++ channeldb/graph_test.go | 61 +++++++++++++++++++++ 3 files changed, 178 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 51f3fe9aa4..4b9476c377 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -268,6 +268,9 @@ func createChannelDB(dbPath string) error { if _, err := edges.CreateBucket(channelPointBucket); err != nil { return err } + if _, err := edges.CreateBucket(zombieBucket); err != nil { + return err + } graphMeta, err := tx.CreateBucket(graphMetaBucket) if err != nil { diff --git a/channeldb/graph.go b/channeldb/graph.go index 2d5a2491b7..60c49e4e89 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -106,6 +106,17 @@ var ( // maps: outPoint -> chanID channelPointBucket = []byte("chan-index") + // zombieBucket is a sub-bucket of the main edgeBucket bucket + // responsible for maintaining an index of zombie channels. Each entry + // exists within the bucket as follows: + // + // maps: chanID -> pubKey1 || pubKey2 + // + // The chanID represents the channel ID of the edge that is marked as a + // zombie and is used as the key, which maps to the public keys of the + // edge's participants. + zombieBucket = []byte("zombie-index") + // graphMetaBucket is a top-level bucket which stores various meta-deta // related to the on-disk channel graph. Data stored in this bucket // includes the block to which the graph has been synced to, the total @@ -2782,6 +2793,109 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy { return &ChannelEdgePolicy{db: c.db} } +// MarkEdgeZombie marks an edge as a zombie within the graph's zombie index. +// The public keys should represent the node public keys of the two parties +// involved in the edge. +func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + return c.db.Batch(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } + return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2) + }) +} + +// markEdgeZombie marks an edge as a zombie within our zombie index. The public +// keys should represent the node public keys of the two parties involved in the +// edge. +func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + + var v [66]byte + copy(v[:33], pubKey1[:]) + copy(v[33:], pubKey2[:]) + + return zombieIndex.Put(k[:], v[:]) +} + +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { + return c.db.Batch(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return nil + } + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + return zombieIndex.Delete(k[:]) + }) +} + +// IsZombieEdge returns whether the edge is considered zombie. If it is a +// zombie, then the two node public keys corresponding to this edge are also +// returned. +func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) { + var ( + isZombie bool + pubKey1, pubKey2 [33]byte + ) + + err := c.db.View(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return nil + } + + isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID) + return nil + }) + if err != nil { + return false, [33]byte{}, [33]byte{} + } + + return isZombie, pubKey1, pubKey2 +} + +// isZombieEdge returns whether an entry exists for the given channel in the +// zombie index. If an entry exists, then the two node public keys corresponding +// to this edge are also returned. +func isZombieEdge(zombieIndex *bbolt.Bucket, + chanID uint64) (bool, [33]byte, [33]byte) { + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + + v := zombieIndex.Get(k[:]) + if v == nil { + return false, [33]byte{}, [33]byte{} + } + + var pubKey1, pubKey2 [33]byte + copy(pubKey1[:], v[:33]) + copy(pubKey2[:], v[33:]) + + return true, pubKey1, pubKey2 +} + func putLightningNode(nodeBucket *bbolt.Bucket, aliasBucket *bbolt.Bucket, updateIndex *bbolt.Bucket, node *LightningNode) error { diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 378391ec69..0c7edc0e2a 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2786,6 +2786,67 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { assertEdgeInfoEqual(t, dbEdgeInfo, edgeInfo) } +// TestGraphZombieIndex ensures that we can mark edges correctly as zombie/live. +func TestGraphZombieIndex(t *testing.T) { + t.Parallel() + + // We'll start by creating our test graph along with a test edge. + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to create test database: %v", err) + } + graph := db.ChannelGraph() + + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test vertex: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test vertex: %v", err) + } + edge, _, _ := createChannelEdge(db, node1, node2) + + // If the graph is not aware of the edge, then it should not be a + // zombie. + isZombie, _, _ := graph.IsZombieEdge(edge.ChannelID) + if isZombie { + t.Fatal("expected edge to not be marked as zombie") + } + + // If we mark the edge as a zombie, then we should expect to see it + // within the index. + err = graph.MarkEdgeZombie( + edge.ChannelID, node1.PubKeyBytes, node2.PubKeyBytes, + ) + if err != nil { + t.Fatalf("unable to mark edge as zombie: %v", err) + } + isZombie, pubKey1, pubKey2 := graph.IsZombieEdge(edge.ChannelID) + if !isZombie { + t.Fatal("expected edge to be marked as zombie") + } + if pubKey1 != node1.PubKeyBytes { + t.Fatalf("expected pubKey1 %x, got %x", node1.PubKeyBytes, + pubKey1) + } + if pubKey2 != node2.PubKeyBytes { + t.Fatalf("expected pubKey2 %x, got %x", node2.PubKeyBytes, + pubKey2) + } + + // Similarly, if we mark the same edge as live, we should no longer see + // it within the index. + if err := graph.MarkEdgeLive(edge.ChannelID); err != nil { + t.Fatalf("unable to mark edge as live: %v", err) + } + isZombie, _, _ = graph.IsZombieEdge(edge.ChannelID) + if isZombie { + t.Fatal("expected edge to not be marked as zombie") + } +} + // compareNodes is used to compare two LightningNodes while excluding the // Features struct, which cannot be compared as the semantics for reserializing // the featuresMap have not been defined. From e98f4d6d9da4de7326b62a5412a466992a896f2b Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:06:34 -0700 Subject: [PATCH 3/8] channeldb: extend DeleteChannelEdge to mark edge as zombie We mark the edges as zombies when pruning them to ensure we don't attempt to reprocess them later on. This also applies to channels that have been removed from the graph due to being stale. --- channeldb/graph.go | 56 ++++++++++++++++++++++++++++++++--------- channeldb/graph_test.go | 4 +++ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 60c49e4e89..a2b72281e6 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -728,6 +728,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, if nodes == nil { return ErrSourceNodeNotSet } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } // For each of the outpoints that have been spent within the // block, we attempt to delete them from the graph as if that @@ -761,7 +765,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // a channel. If no error is returned, then a channel // was successfully pruned. err = delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, chanPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + chanPoint, false, ) if err != nil && err != ErrEdgeNotFound { return err @@ -971,6 +976,10 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf if err != nil { return err } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } nodes, err := tx.CreateBucketIfNotExists(nodeBucket) if err != nil { return err @@ -988,7 +997,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf return err } err = delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, &edgeInfo.ChannelPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + &edgeInfo.ChannelPoint, false, ) if err != nil && err != ErrEdgeNotFound { return err @@ -1075,8 +1085,9 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { } // DeleteChannelEdge removes an edge from the database as identified by its -// funding outpoint. If the edge does not exist within the database, then -// ErrEdgeNotFound will be returned. +// funding outpoint and also marks it as a zombie. This ensures that we're +// unable to re-add this to our database once again. If the edge does not exist +// within the database, then ErrEdgeNotFound will be returned. func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { // TODO(roasbeef): possibly delete from node bucket if node has no more // channels @@ -1096,19 +1107,22 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { if edgeIndex == nil { return ErrEdgeNotFound } - chanIndex := edges.Bucket(channelPointBucket) if chanIndex == nil { return ErrEdgeNotFound } - nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodeNotFound } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } return delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, chanPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + chanPoint, true, ) }) } @@ -1579,8 +1593,9 @@ func delEdgeUpdateIndexEntry(edgesBucket *bbolt.Bucket, chanID uint64, return nil } -func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, - chanIndex *bbolt.Bucket, nodes *bbolt.Bucket, chanPoint *wire.OutPoint) error { +func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex, + nodes *bbolt.Bucket, chanPoint *wire.OutPoint, isZombie bool) error { + var b bytes.Buffer if err := writeOutpoint(&b, chanPoint); err != nil { return err @@ -1638,12 +1653,29 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, } } - // Finally, with the edge data deleted, we can purge the information - // from the two edge indexes. + // With the edge data deleted, we can purge the information from the two + // edge indexes. if err := edgeIndex.Delete(chanID); err != nil { return err } - return chanIndex.Delete(b.Bytes()) + if err := chanIndex.Delete(b.Bytes()); err != nil { + return err + } + + // Finally, we'll mark the edge as a zombie within our index if it's + // being removed due to the channel becoming a zombie. We do this to + // ensure we don't store unnecessary data for spent channels. + if !isZombie { + return nil + } + + var pubKey1, pubKey2 [33]byte + copy(pubKey1[:], nodeKeys[:33]) + copy(pubKey2[:], nodeKeys[33:]) + + return markEdgeZombie( + zombieIndex, byteOrder.Uint64(chanID), pubKey1, pubKey2, + ) } // UpdateEdgePolicy updates the edge routing policy for a single directed edge diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 0c7edc0e2a..43d1c84c22 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -374,6 +374,10 @@ func TestEdgeInsertionDeletion(t *testing.T) { if _, _, _, err := graph.FetchChannelEdgesByID(chanID); err == nil { t.Fatalf("channel edge not deleted") } + isZombie, _, _ := graph.IsZombieEdge(chanID) + if !isZombie { + t.Fatal("channel edge not marked as zombie") + } // Finally, attempt to delete a (now) non-existent edge within the // database, this should result in an error. From c82d73a8264dee010e5ff01bc0876b7a6146e93c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:06:57 -0700 Subject: [PATCH 4/8] channeldb+routing: extend edge lookup methods with zombie index check In this commit, we extend the graph's FetchChannelEdgesByID and HasChannelEdge methods to also check the zombie index whenever the edge to be looked up doesn't exist within the edge index. We do this to signal to callers that the edge is known, but only as a zombie, and the only information that we have about the edge are the node public keys of the two parties involved in the edge. In the event that an edge does exist within the zombie index, we make an additional check on edge policies to ensure they are not within the router's pruning window, indicating that it is a fresh update. --- channeldb/error.go | 9 ++++- channeldb/graph.go | 74 +++++++++++++++++++++++++++++++++++------ channeldb/graph_test.go | 23 ++++++++++--- routing/router.go | 51 +++++++++++++++++++--------- routing/router_test.go | 52 ++++++++++++++++++++++------- 5 files changed, 166 insertions(+), 43 deletions(-) diff --git a/channeldb/error.go b/channeldb/error.go index c054810633..e0e7545220 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -1,6 +1,9 @@ package channeldb -import "fmt" +import ( + "errors" + "fmt" +) var ( // ErrNoChanDBExists is returned when a channel bucket hasn't been @@ -79,6 +82,10 @@ var ( // can't be found. ErrEdgeNotFound = fmt.Errorf("edge not found") + // ErrZombieEdge is an error returned when we attempt to look up an edge + // but it is marked as a zombie within the zombie index. + ErrZombieEdge = errors.New("edge marked as zombie") + // ErrEdgeAlreadyExist is returned when edge with specific // channel id can't be added because it already exist. ErrEdgeAlreadyExist = fmt.Errorf("edge already exist") diff --git a/channeldb/graph.go b/channeldb/graph.go index a2b72281e6..11a875fc96 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -595,17 +595,20 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error // HasChannelEdge returns true if the database knows of a channel edge with the // passed channel ID, and false otherwise. If an edge with that ID is found // within the graph, then two time stamps representing the last time the edge -// was updated for both directed edges are returned along with the boolean. -func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) { - // TODO(roasbeef): check internal bloom filter first +// was updated for both directed edges are returned along with the boolean. If +// it is not found, then the zombie index is checked and its result is returned +// as the second boolean. +func (c *ChannelGraph) HasChannelEdge(chanID uint64, +) (time.Time, time.Time, bool, bool, error) { var ( node1UpdateTime time.Time node2UpdateTime time.Time exists bool + isZombie bool ) - if err := c.db.View(func(tx *bbolt.Tx) error { + err := c.db.View(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound @@ -617,12 +620,21 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool var channelID [8]byte byteOrder.PutUint64(channelID[:], chanID) + + // If the edge doesn't exist, then we'll also check our zombie + // index. if edgeIndex.Get(channelID[:]) == nil { exists = false + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex != nil { + isZombie, _, _ = isZombieEdge(zombieIndex, chanID) + } + return nil } exists = true + isZombie = false // If the channel has been found in the graph, then retrieve // the edges itself so we can return the last updated @@ -648,11 +660,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool } return nil - }); err != nil { - return time.Time{}, time.Time{}, exists, err - } + }) - return node1UpdateTime, node2UpdateTime, exists, nil + return node1UpdateTime, node2UpdateTime, exists, isZombie, err } // UpdateChannelEdge retrieves and update edge of the graph database. Method @@ -2537,7 +2547,8 @@ func (c *ChannelEdgePolicy) Signature() (*btcec.Signature, error) { // found, then ErrEdgeNotFound is returned. A struct which houses the general // information for the channel itself is returned as well as two structs that // contain the routing policies for the channel in either direction. -func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { +func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint, +) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { var ( edgeInfo *ChannelEdgeInfo @@ -2615,7 +2626,12 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE // ErrEdgeNotFound is returned. A struct which houses the general information // for the channel itself is returned as well as two structs that contain the // routing policies for the channel in either direction. -func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { +// +// ErrZombieEdge an be returned if the edge is currently marked as a zombie +// within the database. In this case, the ChannelEdgePolicy's will be nil, and +// the ChannelEdgeInfo will only include the public keys of each node. +func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64, +) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { var ( edgeInfo *ChannelEdgeInfo @@ -2646,13 +2662,48 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * byteOrder.PutUint64(channelID[:], chanID) + // Now, attempt to fetch edge. edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:]) + + // If it doesn't exist, we'll quickly check our zombie index to + // see if we've previously marked it as so. + if err == ErrEdgeNotFound { + // If the zombie index doesn't exist, or the edge is not + // marked as a zombie within it, then we'll return the + // original ErrEdgeNotFound error. + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return ErrEdgeNotFound + } + + isZombie, pubKey1, pubKey2 := isZombieEdge( + zombieIndex, chanID, + ) + if !isZombie { + return ErrEdgeNotFound + } + + // Otherwise, the edge is marked as a zombie, so we'll + // populate the edge info with the public keys of each + // party as this is the only information we have about + // it and return an error signaling so. + edgeInfo = &ChannelEdgeInfo{ + NodeKey1Bytes: pubKey1, + NodeKey2Bytes: pubKey2, + } + return ErrZombieEdge + } + + // Otherwise, we'll just return the error if any. if err != nil { return err } + edgeInfo = &edge edgeInfo.db = c.db + // Then we'll attempt to fetch the accompanying policies of this + // edge. e1, e2, err := fetchChanEdgePolicies( edgeIndex, edges, nodes, channelID[:], c.db, ) @@ -2664,6 +2715,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * policy2 = e2 return nil }) + if err == ErrZombieEdge { + return edgeInfo, nil, nil, err + } if err != nil { return nil, nil, nil, err } diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 43d1c84c22..f8df644876 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -526,29 +526,38 @@ func TestDisconnectBlockAtHeight(t *testing.T) { } // The two first edges should be removed from the db. - _, _, has, err := graph.HasChannelEdge(edgeInfo.ChannelID) + _, _, has, isZombie, err := graph.HasChannelEdge(edgeInfo.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if has { t.Fatalf("edge1 was not pruned from the graph") } - _, _, has, err = graph.HasChannelEdge(edgeInfo2.ChannelID) + if isZombie { + t.Fatal("reorged edge1 should not be marked as zombie") + } + _, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo2.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if has { t.Fatalf("edge2 was not pruned from the graph") } + if isZombie { + t.Fatal("reorged edge2 should not be marked as zombie") + } // Edge 3 should not be removed. - _, _, has, err = graph.HasChannelEdge(edgeInfo3.ChannelID) + _, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo3.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if !has { t.Fatalf("edge3 was pruned from the graph") } + if isZombie { + t.Fatal("edge3 was marked as zombie") + } // PruneTip should be set to the blockHash we specified for the block // at height 155. @@ -759,12 +768,16 @@ func TestEdgeInfoUpdates(t *testing.T) { // Check for existence of the edge within the database, it should be // found. - _, _, found, err := graph.HasChannelEdge(chanID) + _, _, found, isZombie, err := graph.HasChannelEdge(chanID) if err != nil { t.Fatalf("unable to query for edge: %v", err) - } else if !found { + } + if !found { t.Fatalf("graph should have of inserted edge") } + if isZombie { + t.Fatal("live edge should not be marked as zombie") + } // We should also be able to retrieve the channelID only knowing the // channel point of the channel. diff --git a/routing/router.go b/routing/router.go index 7d836a8908..5ef2cb947c 100644 --- a/routing/router.go +++ b/routing/router.go @@ -76,7 +76,7 @@ type ChannelGraphSource interface { IsPublicNode(node Vertex) (bool, error) // IsKnownEdge returns true if the graph source already knows of the - // passed channel ID. + // passed channel ID either as a live or zombie edge. IsKnownEdge(chanID lnwire.ShortChannelID) bool // IsStaleEdgePolicy returns true if the graph source has a channel @@ -1009,12 +1009,19 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // Prior to processing the announcement we first check if we // already know of this channel, if so, then we can exit early. - _, _, exists, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) + _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge( + msg.ChannelID, + ) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return errors.Errorf("unable to check for edge "+ "existence: %v", err) - } else if exists { - return newErrf(ErrIgnored, "Ignoring msg for known "+ + } + if isZombie { + return newErrf(ErrIgnored, "ignoring msg for zombie "+ + "chan_id=%v", msg.ChannelID) + } + if exists { + return newErrf(ErrIgnored, "ignoring msg for known "+ "chan_id=%v", msg.ChannelID) } @@ -1130,19 +1137,29 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { r.channelEdgeMtx.Lock(msg.ChannelID) defer r.channelEdgeMtx.Unlock(msg.ChannelID) - edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( - msg.ChannelID, - ) + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + r.cfg.Graph.HasChannelEdge(msg.ChannelID) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return errors.Errorf("unable to check for edge "+ "existence: %v", err) } + // If the channel is marked as a zombie in our database, and + // we consider this a stale update, then we should not apply the + // policy. + isStaleUpdate := time.Since(msg.LastUpdate) > r.cfg.ChannelPruneExpiry + if isZombie && isStaleUpdate { + return newErrf(ErrIgnored, "ignoring stale update "+ + "(flags=%v|%v) for zombie chan_id=%v", + msg.MessageFlags, msg.ChannelFlags, + msg.ChannelID) + } + // If the channel doesn't exist in our database, we cannot // apply the updated policy. if !exists { - return newErrf(ErrIgnored, "Ignoring update "+ + return newErrf(ErrIgnored, "ignoring update "+ "(flags=%v|%v) for unknown chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) @@ -2241,12 +2258,12 @@ func (r *ChannelRouter) IsPublicNode(node Vertex) (bool, error) { } // IsKnownEdge returns true if the graph source already knows of the passed -// channel ID. +// channel ID either as a live or zombie edge. // // NOTE: This method is part of the ChannelGraphSource interface. func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { - _, _, exists, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) - return exists + _, _, exists, isZombie, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) + return exists || isZombie } // IsStaleEdgePolicy returns true if the graph soruce has a channel edge for @@ -2256,14 +2273,19 @@ func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { - edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( - chanID.ToUint64(), - ) + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) if err != nil { return false } + // If we know of the edge as a zombie, then we'll check the timestamp of + // this message to determine whether it's fresh. + if isZombie { + return time.Since(timestamp) > r.cfg.ChannelPruneExpiry + } + // If we don't know of the edge, then it means it's fresh (thus not // stale). if !exists { @@ -2275,7 +2297,6 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, // already have the most up to date information for that edge. If so, // then we can exit early. switch { - // A flag set of 0 indicates this is an announcement for the "first" // node in the channel. case flags&lnwire.ChanUpdateDirection == 0: diff --git a/routing/router_test.go b/routing/router_test.go index a8249183be..df9ad9e6a8 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1549,21 +1549,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) { } // Check that the fundingTxs are in the graph db. - _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // Stop the router, so we can reorg the chain while its offline. if err := ctx.router.Stop(); err != nil { @@ -1607,22 +1613,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) { // The channel with chanID2 should not be in the database anymore, // since it is not confirmed on the longest chain. chanID1 should // still be. - _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("did not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if has { t.Fatalf("found edge in graph") } - + if isZombie { + t.Fatal("reorged edge should not be marked as zombie") + } } // TestDisconnectedBlocks checks that the router handles a reorg happening when @@ -1755,21 +1766,27 @@ func TestDisconnectedBlocks(t *testing.T) { } // Check that the fundingTxs are in the graph db. - _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // Create a 15 block fork. We first let the chainView notify the router // about stale blocks, before sending the now connected blocks. We do @@ -1796,22 +1813,27 @@ func TestDisconnectedBlocks(t *testing.T) { // chanID2 should not be in the database anymore, since it is not // confirmed on the longest chain. chanID1 should still be. - _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("did not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if has { t.Fatalf("found edge in graph") } - + if isZombie { + t.Fatal("reorged edge should not be marked as zombie") + } } // TestChansClosedOfflinePruneGraph tests that if channels we know of are @@ -1876,13 +1898,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) { } // The router should now be aware of the channel we created above. - _, _, hasChan, err := ctx.graph.HasChannelEdge(chanID1.ToUint64()) + _, _, hasChan, isZombie, err := ctx.graph.HasChannelEdge(chanID1.ToUint64()) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !hasChan { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // With the transaction included, and the router's database state // updated, we'll now mine 5 additional blocks on top of it. @@ -1957,13 +1982,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) { // At this point, the channel that was pruned should no longer be known // by the router. - _, _, hasChan, err = ctx.graph.HasChannelEdge(chanID1.ToUint64()) + _, _, hasChan, isZombie, err = ctx.graph.HasChannelEdge(chanID1.ToUint64()) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if hasChan { t.Fatalf("channel was found in graph but shouldn't have been") } + if isZombie { + t.Fatal("closed channel should not be marked as zombie") + } } // TestFindPathFeeWeighting tests that the findPath method will properly prefer From 174645fcbadf4032e0e34fee944f5fbb88c2e596 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:07:13 -0700 Subject: [PATCH 5/8] routing+server: expose DefaultChannelPruneExpiry --- routing/router.go | 4 ++++ server.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/routing/router.go b/routing/router.go index 5ef2cb947c..7f39063ab1 100644 --- a/routing/router.go +++ b/routing/router.go @@ -33,6 +33,10 @@ const ( // if we should give up on a payment attempt. This will be used if a // value isn't specified in the LightningNode struct. defaultPayAttemptTimeout = time.Duration(time.Second * 60) + + // DefaultChannelPruneExpiry is the default duration used to determine + // if a channel should be pruned or not. + DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14) ) var ( diff --git a/server.go b/server.go index 0ac457332c..6ffbe0711e 100644 --- a/server.go +++ b/server.go @@ -583,7 +583,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, firstHop, htlcAdd, errorDecryptor, ) }, - ChannelPruneExpiry: time.Duration(time.Hour * 24 * 14), + ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { // If we aren't on either side of this edge, then we'll From 23796d32471a3bf5dfb851e090614b5a17041d67 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:07:30 -0700 Subject: [PATCH 6/8] routing+discovery: extend ChannelGraphSource with zombie index methods --- discovery/gossiper.go | 1 - discovery/gossiper_test.go | 67 +++++++++++++++++++++++++++++++------- routing/router.go | 11 +++++++ 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e35a192992..d019c2804e 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1793,7 +1793,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { - nMsg.err <- nil return nil } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 560fdbb1ab..2f7ca77ca5 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -110,10 +110,11 @@ func (n *mockSigner) SignMessage(pubKey *btcec.PublicKey, type mockGraphSource struct { bestHeight uint32 - mu sync.Mutex - nodes []channeldb.LightningNode - infos map[uint64]channeldb.ChannelEdgeInfo - edges map[uint64][]channeldb.ChannelEdgePolicy + mu sync.Mutex + nodes []channeldb.LightningNode + infos map[uint64]channeldb.ChannelEdgeInfo + edges map[uint64][]channeldb.ChannelEdgePolicy + zombies map[uint64][][33]byte } func newMockRouter(height uint32) *mockGraphSource { @@ -121,6 +122,7 @@ func newMockRouter(height uint32) *mockGraphSource { bestHeight: height, infos: make(map[uint64]channeldb.ChannelEdgeInfo), edges: make(map[uint64][]channeldb.ChannelEdgePolicy), + zombies: make(map[uint64][][33]byte), } } @@ -205,9 +207,18 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( r.mu.Lock() defer r.mu.Unlock() - chanInfo, ok := r.infos[chanID.ToUint64()] + chanIDInt := chanID.ToUint64() + chanInfo, ok := r.infos[chanIDInt] if !ok { - return nil, nil, nil, channeldb.ErrEdgeNotFound + pubKeys, isZombie := r.zombies[chanIDInt] + if !isZombie { + return nil, nil, nil, channeldb.ErrEdgeNotFound + } + + return &channeldb.ChannelEdgeInfo{ + NodeKey1Bytes: pubKeys[0], + NodeKey2Bytes: pubKeys[1], + }, nil, nil, channeldb.ErrZombieEdge } edges := r.edges[chanID.ToUint64()] @@ -280,13 +291,15 @@ func (r *mockGraphSource) IsPublicNode(node routing.Vertex) (bool, error) { } // IsKnownEdge returns true if the graph source already knows of the passed -// channel ID. +// channel ID either as a live or zombie channel. func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { r.mu.Lock() defer r.mu.Unlock() - _, ok := r.infos[chanID.ToUint64()] - return ok + chanIDInt := chanID.ToUint64() + _, exists := r.infos[chanIDInt] + _, isZombie := r.zombies[chanIDInt] + return exists || isZombie } // IsStaleEdgePolicy returns true if the graph source has a channel edge for @@ -297,13 +310,23 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, r.mu.Lock() defer r.mu.Unlock() - edges, ok := r.edges[chanID.ToUint64()] + chanIDInt := chanID.ToUint64() + edges, ok := r.edges[chanIDInt] if !ok { - return false + // Since the edge doesn't exist, we'll check our zombie index as + // well. + _, isZombie := r.zombies[chanIDInt] + if !isZombie { + return false + } + + // Since it exists within our zombie index, we'll check that it + // respects the router's live edge horizon to determine whether + // it is stale or not. + return time.Since(timestamp) > routing.DefaultChannelPruneExpiry } switch { - case len(edges) >= 1 && edges[0].ChannelFlags == flags: return !edges[0].LastUpdate.Before(timestamp) @@ -315,6 +338,26 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, } } +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *mockGraphSource) MarkEdgeLive(chanID lnwire.ShortChannelID) error { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.zombies, chanID.ToUint64()) + return nil +} + +// MarkEdgeZombie marks an edge as a zombie within our zombie index. +func (r *mockGraphSource) MarkEdgeZombie(chanID lnwire.ShortChannelID, pubKey1, + pubKey2 [33]byte) error { + + r.mu.Lock() + defer r.mu.Unlock() + r.zombies[chanID.ToUint64()] = [][33]byte{pubKey1, pubKey2} + return nil +} + type mockNotifier struct { clientCounter uint32 epochClients map[uint32]chan *chainntnfs.BlockEpoch diff --git a/routing/router.go b/routing/router.go index 7f39063ab1..1025a4023e 100644 --- a/routing/router.go +++ b/routing/router.go @@ -89,6 +89,10 @@ type ChannelGraphSource interface { IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool + // MarkEdgeLive clears an edge from our zombie index, deeming it as + // live. + MarkEdgeLive(chanID lnwire.ShortChannelID) error + // ForAllOutgoingChannels is used to iterate over all channels // emanating from the "source" node which is the center of the // star-graph. @@ -2314,3 +2318,10 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, return false } + +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error { + return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64()) +} From 44a01db0eff98636770c40f69cca08f2d879bc85 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:07:47 -0700 Subject: [PATCH 7/8] routing: expose VerifyChannelUpdateSignature function --- routing/ann_validation.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/routing/ann_validation.go b/routing/ann_validation.go index 4b304b5a9c..184d7009c7 100644 --- a/routing/ann_validation.go +++ b/routing/ann_validation.go @@ -2,6 +2,7 @@ package routing import ( "bytes" + "fmt" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -132,20 +133,28 @@ func ValidateChannelUpdateAnn(pubKey *btcec.PublicKey, capacity btcutil.Amount, return err } - data, err := a.DataToSign() + return VerifyChannelUpdateSignature(a, pubKey) +} + +// VerifyChannelUpdateSignature verifies that the channel update message was +// signed by the party with the given node public key. +func VerifyChannelUpdateSignature(msg *lnwire.ChannelUpdate, + pubKey *btcec.PublicKey) error { + + data, err := msg.DataToSign() if err != nil { - return errors.Errorf("unable to reconstruct message: %v", err) + return fmt.Errorf("unable to reconstruct message data: %v", err) } dataHash := chainhash.DoubleHashB(data) - nodeSig, err := a.Signature.ToSignature() + nodeSig, err := msg.Signature.ToSignature() if err != nil { return err } if !nodeSig.Verify(dataHash, pubKey) { - return errors.Errorf("invalid signature for channel "+ - "update %v", spew.Sdump(a)) + return fmt.Errorf("invalid signature for channel update %v", + spew.Sdump(msg)) } return nil From 5cec4513de45f8b61c08ca134348353bc2b09fb5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 27 Mar 2019 13:08:03 -0700 Subject: [PATCH 8/8] discovery: reject announcements for known zombie edges In this commit, we leverage the recently introduced zombie edge index to quickly reject announcements for edges we've previously deemed as zombies. Care has been taken to ensure we don't reject fresh updates for edges we've considered zombies. --- discovery/gossiper.go | 138 +++++++++++++------- discovery/gossiper_test.go | 253 +++++++++++++++++++++++++++++++++++++ 2 files changed, 344 insertions(+), 47 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index d019c2804e..171b04aba6 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1571,8 +1571,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // At this point, we'll now ask the router if this is a stale - // update. If so we can skip all the processing below. + // At this point, we'll now ask the router if this is a + // zombie/known edge. If so we can skip all the processing + // below. if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { nMsg.err <- nil return nil @@ -1787,8 +1788,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } // Before we perform any of the expensive checks below, we'll - // make sure that the router doesn't already have a fresher - // announcement for this edge. + // check whether this update is stale or is for a zombie + // channel in order to quickly reject it. timestamp := time.Unix(int64(msg.Timestamp), 0) if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, @@ -1808,56 +1809,99 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - if err != nil { - switch err { - case channeldb.ErrGraphNotFound: - fallthrough - case channeldb.ErrGraphNoEdgesFound: - fallthrough - case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this - // ChannelUpdate was not found in the graph, - // this might be a channel in the process of - // being opened, and we haven't processed our - // own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets - // resolved after the channel proofs are - // exchanged and the channel is broadcasted to - // the rest of the network, but in case this - // is a private channel this won't ever happen. - // Because of this, we temporarily add it to a - // map, and reprocess it after our own - // ChannelAnnouncement has been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], - nMsg, - ) - d.pChanUpdMtx.Unlock() - - log.Debugf("Got ChannelUpdate for edge not "+ - "found in graph(shortChanID=%v), "+ - "saving for reprocessing later", - shortChanID) + switch err { + // No error, break. + case nil: + break + + case channeldb.ErrZombieEdge: + // Since we've deemed the update as not stale above, + // before marking it live, we'll make sure it has been + // signed by the correct party. The least-significant + // bit in the flag on the channel update tells us which + // edge is being updated. + var pubKey *btcec.PublicKey + switch { + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: + pubKey, _ = chanInfo.NodeKey1() + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: + pubKey, _ = chanInfo.NodeKey2() + } - // NOTE: We don't return anything on the error - // channel for this message, as we expect that - // will be done when this ChannelUpdate is - // later reprocessed. + err := routing.VerifyChannelUpdateSignature(msg, pubKey) + if err != nil { + err := fmt.Errorf("unable to verify channel "+ + "update signature: %v", err) + log.Error(err) + nMsg.err <- err return nil + } - default: - err := fmt.Errorf("unable to validate "+ - "channel update short_chan_id=%v: %v", - shortChanID, err) + // With the signature valid, we'll proceed to mark the + // edge as live and wait for the channel announcement to + // come through again. + err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) + if err != nil { + err := fmt.Errorf("unable to remove edge with "+ + "chan_id=%v from zombie index: %v", + msg.ShortChannelID, err) log.Error(err) nMsg.err <- err - - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() return nil } + + log.Debugf("Removed edge with chan_id=%v from zombie "+ + "index", msg.ShortChannelID) + + // We'll fallthrough to ensure we stash the update until + // we receive its corresponding ChannelAnnouncement. + // This is needed to ensure the edge exists in the graph + // before applying the update. + fallthrough + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this ChannelUpdate was + // not found in the graph, this might be a channel in + // the process of being opened, and we haven't processed + // our own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets resolved after + // the channel proofs are exchanged and the channel is + // broadcasted to the rest of the network, but in case + // this is a private channel this won't ever happen. + // This can also happen in the case of a zombie channel + // with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because + // of this, we temporarily add it to a map, and + // reprocess it after our own ChannelAnnouncement has + // been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], nMsg, + ) + d.pChanUpdMtx.Unlock() + + log.Debugf("Got ChannelUpdate for edge not found in "+ + "graph(shortChanID=%v), saving for "+ + "reprocessing later", shortChanID) + + // NOTE: We don't return anything on the error channel + // for this message, as we expect that will be done when + // this ChannelUpdate is later reprocessed. + return nil + + default: + err := fmt.Errorf("unable to validate channel update "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() + return nil } // The least-significant bit in the flag on the channel update diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 2f7ca77ca5..8b8a3917ec 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2201,6 +2201,259 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } +// TestRejectZombieEdge ensures that we properly reject any announcements for +// zombie edges. +func TestRejectZombieEdge(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()} + + // processAnnouncements is a helper closure we'll use to test that we + // properly process/reject announcements based on whether they're for a + // zombie edge or not. + processAnnouncements := func(isZombie bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteChanAnn, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject live channel "+ + "announcement with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "announcement: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel announcement") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel announcement") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "announcement") + } + } + + errChan = ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject zombie channel "+ + "update with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "update: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel update") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel update") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "update") + } + } + } + + // We'll mark the edge for which we'll process announcements for as a + // zombie within the router. This should reject any announcements for + // this edge while it remains as a zombie. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(true) + + // If we then mark the edge as live, the edge's zombie status should be + // overridden and the announcements should be processed. + if err := ctx.router.MarkEdgeLive(chanID); err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(false) +} + +// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge +// becomes live by receiving a fresh update. +func TestProcessZombieEdgeNowLive(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + + localPrivKey := nodeKeyPriv1 + remotePrivKey := nodeKeyPriv2 + + remotePeer := &mockPeer{pk: remotePrivKey.PubKey()} + + // processAnnouncement is a helper closure we'll use to ensure an + // announcement is properly processed/rejected based on whether the edge + // is a zombie or not. The expectsErr boolean can be used to determine + // whether we should expect an error when processing the message, while + // the isZombie boolean can be used to determine whether the + // announcement should be or not be broadcast. + processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + ann, remotePeer, + ) + + var err error + select { + case err = <-errChan: + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + if expectsErr && err == nil { + t.Fatal("expected error when processing announcement") + } + if !expectsErr && err != nil { + t.Fatalf("received unexpected error when processing "+ + "announcement: %v", err) + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel message") + } + assertMessage(t, ann, msgWithSenders.msg) + + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "message") + } + } + } + + // We'll generate a channel update with a timestamp far enough in the + // past to consider it a zombie. + zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry) + batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix()) + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We'll also add the edge to our zombie index. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + // Attempting to process the current channel update should fail due to + // its edge being considered a zombie and its timestamp not being within + // the live horizon. We should not expect an error here since it is just + // a stale update. + processAnnouncement(batch.chanUpdAnn2, true, false) + + // Now we'll generate a new update with a fresh timestamp. This should + // allow the channel update to be processed even though it is still + // marked as a zombie within the index, since it is a fresh new update. + // This won't work however since we'll sign it with the wrong private + // key (local rather than remote). + batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix()) + if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We should expect an error due to the signature being invalid. + processAnnouncement(batch.chanUpdAnn2, true, true) + + // Signing it with the correct private key should allow it to be + // processed. + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // The channel update cannot be successfully processed and broadcast + // until the channel announcement is. Since the channel update indicates + // a fresh new update, the gossiper should stash it until it sees the + // corresponding channel announcement. + updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + + select { + case <-ctx.broadcastedMessage: + t.Fatal("expected to not broadcast live channel update " + + "without announcement") + case <-time.After(2 * trickleDelay): + } + + // We'll go ahead and process the channel announcement to ensure the + // channel update is processed thereafter. + processAnnouncement(batch.remoteChanAnn, false, false) + + // After successfully processing the announcement, the channel update + // should have been processed and broadcast successfully as well. + select { + case err := <-updateErrChan: + if err != nil { + t.Fatalf("expected to process live channel update: %v", + err) + } + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) + case <-time.After(2 * trickleDelay): + t.Fatal("expected to broadcast live channel update") + } +} + // TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate // from the remote before we have processed our own ChannelAnnouncement, it will // be reprocessed later, after our ChannelAnnouncement.