Skip to content

Commit

Permalink
connectd: tie gossip query responses into ratelimiting code.
Browse files Browse the repository at this point in the history
A bit tricky, since we get more than one message at a time.  However,
this just means we go over quota for a bit, and will get caught when
those are sent (we do this for a single message already, so it's not
that much worse).

Note: this not only limits sending, but it limits the actuall query
processing, which is nice.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell committed Jul 3, 2024
1 parent aff77fa commit 572ed95
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 47 deletions.
53 changes: 32 additions & 21 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,24 +420,14 @@ static void wake_gossip(struct peer *peer)
peer->gs.gossip_timer = gossip_stream_timer(peer);
}

/* If we are streaming gossip, get something from gossip store */
static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
/* Gossip response or something from gossip store */
static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer)
{
const u8 *msg;
struct timemono now;
struct gossmap *gossmap;
u32 timestamp;

/* dev-mode can suppress all gossip */
if (peer->daemon->dev_suppress_gossip)
return NULL;

/* Not streaming right now? */
if (!peer->gs.active)
return NULL;

/* This should be around to kick us every 60 seconds */
assert(peer->gs.gossip_timer);
const u8 **msgs;

/* If it's been over a second, make a fresh start. */
now = time_mono();
Expand All @@ -462,6 +452,31 @@ static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)

gossmap = get_gossmap(peer->daemon);

/* This can return more than one. */
msgs = maybe_create_query_responses(tmpctx, peer, gossmap);
if (tal_count(msgs) > 0) {
/* We return the first one for immediate sending, and queue
* others for future. We add all the lengths now though! */
for (size_t i = 0; i < tal_count(msgs); i++) {
peer->gs.bytes_this_second += tal_bytelen(msgs[i]);
status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]);
if (i > 0)
msg_enqueue(peer->peer_outq, take(msgs[i]));
}
return msgs[0];
}

/* dev-mode can suppress all gossip */
if (peer->daemon->dev_suppress_gossip)
return NULL;

/* Not streaming right now? */
if (!peer->gs.active)
return NULL;

/* This should be around to kick us every 60 seconds */
assert(peer->gs.gossip_timer);

again:
msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, &timestamp);
if (msg) {
Expand All @@ -482,6 +497,7 @@ static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
return msg;
}

/* No gossip left to send */
peer->gs.active = false;
return NULL;
}
Expand Down Expand Up @@ -945,14 +961,9 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
return io_sock_shutdown(peer_conn);

/* If they want us to send gossip, do so now. */
if (!peer->draining) {
/* FIXME: make it return the message? */
if (maybe_send_query_responses(peer, get_gossmap(peer->daemon))) {
msg = msg_dequeue(peer->peer_outq);
} else {
msg = maybe_from_gossip_store(NULL, peer);
}
}
if (!peer->draining)
msg = maybe_gossip_msg(NULL, peer);

if (!msg) {
/* Tell them to read again, */
io_wake(&peer->subds);
Expand Down
38 changes: 18 additions & 20 deletions connectd/queries.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,22 @@ static void uniquify_node_ids(struct node_id **ids)
}

/* We are fairly careful to avoid the peer DoSing us with channel queries:
* this routine sends information about a single short_channel_id, unless
* this routine creates messages about a single short_channel_id, unless
* it's finished all of them. */
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
const u8 **maybe_create_query_responses(const tal_t *ctx,
struct peer *peer,
struct gossmap *gossmap)
{
size_t i, num;
bool sent = false;
const u8 *msg;
const u8 **msgs = tal_arr(ctx, const u8 *, 0);

/* BOLT #7:
*
* - MUST respond to each known `short_channel_id`:
*/
/* Search for next short_channel_id we know about. */
num = tal_count(peer->scid_queries);
for (i = peer->scid_query_idx; !sent && i < num; i++) {
for (i = peer->scid_query_idx; tal_count(msgs) == 0 && i < num; i++) {
struct gossmap_chan *chan;
struct gossmap_node *node;
struct node_id node_id;
Expand All @@ -136,9 +137,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
* - MUST reply with a `channel_announcement`
*/
if (peer->scid_query_flags[i] & SCID_QF_ANNOUNCE) {
msg = gossmap_chan_get_announce(NULL, gossmap, chan);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_announce(msgs, gossmap, chan));
}

/* BOLT #7:
Expand All @@ -152,15 +152,13 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
* `node_id_2` */
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE1)
&& gossmap_chan_set(chan, 0)) {
msg = gossmap_chan_get_update(NULL, gossmap, chan, 0);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_update(msgs, gossmap, chan, 0));
}
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE2)
&& gossmap_chan_set(chan, 1)) {
msg = gossmap_chan_get_update(NULL, gossmap, chan, 1);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_update(msgs, gossmap, chan, 1));
}

/* BOLT #7:
Expand Down Expand Up @@ -212,7 +210,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
/* If we haven't sent anything above, we look for the next
* node_announcement to send. */
num = tal_count(peer->scid_query_nodes);
for (i = peer->scid_query_nodes_idx; !sent && i < num; i++) {
for (i = peer->scid_query_nodes_idx; tal_count(msgs) == 0 && i < num; i++) {
const struct gossmap_node *n;

/* Not every node announces itself (we know it exists because
Expand All @@ -221,9 +219,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
if (!n || !gossmap_node_announced(n))
continue;

msg = gossmap_node_get_announce(NULL, gossmap, n);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_node_get_announce(msgs, gossmap, n));
}
peer->scid_query_nodes_idx = i;

Expand All @@ -245,7 +242,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
u8 *end = towire_reply_short_channel_ids_end(peer,
&chainparams->genesis_blockhash,
true);
inject_peer_msg(peer, take(end));
tal_arr_expand(&msgs, end);

/* We're done! Clean up so we simply pass-through next time. */
peer->scid_queries = tal_free(peer->scid_queries);
Expand All @@ -254,7 +251,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
peer->scid_query_nodes = tal_free(peer->scid_query_nodes);
peer->scid_query_nodes_idx = 0;
}
return sent;

return msgs;
}

/* The peer can ask about an array of short channel ids: we don't assemble the
Expand Down
6 changes: 4 additions & 2 deletions connectd/queries.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#define LIGHTNING_CONNECTD_QUERIES_H
#include "config.h"

/* See if there's a query to respond to, if so, do it and return true */
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap);
/* See if there's a query to respond to: if so, return some msgs */
const u8 **maybe_create_query_responses(const tal_t *ctx,
struct peer *peer,
struct gossmap *gossmap);

void handle_query_short_channel_ids(struct peer *peer, const u8 *msg);
void handle_query_channel_range(struct peer *peer, const u8 *msg);
Expand Down
62 changes: 58 additions & 4 deletions tests/test_gossip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2043,7 +2043,7 @@ def test_listchannels_deprecated_local(node_factory, bitcoind):
assert vals == [(True, True, l1l2)] * 2 + [(True, False, l2l3)] * 2 or vals == [(True, False, l2l3)] * 2 + [(True, True, l1l2)] * 2


def test_gossip_throttle(node_factory, bitcoind):
def test_gossip_throttle(node_factory, bitcoind, chainparams):
"""Make some gossip, test it gets throttled"""
l1, l2, l3, l4 = node_factory.line_graph(4, wait_for_announce=True,
opts=[{}, {}, {}, {'dev-throttle-gossip': None}])
Expand All @@ -2063,9 +2063,11 @@ def test_gossip_throttle(node_factory, bitcoind):
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l1.info['id'], l1.port)],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_fast = time.time() - start_fast
assert time_fast < 2
# Remove timestamp filter, since timestamp will change!
out1 = [m for m in out1 if not m.startswith(b'0109')]

# l4 is throttled
start_slow = time.time()
Expand All @@ -2076,10 +2078,62 @@ def test_gossip_throttle(node_factory, bitcoind):
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l4.info['id'], l4.port)],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_slow = time.time() - start_slow
assert time_slow > 3

# Remove timestamp filter, since timestamp will change!
out2 = [m for m in out2 if not m.startswith(b'0109')]

# Contents should be identical (once uniquified, since each
# doubles-up on its own gossip)
assert set(out1.split()) == set(out2.split())
assert set(out1) == set(out2)

encoded = subprocess.run(['devtools/mkencoded', '--scids', '00',
first_scid(l1, l2),
first_scid(l2, l3),
first_scid(l3, l4)],
check=True,
timeout=TIMEOUT,
stdout=subprocess.PIPE).stdout.strip().decode()

query = subprocess.run(['devtools/mkquery',
'query_short_channel_ids',
chainparams['chain_hash'],
encoded,
# We want channel announce, updates and node ann.
'00', '1F1F1F'],
check=True,
timeout=TIMEOUT,
stdout=subprocess.PIPE).stdout.strip()

# Queries should also be ratelimited, so compare l1 vs l4.
start_fast = time.time()
out3 = subprocess.run(['devtools/gossipwith',
'--no-gossip',
'--hex',
'--network={}'.format(TEST_NETWORK),
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l1.info['id'], l1.port),
query],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_fast = time.time() - start_fast
assert time_fast < 2
out3 = [m for m in out3 if not m.startswith(b'0109')]
assert set(out1) == set(out3)

start_slow = time.time()
out4 = subprocess.run(['devtools/gossipwith',
'--no-gossip',
'--hex',
'--network={}'.format(TEST_NETWORK),
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l4.info['id'], l4.port),
query],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_slow = time.time() - start_slow
assert time_slow > 3
out4 = [m for m in out4 if not m.startswith(b'0109')]
assert set(out2) == set(out4)

0 comments on commit 572ed95

Please sign in to comment.