Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channeldb: node channels cache [poc] #5631

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions channeldb/channel_cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package channeldb

import "github.com/lightningnetwork/lnd/routing/route"

// channelCache is an in-memory cache used to improve the performance of
// ChanUpdatesInHorizon. It caches the chan info and edge policies for a
// particular channel.
type channelCache struct {
n int
channels map[uint64]ChannelEdge

nodeChannels map[route.Vertex][]ChannelEdge
}

// newChannelCache creates a new channelCache with maximum capacity of n
Expand All @@ -14,6 +18,8 @@ func newChannelCache(n int) *channelCache {
return &channelCache{
n: n,
channels: make(map[uint64]ChannelEdge),

nodeChannels: make(map[route.Vertex][]ChannelEdge),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the chan edge is a pointer, then it can just point into the entry in the channels map so things don't need to be allocated+stored twice for each channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I that thought also crossed my mind. It does make things more complicated, so left it out for this PoC. For a first version in production, redundant storage could be an acceptable option. The cache size is limited anyway.

}
}

Expand All @@ -23,28 +29,55 @@ func (c *channelCache) get(chanid uint64) (ChannelEdge, bool) {
return channel, ok
}

func (c *channelCache) getNodeChannels(node route.Vertex) ([]ChannelEdge, bool) {
channels, ok := c.nodeChannels[node]
return channels, ok
}

func (c *channelCache) insertNodeChannels(node route.Vertex, channels []ChannelEdge) {
// TODO: Manage cache capacity.

c.nodeChannels[node] = channels
}

// insert adds the entry to the channel cache. If an entry for chanid already
// exists, it will be replaced with the new entry. If the entry doesn't exist,
// it will be inserted to the cache, performing a random eviction if the cache
// is at capacity.
func (c *channelCache) insert(chanid uint64, channel ChannelEdge) {
// If entry exists, replace it.
if _, ok := c.channels[chanid]; ok {
c.channels[chanid] = channel
return
}
if _, ok := c.channels[chanid]; !ok {
if len(c.channels) == c.n {
for id := range c.channels {
channel := c.channels[id]

// Otherwise, evict an entry at random and insert.
if len(c.channels) == c.n {
for id := range c.channels {
delete(c.channels, id)
break
delete(c.channels, id)

// Invalidate node channels cache.
delete(c.nodeChannels, channel.Info.NodeKey1Bytes)
delete(c.nodeChannels, channel.Info.NodeKey2Bytes)

break
}
}
}

c.channels[chanid] = channel

// Invalidate node channels cache.
delete(c.nodeChannels, channel.Info.NodeKey1Bytes)
delete(c.nodeChannels, channel.Info.NodeKey2Bytes)
}

// remove deletes an edge for chanid from the cache, if it exists.
func (c *channelCache) remove(chanid uint64) {
channel, ok := c.channels[chanid]
if !ok {
return
}

delete(c.channels, chanid)

// Invalidate node channels cache.
delete(c.nodeChannels, channel.Info.NodeKey1Bytes)
delete(c.nodeChannels, channel.Info.NodeKey2Bytes)
}
47 changes: 45 additions & 2 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,48 @@ func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub []byte,
cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy,
*ChannelEdgePolicy) error) error {

db := c.db
var channels []ChannelEdge

return nodeTraversal(tx, nodePub, db, cb)
node, err := route.NewVertexFromBytes(nodePub)
if err != nil {
return err
}

// Retrieve channels from cache if possible.
var ok bool
channels, ok = c.chanCache.getNodeChannels(node)
ok = false
if !ok {
// Cache miss, retrieve from database.
db := c.db

add := func(_ kvdb.RTx, info *ChannelEdgeInfo,
policy1 *ChannelEdgePolicy, policy2 *ChannelEdgePolicy) error {

channels = append(channels, ChannelEdge{
Info: info,
Policy1: policy1,
Policy2: policy2,
})

return nil
}
if err := nodeTraversal(tx, nodePub, db, add); err != nil {
return err
}

// Store in cache.
c.chanCache.insertNodeChannels(node, channels)
}

// Execute callback.
for _, channel := range channels {
err := cb(tx, channel.Info, channel.Policy1, channel.Policy2)
if err != nil {
return err
}
}
return nil
}

// DisabledChannelIDs returns the channel ids of disabled channels.
Expand Down Expand Up @@ -2132,6 +2171,10 @@ func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) {
}
c.chanCache.insert(e.ChannelID, channel)
}

// Short-cut: invalidate all cached node channels. This needs to be
// replaced by more selective invalidation or an update.
c.chanCache.nodeChannels = make(map[route.Vertex][]ChannelEdge)
}

// updateEdgePolicy attempts to update an edge's policy within the relevant
Expand Down