diff --git a/impl/impl.go b/impl/impl.go index f9ea2bf8..f7188aad 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -95,20 +95,26 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption { // restarting push channels // - interval is the time over which minBytesSent must have been sent // - checksPerInterval is the number of times to check per interval -// - minBytesSent is the minimum amount of data that must have been sent over the interval +// - minBytesSent is the minimum amount of data that must have been sent over +// the interval // - restartBackoff is the time to wait before checking again for restarts +// - maxConsecutiveRestarts is the maximum number of restarts in a row to +// attempt where no data is transferred. When the limit is reached the +// channel is closed. func PushChannelRestartConfig( interval time.Duration, checksPerInterval uint32, minBytesSent uint64, restartBackoff time.Duration, + maxConsecutiveRestarts uint32, ) DataTransferOption { return func(m *manager) { m.pushChannelMonitorCfg = &pushchannelmonitor.Config{ - Interval: interval, - ChecksPerInterval: checksPerInterval, - MinBytesSent: minBytesSent, - RestartBackoff: restartBackoff, + Interval: interval, + ChecksPerInterval: checksPerInterval, + MinBytesSent: minBytesSent, + RestartBackoff: restartBackoff, + MaxConsecutiveRestarts: maxConsecutiveRestarts, } } } diff --git a/impl/integration_test.go b/impl/integration_test.go index 1df4fe13..e2600cba 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -513,7 +513,7 @@ func TestPushRequestAutoRestart(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond) + restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5) dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) diff --git a/pushchannelmonitor/pushchannelmonitor.go b/pushchannelmonitor/pushchannelmonitor.go index 47f5ba65..4a4e543a 100644 --- a/pushchannelmonitor/pushchannelmonitor.go +++ b/pushchannelmonitor/pushchannelmonitor.go @@ -33,10 +33,11 @@ type Monitor struct { } type Config struct { - Interval time.Duration - MinBytesSent uint64 - ChecksPerInterval uint32 - RestartBackoff time.Duration + Interval time.Duration + MinBytesSent uint64 + ChecksPerInterval uint32 + RestartBackoff time.Duration + MaxConsecutiveRestarts uint32 } func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor { @@ -66,6 +67,9 @@ func checkConfig(cfg *Config) { if cfg.MinBytesSent == 0 { panic(fmt.Sprintf(prefix+"MinBytesSent is %d but must be > 0", cfg.MinBytesSent)) } + if cfg.MaxConsecutiveRestarts == 0 { + panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts)) + } } // AddChannel adds a channel to the push channel monitor @@ -158,10 +162,11 @@ type monitoredChannel struct { unsub datatransfer.Unsubscribe onShutdown func(*monitoredChannel) - statsLk sync.RWMutex - queued uint64 - sent uint64 - dataRatePoints chan *dataRatePoint + statsLk sync.RWMutex + queued uint64 + sent uint64 + dataRatePoints chan *dataRatePoint + consecutiveRestarts int restartLk sync.RWMutex restarting bool @@ -220,6 +225,8 @@ func (mc *monitoredChannel) start() { case datatransfer.DataSent: // Keep track of the amount of data sent mc.sent = channelState.Sent() + // Some data was sent so reset the consecutive restart counter + mc.consecutiveRestarts = 0 } }) } @@ -277,11 +284,27 @@ func (mc *monitoredChannel) restartChannel() { return } + mc.statsLk.Lock() + mc.consecutiveRestarts++ + restartCount := mc.consecutiveRestarts + mc.statsLk.Unlock() + + if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts { + // If no data has been transferred since the last transfer, and we've + // reached the consecutive restart limit, close the channel and + // shutdown the monitor + log.Errorf("Closing channel after %d consecutive restarts for push data-channel %s", restartCount, mc.chid) + mc.closeChannelAndShutdown() + return + } + defer func() { - // Backoff a little time after a restart before attempting another - select { - case <-time.After(mc.cfg.RestartBackoff): - case <-mc.ctx.Done(): + if mc.cfg.RestartBackoff > 0 { + // Backoff a little time after a restart before attempting another + select { + case <-time.After(mc.cfg.RestartBackoff): + case <-mc.ctx.Done(): + } } mc.restartLk.Lock() @@ -289,19 +312,24 @@ func (mc *monitoredChannel) restartChannel() { mc.restartLk.Unlock() }() - // Send a restart message for the channel + // Send a restart message for the channel. + // Note that at the networking layer there is logic to retry if a network + // connection cannot be established, so this may take some time. log.Infof("Sending restart message for push data-channel %s", mc.chid) err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) if err != nil { - log.Warnf("closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err) - // If it wasn't possible to restart the channel, close the channel // and shut down the monitor - defer mc.Shutdown() + log.Errorf("Closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err) + mc.closeChannelAndShutdown() + } +} - err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid) - if err != nil { - log.Errorf("error closing data transfer channel %s: %w", mc.chid, err) - } +func (mc *monitoredChannel) closeChannelAndShutdown() { + err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid) + if err != nil { + log.Errorf("Error closing data transfer channel %s: %w", mc.chid, err) } + + mc.Shutdown() } diff --git a/pushchannelmonitor/pushchannelmonitor_test.go b/pushchannelmonitor/pushchannelmonitor_test.go index 692ae7ff..e731a868 100644 --- a/pushchannelmonitor/pushchannelmonitor_test.go +++ b/pushchannelmonitor/pushchannelmonitor_test.go @@ -58,12 +58,14 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) { mockAPI := newMockMonitorAPI(ch, tc.errOnRestart) m := NewMonitor(mockAPI, &Config{ - Interval: 10 * time.Millisecond, - ChecksPerInterval: 10, - MinBytesSent: 1, + Interval: 10 * time.Millisecond, + ChecksPerInterval: 10, + MinBytesSent: 1, + MaxConsecutiveRestarts: 3, }) m.Start() m.AddChannel(ch1) + mch := getFirstMonitoredChannel(m) mockAPI.dataQueued(tc.dataQueued) mockAPI.dataSent(tc.dataSent) @@ -82,12 +84,7 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) { select { case <-time.After(100 * time.Millisecond): require.Fail(t, "failed to restart channel") - case <-mockAPI.restarted: - } - - require.Len(t, m.channels, 1) - var mch *monitoredChannel - for mch = range m.channels { + case <-mockAPI.restarts: } // Simulate sending the remaining data @@ -100,11 +97,7 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) { mockAPI.completed() // Verify that channel has been shutdown - select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to shutdown channel") - case <-mch.ctx.Done(): - } + verifyChannelShutdown(t, mch) }) } } @@ -115,25 +108,22 @@ func TestPushChannelMonitorDataRate(t *testing.T) { sent uint64 } type testCase struct { - name string - checksPerInterval uint32 - minBytesSent uint64 - dataPoints []dataPoint - expectRestart bool + name string + minBytesSent uint64 + dataPoints []dataPoint + expectRestart bool } testCases := []testCase{{ - name: "restart when min sent (1) < pending (10)", - checksPerInterval: 2, - minBytesSent: 1, + name: "restart when min sent (1) < pending (10)", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 20, sent: 10, }}, expectRestart: true, }, { - name: "dont restart when min sent (20) >= pending (10)", - checksPerInterval: 2, - minBytesSent: 1, + name: "dont restart when min sent (20) >= pending (10)", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 20, sent: 10, @@ -143,9 +133,8 @@ func TestPushChannelMonitorDataRate(t *testing.T) { }}, expectRestart: false, }, { - name: "restart when min sent (5) < pending (10)", - checksPerInterval: 2, - minBytesSent: 10, + name: "restart when min sent (5) < pending (10)", + minBytesSent: 10, dataPoints: []dataPoint{{ queued: 20, sent: 10, @@ -155,18 +144,16 @@ func TestPushChannelMonitorDataRate(t *testing.T) { }}, expectRestart: true, }, { - name: "dont restart when pending is zero", - checksPerInterval: 2, - minBytesSent: 1, + name: "dont restart when pending is zero", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 20, sent: 20, }}, expectRestart: false, }, { - name: "dont restart when pending increases but sent also increases within interval", - checksPerInterval: 2, - minBytesSent: 1, + name: "dont restart when pending increases but sent also increases within interval", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 10, sent: 10, @@ -179,9 +166,8 @@ func TestPushChannelMonitorDataRate(t *testing.T) { }}, expectRestart: false, }, { - name: "restart when pending increases and sent doesn't increase within interval", - checksPerInterval: 2, - minBytesSent: 1, + name: "restart when pending increases and sent doesn't increase within interval", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 10, sent: 10, @@ -200,9 +186,8 @@ func TestPushChannelMonitorDataRate(t *testing.T) { }}, expectRestart: true, }, { - name: "dont restart with typical progression", - checksPerInterval: 2, - minBytesSent: 1, + name: "dont restart with typical progression", + minBytesSent: 1, dataPoints: []dataPoint{{ queued: 10, sent: 10, @@ -235,17 +220,19 @@ func TestPushChannelMonitorDataRate(t *testing.T) { ch := &mockChannelState{chid: ch1} mockAPI := newMockMonitorAPI(ch, false) + checksPerInterval := uint32(1) m := NewMonitor(mockAPI, &Config{ - Interval: time.Hour, - ChecksPerInterval: tc.checksPerInterval, - MinBytesSent: tc.minBytesSent, + Interval: time.Hour, + ChecksPerInterval: checksPerInterval, + MinBytesSent: tc.minBytesSent, + MaxConsecutiveRestarts: 3, }) // Note: Don't start monitor, we'll call checkDataRate() manually m.AddChannel(ch1) - totalChecks := tc.checksPerInterval + uint32(len(tc.dataPoints)) + totalChecks := checksPerInterval + uint32(len(tc.dataPoints)) for i := uint32(0); i < totalChecks; i++ { if i < uint32(len(tc.dataPoints)) { dp := tc.dataPoints[i] @@ -261,7 +248,7 @@ func TestPushChannelMonitorDataRate(t *testing.T) { if tc.expectRestart { require.Fail(t, "failed to restart channel") } - case <-mockAPI.restarted: + case <-mockAPI.restarts: if !tc.expectRestart { require.Fail(t, "expected no channel restart") } @@ -270,10 +257,81 @@ func TestPushChannelMonitorDataRate(t *testing.T) { } } +func TestPushChannelMonitorMaxConsecutiveRestarts(t *testing.T) { + ch1 := datatransfer.ChannelID{ + Initiator: "initiator", + Responder: "responder", + ID: 1, + } + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false) + + maxConsecutiveRestarts := 3 + m := NewMonitor(mockAPI, &Config{ + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesSent: 2, + MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), + }) + + // Note: Don't start monitor, we'll call checkDataRate() manually + + m.AddChannel(ch1) + mch := getFirstMonitoredChannel(m) + + mockAPI.dataQueued(10) + mockAPI.dataSent(5) + + // Check once to add a data point to the queue. + // Subsequent checks will compare against the previous data point. + m.checkDataRate() + + // Each check should trigger a restart up to the maximum number of restarts + triggerMaxRestarts := func() { + for i := 0; i < maxConsecutiveRestarts; i++ { + m.checkDataRate() + + err := mockAPI.awaitRestart() + require.NoError(t, err) + } + } + triggerMaxRestarts() + + // When data is sent it should reset the consecutive restarts back to zero + mockAPI.dataSent(6) + + // Trigger restarts up to max again + triggerMaxRestarts() + + // Reached max restarts, so now there should not be another restart + // attempt. + // Instead the channel should be closed and the monitor shut down. + m.checkDataRate() + err := mockAPI.awaitRestart() + require.Error(t, err) // require error because expecting no restart + verifyChannelShutdown(t, mch) +} + +func getFirstMonitoredChannel(m *Monitor) *monitoredChannel { + var mch *monitoredChannel + for mch = range m.channels { + return mch + } + panic("no channels") +} + +func verifyChannelShutdown(t *testing.T, mch *monitoredChannel) { + select { + case <-time.After(10 * time.Millisecond): + require.Fail(t, "failed to shutdown channel") + case <-mch.ctx.Done(): + } +} + type mockMonitorAPI struct { ch *mockChannelState restartErrors chan error - restarted chan struct{} + restarts chan struct{} closed chan struct{} lk sync.Mutex @@ -283,7 +341,7 @@ type mockMonitorAPI struct { func newMockMonitorAPI(ch *mockChannelState, errOnRestart bool) *mockMonitorAPI { m := &mockMonitorAPI{ ch: ch, - restarted: make(chan struct{}), + restarts: make(chan struct{}, 1), closed: make(chan struct{}), restartErrors: make(chan error, 1), } @@ -315,11 +373,7 @@ func (m *mockMonitorAPI) callSubscriber(e datatransfer.Event, state datatransfer func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { defer func() { - select { - case <-m.restarted: - default: - close(m.restarted) - } + m.restarts <- struct{}{} }() select { @@ -330,6 +384,15 @@ func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid da } } +func (m *mockMonitorAPI) awaitRestart() error { + select { + case <-time.After(10 * time.Millisecond): + return xerrors.Errorf("failed to restart channel") + case <-m.restarts: + return nil + } +} + func (m *mockMonitorAPI) CloseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { close(m.closed) return nil