Skip to content

Commit

Permalink
query: end completed queries in sendTransaction
Browse files Browse the repository at this point in the history
(*ChainService).sendTransaction always timed out even if each peer did
respond. This is because after receiving a getdata message in response
to the outgoing inv, the checkResponse closure provided to
queryAllPeers neglected to close the peerQuit channel. As a result,
sendTransaction always blocks for the full configured BroadcastTimeout.

This change adds the missing close(peerQuit) when a matching response
(either a getdata or reject) is received.

Note that as before this change, a non-nil error from sendTransaction
does not guarantee that a remote peer actually received the MsgTx that
was queued since the outgoing tx message is handled asynchronously.
To account for the removed delay, the TestNeutrinoSync unit test is
updated with a waitTx helper to ensure the mining node has the sent
transaction prior to requesting a new block.
  • Loading branch information
chappjc committed Feb 8, 2022
1 parent 53b628c commit 4dee907
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
15 changes: 12 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
)

numReplied++

close(peerQuit)
}
}

Expand All @@ -1096,6 +1098,13 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
response, sp.Addr(),
)
rejections[*broadcastErr]++

log.Debugf("Transaction %v rejected by peer "+
"%v: code = %v, reason = %q", txHash,
sp.Addr(), broadcastErr.Code,
broadcastErr.Reason)

close(peerQuit)
}
},
append(
Expand All @@ -1109,7 +1118,7 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
// transaction upon every block connected/disconnected.
if numReplied == 0 {
log.Debugf("No peers replied to inv message for transaction %v",
tx.TxHash())
txHash)
return nil
}

Expand All @@ -1118,15 +1127,15 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
// it so we'll return the most rejected error between all of our peers.
//
// TODO(wilmer): This might be too naive, some rejections are more
// critical than others.
// critical than others. e.g. pushtx.Mempool and pushtx.Confirmed are OK.
//
// TODO(wilmer): This does not cover the case where a peer also rejected
// our transaction but didn't send the response within our given timeout
// and certain other cases. Due to this, we should probably decide on a
// threshold of rejections instead.
if numReplied == len(rejections) {
log.Warnf("All peers rejected transaction %v checking errors",
tx.TxHash())
txHash)

mostRejectedCount := 0
var mostRejectedErr pushtx.BroadcastError
Expand Down
27 changes: 27 additions & 0 deletions sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,29 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) {
}
}

// waitTx will poll for a transaction to appear on given node for up to
// 5 seconds.
waitTx := func(node *rpcclient.Client, hash chainhash.Hash) {
t.Helper()
exitTimer := time.NewTimer(5 * time.Second)
defer exitTimer.Stop()
for {
<-time.After(200 * time.Millisecond)

select {
case <-exitTimer.C:
t.Fatalf("Timeout waiting to see transaction.")
default:
}

if _, err := node.GetRawTransaction(&hash); err != nil {
continue
}

return
}
}

// Create another address to send to so we don't trip the rescan with
// the old address and we can test monitoring both OutPoint usage and
// receipt by addresses.
Expand Down Expand Up @@ -517,6 +540,9 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) {
if err != nil && !strings.Contains(err.Error(), "already have") {
t.Fatalf("Unable to send transaction to network: %s", err)
}
// SendTransaction does not know when the MsgTx was actually sent, only
// that a getdata request was received and a MsgTx queued to send.
waitTx(harness.h1.Node, authTx1.Tx.TxHash())
_, err = harness.h1.Node.Generate(1)
if err != nil {
t.Fatalf("Couldn't generate/submit block: %s", err)
Expand Down Expand Up @@ -559,6 +585,7 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) {
if err != nil && !strings.Contains(err.Error(), "already have") {
t.Fatalf("Unable to send transaction to network: %s", err)
}
waitTx(harness.h1.Node, authTx2.Tx.TxHash())
_, err = harness.h1.Node.Generate(1)
if err != nil {
t.Fatalf("Couldn't generate/submit block: %s", err)
Expand Down

0 comments on commit 4dee907

Please sign in to comment.