Skip to content

Commit

Permalink
fix: more granular interval checking of data rates
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Dec 16, 2020
1 parent 4939195 commit c6c9ac2
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 71 deletions.
88 changes: 68 additions & 20 deletions pushchannelmonitor/pushchannelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pushchannelmonitor

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -32,12 +33,14 @@ type PushChannelMonitor struct {
}

type PushMonitorConfig struct {
Interval time.Duration
MinBytesSent uint64
RestartBackoff time.Duration
Interval time.Duration
MinBytesSent uint64
ChecksPerInterval uint32
RestartBackoff time.Duration
}

func NewPushChannelMonitor(mgr monitorAPI, cfg *PushMonitorConfig) *PushChannelMonitor {
checkConfig(cfg)
ctx, cancel := context.WithCancel(context.Background())
return &PushChannelMonitor{
ctx: ctx,
Expand All @@ -48,6 +51,23 @@ func NewPushChannelMonitor(mgr monitorAPI, cfg *PushMonitorConfig) *PushChannelM
}
}

func checkConfig(cfg *PushMonitorConfig) {
if cfg == nil {
return
}

prefix := "data-transfer channel push monitor config "
if cfg.Interval <= 0 {
panic(fmt.Sprintf(prefix+"Interval is %s but must be > 0", cfg.Interval))
}
if cfg.ChecksPerInterval == 0 {
panic(fmt.Sprintf(prefix+"ChecksPerInterval is %d but must be > 0", cfg.ChecksPerInterval))
}
if cfg.MinBytesSent == 0 {
panic(fmt.Sprintf(prefix+"MinBytesSent is %d but must be > 0", cfg.MinBytesSent))
}
}

// AddChannel adds a channel to the push channel monitor
func (m *PushChannelMonitor) AddChannel(chid datatransfer.ChannelID) *monitoredChannel {
if !m.enabled() {
Expand Down Expand Up @@ -88,7 +108,7 @@ func (m *PushChannelMonitor) onMonitoredChannelShutdown(mpc *monitoredChannel) {

// enabled indicates whether the push channel monitor is running
func (m *PushChannelMonitor) enabled() bool {
return m.cfg != nil && m.cfg.Interval > 0
return m.cfg != nil
}

func (m *PushChannelMonitor) Start() {
Expand All @@ -102,8 +122,9 @@ func (m *PushChannelMonitor) Start() {
func (m *PushChannelMonitor) run() {
defer m.onShutdown()

// Check the data-rate of all monitored channels on every tick
ticker := time.NewTicker(m.cfg.Interval)
// Check data-rate ChecksPerInterval times per interval
tickInterval := m.cfg.Interval / time.Duration(m.cfg.ChecksPerInterval)
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()

for {
Expand Down Expand Up @@ -137,10 +158,10 @@ type monitoredChannel struct {
unsub datatransfer.Unsubscribe
onShutdown func(*monitoredChannel)

statsLk sync.RWMutex
queued uint64
sent uint64
lastSent uint64
statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint

restartLk sync.RWMutex
restarting bool
Expand All @@ -154,12 +175,13 @@ func newMonitoredChannel(
) *monitoredChannel {
ctx, cancel := context.WithCancel(context.Background())
mpc := &monitoredChannel{
ctx: ctx,
cancel: cancel,
mgr: mgr,
chid: chid,
cfg: cfg,
onShutdown: onShutdown,
ctx: ctx,
cancel: cancel,
mgr: mgr,
chid: chid,
cfg: cfg,
onShutdown: onShutdown,
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
}
mpc.start()
return mpc
Expand Down Expand Up @@ -202,16 +224,42 @@ func (mc *monitoredChannel) start() {
})
}

type dataRatePoint struct {
pending uint64
sent uint64
}

// check if the amount of data sent in the interval was too low, and if so
// restart the channel
func (mc *monitoredChannel) checkDataRate() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

sentInInterval := mc.sent - mc.lastSent
mc.lastSent = mc.sent
pending := mc.queued - mc.sent
if pending > 0 && sentInInterval < mc.cfg.MinBytesSent {
// Before returning, add the current data rate stats to the queue
defer func() {
var pending uint64
if mc.queued > mc.sent { // should always be true but just in case
pending = mc.queued - mc.sent
}
mc.dataRatePoints <- &dataRatePoint{
pending: pending,
sent: mc.sent,
}
}()

// Check that there are enough data points that an interval has elapsed
if len(mc.dataRatePoints) < int(mc.cfg.ChecksPerInterval) {
return
}

// Pop the data point from one interval ago
atIntervalStart := <-mc.dataRatePoints

// If there was enough pending data to cover the minimum required amount,
// and the amount sent was lower than the minimum required, restart the
// channel
sentInInterval := mc.sent - atIntervalStart.sent
if atIntervalStart.pending > sentInInterval && sentInInterval < mc.cfg.MinBytesSent {
go mc.restartChannel()
}
}
Expand Down
Loading

0 comments on commit c6c9ac2

Please sign in to comment.