Skip to content

Commit

Permalink
Backport: remember backoff counts
Browse files Browse the repository at this point in the history
Remember backoff counts between calls to publish, so backoff is not reset in
between multiple calls to publish
  • Loading branch information
urso committed Jun 8, 2016
1 parent 43b4197 commit 1715e8e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// Template, add newest changes here

=== Beats version HEAD
https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff]
https://github.com/elastic/beats/compare/v1.2.3...1.3[Check the HEAD diff]

==== Breaking changes

Expand All @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Fix output modes backoff counter reset. {issue}1803[1803] {pull}1814[1814] {pull}1818[1818]

*Packetbeat*

Expand Down
9 changes: 6 additions & 3 deletions libbeat/outputs/mode/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type LoadBalancerMode struct {
// block until event has been successfully published.
maxAttempts int

backoffCount uint // number of consecutive failed retry attempts.

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}
Expand Down Expand Up @@ -191,7 +193,6 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
} else {
events := msg.events
total := len(events)
var backoffCount uint

for len(events) > 0 {
var err error
Expand Down Expand Up @@ -222,11 +223,11 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
}

// wait before retry
backoff := time.Duration(int64(m.waitRetry) * (1 << backoffCount))
backoff := time.Duration(int64(m.waitRetry) * (1 << m.backoffCount))
if backoff > m.maxWaitRetry {
backoff = m.maxWaitRetry
} else {
backoffCount++
m.backoffCount++
}
select {
case <-m.done: // shutdown
Expand All @@ -240,6 +241,8 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
}
}
}

m.backoffCount = 0
outputs.SignalCompleted(msg.signaler)
}

Expand Down
10 changes: 5 additions & 5 deletions libbeat/outputs/mode/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type SingleConnectionMode struct {
timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect
maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case.
backoffCount uint // number of consecutive failed retry attempts.

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
Expand Down Expand Up @@ -113,7 +114,6 @@ func (s *SingleConnectionMode) publish(
send func() (ok bool, resetFail bool),
) error {
fails := 0
var backoffCount uint
var err error

guaranteed := opts.Guaranteed || s.maxAttempts == 0
Expand All @@ -132,8 +132,8 @@ func (s *SingleConnectionMode) publish(
goto sendFail
}

backoffCount = 0
debug("send completed")
s.backoffCount = 0
outputs.SignalCompleted(signaler)
return nil

Expand All @@ -142,7 +142,7 @@ func (s *SingleConnectionMode) publish(
if resetFail {
debug("reset fails")
fails = 0
backoffCount = 0
s.backoffCount = 0
}

if !guaranteed && (s.maxAttempts > 0 && fails == s.maxAttempts) {
Expand All @@ -152,11 +152,11 @@ func (s *SingleConnectionMode) publish(
}

logp.Info("send fail")
backoff := time.Duration(int64(s.waitRetry) * (1 << backoffCount))
backoff := time.Duration(int64(s.waitRetry) * (1 << s.backoffCount))
if backoff > s.maxWaitRetry {
backoff = s.maxWaitRetry
} else {
backoffCount++
s.backoffCount++
}
logp.Info("backoff retry: %v", backoff)
time.Sleep(backoff)
Expand Down

0 comments on commit 1715e8e

Please sign in to comment.