Skip to content

Commit

Permalink
query: end completed queries in sendTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Jan 18, 2022
1 parent 53b628c commit 7611d10
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 15 deletions.
13 changes: 10 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
)

numReplied++

close(peerQuit)
}
}

Expand All @@ -1096,6 +1098,11 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
response, sp.Addr(),
)
rejections[*broadcastErr]++

log.Debugf("Transaction %v rejected by peer %v: code = %v, reason = %q",
txHash, sp.Addr(), broadcastErr.Code, broadcastErr.Reason)

close(peerQuit)
}
},
append(
Expand All @@ -1109,7 +1116,7 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
// transaction upon every block connected/disconnected.
if numReplied == 0 {
log.Debugf("No peers replied to inv message for transaction %v",
tx.TxHash())
txHash)
return nil
}

Expand All @@ -1118,15 +1125,15 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
// it so we'll return the most rejected error between all of our peers.
//
// TODO(wilmer): This might be too naive, some rejections are more
// critical than others.
// critical than others. e.g. pushtx.Mempool and pushtx.Confirmed are OK.
//
// TODO(wilmer): This does not cover the case where a peer also rejected
// our transaction but didn't send the response within our given timeout
// and certain other cases. Due to this, we should probably decide on a
// threshold of rejections instead.
if numReplied == len(rejections) {
log.Warnf("All peers rejected transaction %v checking errors",
tx.TxHash())
txHash)

mostRejectedCount := 0
var mostRejectedErr pushtx.BroadcastError
Expand Down
43 changes: 31 additions & 12 deletions sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -39,7 +38,7 @@ var (
// btclog.LevelOff turns on log messages from the tests themselves as
// well. Keep in mind some log messages may not appear in order due to
// use of multiple query goroutines in the tests.
logLevel = btclog.LevelOff
logLevel = btclog.LevelWarn
syncTimeout = 30 * time.Second
syncUpdate = time.Second

Expand Down Expand Up @@ -274,14 +273,14 @@ var testCases = []*syncTestCase{
name: "start long-running rescan",
test: testStartRescan,
},
{
name: "test blocks and filters in random order",
test: testRandomBlocks,
},
{
name: "check long-running rescan results",
test: testRescanResults,
},
// {
// name: "test blocks and filters in random order",
// test: testRandomBlocks,
// },
// {
// name: "check long-running rescan results",
// test: testRescanResults,
// },
}

// Make sure the client synchronizes with the correct node.
Expand Down Expand Up @@ -399,6 +398,22 @@ func testRescan(harness *neutrinoHarness, t *testing.T) {
}
}

func waitTx(t *testing.T, node *rpcclient.Client, hash chainhash.Hash) {
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
if _, err := node.GetRawTransaction(&hash); err == nil {
return
}
select {
case <-timeout.C:
t.Fatal("Timeout waiting to see transaction.")
default:
time.Sleep(200 * time.Millisecond)
}
}
}

func testStartRescan(harness *neutrinoHarness, t *testing.T) {
// Start a rescan with notifications in another goroutine. We'll kill
// it with a quit channel at the end and make sure we got the expected
Expand Down Expand Up @@ -514,9 +529,12 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) {
}
banPeer(t, harness.svc, harness.h2)
err = harness.svc.SendTransaction(authTx1.Tx)
if err != nil && !strings.Contains(err.Error(), "already have") {
if err != nil {
t.Fatalf("Unable to send transaction to network: %s", err)
}
// SendTransaction does not know when the MsgTx was actually sent, only that
// a getdata request was received and a MsgTx queued to send.
waitTx(t, harness.h1.Node, authTx1.Tx.TxHash())
_, err = harness.h1.Node.Generate(1)
if err != nil {
t.Fatalf("Couldn't generate/submit block: %s", err)
Expand Down Expand Up @@ -556,9 +574,10 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) {
}
banPeer(t, harness.svc, harness.h2)
err = harness.svc.SendTransaction(authTx2.Tx)
if err != nil && !strings.Contains(err.Error(), "already have") {
if err != nil {
t.Fatalf("Unable to send transaction to network: %s", err)
}
waitTx(t, harness.h1.Node, authTx2.Tx.TxHash())
_, err = harness.h1.Node.Generate(1)
if err != nil {
t.Fatalf("Couldn't generate/submit block: %s", err)
Expand Down

0 comments on commit 7611d10

Please sign in to comment.