Skip to content

Commit

Permalink
Merge pull request #3139 from algorand/rel/beta-3.1.3
Browse files Browse the repository at this point in the history
go-algorand v3.1.3-beta
  • Loading branch information
algojohnlee authored Oct 26, 2021
2 parents 53cf013 + 615fee2 commit 378816d
Show file tree
Hide file tree
Showing 33 changed files with 1,178 additions and 440 deletions.
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2
3
6 changes: 6 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (p *testUnicastPeer) IsOutgoing() bool {
return false
}

// GetConnectionLatency returns the connection latency between the local node and this peer.
func (p *testUnicastPeer) GetConnectionLatency() time.Duration {
return time.Duration(0)
}

func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback network.UnicastWebsocketMessageStateCallback) error {
ps := p.gn.(*httpTestPeerSource)
var dispather network.MessageHandler
Expand Down
5 changes: 5 additions & 0 deletions catchup/peerSelector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (d *mockUnicastPeer) IsOutgoing() bool {
return false
}

// GetConnectionLatency returns the connection latency between the local node and this peer.
func (d *mockUnicastPeer) GetConnectionLatency() time.Duration {
return time.Duration(0)
}

func TestPeerAddress(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
3 changes: 3 additions & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,

if err != nil {
switch err.(type) {
case ledgercore.ErrNonSequentialBlockEval:
s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r)
return true
case ledgercore.BlockInLedgerError:
s.log.Infof("fetchAndWrite(%d): block already in ledger", r)
return true
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17"`
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18"`

// environmental (may be overridden)
// When enabled, stores blocks indefinitally, otherwise, only the most recents blocks
Expand All @@ -84,7 +84,7 @@ type Local struct {
MaxConnectionsPerIP int `version[3]:"30"`

// 0 == disable
PeerPingPeriodSeconds int `version[0]:"0"`
PeerPingPeriodSeconds int `version[0]:"0" version[18]:"10"`

// for https serving
TLSCertFile string `version[0]:""`
Expand Down
4 changes: 2 additions & 2 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package config

var defaultLocal = Local{
Version: 17,
Version: 18,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AnnounceParticipationKey: true,
Expand Down Expand Up @@ -92,7 +92,7 @@ var defaultLocal = Local{
OutgoingMessageFilterBucketSize: 128,
ParticipationKeysRefreshInterval: 60000000000,
PeerConnectionsUpdateInterval: 3600,
PeerPingPeriodSeconds: 0,
PeerPingPeriodSeconds: 10,
PriorityPeers: map[string]bool{},
PublicAddress: "",
ReconnectTime: 60000000000,
Expand Down
2 changes: 1 addition & 1 deletion data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) {
// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer)
if err != nil {
logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v")
logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err)
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/algorand/graphtrace v0.0.0-20201117160756-e524ed1a6f64
github.com/algorand/msgp v1.1.48
github.com/algorand/oapi-codegen v1.3.5-algorand5
github.com/algorand/websocket v1.4.2
github.com/algorand/websocket v1.4.3
github.com/algorand/xorfilter v0.2.0
github.com/aws/aws-sdk-go v1.16.5
github.com/chrismcguire/gobberish v0.0.0-20150821175641-1d8adb509a0e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/algorand/msgp v1.1.48 h1:5P+gVmTnk0m37r+rA3ZsFZW219ZqmCLulW5f8Z+3nx8=
github.com/algorand/msgp v1.1.48/go.mod h1:LtOntbYiCHj/Sl/Sqxtf8CZOrDt2a8Dv3tLaS6mcnUE=
github.com/algorand/oapi-codegen v1.3.5-algorand5 h1:y576Ca2/guQddQrQA7dtL5KcOx5xQgPeIupiuFMGyCI=
github.com/algorand/oapi-codegen v1.3.5-algorand5/go.mod h1:/k0Ywn0lnt92uBMyE+yiRf/Wo3/chxHHsAfenD09EbY=
github.com/algorand/websocket v1.4.2 h1:zMB7ukz+c7tcef8rVqmKQTv6KQtxXtCFuiAqKaE7n9I=
github.com/algorand/websocket v1.4.2/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8=
github.com/algorand/websocket v1.4.3 h1:8YiA+ZtwqAyg0K30lQyl7gUdKUArYXvBtd/cTFwA4uQ=
github.com/algorand/websocket v1.4.3/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8=
github.com/algorand/xorfilter v0.2.0 h1:YC31ANxdZ2jmtbwqv1+USskVSqjkeiRZcQGc6//ro9Q=
github.com/algorand/xorfilter v0.2.0/go.mod h1:f5cJsYrFbJhXkbjnV4odJB44np05/PvwvdBnABnQoUs=
github.com/aws/aws-sdk-go v1.16.5 h1:NVxzZXIuwX828VcJrpNxxWjur1tlOBISdMdDdHIKHcc=
Expand Down
4 changes: 2 additions & 2 deletions installer/config.json.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"Version": 17,
"Version": 18,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AnnounceParticipationKey": true,
Expand Down Expand Up @@ -71,7 +71,7 @@
"OutgoingMessageFilterBucketSize": 128,
"ParticipationKeysRefreshInterval": 60000000000,
"PeerConnectionsUpdateInterval": 3600,
"PeerPingPeriodSeconds": 0,
"PeerPingPeriodSeconds": 10,
"PriorityPeers": {},
"PublicAddress": "",
"ReconnectTime": 60000000000,
Expand Down
170 changes: 170 additions & 0 deletions network/latencyTracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"errors"
"net"
"strconv"
"sync/atomic"
"time"

"github.com/algorand/websocket"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/config"
)

const pongMessageWriteDuration = time.Second
const pingMessageWriteDuration = time.Second

var errInvalidPongMessageContent = errors.New("invalid pong message content")
var errInvalidPingMessageContent = errors.New("invalid ping message content")

// latencyTracker works in conjunction with the wspeer in measuring the
// communication latency over the websocket connection.
type latencyTracker struct {
// receivedPacketCounter is a counter for all incoming messages
// placed here to be aligned with 64bit address.
receivedPacketCounter uint64

// latency is the effective latency of the connection.
// placed here to be aligned with 64bit address.
latency int64

// lastPingSentTime is the timestamp at which we last sent a message.
// this variable is only touched by checkPingSending, and therefore doesn't
// need to be syncronized. The "clone" of this variable lastPingSentTimeSynced,
// is being used by both the checkPingSending as well as by the pongHandler
// and therefore require synchronization.
lastPingSentTime int64

// static variables
// ( doesn't get changed after init, hence, no synchronization needed )

// conn is the underlying connection object.
conn wsPeerWebsocketConn

// enabled indicates whether the pingpong is currently enabled or not.
enabled bool

// pingInterval is the max interval at which the client would send ping messages.
pingInterval time.Duration

// lastPingMu synchronize the protected variables that might be modified across
// the checkPingSending and the pongHandler. All the variable below this point
// need to be syncronized with the mutex.
lastPingMu deadlock.Mutex

// lastPingID is the last ping ID, a monotonic growing number used to ensure
// that the pong message we've receive corresponds to the latest ping message
// that we've sent.
lastPingID uint64

// lastPingReceivedCounter stores message counter at the time we sent the ping.
// In order to ensure the timing accuracy, we want to have no other messages
// being exchanged. This, of course, would only delay the ping-pong until a
// better measurement could be taken.
lastPingReceivedCounter uint64

// lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime.
// it is used only in the case where we end up sending the ping message.
lastPingSentTimeSynced int64
}

func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) {
lt.conn = conn
lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler
lt.latency = int64(initialConnectionLatency)
lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second
conn.SetPingHandler(lt.pingHandler)
conn.SetPongHandler(lt.pongHandler)
}

func (lt *latencyTracker) pingHandler(message string) error {
if _, err := strconv.Atoi(message); err != nil {
return errInvalidPingMessageContent
}
err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
}

func (lt *latencyTracker) pongHandler(message string) error {
pongID, err := strconv.Atoi(message)
if err != nil {
return errInvalidPongMessageContent
}

lt.lastPingMu.Lock()
defer lt.lastPingMu.Unlock()

if uint64(pongID) != lt.lastPingID {
// we've sent more than one ping since; ignore this message.
return nil
}
if lt.receivedPacketCounter != lt.lastPingReceivedCounter {
// we've received other messages since the one that we sent. The timing
// here would not be accurate.
return nil
}
lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced)
roundtripDuration := time.Since(lastPingSentTime)
atomic.StoreInt64(&lt.latency, roundtripDuration.Nanoseconds())
return nil
}

func (lt *latencyTracker) getConnectionLatency() time.Duration {
return time.Duration(atomic.LoadInt64(&lt.latency))
}

func (lt *latencyTracker) checkPingSending(now *time.Time) error {
if !lt.enabled {
return nil
}
if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval {
return nil
}

// it looks like it's time to send a ping :
lt.lastPingMu.Lock()
defer lt.lastPingMu.Unlock()

lt.lastPingID++
err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
if err != nil {
return err
}
lt.lastPingSentTimeSynced = now.UnixNano()
lt.lastPingReceivedCounter = atomic.LoadUint64(&lt.receivedPacketCounter)
lt.lastPingSentTime = lt.lastPingSentTimeSynced
return nil
}

func (lt *latencyTracker) increaseReceivedCounter() {
atomic.AddUint64(&lt.receivedPacketCounter, 1)
}
Loading

0 comments on commit 378816d

Please sign in to comment.