diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 271bae07c7..aad2c72b1b 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) { } } +// Tests that local pending transactions get propagated to peers. +func TestTransactionPendingReannounce(t *testing.T) { + t.Parallel() + + // Create a source handler to announce transactions from and a sink handler + // to receive them. + source := newTestHandler() + defer source.close() + + sink := newTestHandler() + defer sink.close() + sink.handler.acceptTxs = 1 // mark synced to accept transactions + + sourcePipe, sinkPipe := p2p.MsgPipe() + defer sourcePipe.Close() + defer sinkPipe.Close() + + sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool) + sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool) + defer sourcePeer.Close() + defer sinkPeer.Close() + + go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(source.handler), peer) + }) + go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(sink.handler), peer) + }) + + // Subscribe transaction pools + txCh := make(chan core.NewTxsEvent, 1024) + sub := sink.txpool.SubscribeNewTxsEvent(txCh) + defer sub.Unsubscribe() + + txs := make([]*types.Transaction, 64) + for nonce := range txs { + tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil) + tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) + + txs[nonce] = tx + } + source.txpool.ReannouceTransactions(txs) + + for arrived := 0; arrived < len(txs); { + select { + case event := <-txCh: + arrived += len(event.Txs) + case <-time.NewTimer(time.Second).C: + t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs)) + } + } +} + // Tests that post eth protocol handshake, clients perform a mutual checkpoint // challenge to validate each other's chains. Hash mismatches, or missing ones // during a fast sync should lead to the peer getting dropped. diff --git a/eth/handler_test.go b/eth/handler_test.go index a90ef5c348..c3b7b769b2 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -48,8 +48,9 @@ var ( type testTxPool struct { pool map[common.Hash]*types.Transaction // Hash map of collected transactions - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + txFeed event.Feed // Notification feed to allow waiting for inclusion + reannoTxFeed event.Feed // Notification feed to trigger reannouce + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -90,6 +91,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error { return make([]error, len(txs)) } +// ReannouceTransactions announce the transactions to some peers. +func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error { + p.lock.Lock() + defer p.lock.Unlock() + + for _, tx := range txs { + p.pool[tx.Hash()] = tx + } + p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs}) + return make([]error, len(txs)) +} + // Pending returns all the transactions known to the pool func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { p.lock.RLock() @@ -112,6 +125,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs return p.txFeed.Subscribe(ch) } +// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and +// send events to the given channel. +func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { + return p.reannoTxFeed.Subscribe(ch) +} + // testHandler is a live implementation of the Ethereum protocol handler, just // preinitialized with some sane testing defaults and the transaction pool mocked // out.