Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel range queries: send back node announcements #1108

Merged
merged 5 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ eclair {
channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration
broadcast-interval = 60 seconds // see BOLT #7
init-timeout = 5 minutes
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know

// the values below will be used to perform route searching
path-finding {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ object NodeParams {
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
randomizeRouteSelection = config.getBoolean("router.randomize-route-selection"),
requestNodeAnnouncements = config.getBoolean("router.request-node-announcements"),
searchMaxRouteLength = config.getInt("router.path-finding.max-route-length"),
searchMaxCltv = config.getInt("router.path-finding.max-cltv"),
searchMaxFeeBase = Satoshi(config.getLong("router.path-finding.fee-threshold-sat")),
Expand Down
194 changes: 145 additions & 49 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.util.{Random, Try}
case class RouterConf(randomizeRouteSelection: Boolean,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
searchMaxFeeBase: Satoshi,
searchMaxFeePct: Double,
searchMaxRouteLength: Int,
Expand Down Expand Up @@ -535,7 +536,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption)
val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption, nodeParams.routerConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
Expand All @@ -549,7 +550,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[

val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) {
case ((c, u), ShortChannelIdAndFlag(_, flag)) =>
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) 1 else 0)
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
Expand All @@ -573,26 +574,29 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[

case Event(PeerRoutingMessage(transport, _, routingMessage@QueryShortChannelIds(chainHash, shortChannelIds, queryFlags_opt)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
val (channelCount, updatesCount) = shortChannelIds.array
.zipWithIndex
.foldLeft((0, 0)) {
case ((c, u), (shortChannelId, idx)) =>
var c1 = c
var u1 = u
val flag = routingMessage.queryFlags_opt.map(_.array(idx)).getOrElse(QueryShortChannelIdsTlv.QueryFlagType.INCLUDE_ALL)
d.channels.get(shortChannelId) match {
case None => log.warning("received query for shortChannelId={} that we don't have", shortChannelId)
case Some(ca) =>
if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) {
transport ! ca
c1 = c1 + 1
}
if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => transport ! u; u1 = u1 + 1 }
if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => transport ! u; u1 = u1 + 1 }
}
(c1, u1)
val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long])

var channelCount = 0
var updateCount = 0
var nodeCount = 0

Router.handleQuery(d.nodes, d.channels, d.updates)(
shortChannelIds.array,
flags,
ca => {
channelCount = channelCount + 1
transport ! ca
},
cu => {
updateCount = updateCount + 1
transport ! cu
},
na => {
nodeCount = nodeCount + 1
transport ! na
}
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates", shortChannelIds.array.size, channelCount, updatesCount)
)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount)
transport ! ReplyShortChannelIdsEnd(chainHash, 1)
stay

Expand Down Expand Up @@ -853,43 +857,135 @@ object Router {
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks)
}

def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(
shortChannelId: ShortChannelId,
timestamps_opt: Option[ReplyChannelRangeTlv.Timestamps],
checksums_opt: Option[ReplyChannelRangeTlv.Checksums]): Long = {
import QueryShortChannelIdsTlv.QueryFlagType
var flag = 0L
(timestamps_opt, checksums_opt) match {
case (Some(theirTimestamps), Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
def shouldRequestUpdate(ourTimestamp: Long, ourChecksum: Long, theirTimestamp_opt: Option[Long], theirChecksum_opt: Option[Long]): Boolean = {
(theirTimestamp_opt, theirChecksum_opt) match {
case (Some(theirTimestamp), Some(theirChecksum)) =>
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is different from ours, or it is the same but ours is about to be stale
// - it is not stale itself
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && (ourChecksums.checksum1 != theirChecksums.checksum1 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && (ourChecksums.checksum2 != theirChecksums.checksum2 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (Some(theirTimestamps), None) if channels.contains(shortChannelId) =>
val (ourTimestamps, _) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is not stale itself
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (_, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// - it is not stale
val theirsIsMoreRecent = ourTimestamp < theirTimestamp
val theirsIsDifferent = ourChecksum != theirChecksum
val oursIsAlmostStale = isAlmostStale(ourTimestamp)
val theirsIsStale = isStale(theirTimestamp)
theirsIsMoreRecent && (theirsIsDifferent || oursIsAlmostStale) && !theirsIsStale
case (Some(theirTimestamp), None) =>
val theirsIsMoreRecent = ourTimestamp < theirTimestamp
val theirsIsStale = isStale(theirTimestamp)
theirsIsMoreRecent && !theirsIsStale
case (None, Some(theirChecksum)) =>
// this should not happen as we will not ask for checksums without asking for timestamps too
if (ourChecksums.checksum1 != theirChecksums.checksum1 && theirChecksums.checksum1 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourChecksums.checksum2 != theirChecksums.checksum2 && theirChecksums.checksum2 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, None) if channels.contains(shortChannelId) =>
// we know this channel: we only request their channel updates
flag = QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
val theirsIsDifferent = theirChecksum != 0 && ourChecksum != theirChecksum
theirsIsDifferent
case _ =>
// we don't know this channel: we request everything
flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
// they did not include timestamp or checksum => ask for the update
true
}
}

def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(
shortChannelId: ShortChannelId,
theirTimestamps_opt: Option[ReplyChannelRangeTlv.Timestamps],
theirChecksums_opt: Option[ReplyChannelRangeTlv.Checksums],
includeNodeAnnouncements: Boolean): Long = {
import QueryShortChannelIdsTlv.QueryFlagType._

val flag = channels.contains(shortChannelId) match {
case false if includeNodeAnnouncements =>
INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2
case false =>
INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2
case true =>
// we already know this channel
val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// if they don't provide timestamps or checksums, we set appropriate default values:
// - we assume their timestamp is more recent than ours by setting timestamp = Long.MaxValue
// - we assume their update is different from ours by setting checkum = Long.MaxValue (NB: our default value for checksum is 0)
val shouldRequestUpdate1 = shouldRequestUpdate(ourTimestamps.timestamp1, ourChecksums.checksum1, theirTimestamps_opt.map(_.timestamp1), theirChecksums_opt.map(_.checksum1))
val shouldRequestUpdate2 = shouldRequestUpdate(ourTimestamps.timestamp2, ourChecksums.checksum2, theirTimestamps_opt.map(_.timestamp2), theirChecksums_opt.map(_.checksum2))
val flagUpdate1 = if (shouldRequestUpdate1) INCLUDE_CHANNEL_UPDATE_1 else 0
val flagUpdate2 = if (shouldRequestUpdate2) INCLUDE_CHANNEL_UPDATE_2 else 0
flagUpdate1 | flagUpdate2
}
flag
}

/**
* Handle a query message, which includes a list of channel ids and flags.
*
* @param nodes node id -> node announcement
* @param channels channel id -> channel announcement
* @param updates channel description -> channel update
* @param ids list of channel ids
* @param flags list of query flags, either empty one flag per channel id
* @param onChannel called when a channel announcement matches (i.e. its bit is set in the query flag and we have it)
* @param onUpdate called when a channel update matches
* @param onNode called when a node announcement matches
*
*/
def handleQuery(nodes: Map[PublicKey, NodeAnnouncement],
channels: SortedMap[ShortChannelId, ChannelAnnouncement],
updates: Map[ChannelDesc, ChannelUpdate])(
ids: List[ShortChannelId],
flags: List[Long],
onChannel: ChannelAnnouncement => Unit,
onUpdate: ChannelUpdate => Unit,
onNode: NodeAnnouncement => Unit): Unit = {
import QueryShortChannelIdsTlv.QueryFlagType

// we loop over channel ids and query flag. We track node Ids for node announcement
// we've already sent to avoid sending them multiple times, as requested by the BOLTs
@tailrec
def loop(ids: List[ShortChannelId], flags: List[Long], numca: Int = 0, numcu: Int = 0, nodesSent: Set[PublicKey] = Set.empty[PublicKey]): (Int, Int, Int) = ids match {
case Nil => (numca, numcu, nodesSent.size)
case head :: tail if !channels.contains(head) =>
//log.warning("received query for shortChannelId={} that we don't have", head)
loop(tail, flags.drop(1), numca, numcu, nodesSent)
case head :: tail =>
var numca1 = numca
var numcu1 = numcu
var sent1 = nodesSent
val ca = channels(head)
val flag_opt = flags.headOption
// no flag means send everything

val includeChannel = flag_opt.forall(QueryFlagType.includeChannelAnnouncement)
val includeUpdate1 = flag_opt.forall(QueryFlagType.includeUpdate1)
val includeUpdate2 = flag_opt.forall(QueryFlagType.includeUpdate2)
val includeNode1 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement1)
val includeNode2 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement2)

if (includeChannel) {
onChannel(ca)
}
if (includeUpdate1) {
updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u =>
onUpdate(u)
}
}
if (includeUpdate2) {
updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u =>
onUpdate(u)
}
}
if (includeNode1 && !sent1.contains(ca.nodeId1)) {
nodes.get(ca.nodeId1).foreach { n =>
onNode(n)
sent1 = sent1 + ca.nodeId1
}
}
if (includeNode2 && !sent1.contains(ca.nodeId2)) {
nodes.get(ca.nodeId2).foreach { n =>
onNode(n)
sent1 = sent1 + ca.nodeId2
}
}
loop(tail, flags.drop(1), numca1, numcu1, sent1)
}

loop(ids, flags)
}

/**
* Returns overall progress on synchronization
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fr.acinq.eclair.wire

import fr.acinq.eclair.UInt64
import fr.acinq.eclair.wire.CommonCodecs.{shortchannelid, varint, varintoverflow}
import fr.acinq.eclair.wire.CommonCodecs.{varint, varintoverflow}
import scodec.Codec
import scodec.codecs.{byte, discriminated, list, provide, variableSizeBytesLong, zlib}

Expand All @@ -20,13 +20,18 @@ object QueryShortChannelIdsTlv {
val INCLUDE_CHANNEL_ANNOUNCEMENT: Long = 1
val INCLUDE_CHANNEL_UPDATE_1: Long = 2
val INCLUDE_CHANNEL_UPDATE_2: Long = 4
val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2)
val INCLUDE_NODE_ANNOUNCEMENT_1: Long = 8
val INCLUDE_NODE_ANNOUNCEMENT_2: Long = 16

def includeAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0
def includeChannelAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0

def includeUpdate1(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_1) != 0

def includeUpdate2(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_2) != 0

def includeNodeAnnouncement1(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_1) != 0

def includeNodeAnnouncement2(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_2) != 0
}

val encodedQueryFlagsCodec: Codec[EncodedQueryFlags] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object TestConstants {
randomizeRouteSelection = false,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
searchMaxFeeBase = Satoshi(21),
searchMaxFeePct = 0.03,
searchMaxCltv = 2016,
Expand Down Expand Up @@ -176,6 +177,7 @@ object TestConstants {
randomizeRouteSelection = false,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
searchMaxFeeBase = Satoshi(21),
searchMaxFeePct = 0.03,
searchMaxCltv = 2016,
Expand Down
Loading