From 3b06d6b14d6150fc774ae32368ecd8644a18f75a Mon Sep 17 00:00:00 2001 From: qianbin Date: Mon, 6 May 2019 14:18:45 +0800 Subject: [PATCH 1/4] add known tx mark expiration to improve tx propagation --- comm/peer.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/comm/peer.go b/comm/peer.go index 50199308c..01ce87b33 100644 --- a/comm/peer.go +++ b/comm/peer.go @@ -13,15 +13,16 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - lru "github.com/hashicorp/golang-lru" "github.com/inconshreveable/log15" + "github.com/vechain/thor/cache" "github.com/vechain/thor/p2psrv/rpc" "github.com/vechain/thor/thor" ) const ( - maxKnownTxs = 32768 // Maximum transactions IDs to keep in the known list (prevent DOS) - maxKnownBlocks = 1024 // Maximum block IDs to keep in the known list (prevent DOS) + maxKnownTxs = 32768 // Maximum transactions IDs to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block IDs to keep in the known list (prevent DOS) + knownTxMarkExpiration = 10 // Time in seconds to expire known tx mark ) func init() { @@ -35,8 +36,8 @@ type Peer struct { logger log15.Logger createdTime mclock.AbsTime - knownTxs *lru.Cache - knownBlocks *lru.Cache + knownTxs *cache.RandCache + knownBlocks *cache.RandCache head struct { sync.Mutex id thor.Bytes32 @@ -53,15 +54,14 @@ func newPeer(peer *p2p.Peer, rw p2p.MsgReadWriter) *Peer { "peer", peer, "dir", dir, } - knownTxs, _ := lru.New(maxKnownTxs) - knownBlocks, _ := lru.New(maxKnownBlocks) + return &Peer{ Peer: peer, RPC: rpc.New(peer, rw), logger: log.New(ctx...), createdTime: mclock.Now(), - knownTxs: knownTxs, - knownBlocks: knownBlocks, + knownTxs: cache.NewRandCache(maxKnownTxs), + knownBlocks: cache.NewRandCache(maxKnownBlocks), } } @@ -83,17 +83,21 @@ func (p *Peer) UpdateHead(id thor.Bytes32, totalScore uint64) { // MarkTransaction marks a transaction to known. func (p *Peer) MarkTransaction(id thor.Bytes32) { - p.knownTxs.Add(id, struct{}{}) + p.knownTxs.Set(id, time.Now().Unix()) } // MarkBlock marks a block to known. func (p *Peer) MarkBlock(id thor.Bytes32) { - p.knownBlocks.Add(id, struct{}{}) + p.knownBlocks.Set(id, struct{}{}) } // IsTransactionKnown returns if the transaction is known. func (p *Peer) IsTransactionKnown(id thor.Bytes32) bool { - return p.knownTxs.Contains(id) + ts, ok := p.knownTxs.Get(id) + if !ok { + return false + } + return ts.(int64)+knownTxMarkExpiration > time.Now().Unix() } // IsBlockKnown returns if the block is known. From 343e4e1b92da409541d77730473a1a8e4390cfad Mon Sep 17 00:00:00 2001 From: qianbin Date: Mon, 6 May 2019 16:28:05 +0800 Subject: [PATCH 2/4] use tx hash to map txs in txpool and known txs set --- cmd/thor/node/packer_loop.go | 9 +++++---- cmd/thor/solo/solo.go | 9 ++++----- comm/handle_rpc.go | 6 +++--- comm/peer.go | 8 ++++---- comm/sync.go | 2 +- comm/txs_loop.go | 4 ++-- tx/transaction.go | 14 ++++++++++++++ txpool/tx_object_map.go | 20 ++++++++++---------- txpool/tx_object_map_test.go | 12 ++++++------ txpool/tx_pool.go | 26 +++++++++++++------------- 10 files changed, 62 insertions(+), 48 deletions(-) diff --git a/cmd/thor/node/packer_loop.go b/cmd/thor/node/packer_loop.go index fa3805bc6..9b4f8e19b 100644 --- a/cmd/thor/node/packer_loop.go +++ b/cmd/thor/node/packer_loop.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/vechain/thor/packer" "github.com/vechain/thor/thor" + "github.com/vechain/thor/tx" ) func (n *Node) packerLoop(ctx context.Context) { @@ -83,10 +84,10 @@ func (n *Node) packerLoop(ctx context.Context) { func (n *Node) pack(flow *packer.Flow) error { txs := n.txPool.Executables() - var txsToRemove []thor.Bytes32 + var txsToRemove []*tx.Transaction defer func() { - for _, id := range txsToRemove { - n.txPool.Remove(id) + for _, tx := range txsToRemove { + n.txPool.Remove(tx.Hash(), tx.ID()) } }() @@ -99,7 +100,7 @@ func (n *Node) pack(flow *packer.Flow) error { if packer.IsTxNotAdoptableNow(err) { continue } - txsToRemove = append(txsToRemove, tx.ID()) + txsToRemove = append(txsToRemove, tx) } } diff --git a/cmd/thor/solo/solo.go b/cmd/thor/solo/solo.go index 41a71451f..0f39d20fb 100644 --- a/cmd/thor/solo/solo.go +++ b/cmd/thor/solo/solo.go @@ -22,7 +22,6 @@ import ( "github.com/vechain/thor/logdb" "github.com/vechain/thor/packer" "github.com/vechain/thor/state" - "github.com/vechain/thor/thor" "github.com/vechain/thor/tx" "github.com/vechain/thor/txpool" ) @@ -118,10 +117,10 @@ func (s *Solo) loop(ctx context.Context) { func (s *Solo) packing(pendingTxs tx.Transactions) error { best := s.chain.BestBlock() - var txsToRemove []thor.Bytes32 + var txsToRemove []*tx.Transaction defer func() { - for _, id := range txsToRemove { - s.txPool.Remove(id) + for _, tx := range txsToRemove { + s.txPool.Remove(tx.Hash(), tx.ID()) } }() @@ -142,7 +141,7 @@ func (s *Solo) packing(pendingTxs tx.Transactions) error { case packer.IsTxNotAdoptableNow(err): continue default: - txsToRemove = append(txsToRemove, tx.ID()) + txsToRemove = append(txsToRemove, tx) } } diff --git a/comm/handle_rpc.go b/comm/handle_rpc.go index ad52120a2..8f5ece2ba 100644 --- a/comm/handle_rpc.go +++ b/comm/handle_rpc.go @@ -69,7 +69,7 @@ func (c *Communicator) handleRPC(peer *Peer, msg *p2p.Msg, write func(interface{ if err := msg.Decode(&newTx); err != nil { return errors.WithMessage(err, "decode msg") } - peer.MarkTransaction(newTx.ID()) + peer.MarkTransaction(newTx.Hash()) c.txPool.StrictlyAdd(newTx) write(&struct{}{}) case proto.MsgGetBlockByID: @@ -146,10 +146,10 @@ func (c *Communicator) handleRPC(peer *Peer, msg *p2p.Msg, write func(interface{ for _, tx := range txsToSync.txs { n++ - if peer.IsTransactionKnown(tx.ID()) { + if peer.IsTransactionKnown(tx.Hash()) { continue } - peer.MarkTransaction(tx.ID()) + peer.MarkTransaction(tx.Hash()) toSend = append(toSend, tx) size += tx.Size() if size >= maxTxSyncSize { diff --git a/comm/peer.go b/comm/peer.go index 01ce87b33..d4ca78f6e 100644 --- a/comm/peer.go +++ b/comm/peer.go @@ -82,8 +82,8 @@ func (p *Peer) UpdateHead(id thor.Bytes32, totalScore uint64) { } // MarkTransaction marks a transaction to known. -func (p *Peer) MarkTransaction(id thor.Bytes32) { - p.knownTxs.Set(id, time.Now().Unix()) +func (p *Peer) MarkTransaction(hash thor.Bytes32) { + p.knownTxs.Set(hash, time.Now().Unix()) } // MarkBlock marks a block to known. @@ -92,8 +92,8 @@ func (p *Peer) MarkBlock(id thor.Bytes32) { } // IsTransactionKnown returns if the transaction is known. -func (p *Peer) IsTransactionKnown(id thor.Bytes32) bool { - ts, ok := p.knownTxs.Get(id) +func (p *Peer) IsTransactionKnown(hash thor.Bytes32) bool { + ts, ok := p.knownTxs.Get(hash) if !ok { return false } diff --git a/comm/sync.go b/comm/sync.go index a433c9274..e4349ec8b 100644 --- a/comm/sync.go +++ b/comm/sync.go @@ -193,7 +193,7 @@ func (c *Communicator) syncTxs(peer *Peer) { } for _, tx := range result { - peer.MarkTransaction(tx.ID()) + peer.MarkTransaction(tx.Hash()) c.txPool.StrictlyAdd(tx) select { case <-c.ctx.Done(): diff --git a/comm/txs_loop.go b/comm/txs_loop.go index fe0e991cc..45cb0fc65 100644 --- a/comm/txs_loop.go +++ b/comm/txs_loop.go @@ -24,12 +24,12 @@ func (c *Communicator) txsLoop() { if txEv.Executable != nil && *txEv.Executable { tx := txEv.Tx peers := c.peerSet.Slice().Filter(func(p *Peer) bool { - return !p.IsTransactionKnown(tx.ID()) + return !p.IsTransactionKnown(tx.Hash()) }) for _, peer := range peers { peer := peer - peer.MarkTransaction(tx.ID()) + peer.MarkTransaction(tx.Hash()) c.goes.Go(func() { if err := proto.NotifyNewTx(c.ctx, peer, tx); err != nil { peer.logger.Debug("failed to broadcast tx", "err", err) diff --git a/tx/transaction.go b/tx/transaction.go index 006d4d769..fb9def9db 100644 --- a/tx/transaction.go +++ b/tx/transaction.go @@ -37,6 +37,7 @@ type Transaction struct { unprovedWork atomic.Value size atomic.Value intrinsicGas atomic.Value + hash atomic.Value } } @@ -102,6 +103,19 @@ func (t *Transaction) ID() (id thor.Bytes32) { return } +// Hash returns hash of tx. +// Unlike ID, it's the hash of RLP encoded tx. +func (t *Transaction) Hash() (hash thor.Bytes32) { + if cached := t.cache.hash.Load(); cached != nil { + return cached.(thor.Bytes32) + } + defer func() { t.cache.hash.Store(hash) }() + hw := thor.NewBlake2b() + rlp.Encode(hw, t) + hw.Sum(hash[:0]) + return +} + // UnprovedWork returns unproved work of this tx. // It returns 0, if tx is not signed. func (t *Transaction) UnprovedWork() (w *big.Int) { diff --git a/txpool/tx_object_map.go b/txpool/tx_object_map.go index ddfe6f9b0..a70fa8fba 100644 --- a/txpool/tx_object_map.go +++ b/txpool/tx_object_map.go @@ -13,7 +13,7 @@ import ( "github.com/vechain/thor/tx" ) -// txObjectMap to maintain mapping of ID to tx object, and account quota. +// txObjectMap to maintain mapping of tx hash to tx object, and account quota. type txObjectMap struct { lock sync.RWMutex txObjMap map[thor.Bytes32]*txObject @@ -27,10 +27,10 @@ func newTxObjectMap() *txObjectMap { } } -func (m *txObjectMap) Contains(txID thor.Bytes32) bool { +func (m *txObjectMap) Contains(txHash thor.Bytes32) bool { m.lock.RLock() defer m.lock.RUnlock() - _, found := m.txObjMap[txID] + _, found := m.txObjMap[txHash] return found } @@ -38,7 +38,7 @@ func (m *txObjectMap) Add(txObj *txObject, limitPerAccount int) error { m.lock.Lock() defer m.lock.Unlock() - if _, found := m.txObjMap[txObj.ID()]; found { + if _, found := m.txObjMap[txObj.Hash()]; found { return nil } @@ -47,21 +47,21 @@ func (m *txObjectMap) Add(txObj *txObject, limitPerAccount int) error { } m.quota[txObj.Origin()]++ - m.txObjMap[txObj.ID()] = txObj + m.txObjMap[txObj.Hash()] = txObj return nil } -func (m *txObjectMap) Remove(txID thor.Bytes32) bool { +func (m *txObjectMap) Remove(txHash thor.Bytes32) bool { m.lock.Lock() defer m.lock.Unlock() - if txObj, ok := m.txObjMap[txID]; ok { + if txObj, ok := m.txObjMap[txHash]; ok { if m.quota[txObj.Origin()] > 1 { m.quota[txObj.Origin()]-- } else { delete(m.quota, txObj.Origin()) } - delete(m.txObjMap, txID) + delete(m.txObjMap, txHash) return true } return false @@ -93,13 +93,13 @@ func (m *txObjectMap) Fill(txObjs []*txObject) { m.lock.Lock() defer m.lock.Unlock() for _, txObj := range txObjs { - if _, found := m.txObjMap[txObj.ID()]; found { + if _, found := m.txObjMap[txObj.Hash()]; found { continue } // skip account limit check m.quota[txObj.Origin()]++ - m.txObjMap[txObj.ID()] = txObj + m.txObjMap[txObj.Hash()] = txObj } } diff --git a/txpool/tx_object_map_test.go b/txpool/tx_object_map_test.go index 047c983f7..15eab623a 100644 --- a/txpool/tx_object_map_test.go +++ b/txpool/tx_object_map_test.go @@ -41,13 +41,13 @@ func TestTxObjMap(t *testing.T) { assert.Nil(t, m.Add(txObj3, 1)) assert.Equal(t, 2, m.Len()) - assert.True(t, m.Contains(tx1.ID())) - assert.False(t, m.Contains(tx2.ID())) - assert.True(t, m.Contains(tx3.ID())) + assert.True(t, m.Contains(tx1.Hash())) + assert.False(t, m.Contains(tx2.Hash())) + assert.True(t, m.Contains(tx3.Hash())) - assert.True(t, m.Remove(tx1.ID())) - assert.False(t, m.Contains(tx1.ID())) - assert.False(t, m.Remove(tx2.ID())) + assert.True(t, m.Remove(tx1.Hash())) + assert.False(t, m.Contains(tx1.Hash())) + assert.False(t, m.Remove(tx2.Hash())) assert.Equal(t, []*txObject{txObj3}, m.ToTxObjects()) assert.Equal(t, tx.Transactions{tx3}, m.ToTxs()) diff --git a/txpool/tx_pool.go b/txpool/tx_pool.go index 55cdecaf6..e1b43e35e 100644 --- a/txpool/tx_pool.go +++ b/txpool/tx_pool.go @@ -144,7 +144,7 @@ func (p *TxPool) SubscribeTxEvent(ch chan *TxEvent) event.Subscription { } func (p *TxPool) add(newTx *tx.Transaction, rejectNonexecutable bool) error { - if p.all.Contains(newTx.ID()) { + if p.all.Contains(newTx.Hash()) { // tx already in the pool return nil } @@ -217,9 +217,9 @@ func (p *TxPool) StrictlyAdd(newTx *tx.Transaction) error { return p.add(newTx, true) } -// Remove removes tx from pool by its ID. -func (p *TxPool) Remove(txID thor.Bytes32) bool { - if p.all.Remove(txID) { +// Remove removes tx from pool by its Hash. +func (p *TxPool) Remove(txHash thor.Bytes32, txID thor.Bytes32) bool { + if p.all.Remove(txHash) { log.Debug("tx removed", "id", txID) return true } @@ -255,7 +255,7 @@ func (p *TxPool) Dump() tx.Transactions { // this method should only be called in housekeeping go routine func (p *TxPool) wash(headBlock *block.Header) (executables tx.Transactions, removed int, err error) { all := p.all.ToTxObjects() - var toRemove []thor.Bytes32 + var toRemove []*txObject defer func() { if err != nil { // in case of error, simply cut pool size to limit @@ -264,11 +264,11 @@ func (p *TxPool) wash(headBlock *block.Header) (executables tx.Transactions, rem break } removed++ - p.all.Remove(txObj.ID()) + p.all.Remove(txObj.Hash()) } } else { - for _, id := range toRemove { - p.all.Remove(id) + for _, txObj := range toRemove { + p.all.Remove(txObj.Hash()) } removed = len(toRemove) } @@ -288,14 +288,14 @@ func (p *TxPool) wash(headBlock *block.Header) (executables tx.Transactions, rem for _, txObj := range all { // out of lifetime if now > txObj.timeAdded+int64(p.options.MaxLifetime) { - toRemove = append(toRemove, txObj.ID()) + toRemove = append(toRemove, txObj) log.Debug("tx washed out", "id", txObj.ID(), "err", "out of lifetime") continue } // settled, out of energy or dep broken executable, err := txObj.Executable(p.chain, state, headBlock) if err != nil { - toRemove = append(toRemove, txObj.ID()) + toRemove = append(toRemove, txObj) log.Debug("tx washed out", "id", txObj.ID(), "err", err) continue } @@ -327,18 +327,18 @@ func (p *TxPool) wash(headBlock *block.Header) (executables tx.Transactions, rem // remove over limit txs, from non-executables to low priced if len(executableObjs) > limit { for _, txObj := range nonExecutableObjs { - toRemove = append(toRemove, txObj.ID()) + toRemove = append(toRemove, txObj) log.Debug("non-executable tx washed out due to pool limit", "id", txObj.ID()) } for _, txObj := range executableObjs[limit:] { - toRemove = append(toRemove, txObj.ID()) + toRemove = append(toRemove, txObj) log.Debug("executable tx washed out due to pool limit", "id", txObj.ID()) } executableObjs = executableObjs[:limit] } else if len(executableObjs)+len(nonExecutableObjs) > limit { // executableObjs + nonExecutableObjs over pool limit for _, txObj := range nonExecutableObjs[limit-len(executableObjs):] { - toRemove = append(toRemove, txObj.ID()) + toRemove = append(toRemove, txObj) log.Debug("non-executable tx washed out due to pool limit", "id", txObj.ID()) } } From 09ad91ce6bec40f92a2944b83ad0605327ba81e9 Mon Sep 17 00:00:00 2001 From: qianbin Date: Mon, 6 May 2019 17:49:33 +0800 Subject: [PATCH 3/4] remap stashed txs using tx hashes --- cmd/thor/node/tx_stash.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/cmd/thor/node/tx_stash.go b/cmd/thor/node/tx_stash.go index 897e0a380..1059cc602 100644 --- a/cmd/thor/node/tx_stash.go +++ b/cmd/thor/node/tx_stash.go @@ -6,6 +6,7 @@ package node import ( + "bytes" "container/list" "github.com/ethereum/go-ethereum/rlp" @@ -27,7 +28,7 @@ func newTxStash(kv kv.GetPutter, maxSize int) *txStash { } func (ts *txStash) Save(tx *tx.Transaction) error { - has, err := ts.kv.Has(tx.ID().Bytes()) + has, err := ts.kv.Has(tx.Hash().Bytes()) if err != nil { return err } @@ -40,10 +41,10 @@ func (ts *txStash) Save(tx *tx.Transaction) error { return err } - if err := ts.kv.Put(tx.ID().Bytes(), data); err != nil { + if err := ts.kv.Put(tx.Hash().Bytes(), data); err != nil { return err } - ts.fifo.PushBack(tx.ID()) + ts.fifo.PushBack(tx.Hash()) for ts.fifo.Len() > ts.maxSize { keyToDelete := ts.fifo.Remove(ts.fifo.Front()).(thor.Bytes32).Bytes() if err := ts.kv.Delete(keyToDelete); err != nil { @@ -54,6 +55,7 @@ func (ts *txStash) Save(tx *tx.Transaction) error { } func (ts *txStash) LoadAll() tx.Transactions { + batch := ts.kv.NewBatch() var txs tx.Transactions iter := ts.kv.NewIterator(*kv.NewRangeWithBytesPrefix(nil)) for iter.Next() { @@ -65,8 +67,19 @@ func (ts *txStash) LoadAll() tx.Transactions { } } else { txs = append(txs, &tx) - ts.fifo.PushBack(tx.ID()) + ts.fifo.PushBack(tx.Hash()) + + // Keys were tx ids. + // Here to remap values using tx hashes. + if !bytes.Equal(iter.Key(), tx.Hash().Bytes()) { + batch.Delete(iter.Key()) + batch.Put(tx.Hash().Bytes(), iter.Value()) + } } } + + if err := batch.Write(); err != nil { + log.Warn("remap stashed txs", "err", err) + } return txs } From 8f567acc80cfb5561184f4cbf94013bba9d00496 Mon Sep 17 00:00:00 2001 From: qianbin Date: Tue, 7 May 2019 00:50:20 +0800 Subject: [PATCH 4/4] revert known txs cache --- comm/handle_rpc.go | 2 +- comm/peer.go | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/comm/handle_rpc.go b/comm/handle_rpc.go index 8f5ece2ba..9c1f7824e 100644 --- a/comm/handle_rpc.go +++ b/comm/handle_rpc.go @@ -70,7 +70,7 @@ func (c *Communicator) handleRPC(peer *Peer, msg *p2p.Msg, write func(interface{ return errors.WithMessage(err, "decode msg") } peer.MarkTransaction(newTx.Hash()) - c.txPool.StrictlyAdd(newTx) + c.txPool.Add(newTx) write(&struct{}{}) case proto.MsgGetBlockByID: var blockID thor.Bytes32 diff --git a/comm/peer.go b/comm/peer.go index d4ca78f6e..ee6a8009f 100644 --- a/comm/peer.go +++ b/comm/peer.go @@ -13,8 +13,8 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" + lru "github.com/hashicorp/golang-lru" "github.com/inconshreveable/log15" - "github.com/vechain/thor/cache" "github.com/vechain/thor/p2psrv/rpc" "github.com/vechain/thor/thor" ) @@ -36,8 +36,8 @@ type Peer struct { logger log15.Logger createdTime mclock.AbsTime - knownTxs *cache.RandCache - knownBlocks *cache.RandCache + knownTxs *lru.Cache + knownBlocks *lru.Cache head struct { sync.Mutex id thor.Bytes32 @@ -54,14 +54,15 @@ func newPeer(peer *p2p.Peer, rw p2p.MsgReadWriter) *Peer { "peer", peer, "dir", dir, } - + knownTxs, _ := lru.New(maxKnownTxs) + knownBlocks, _ := lru.New(maxKnownBlocks) return &Peer{ Peer: peer, RPC: rpc.New(peer, rw), logger: log.New(ctx...), createdTime: mclock.Now(), - knownTxs: cache.NewRandCache(maxKnownTxs), - knownBlocks: cache.NewRandCache(maxKnownBlocks), + knownTxs: knownTxs, + knownBlocks: knownBlocks, } } @@ -83,12 +84,12 @@ func (p *Peer) UpdateHead(id thor.Bytes32, totalScore uint64) { // MarkTransaction marks a transaction to known. func (p *Peer) MarkTransaction(hash thor.Bytes32) { - p.knownTxs.Set(hash, time.Now().Unix()) + p.knownTxs.Add(hash, time.Now().Unix()) } // MarkBlock marks a block to known. func (p *Peer) MarkBlock(id thor.Bytes32) { - p.knownBlocks.Set(id, struct{}{}) + p.knownBlocks.Add(id, struct{}{}) } // IsTransactionKnown returns if the transaction is known.