Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman committed Oct 24, 2021
1 parent 27b96d6 commit 92a8c4c
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 22 deletions.
68 changes: 54 additions & 14 deletions txnsync/incomingMsgQ.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ type incomingMessageQueue struct {
shutdownRequest chan struct{}
shutdownConfirmed chan struct{}
deletePeersCh chan interface{}
peerlessCount int
}

// maxPeersCount defines the maximum number of supported peers that can have their messages waiting
// in the incoming message queue at the same time. This number can be lower then the actual number of
// connected peers, as it's used only for pending messages.
const maxPeersCount = 1024
const maxPeersCount = 2048

// maxPeerlessCount is the number of messages that we've received that doesn't have a Peer object allocated
// for them ( yet )
const maxPeerlessCount = 512

// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables.
func makeIncomingMessageQueue() *incomingMessageQueue {
Expand All @@ -67,13 +72,14 @@ func makeIncomingMessageQueue() *incomingMessageQueue {
return imq
}

// dequeueHead removes the first head message from the linked list.
func (ml *queuedMsgList) dequeueHead() (out *queuedMsgEntry) {
if ml.head == nil {
return nil
}
entry := ml.head
out = entry
if entry.next == entry.next {
if entry.next == entry {
ml.head = nil
return
}
Expand All @@ -85,36 +91,47 @@ func (ml *queuedMsgList) dequeueHead() (out *queuedMsgEntry) {
return
}

// dequeueHead initialize a list to have msgCount entries.
func (ml *queuedMsgList) initialize(msgCount int) {
msgs := make([]queuedMsgEntry, msgCount)
for i := 0; i < msgCount; i++ {
msgs[i].next = &msgs[(i+1)%maxPeersCount]
msgs[i].prev = &msgs[(i+maxPeersCount-1)%maxPeersCount]
msgs[i].next = &msgs[(i+1)%msgCount]
msgs[i].prev = &msgs[(i+msgCount-1)%msgCount]
}
ml.head = &msgs[0]
}

// empty methods tests to see if the linked list is empty
func (ml *queuedMsgList) empty() bool {
return ml.head == nil
}

// remove removes the given msg from the linked list. The method
// is written with the assumption that the given msg is known to be
// part of the linked list.
func (ml *queuedMsgList) remove(msg *queuedMsgEntry) {
if msg.next == msg.next {
if msg.next == msg {
ml.head = nil
return
}
msg.prev.next = msg.next
msg.next.prev = msg.prev
if ml.head == msg {
ml.head = msg.next
}
msg.prev = msg
msg.next = msg
return
}

// filterRemove removes zero or more messages from the linked list, for which the given
// removeFunc returns true. The removed linked list entries are returned as a linked list.
func (ml *queuedMsgList) filterRemove(removeFunc func(*queuedMsgEntry) bool) *queuedMsgEntry {
if ml.empty() {
return nil
}
if ml.head.next == ml.head.prev {
// do we have a single item ?
if ml.head.next == ml.head {
if removeFunc(ml.head) {
out := ml.head
ml.head = nil
Expand All @@ -123,24 +140,23 @@ func (ml *queuedMsgList) filterRemove(removeFunc func(*queuedMsgEntry) bool) *qu
return nil
}
current := ml.head
last := ml.head.prev
var letGo queuedMsgList
for current != nil {
for {
next := current.next
if next == current {
next = nil
}
if removeFunc(current) {
ml.remove(current)
letGo.enqueueTail(current)
}
if next == ml.head {
if current == last {
break
}
current = next
}
return letGo.head
}

// enqueueTail adds to the current linked list another linked list whose head is msg.
func (ml *queuedMsgList) enqueueTail(msg *queuedMsgEntry) {
if ml.head == nil {
ml.head = msg
Expand All @@ -156,6 +172,8 @@ func (ml *queuedMsgList) enqueueTail(msg *queuedMsgEntry) {
lastEntryNew.next = ml.head
}

// shutdown signals to the message pump to shut down and waits until the message pump goroutine
// aborts.
func (imq *incomingMessageQueue) shutdown() {
imq.enqueuedPeersMu.Lock()
close(imq.shutdownRequest)
Expand All @@ -164,6 +182,8 @@ func (imq *incomingMessageQueue) shutdown() {
<-imq.shutdownConfirmed
}

// messagePump is the incoming message queue message pump. It takes messages from the messages list
// and attempt to write these to the outboundPeerCh.
func (imq *incomingMessageQueue) messagePump() {
defer close(imq.shutdownConfirmed)
imq.enqueuedPeersMu.Lock()
Expand All @@ -184,8 +204,9 @@ func (imq *incomingMessageQueue) messagePump() {
imq.freelist.enqueueTail(msgEntry)
if msg.peer != nil {
delete(imq.enqueuedPeersMap, msg.peer)
} else {
imq.peerlessCount--
}

imq.enqueuedPeersMu.Unlock()
writeOutboundMessage:
select {
Expand Down Expand Up @@ -227,6 +248,11 @@ func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool {
if _, has := imq.enqueuedPeersMap[m.peer]; has {
return true
}
} else {
// do we have enough "room" for peerless messages ?
if imq.peerlessCount >= maxPeerlessCount {
return false
}
}
// do we have enough room in the message queue for the new message ?
if imq.freelist.empty() {
Expand All @@ -239,6 +265,8 @@ func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool {
// if we successfully enqueued the message, set the enqueuedPeersMap so that we won't enqueue the same peer twice.
if m.peer != nil {
imq.enqueuedPeersMap[m.peer] = freeMsgEntry
} else {
imq.peerlessCount++
}
imq.enqueuedPeersCond.Signal()
return true
Expand Down Expand Up @@ -292,11 +320,19 @@ func (imq *incomingMessageQueue) erase(peer *Peer, networkPeer interface{}) {
// queue.
// note : the method expect that the enqueuedPeersMu lock would be taken.
func (imq *incomingMessageQueue) removeMessageByNetworkPeer(networkPeer interface{}) {
peerlessCount := 0
removeByNetworkPeer := func(msg *queuedMsgEntry) bool {
return msg.msg.networkPeer == networkPeer
if msg.msg.networkPeer == networkPeer {
if msg.msg.peer == nil {
peerlessCount++
}
return true
}
return false
}
removeList := imq.messages.filterRemove(removeByNetworkPeer)
imq.freelist.enqueueTail(removeList)
imq.peerlessCount -= peerlessCount
}

// prunePeers removes from the enqueuedMessages queue all the entries that are not provided in the
Expand All @@ -314,20 +350,24 @@ func (imq *incomingMessageQueue) prunePeers(activePeers []PeerInfo) (peerRemoved
}
imq.enqueuedPeersMu.Lock()
defer imq.enqueuedPeersMu.Unlock()

peerlessCount := 0
isPeerMissing := func(msg *queuedMsgEntry) bool {
if msg.msg.peer != nil {
if !activePeersMap[msg.msg.peer] {
return true
}
}
if !activeNetworkPeersMap[msg.msg.networkPeer] {
if msg.msg.peer == nil {
peerlessCount++
}
return true
}
return false
}
removeList := imq.messages.filterRemove(isPeerMissing)
peerRemoved = removeList != nil
imq.freelist.enqueueTail(removeList)
imq.peerlessCount -= peerlessCount
return
}
120 changes: 116 additions & 4 deletions txnsync/incomingMsgQ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,136 @@
package txnsync

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

// fillMessageQueue fills the message queue with the given message.
func (imq *incomingMessageQueue) fillMessageQueue(msg incomingMessage) {
imq.enqueuedPeersMu.Lock()
for i := 0; i < maxPeersCount; i++ {
msgEntry := imq.freelist.dequeueHead()
msgEntry.msg = msg
imq.messages.enqueueTail(msgEntry)
}
if msg.peer == nil {
imq.peerlessCount += maxPeersCount
}
imq.enqueuedPeersCond.Signal()
imq.enqueuedPeersMu.Unlock()

// reading this channel would fill up the staging "msg" in messagePump
<-imq.getIncomingMessageChannel()
imq.enqueue(msg)
for !imq.enqueue(msg) {
// wait for a single message to be consumed by the message pump.
for {
imq.enqueuedPeersMu.Lock()
if !imq.freelist.empty() {
break
}
imq.enqueuedPeersMu.Unlock()
time.Sleep(time.Millisecond)
}
for !imq.freelist.empty() {
msgEntry := imq.freelist.dequeueHead()
msgEntry.msg = msg
imq.messages.enqueueTail(msgEntry)
}
imq.enqueuedPeersCond.Signal()
imq.enqueuedPeersMu.Unlock()
}

// count counts teh number of messages in the list
func (ml *queuedMsgList) count() int {
first := ml.head
cur := first
count := 0
for cur != nil {
next := cur.next
if next == first {
next = nil
}
count++
cur = next
}
return count
}

// validateLinking test to see the the entries in the list are correctly connected.
func (ml *queuedMsgList) validateLinking(t *testing.T) {
cur := ml.head
if cur == nil {
return
}
seen := make(map[*queuedMsgEntry]bool)
list := make([]*queuedMsgEntry, 0)
for {
if seen[cur] {
break
}
seen[cur] = true
require.NotNil(t, cur.prev)
require.NotNil(t, cur.next)
list = append(list, cur)
cur = cur.next
}
for i := range list {
require.Equal(t, list[i], list[(i+len(list)-1)%len(list)].next)
require.Equal(t, list[i], list[(i+1)%len(list)].prev)
}
}

// TestMsgQCounts tests the message queue add/remove manipulations
func TestMsgQCounts(t *testing.T) {
var list queuedMsgList
list.initialize(7)
list.validateLinking(t)
require.Equal(t, 7, list.count())
list.dequeueHead()
list.validateLinking(t)
require.Equal(t, 6, list.count())
var anotherList queuedMsgList
anotherList.initialize(4)
require.Equal(t, 4, anotherList.count())
list.enqueueTail(anotherList.head)
list.validateLinking(t)
require.Equal(t, 10, list.count())
}

// TestMsgQFiltering tests the message queue filtering
func TestMsgQFiltering(t *testing.T) {
item1 := &queuedMsgEntry{}
item2 := &queuedMsgEntry{}
item3 := &queuedMsgEntry{}
item1.next = item1
item1.prev = item1
item2.next = item2
item2.prev = item2
item3.next = item3
item3.prev = item3

var list queuedMsgList
list.enqueueTail(item1)
list.enqueueTail(item2)
list.enqueueTail(item3)

// test removing head.
removedItem1 := list.filterRemove(func(msg *queuedMsgEntry) bool {
return msg == item1
})
require.Equal(t, item1, removedItem1)
require.Equal(t, 2, list.count())

// test removing tail
removedItem3 := list.filterRemove(func(msg *queuedMsgEntry) bool {
return msg == item3
})
require.Equal(t, item3, removedItem3)
require.Equal(t, 1, list.count())

// test removing last item
removedItem2 := list.filterRemove(func(msg *queuedMsgEntry) bool {
return msg == item2
})
require.Equal(t, item2, removedItem2)
require.True(t, list.empty())
}
8 changes: 4 additions & 4 deletions txnsync/incoming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
require.Equal(t, errDecodingReceivedTransactionGroupsFailed, err)
s.incomingMessagesQ.shutdown()

peer := Peer{networkPeer: &s}

// error queue full
message.TransactionGroups = packedTransactionGroups{}
messageBytes = message.MarshalMsg(nil)
s.incomingMessagesQ = makeIncomingMessageQueue()
s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: nil, networkPeer: &s.incomingMessagesQ})
mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{NetworkPeer: &s.incomingMessagesQ})
s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: &peer, networkPeer: &s.incomingMessagesQ})
mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{TxnSyncPeer: &peer, NetworkPeer: &s.incomingMessagesQ})
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)
s.incomingMessagesQ.shutdown()
Expand All @@ -111,8 +113,6 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
require.NoError(t, err)
s.incomingMessagesQ.shutdown()

peer := Peer{networkPeer: &s}

// error when placing the peer message on the main queue (incomingMessages cannot accept messages)
s.incomingMessagesQ = makeIncomingMessageQueue()
s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: nil, networkPeer: &s})
Expand Down

0 comments on commit 92a8c4c

Please sign in to comment.