Skip to content

Commit

Permalink
refactor(ntp): refactor ntp util
Browse files Browse the repository at this point in the history
  • Loading branch information
ambersun1234 committed May 13, 2024
1 parent 2f1a71c commit a749e67
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 63 deletions.
3 changes: 0 additions & 3 deletions cmd/gtk/model_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ package main

import (
"github.com/pactus-project/pactus/node"
"github.com/pactus-project/pactus/util/ntp"
)

type nodeModel struct {
node *node.Node
ntp *ntp.Server
}

func newNodeModel(nde *node.Node) *nodeModel {
return &nodeModel{
node: nde,
ntp: ntp.NewNtpServer(),
}
}
15 changes: 10 additions & 5 deletions cmd/gtk/widget_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,23 @@ func (wn *widgetNode) timeout10() bool {
if wn.model.node.ConsManager().HasActiveInstance() {
isInCommittee = "Yes"
}
offset, offsetErr := wn.model.node.Sync().GetClockOffset()

glib.IdleAdd(func() bool {
offset := wn.model.ntp.ClockOffset()

wn.labelClockOffset.SetText(offset.String())
styleContext, err := wn.labelClockOffset.GetStyleContext()
fatalErrorCheck(err)

if wn.model.ntp.OutOfSync(offset) {
if offsetErr != nil {
styleContext.AddClass("warning")
wn.labelClockOffset.SetText("Error response from NTP server.")
} else {
styleContext.RemoveClass("warning")
wn.labelClockOffset.SetText(offset.String())

if wn.model.node.Sync().OutOfSync(offset) {
styleContext.AddClass("warning")
} else {
styleContext.RemoveClass("warning")
}
}

wn.labelCommitteeSize.SetText(fmt.Sprintf("%v", committeeSize))
Expand Down
4 changes: 4 additions & 0 deletions sync/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sync

import (
"time"

"github.com/pactus-project/pactus/sync/peerset"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
Expand All @@ -13,4 +15,6 @@ type Synchronizer interface {
SelfID() peer.ID
PeerSet() *peerset.PeerSet
Services() service.Services
GetClockOffset() (time.Duration, error)
OutOfSync(time.Duration) bool
}
8 changes: 8 additions & 0 deletions sync/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ func (m *MockSync) PeerSet() *peerset.PeerSet {
func (m *MockSync) Services() service.Services {
return m.TestServices
}

func (m *MockSync) GetClockOffset() (time.Duration, error) {

Check failure on line 76 in sync/mock.go

View workflow job for this annotation

GitHub Actions / linting

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)

Check failure on line 76 in sync/mock.go

View workflow job for this annotation

GitHub Actions / build-linux

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)
return 1 * time.Second, nil
}

func (m *MockSync) OutOfSync(_ time.Duration) bool {

Check failure on line 80 in sync/mock.go

View workflow job for this annotation

GitHub Actions / linting

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)

Check failure on line 80 in sync/mock.go

View workflow job for this annotation

GitHub Actions / build-linux

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)
return false
}
40 changes: 13 additions & 27 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type synchronizer struct {
networkCh <-chan network.Event
network network.Network
logger *logger.SubLogger
ntp *ntp.Server
ntp *ntp.Checker
}

func NewSynchronizer(
Expand All @@ -69,7 +69,7 @@ func NewSynchronizer(
network: net,
broadcastCh: broadcastCh,
networkCh: net.EventChannel(),
ntp: ntp.NewNtpServer(),
ntp: ntp.NewNtpChecker(1 * time.Minute),
}

sync.peerSet = peerset.NewPeerSet(conf.SessionTimeout)
Expand Down Expand Up @@ -111,7 +111,7 @@ func (sync *synchronizer) Start() error {
return err
}

go sync.checkClockLoop()
go sync.ntp.Start()
go sync.receiveLoop()
go sync.broadcastLoop()

Expand All @@ -120,9 +120,19 @@ func (sync *synchronizer) Start() error {

func (sync *synchronizer) Stop() {
sync.cancel()
sync.ntp.Stop()

sync.logger.Debug("context closed", "reason", sync.ctx.Err())
}

func (sync *synchronizer) GetClockOffset() (time.Duration, error) {
return sync.ntp.GetClockOffset()

Check warning on line 129 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L129

Added line #L129 was not covered by tests
}

func (sync *synchronizer) OutOfSync(offset time.Duration) bool {
return sync.ntp.OutOfSync(offset)

Check warning on line 133 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L133

Added line #L133 was not covered by tests
}

func (sync *synchronizer) stateHeight() uint32 {
stateHeight := sync.state.LastBlockHeight()

Expand Down Expand Up @@ -259,30 +269,6 @@ func (sync *synchronizer) broadcastLoop() {
}
}

func (sync *synchronizer) checkClockLoop() {
checkInterval := 10 * time.Second
ntpIntervalTicker := time.NewTicker(checkInterval)

for {
select {
case <-sync.ctx.Done():
return

case <-ntpIntervalTicker.C:
// if the offset is more than 1 second, we assume the node is out of sync
offset := sync.ntp.ClockOffset()

if sync.ntp.OutOfSync(offset) {
sync.logger.Error(
"The node is out of sync with the network time",
"threshold", sync.ntp.GetThreshold(),
"offset", offset,
)
}
}
}
}

func (sync *synchronizer) receiveLoop() {
for {
select {
Expand Down
120 changes: 96 additions & 24 deletions util/ntp/ntp.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,134 @@
package ntp

import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/beevik/ntp"
"github.com/pactus-project/pactus/util/logger"
)

type Server struct {
const (
maxClockOffset = time.Duration(math.MinInt64)
)

type Checker struct {
lock sync.RWMutex
ctx context.Context
cancel func()

threshold time.Duration
offset time.Duration
interval time.Duration
logger *logger.SubLogger
serverList []string
}

func NewNtpServer() *Server {
server := &Server{
threshold: 1 * time.Second,
serverList: []string{
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
},
retryTimes int
}

func NewNtpChecker(interval time.Duration) *Checker {
ctxWithCancel, cancel := context.WithCancel(context.Background())
server := &Checker{
ctx: ctxWithCancel,
cancel: cancel,
interval: interval,
threshold: 1 * time.Second,
retryTimes: 3,
}

server.logger = logger.NewSubLogger("_ntp", server)

return server
}

func (s *Server) ClockOffset() time.Duration {
clockOffset := time.Duration(math.MinInt64)
func (c *Checker) clockOffset() time.Duration {
clockOffset := maxClockOffset

Check warning on line 47 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L47

Added line #L47 was not covered by tests

for _, server := range s.serverList {
remoteTime, err := ntp.Time(server)
for i := 0; i < c.retryTimes; i++ {
server := "pool.ntp.org"
response, err := ntp.Query(server)
if err != nil {
s.logger.Debug("ntp error", "server", server, "error", err)
c.logger.Debug(
fmt.Sprintf("ntp error(retry %v/%v times)", i+1, c.retryTimes),
"server", server, "error", err,
)

Check warning on line 56 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L50-L56

Added lines #L50 - L56 were not covered by tests

continue

Check warning on line 58 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L58

Added line #L58 was not covered by tests
}

clockOffset = time.Since(remoteTime)
if err := response.Validate(); err != nil {
c.logger.Debug(
fmt.Sprintf("ntp error(retry %v/%v times)", i+1, c.retryTimes),
"server", server, "error", err,
)

Check warning on line 65 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L61-L65

Added lines #L61 - L65 were not covered by tests

continue

Check warning on line 67 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L67

Added line #L67 was not covered by tests
}

clockOffset = response.ClockOffset

Check warning on line 70 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L70

Added line #L70 was not covered by tests

break

Check warning on line 72 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L72

Added line #L72 was not covered by tests
}

return clockOffset

Check warning on line 75 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L75

Added line #L75 was not covered by tests
}

func (s *Server) String() string {
func (c *Checker) checkClockOffset() {
ticker := time.NewTicker(c.interval)

for {
select {
case <-c.ctx.Done():
return

case <-ticker.C:
c.lock.Lock()
c.offset = c.clockOffset()

Check warning on line 88 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L86-L88

Added lines #L86 - L88 were not covered by tests

if c.offset == maxClockOffset {
c.logger.Error("error on getting clock offset")
} else if c.OutOfSync(c.offset) {
c.logger.Error(
"The node is out of sync with the network time",
"threshold", c.GetThreshold(),
"offset", c.offset,
)

Check warning on line 97 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L90-L97

Added lines #L90 - L97 were not covered by tests
}
c.lock.Unlock()

Check warning on line 99 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L99

Added line #L99 was not covered by tests
}
}
}

func (c *Checker) OutOfSync(offset time.Duration) bool {
return math.Abs(float64(offset)) > float64(c.threshold)

Check warning on line 105 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L105

Added line #L105 was not covered by tests
}

func (c *Checker) String() string {

Check failure on line 108 in util/ntp/ntp.go

View workflow job for this annotation

GitHub Actions / linting

unused-receiver: method receiver 'c' is not referenced in method's body, consider removing or renaming it as _ (revive)

Check failure on line 108 in util/ntp/ntp.go

View workflow job for this annotation

GitHub Actions / build-linux

unused-receiver: method receiver 'c' is not referenced in method's body, consider removing or renaming it as _ (revive)
return "ntp"

Check warning on line 109 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L109

Added line #L109 was not covered by tests
}

func (s *Server) GetThreshold() time.Duration {
return s.threshold
func (c *Checker) Start() {
go c.checkClockOffset()
}

func (c *Checker) Stop() {
c.cancel()
}

func (c *Checker) GetThreshold() time.Duration {
return c.threshold

Check warning on line 121 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L121

Added line #L121 was not covered by tests
}

func (s *Server) OutOfSync(offset time.Duration) bool {
return math.Abs(float64(offset)) > float64(s.threshold)
func (c *Checker) GetClockOffset() (time.Duration, error) {
c.lock.RLock()
offset := c.offset
c.lock.RUnlock()

Check warning on line 127 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L125-L127

Added lines #L125 - L127 were not covered by tests

if offset == maxClockOffset {
return 0, errors.New("error on getting clock offset")

Check warning on line 130 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}

return offset, nil

Check warning on line 133 in util/ntp/ntp.go

View check run for this annotation

Codecov / codecov/patch

util/ntp/ntp.go#L133

Added line #L133 was not covered by tests
}
10 changes: 6 additions & 4 deletions www/grpc/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ import (
"github.com/fxamacker/cbor/v2"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/util/ntp"
"github.com/pactus-project/pactus/version"
pactus "github.com/pactus-project/pactus/www/grpc/gen/go"
)

type networkServer struct {
*Server
ntp *ntp.Server
}

func newNetworkServer(server *Server) *networkServer {
return &networkServer{
Server: server,
ntp: ntp.NewNtpServer(),
}
}

Expand All @@ -37,6 +34,11 @@ func (s *networkServer) GetNodeInfo(_ context.Context,
servicesNames = append(servicesNames, "NETWORK")
}

clockOffset, err := s.sync.GetClockOffset()
if err != nil {
s.logger.Warn("failed to get clock offset", "err", err)

Check warning on line 39 in www/grpc/network.go

View check run for this annotation

Codecov / codecov/patch

www/grpc/network.go#L39

Added line #L39 was not covered by tests
}

return &pactus.GetNodeInfoResponse{
Moniker: s.sync.Moniker(),
Agent: version.NodeAgent.String(),
Expand All @@ -50,7 +52,7 @@ func (s *networkServer) GetNodeInfo(_ context.Context,
Connections: uint64(s.net.NumConnectedPeers()),
InboundConnections: uint64(s.net.NumInbound()),
OutboundConnections: uint64(s.net.NumOutbound()),
ClockOffset: s.ntp.ClockOffset().Seconds(),
ClockOffset: clockOffset.Seconds(),
}, nil
}

Expand Down

0 comments on commit a749e67

Please sign in to comment.