Skip to content

Commit

Permalink
refactor(ntp): refactor ntp util
Browse files Browse the repository at this point in the history
  • Loading branch information
ambersun1234 committed May 14, 2024
1 parent 63c061d commit ad15e4a
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 73 deletions.
3 changes: 0 additions & 3 deletions cmd/gtk/model_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ package main

import (
"github.com/pactus-project/pactus/node"
"github.com/pactus-project/pactus/util/ntp"
)

type nodeModel struct {
node *node.Node
ntp *ntp.Server
}

func newNodeModel(nde *node.Node) *nodeModel {
return &nodeModel{
node: nde,
ntp: ntp.NewNtpServer(),
}
}
15 changes: 10 additions & 5 deletions cmd/gtk/widget_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,23 @@ func (wn *widgetNode) timeout10() bool {
if wn.model.node.ConsManager().HasActiveInstance() {
isInCommittee = "Yes"
}
offset, offsetErr := wn.model.node.Sync().GetClockOffset()

glib.IdleAdd(func() bool {
offset := wn.model.ntp.ClockOffset()

wn.labelClockOffset.SetText(offset.String())
styleContext, err := wn.labelClockOffset.GetStyleContext()
fatalErrorCheck(err)

if wn.model.ntp.OutOfSync(offset) {
if offsetErr != nil {
styleContext.AddClass("warning")
wn.labelClockOffset.SetText("Error response from NTP server.")
} else {
styleContext.RemoveClass("warning")
wn.labelClockOffset.SetText(offset.String())

if wn.model.node.Sync().OutOfSync(offset) {
styleContext.AddClass("warning")
} else {
styleContext.RemoveClass("warning")
}
}

wn.labelCommitteeSize.SetText(fmt.Sprintf("%v", committeeSize))
Expand Down
4 changes: 4 additions & 0 deletions sync/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sync

import (
"time"

"github.com/pactus-project/pactus/sync/peerset"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
Expand All @@ -13,4 +15,6 @@ type Synchronizer interface {
SelfID() peer.ID
PeerSet() *peerset.PeerSet
Services() service.Services
GetClockOffset() (time.Duration, error)
OutOfSync(time.Duration) bool
}
8 changes: 8 additions & 0 deletions sync/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ func (m *MockSync) PeerSet() *peerset.PeerSet {
func (m *MockSync) Services() service.Services {
return m.TestServices
}

func (*MockSync) GetClockOffset() (time.Duration, error) {
return 1 * time.Second, nil
}

func (*MockSync) OutOfSync(_ time.Duration) bool {
return false
}
40 changes: 13 additions & 27 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type synchronizer struct {
networkCh <-chan network.Event
network network.Network
logger *logger.SubLogger
ntp *ntp.Server
ntp *ntp.Checker
}

func NewSynchronizer(
Expand All @@ -69,7 +69,7 @@ func NewSynchronizer(
network: net,
broadcastCh: broadcastCh,
networkCh: net.EventChannel(),
ntp: ntp.NewNtpServer(),
ntp: ntp.NewNtpChecker(1*time.Minute, 1*time.Second),
}

sync.peerSet = peerset.NewPeerSet(conf.SessionTimeout)
Expand Down Expand Up @@ -111,7 +111,7 @@ func (sync *synchronizer) Start() error {
return err
}

go sync.checkClockLoop()
go sync.ntp.Start()
go sync.receiveLoop()
go sync.broadcastLoop()

Expand All @@ -120,9 +120,19 @@ func (sync *synchronizer) Start() error {

func (sync *synchronizer) Stop() {
sync.cancel()
sync.ntp.Stop()

sync.logger.Debug("context closed", "reason", sync.ctx.Err())
}

func (sync *synchronizer) GetClockOffset() (time.Duration, error) {
return sync.ntp.GetClockOffset()
}

func (sync *synchronizer) OutOfSync(offset time.Duration) bool {
return sync.ntp.OutOfSync(offset)
}

func (sync *synchronizer) stateHeight() uint32 {
stateHeight := sync.state.LastBlockHeight()

Expand Down Expand Up @@ -259,30 +269,6 @@ func (sync *synchronizer) broadcastLoop() {
}
}

func (sync *synchronizer) checkClockLoop() {
checkInterval := 10 * time.Second
ntpIntervalTicker := time.NewTicker(checkInterval)

for {
select {
case <-sync.ctx.Done():
return

case <-ntpIntervalTicker.C:
// if the offset is more than 1 second, we assume the node is out of sync
offset := sync.ntp.ClockOffset()

if sync.ntp.OutOfSync(offset) {
sync.logger.Error(
"The node is out of sync with the network time",
"threshold", sync.ntp.GetThreshold(),
"offset", offset,
)
}
}
}
}

func (sync *synchronizer) receiveLoop() {
for {
select {
Expand Down
2 changes: 2 additions & 0 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
ErrDuplicateVote

ErrCount
ErrNtpError
)

var messages = map[int]string{
Expand All @@ -42,6 +43,7 @@ var messages = map[int]string{
ErrInvalidVote: "invalid vote",
ErrInvalidMessage: "invalid message",
ErrDuplicateVote: "duplicate vote",
ErrNtpError: "error on getting clock offset",
}

type withCodeError struct {
Expand Down
115 changes: 81 additions & 34 deletions util/ntp/ntp.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,109 @@
package ntp

import (
"context"
"math"
"sync"
"time"

"github.com/beevik/ntp"
"github.com/pactus-project/pactus/util/errors"
"github.com/pactus-project/pactus/util/logger"
)

type Server struct {
threshold time.Duration
logger *logger.SubLogger
serverList []string
}

func NewNtpServer() *Server {
server := &Server{
threshold: 1 * time.Second,
serverList: []string{
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
},
}
const (
maxClockOffset = time.Duration(math.MinInt64)
)

type Checker struct {
lock sync.RWMutex
ctx context.Context
cancel func()

ticker *time.Ticker
threshold time.Duration
offset time.Duration
interval time.Duration
}

server.logger = logger.NewSubLogger("_ntp", server)
func NewNtpChecker(interval, threshold time.Duration) *Checker {
ctxWithCancel, cancel := context.WithCancel(context.Background())
server := &Checker{
ctx: ctxWithCancel,
cancel: cancel,
interval: interval,
threshold: threshold,
ticker: time.NewTicker(interval),
}

return server
}

func (s *Server) ClockOffset() time.Duration {
clockOffset := time.Duration(math.MinInt64)
func (*Checker) clockOffset() time.Duration {
server := "pool.ntp.org"
response, err := ntp.Query(server)
if err != nil {
logger.Error("ntp error", "server", server, "error", err)

for _, server := range s.serverList {
remoteTime, err := ntp.Time(server)
if err != nil {
s.logger.Debug("ntp error", "server", server, "error", err)
return maxClockOffset
}

continue
}
if err := response.Validate(); err != nil {
logger.Error("ntp error", "server", server, "error", err)

clockOffset = time.Since(remoteTime)
return maxClockOffset
}

break
return response.ClockOffset
}

func (c *Checker) checkClockOffset() {
for {
select {
case <-c.ctx.Done():
return

case <-c.ticker.C:
offset := c.clockOffset()
c.lock.Lock()
c.offset = offset
c.lock.Unlock()

if c.offset == maxClockOffset {
logger.Error("error on getting clock offset")
} else if c.OutOfSync(offset) {
logger.Error(
"The node is out of sync with the network time",
"threshold", c.threshold,
"offset", offset,
)
}
}
}
}

return clockOffset
func (c *Checker) OutOfSync(offset time.Duration) bool {
return math.Abs(float64(offset)) > float64(c.threshold)
}

func (s *Server) String() string {
return "ntp"
func (c *Checker) Start() {
go c.checkClockOffset()
}

func (s *Server) GetThreshold() time.Duration {
return s.threshold
func (c *Checker) Stop() {
c.cancel()
c.ticker.Stop()
}

func (s *Server) OutOfSync(offset time.Duration) bool {
return math.Abs(float64(offset)) > float64(s.threshold)
func (c *Checker) GetClockOffset() (time.Duration, error) {
c.lock.RLock()
defer c.lock.RUnlock()

offset := c.offset

if offset == maxClockOffset {
return 0, errors.Errorf(errors.ErrNtpError, "unable to get clock offset")
}

return offset, nil
}
10 changes: 6 additions & 4 deletions www/grpc/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ import (
"github.com/fxamacker/cbor/v2"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/util/ntp"
"github.com/pactus-project/pactus/version"
pactus "github.com/pactus-project/pactus/www/grpc/gen/go"
)

type networkServer struct {
*Server
ntp *ntp.Server
}

func newNetworkServer(server *Server) *networkServer {
return &networkServer{
Server: server,
ntp: ntp.NewNtpServer(),
}
}

Expand All @@ -36,6 +33,11 @@ func (s *networkServer) GetNodeInfo(_ context.Context,
servicesNames = append(servicesNames, "NETWORK")
}

clockOffset, err := s.sync.GetClockOffset()
if err != nil {
s.logger.Warn("failed to get clock offset", "err", err)
}

return &pactus.GetNodeInfoResponse{
Moniker: s.sync.Moniker(),
Agent: version.NodeAgent.String(),
Expand All @@ -49,7 +51,7 @@ func (s *networkServer) GetNodeInfo(_ context.Context,
Connections: uint64(s.net.NumConnectedPeers()),
InboundConnections: uint64(s.net.NumInbound()),
OutboundConnections: uint64(s.net.NumOutbound()),
ClockOffset: s.ntp.ClockOffset().Seconds(),
ClockOffset: clockOffset.Seconds(),
}, nil
}

Expand Down

0 comments on commit ad15e4a

Please sign in to comment.