Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnsync: delete-able incoming message queue #3126

Merged
merged 12 commits into from
Oct 25, 2021
Merged
110 changes: 34 additions & 76 deletions txnsync/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"errors"
"time"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/data/pooldata"
)

Expand All @@ -42,70 +40,6 @@ type incomingMessage struct {
transactionGroups []pooldata.SignedTxGroup
}

// incomingMessageQueue manages the global incoming message queue across all the incoming peers.
type incomingMessageQueue struct {
incomingMessages chan incomingMessage
enqueuedPeers map[*Peer]struct{}
enqueuedPeersMu deadlock.Mutex
}

// maxPeersCount defines the maximum number of supported peers that can have their messages waiting
// in the incoming message queue at the same time. This number can be lower then the actual number of
// connected peers, as it's used only for pending messages.
const maxPeersCount = 1024

// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables.
func makeIncomingMessageQueue() incomingMessageQueue {
return incomingMessageQueue{
incomingMessages: make(chan incomingMessage, maxPeersCount),
enqueuedPeers: make(map[*Peer]struct{}, maxPeersCount),
}
}

// getIncomingMessageChannel returns the incoming messages channel, which would contain entries once
// we have one ( or more ) pending incoming messages.
func (imq *incomingMessageQueue) getIncomingMessageChannel() <-chan incomingMessage {
return imq.incomingMessages
}

// enqueue places the given message on the queue, if and only if it's associated peer doesn't
// appear on the incoming message queue already. In the case there is no peer, the message
// would be placed on the queue as is.
// The method returns false if the incoming message doesn't have it's peer on the queue and
// the method has failed to place the message on the queue. True is returned otherwise.
func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool {
if m.peer != nil {
imq.enqueuedPeersMu.Lock()
defer imq.enqueuedPeersMu.Unlock()
if _, has := imq.enqueuedPeers[m.peer]; has {
return true
}
}
select {
case imq.incomingMessages <- m:
// if we successfully enqueued the message, set the enqueuedPeers so that we won't enqueue the same peer twice.
if m.peer != nil {
// at this time, the enqueuedPeersMu is still under lock ( due to the above defer ), so we can access
// the enqueuedPeers here.
imq.enqueuedPeers[m.peer] = struct{}{}
}
return true
default:
return false
}
}

// clear removes the peer that is associated with the message ( if any ) from
// the enqueuedPeers map, allowing future messages from this peer to be placed on the
// incoming message queue.
func (imq *incomingMessageQueue) clear(m incomingMessage) {
if m.peer != nil {
imq.enqueuedPeersMu.Lock()
defer imq.enqueuedPeersMu.Unlock()
delete(imq.enqueuedPeers, m.peer)
}
}

// incomingMessageHandler
// note - this message is called by the network go-routine dispatch pool, and is not synchronized with the rest of the transaction synchronizer
func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) (err error) {
Expand All @@ -125,12 +59,14 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
if err != nil {
// if we received a message that we cannot parse, disconnect.
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.")
s.incomingMessagesQ.erase(peer, networkPeer)
return err
}

if incomingMessage.message.Version != txnBlockMessageVersion {
// we receive a message from a version that we don't support, disconnect.
s.log.Infof("received unsupported transaction sync message version from peer (%d). disconnecting from peer.", incomingMessage.message.Version)
s.incomingMessagesQ.erase(peer, networkPeer)
return errUnsupportedTransactionSyncMessageVersion
}

Expand All @@ -139,6 +75,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
bloomFilter, err := decodeBloomFilter(incomingMessage.message.TxnBloomFilter)
if err != nil {
s.log.Infof("Invalid bloom filter received from peer : %v", err)
s.incomingMessagesQ.erase(peer, networkPeer)
return errInvalidBloomFilter
}
incomingMessage.bloomFilter = bloomFilter
Expand All @@ -150,6 +87,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
incomingMessage.transactionGroups, err = decodeTransactionGroups(incomingMessage.message.TransactionGroups, s.genesisID, s.genesisHash)
if err != nil {
s.log.Infof("failed to decode received transactions groups: %v\n", err)
s.incomingMessagesQ.erase(peer, networkPeer)
return errDecodingReceivedTransactionGroupsFailed
}

Expand All @@ -158,10 +96,21 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
// all the peer objects are created synchronously.
enqueued := s.incomingMessagesQ.enqueue(incomingMessage)
if !enqueued {
// if we can't enqueue that, return an error, which would disconnect the peer.
// ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number)
s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.")
return errTransactionSyncIncomingMessageQueueFull
// if we failed to enqueue, it means that the queue is full. Try to remove disconnected
// peers from the queue before re-attempting.
peers := s.node.GetPeers()
if s.incomingMessagesQ.prunePeers(peers) {
// if we were successful in removing at least a single peer, then try to add the entry again.
enqueued = s.incomingMessagesQ.enqueue(incomingMessage)
}
if !enqueued {
// if we can't enqueue that, return an error, which would disconnect the peer.
// ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number)
s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.")
s.incomingMessagesQ.erase(peer, networkPeer)
return errTransactionSyncIncomingMessageQueueFull
}

}
return nil
}
Expand All @@ -170,15 +119,26 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
if err != nil {
// if the incoming message queue for this peer is full, disconnect from this peer.
s.log.Infof("unable to enqueue incoming message into peer incoming message backlog. disconnecting from peer.")
s.incomingMessagesQ.erase(peer, networkPeer)
return err
}

// (maybe) place the peer message on the main queue. This would get skipped if the peer is already on the queue.
enqueued := s.incomingMessagesQ.enqueue(incomingMessage)
if !enqueued {
// if we can't enqueue that, return an error, which would disconnect the peer.
s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.")
return errTransactionSyncIncomingMessageQueueFull
// if we failed to enqueue, it means that the queue is full. Try to remove disconnected
// peers from the queue before re-attempting.
peers := s.node.GetPeers()
if s.incomingMessagesQ.prunePeers(peers) {
// if we were successful in removing at least a single peer, then try to add the entry again.
enqueued = s.incomingMessagesQ.enqueue(incomingMessage)
}
if !enqueued {
// if we can't enqueue that, return an error, which would disconnect the peer.
s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.")
s.incomingMessagesQ.erase(peer, networkPeer)
return errTransactionSyncIncomingMessageQueueFull
}
}
return nil
}
Expand Down Expand Up @@ -207,9 +167,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) {
return
}
}
// clear the peer that is associated with this incoming message from the message queue, allowing future
// messages from the peer to be placed on the message queue.
s.incomingMessagesQ.clear(message)

messageProcessed := false
transactionPoolSize := 0
totalAccumulatedTransactionsCount := 0 // the number of transactions that were added during the execution of this method
Expand Down
Loading