-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BOLT#2: Add message retransmission sub-system #156
BOLT#2: Add message retransmission sub-system #156
Conversation
c18e4ec
to
480beb6
Compare
480beb6
to
bddc034
Compare
channeldb/messagestore.go
Outdated
return nil | ||
} | ||
|
||
// Add indexes in additional array, because |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the keys are stored on disk in big-endian order, the sorting isn't necessary here. When one performs an in order scan, the items will be retrieved in chronological order by the sequence number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the comment above about re-working the schema to eliminate this sorting step.
channeldb/messagestore.go
Outdated
basePeerBucketKey = []byte("peermessages") | ||
) | ||
|
||
// MessagesStore represents the boltdb storage for messages inside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping with the theme of only storing generic LN
data with channeldb
(the graph, invoices, etc.), I think this entire file should instead be moved to reside in the root lnd
directory.
channeldb/messagestore.go
Outdated
} | ||
|
||
// Generate next sequence number to preserver the message order. | ||
sequence, err := peerBucket.NextSequence() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what's implemented here currently isn't quire what we discussed offline. As is now, a sorting step is inserted before retrieving all the messages for a peer as the messages are stored in distinct buckets.
Instead, what I originally described was this:
- A single top-level bucket that maps:
index -> code || msg
. Concatenating the message code to the stored data allows the db logic to properly parse the wire message without trial and error or needing an additional index. - Within this top-level bucket, another bucket would be stored which acts as an index into the top-level bucket. This bucket will be used to locate which messages can be deleted from the log in response to a retrieved ACK message.
The mapping for items in this bucket would be: messageCode -> {index_1, index_2, index_3, etc.}
. So when receiving a new message, you check for the existence of the message code in this index bucket, then delete all the indexes from the top-level bucket that are returned.
Similarly, when adding a new message to the top-level bucket, another compile-time constant set of mappings needs to be consulted to determine which message ACKs the message being stored. So in addition to storing it in the top-level bucket, you'd also append to the record for the messageCode
mappings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switching to the schema above eliminates the unnecessary sorting logic and also still retains the message order required to properly perform retransmissions.
lnd_test.go
Outdated
@@ -1459,6 +1459,9 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { | |||
return bobChannelInfo.Channels[0], nil | |||
} | |||
|
|||
// Wait for channel to be acquired by router. | |||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current set of tests seems to pass relatively reliably without these added sleeps. Rather than adding additional sleeps, with the topology notification code merged in, we can add hooks into the integration testing framework to properly wait for messages to propagate before attempting to dispatch payments through newly opened channels.
lnwire/message.go
Outdated
@@ -20,50 +20,124 @@ const MessageHeaderSize = 12 | |||
// individual limits imposed by messages themselves. | |||
const MaxMessagePayload = 1024 * 1024 * 32 // 32MB | |||
|
|||
// Code represent the unique identifier of the lnwire command. | |||
type Code uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming suggestion: MessageCode
.
networktest.go
Outdated
// CopyLogs copy/dumps Alice and Bob lnd daemon logs with specified period | ||
// of time in temporary directory. | ||
// NOTE: Panics error logs will not be logged. | ||
func (n *networkHarness) CopyLogs(period time.Duration) (func(), <-chan error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this method? It doesn't look to be used anywhere with the PR currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using it during debugging and I thought if might be useful to others, but essentially this is just copying of the lnd
logs in the temp directory.
retranmission.go
Outdated
type MessageStore interface { | ||
// Get returns the sorted set of messages in the order they were | ||
// added in storage. | ||
Get() ([]lnwire.Message, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming suggestion: GetUnackedMessages
.
retranmission.go
Outdated
// | ||
// NOTE: The original purpose of creating this interface was in separation | ||
// between the retransmission logic and specifics of storage itself. In case of | ||
// such interface we may create the test storage and register the add,remove,get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing spaces between the commas at the end of this sentence.
retranmission.go
Outdated
// Is due to the fact of logic of retransmission subsystem, where we | ||
// need remove messages not one by one but in contrary by groups of | ||
// messages. | ||
Remove(codes ...lnwire.Code) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the modification to the schema I suggested, I think this method would be changed to something along the lines of an Ack
method and instead take a single lnwire.MessageCode
.
|
||
// Ack encapsulates the specification logic about which messages should be | ||
// acknowledged by receiving this one. | ||
func (rt *retransmitter) Ack(msg lnwire.Message) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the comment above, this message would be simplified a good bit as it would attempt to perform a single unconditional delete from the database.
The mapping here (what get's deleted on receipt of a message) would be moved into the storage layer as it would need to be consulted each time a message is written.
08df3e9
to
31cb691
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on the latest iteration!
This PR is getting pretty close, I'm going to move to some local testing of the functionality while the latest comments are being addressed.
channeldb/error.go
Outdated
@@ -30,4 +30,8 @@ var ( | |||
ErrNodeAliasNotFound = fmt.Errorf("alias for node not found") | |||
|
|||
ErrSourceNodeNotSet = fmt.Errorf("source node does not exist") | |||
|
|||
// ErrPeerMessagesNotFound is returned when no message have been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been found -> has been found
channeldb/messagestore.go
Outdated
// MessagesStore represents the boltdb storage for messages inside | ||
// retransmission sub-system. | ||
type MessagesStore struct { | ||
// id is a unique identificator of peer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id is a unique slice of bytes identifying a peer. This value is typically a peer's identity public key serialized in compressed format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-thinked the meaning of this field a bit due to the recent changes in discovery PR. I think it would be better to keep id
as not something coupled with peer at all, but mention that usually it is compressed pub key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changes in the discovery PR? Peers within the network are identified globally by their public keys.
In any case, the comment should be replaced with the first sentence of my suggestion:
id is a unique slice of bytes identifying a peer.
channeldb/messagestore.go
Outdated
} | ||
|
||
return peerBucket.Put(indexBytes, b.Bytes()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: there's an extra new line here.
channeldb/messagestore_test.go
Outdated
if m.ID != 1 { | ||
t.Fatal("wrong order of message") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test should also assert the deep equality of the message read from disk vs the original message.
channeldb/messagestore_test.go
Outdated
if err != nil && err != ErrPeerMessagesNotFound { | ||
t.Fatalf("can't get the message: %v", err) | ||
} else if len(messages) != 0 { | ||
t.Fatal("wrong lenght of messages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lenght -> length
lnwire.CmdUpdateFailHTLC, | ||
lnwire.CmdUpdateFufillHTLC, | ||
lnwire.CmdCommitSig, | ||
lnwire.CmdCloseRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading the current spec draft correctly, CloseRequest
and FundingLocked
should be omitted. They're not ACK'd by a RevokeAndAck
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe I am missing something, but from the spec:
funding_locked:
acknowledged by update_ messages, commitment_signed, revoke_and_ack or shutdown messages.
shutdown
: acknowledged by closing_signed or revoke_and_ack
server.go
Outdated
} | ||
p.Disconnect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was the disconnect removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're unable to create the peer, then it must be removed from the connmgr
's set of pending persistent connections, hence the use of Disconnect
here.
Honestly, we need to revisit the current connmgr
integration for coherency as it was put together rather quickly in order to get the functionality out the door.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case p
is nil which cause a panic if error occurred on this stage. Maybe we should always return instance of peer from newPeer
function? In this case we can return the previous logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, nice find! In the future, I'd prefer for fixes like this to be either included in the PR in a distinct commit, or entirely within it's own PR. Otherwise, it's easy to miss amidst all the other changes within the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will create an additional PR for that.
server.go
Outdated
@@ -542,7 +544,7 @@ func (s *server) addPeer(p *peer) { | |||
return | |||
} | |||
|
|||
// Track the new peer in our indexes so we can quickly look it up either | |||
// Track the new peer in our messages so we can quickly look it up either |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment here about reverting this line diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, nice catch!
server.go
Outdated
@@ -551,7 +553,7 @@ func (s *server) addPeer(p *peer) { | |||
s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p | |||
s.peersMtx.Unlock() | |||
|
|||
// Once the peer has been added to our indexes, send a message to the | |||
// Once the peer has been added to our messages, send a message to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment here about reverting this line diff.
@@ -236,7 +235,7 @@ func TestSerializeKidOutput(t *testing.T) { | |||
|
|||
deserializedKid, err := deserializeKidOutput(&b) | |||
if err != nil { | |||
fmt.Printf(err.Error()) | |||
t.Fatalf("can't deserialize kid output: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! i'd missed this during my initial review.
c37d1ba
to
19239c7
Compare
Coverage increased (+0.09%) to 67.801% when pulling 19239c7541bdc368fe75652bd5b75b3b6dd7199c on AndrewSamokhvalov:retransmission_subsystem into d723aad on lightningnetwork:master. |
in this commit lnwire message header encode/decode tests were added, without it newcommer programmer may change the type inside message header and spend hours on debugging of integration test trying to understand why his node can't start and interact properly.
Issue: lightningnetwork#137 In this commit retranmission subsystem and boltdb mesage storage were added. Retransmission subsystem described in details in BOLT #2 (Message Retransmission) section. This subsystem keeps records of all messages that were sent to other peer and waits the ACK message to be received from other side and after that removes all acked messaged from the storage.
Issue: lightningnetwork#137 In this commit the retransmission subsystem was included in lnd, now upon peer reconnection we fetch all messages from message storage that were not acked and send them again to remote side.
19239c7
to
f8b2624
Compare
I have added the |
I've started to test this PR locally and noticed that it currently goes about implementing the retransmission is missing a key feature. The description in the original issue stated that the retransmission sub-system should actually sit between the Such behavior would allow sub-systems like the As an example, let's say we're nearing the completion of a funding workflow. The ultimate block finalizing the channel arrives, the This PR is 80% of the way there, functionality wise. To get that last 20%, the following behavior needs to be implemented:
|
Here's an alternative to what's described above:
|
Decided that what I've described w.r.t the |
"to the peer(%v)", len(messages), p) | ||
|
||
for _, message := range messages { | ||
// Sending over sendToPeer will cause block because of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you insert a logging message here that just logs the MessageCode
itself? Thanks!
func (rt *retransmitter) Ack(msg lnwire.Message) error { | ||
switch msg.Command() { | ||
|
||
case lnwire.CmdSingleFundingResponse: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, all funding message should be omitted from retransmission other than the FundingLocked
message. While testing locally I just hit a bug that causes the funding manager to deadlock if lnd
is restarted mid a >1 conf required channel opening.
Atm, the spec is incorrect. No funding messages until the point in which either side is committed to a funding transaction should be retransmitted at all.
) | ||
case lnwire.CmdCloseComplete: | ||
return rt.remove( | ||
lnwire.CmdCloseRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atm CloseComplete
is never sent within the daemon. Therefore, this entry should be removed. Otherwise, on restart the node will keep sending the same CloseRequest
message indefinitely upon each restart. The responding node will simply ignore the message as the the channel has already been closed.
lnwire.CmdCloseRequest: | ||
return rt.remove( | ||
lnwire.CmdFundingLocked, | ||
lnwire.CmdRevokeAndAck, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, all instances of RevokeAndAck
should be omitted from retransmission. As is now, because we still use an "initial revocation window" of 1, peer restarts will cause lnd
to send the initial RevokedAndAck
twice with the same revocation values. This'll cause the channel to fail down the line as a state transition will re-use the same preimage rather than going to the next leaf node in the tree.
20:52:08 2017-03-16 [INF] PEER: retransmission subsystem resends 1 messages to the peer(020dbf0df13b994e562c9ac52098b86afd2b2099463370fc78124ab3c88ef87a6b@127.0.0.1:10019)
20:52:08 2017-03-16 [INF] CRTR: Synchronizing channel graph with 020dbf0df13b994e562c9ac52098b86afd2b2099463370fc78124ab3c88ef87a6b
20:52:08 2017-03-16 [TRC] PEER: writeMessage to 020dbf0df13b994e562c9ac52098b86afd2b2099463370fc78124ab3c88ef87a6b@127.0.0.1:10019: (*lnwire.RevokeAndAck)(0xc42053f2d0)({
ChannelPoint: (wire.OutPoint) 397d9ba617b1b2d81a8249e3de3749f4dd2efd792ce34b758ed97326b68bf0b9:0,
Revocation: ([32]uint8) (len=32 cap=32) {
00000000 6a 00 62 86 31 55 b1 4d 8f 20 e6 53 f2 8c f7 78 |j.b.1U.M. .S...x|
00000010 1c b2 72 d3 07 86 57 2d 5d bc 55 4f b4 a4 c8 a1 |..r...W-].UO....|
},
NextRevocationKey: (*btcec.PublicKey)(0xc420318160)({
Curve: (elliptic.Curve) <nil>,
X: (*big.Int)(0xc420318180)(105223291483128089908537415774962877536378315872169081183677829390620736225739),
Y: (*big.Int)(0xc4203181a0)(5542066621571236556856056711647061449395836182811543325992215950193357130663)
}),
NextRevocationHash: ([32]uint8) (len=32 cap=32) {
00000000 46 b5 6c 1c 0e 0d 50 d4 a1 3c 97 c6 8c 8e 5d 6e |F.l...P..<....]n|
00000010 15 5b 62 f1 de 12 ec af 4a 11 a2 21 b2 4e a1 89 |.[b.....J..!.N..|
}
})
......
20:52:08 2017-03-16 [TRC] PEER: writeMessage to 020dbf0df13b994e562c9ac52098b86afd2b2099463370fc78124ab3c88ef87a6b@127.0.0.1:10019: (*lnwire.RevokeAndAck)(0xc420559f80)({
ChannelPoint: (wire.OutPoint) 397d9ba617b1b2d81a8249e3de3749f4dd2efd792ce34b758ed97326b68bf0b9:0,
Revocation: ([32]uint8) (len=32 cap=32) {
00000000 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
},
NextRevocationKey: (*btcec.PublicKey)(0xc42053bec0)({
Curve: (elliptic.Curve) <nil>,
X: (*big.Int)(0xc42053be40)(105223291483128089908537415774962877536378315872169081183677829390620736225739),
Y: (*big.Int)(0xc42053be60)(5542066621571236556856056711647061449395836182811543325992215950193357130663)
}),
NextRevocationHash: ([32]uint8) (len=32 cap=32) {
00000000 46 b5 6c 1c 0e 0d 50 d4 a1 3c 97 c6 8c 8e 5d 6e |F.l...P..<....]n|
00000010 15 5b 62 f1 de 12 ec af 4a 11 a2 21 b2 4e a1 89 |.[b.....J..!.N..|
}
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this state, the state machines of both channels will actually enter a negative feedback cycle, continually failing as the wrong revocation message is being sent over and over again. As a result, the channels are no longer usable after a single restart.
// and may need to be re-established from time to time and reconnection | ||
// introduces doubt as to what has been received such logic is needed to be sure | ||
// that peers are in consistent state in terms of message communication. | ||
type retransmitter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticed the filename has a typo, should probably be retransmission.go
Closing this as it has been replaced by #231. We might possibly integrate some sections of this into the project at a later point though. |
Issue: #137
I know that we discussed that funding manager messages shouldn't be included in retransmission, but after some thinking about this I decided to include them anyway, because the specification is not about us, but more about convergency with other lightning network clients. If in specification the funding messages are included it means that other clients might be built with this logic in mind.
Instead, I believe that we should tolerantly ignore the funding messages that was already proceed, or expired.