Skip to content

Commit

Permalink
Improve Bandwidth Estimation in Txnsync (#3096)
Browse files Browse the repository at this point in the history

## Summary


Improve the bandwidth estimation within the transaction sync by having the estimation account for latency, transaction compression time, and time spent waiting in the incoming queue.

## Test Plan


Wrote unit tests for correctness, ran network on mainnet model and observed measured bandwidths. Before the bandwidth would converge to the minimum over time as well have erratic inaccuracies. Now the numbers look much more in range, at most a factor of 2 off.
  • Loading branch information
nicholasguoalgorand authored Oct 24, 2021
1 parent 2c79cfd commit b93ad64
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 64 deletions.
7 changes: 6 additions & 1 deletion node/txnSyncConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func (tsnc *transactionSyncNodeConnector) SendPeerMessage(netPeer interface{}, m
}
}

func (tsnc *transactionSyncNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration {
unicastPeer := netPeer.(network.UnicastPeer)
return unicastPeer.GetConnectionLatency()
}

// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
Expand Down Expand Up @@ -215,7 +220,7 @@ func (tsnc *transactionSyncNodeConnector) Handle(raw network.IncomingMessage) ne
peer = peerData.(*txnsync.Peer)
}

err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence)
err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence, raw.Received)
if err != nil {
return network.OutgoingMessage{
Action: network.Disconnect,
Expand Down
6 changes: 6 additions & 0 deletions txnsync/bloomFilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -357,6 +358,11 @@ func (fn *justRandomFakeNode) UpdatePeers(txsyncPeers []*Peer, netPeers []interf
}
func (fn *justRandomFakeNode) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) {
}

func (fn *justRandomFakeNode) GetPeerLatency(netPeer interface{}) time.Duration {
return 0
}

func (fn *justRandomFakeNode) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) {
return
}
Expand Down
6 changes: 5 additions & 1 deletion txnsync/emulatorNode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ func (n *emulatedNode) SendPeerMessage(netPeer interface{}, msg []byte, callback
peer.outSeq++
}

func (n *emulatedNode) GetPeerLatency(netPeer interface{}) time.Duration {
return 0
}

func (n *emulatedNode) GetPendingTransactionGroups() ([]pooldata.SignedTxGroup, uint64) {
return n.txpoolEntries, n.latestLocallyOriginatedGroupCounter
}
Expand Down Expand Up @@ -356,7 +360,7 @@ func (n *emulatedNode) step() {

peer.mu.Unlock()

msgHandler(peer, peer.peer, msgBytes, msgInSeq)
msgHandler(peer, peer.peer, msgBytes, msgInSeq, 0)
n.unblock()
n.waitBlocked()
peer.mu.Lock()
Expand Down
13 changes: 9 additions & 4 deletions txnsync/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type incomingMessage struct {
encodedSize int // the byte length of the incoming network message
bloomFilter *testableBloomFilter
transactionGroups []pooldata.SignedTxGroup
timeReceived int64
}

// incomingMessageQueue manages the global incoming message queue across all the incoming peers.
Expand Down Expand Up @@ -108,7 +109,7 @@ func (imq *incomingMessageQueue) clear(m incomingMessage) {

// incomingMessageHandler
// note - this message is called by the network go-routine dispatch pool, and is not synchronized with the rest of the transaction synchronizer
func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) (err error) {
func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) (err error) {
// increase number of incoming messages metric.
txsyncIncomingMessagesTotal.Inc(nil)

Expand All @@ -120,7 +121,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
}
}()

incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer}
incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer, timeReceived: receivedTimestamp}
_, err = incomingMessage.message.UnmarshalMsg(message)
if err != nil {
// if we received a message that we cannot parse, disconnect.
Expand Down Expand Up @@ -194,7 +195,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) {
}
if peerInfo.TxnSyncPeer == nil {
// we couldn't really do much about this message previously, since we didn't have the peer.
peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(message.networkPeer))
// let the network peer object know about our peer
s.node.UpdatePeers([]*Peer{peer}, []interface{}{message.networkPeer}, 0)
} else {
Expand Down Expand Up @@ -249,7 +250,11 @@ incomingMessageLoop:
}

peer.updateRequestParams(incomingMsg.message.UpdatedRequestParams.Modulator, incomingMsg.message.UpdatedRequestParams.Offset)
peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), incomingMsg.encodedSize)
timeInQueue := time.Duration(0)
if incomingMsg.timeReceived > 0 {
timeInQueue = time.Since(time.Unix(0, incomingMsg.timeReceived))
}
peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), timeInQueue, peer.cachedLatency, incomingMsg.encodedSize)

// if the peer's round is more than a single round behind the local node, then we don't want to
// try and load the transactions. The other peer should first catch up before getting transactions.
Expand Down
21 changes: 11 additions & 10 deletions txnsync/incoming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,28 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
cfg := config.GetDefaultLocal()
mNodeConnector := &mockNodeConnector{transactionPoolSize: 3}
s := syncState{
log: wrapLogger(&incLogger, &cfg),
node: mNodeConnector,
log: wrapLogger(&incLogger, &cfg),
node: mNodeConnector,
clock: mNodeConnector.Clock(),
}

// expect UnmarshalMsg error
messageBytes[0] = 0
err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
msgpe := msgp.TypeError{}
require.True(t, errors.As(err, &msgpe))

// expect wrong version error
message = transactionBlockMessage{Version: -3}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errUnsupportedTransactionSyncMessageVersion, err)

// expect error decoding bloomFilter
message.Version = 1
message.TxnBloomFilter.BloomFilterType = byte(multiHashBloomFilter)
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errInvalidBloomFilter, err)

// error decoding transaction groups
Expand All @@ -89,33 +90,33 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
require.NoError(t, err)
message.TransactionGroups = packedTransactionGroups{Bytes: []byte{1}}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errDecodingReceivedTransactionGroupsFailed, err)

// error queue full
message.TransactionGroups = packedTransactionGroups{}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)

// Success where peer == nil
s.incomingMessagesQ = makeIncomingMessageQueue()
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.NoError(t, err)

peer := Peer{}

// error when placing the peer message on the main queue (incomingMessages cannot accept messages)
s.incomingMessagesQ = incomingMessageQueue{}
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)

s.incomingMessagesQ = makeIncomingMessageQueue()
err = nil
// fill up the incoming message queue (one was already added)
for x := 1; x <= messageOrderingHeapLimit; x++ {
require.NoError(t, err)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
}
require.Equal(t, errHeapReachedCapacity, err)
}
Expand Down
5 changes: 4 additions & 1 deletion txnsync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package txnsync

import (
"time"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/pooldata"
"github.com/algorand/go-algorand/util/timers"
Expand Down Expand Up @@ -46,7 +48,7 @@ type Event struct {
}

// IncomingMessageHandler is the signature of the incoming message handler used by the transaction sync to receive network messages
type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) error
type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) error

// SendMessageCallback define a message sent feedback for performing message tracking
type SendMessageCallback func(enqueued bool, sequenceNumber uint64) error
Expand Down Expand Up @@ -79,6 +81,7 @@ type NodeConnector interface {
// across all the connected peers.
UpdatePeers(txsyncPeers []*Peer, netPeers []interface{}, peersAverageDataExchangeRate uint64)
SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback)
GetPeerLatency(netPeer interface{}) time.Duration
// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
Expand Down
9 changes: 8 additions & 1 deletion txnsync/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func (s *syncState) onNewRoundEvent(ent Event) {
if !s.isRelay {
s.nextOffsetRollingCh = s.clock.TimeoutAt(kickoffTime + 2*s.lastBeta)
}
s.updatePeersLatency(peers)
s.updatePeersRequestParams(peers)
}

Expand Down Expand Up @@ -395,7 +396,7 @@ func (s *syncState) getPeers() (result []*Peer) {
// some of the network peers might not have a sync peer, so we need to create one for these.
for _, peerInfo := range peersInfo {
if peerInfo.TxnSyncPeer == nil {
syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(peerInfo.NetworkPeer))
peerInfo.TxnSyncPeer = syncPeer
updatedNetworkPeers = append(updatedNetworkPeers, peerInfo.NetworkPeer)
updatedNetworkPeersSync = append(updatedNetworkPeersSync, syncPeer)
Expand Down Expand Up @@ -435,3 +436,9 @@ func (s *syncState) updatePeersRequestParams(peers []*Peer) {
}
}
}

func (s *syncState) updatePeersLatency(peers []*Peer) {
for _, peer := range peers {
peer.cachedLatency = s.node.GetPeerLatency(peer.networkPeer)
}
}
25 changes: 16 additions & 9 deletions txnsync/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ type sentMessageMetadata struct {
// could be a lengthy operation which does't need to be blocking the main loop. Moving the actual encoding into an
// execution pool thread frees up the main loop, allowing smoother operation.
type messageAsyncEncoder struct {
state *syncState
messageData sentMessageMetadata
roundClock timers.WallClock
peerDataExchangeRate uint64
state *syncState
messageData sentMessageMetadata
roundClock timers.WallClock
lastReceivedMessageTimestamp time.Duration
peerDataExchangeRate uint64
// sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of
// the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel
// without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback
Expand Down Expand Up @@ -94,6 +95,12 @@ func (encoder *messageAsyncEncoder) asyncEncodeAndSend(interface{}) interface{}
encoder.messageData.transactionGroups = nil // clear out to allow GC to reclaim
}

if encoder.lastReceivedMessageTimestamp >= 0 {
// adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
// the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
encoder.messageData.message.MsgSync.ResponseElapsedTime = uint64((encoder.roundClock.Since() - encoder.lastReceivedMessageTimestamp).Nanoseconds())
}

encodedMessage := encoder.messageData.message.MarshalMsg(getMessageBuffer())
encoder.messageData.encodedMessageSize = len(encodedMessage)
// now that the message is ready, we can discard the encoded transaction group slice to allow the GC to collect it.
Expand Down Expand Up @@ -143,7 +150,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
for _, peer := range peers {
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh}
profAssembleMessage.start()
msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions)
msgEncoder.messageData, assembledBloomFilter, msgEncoder.lastReceivedMessageTimestamp = s.assemblePeerMessage(peer, &pendingTransactions)
profAssembleMessage.end()
isPartialMessage := msgEncoder.messageData.partialMessage
// The message that we've just encoded is expected to be sent out with the next sequence number.
Expand Down Expand Up @@ -183,7 +190,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
}
}

func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter) {
func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter, lastReceivedMessageTimestamp time.Duration) {
metaMessage = sentMessageMetadata{
peer: peer,
message: &transactionBlockMessage{
Expand Down Expand Up @@ -281,10 +288,10 @@ notxns:
}

metaMessage.message.MsgSync.RefTxnBlockMsgSeq = peer.nextReceivedMessageSeq - 1
// signify that timestamp is not set
lastReceivedMessageTimestamp = time.Duration(-1)
if peer.lastReceivedMessageTimestamp != 0 && peer.lastReceivedMessageLocalRound == s.round {
// adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
// the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
metaMessage.message.MsgSync.ResponseElapsedTime = uint64((s.clock.Since() - peer.lastReceivedMessageTimestamp).Nanoseconds()) + 1
lastReceivedMessageTimestamp = peer.lastReceivedMessageTimestamp
// reset the lastReceivedMessageTimestamp so that we won't be using that again on a subsequent outgoing message.
peer.lastReceivedMessageTimestamp = 0
}
Expand Down
14 changes: 7 additions & 7 deletions txnsync/outgoing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,14 @@ func TestAssemblePeerMessage_messageConstBloomFilter(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.Equal(s.lastBloomFilter, expectedFilter)
}

Expand Down Expand Up @@ -330,14 +330,14 @@ func TestAssemblePeerMessage_messageConstBloomFilterNonRelay(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.NotEqual(s.lastBloomFilter, expectedFilter)
}

Expand All @@ -362,14 +362,14 @@ func TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestP
s.isRelay = true
s.lastBeta = 123 * time.Nanosecond

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.Equal(metaMessage.message.MsgSync.NextMsgMinDelay, uint64(s.lastBeta.Nanoseconds())*2)

}
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestAssemblePeerMessage_messageConstTransactions(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateHoldsoff

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, _ := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(len(metaMessage.transactionGroups), 1)
a.True(reflect.DeepEqual(metaMessage.transactionGroups[0], pendingTransactions.pendingTransactionsGroups[0]))
Expand Down
Loading

0 comments on commit b93ad64

Please sign in to comment.