Skip to content

Commit

Permalink
Stop waiting for signals on closed outleters (elastic#11263)
Browse files Browse the repository at this point in the history
Outleters start a goroutine to handle the finalization of filebeat. If the
outleter is closed by other means the goroutine will be kept running
even if it has nothing to do, leaking goroutines.

Stop this goroutine if the outleter is closed.

(cherry picked from commit 57c9891)
  • Loading branch information
jsoriano committed Mar 27, 2019
1 parent 755d257 commit fd8f949
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v7.0.0-rc1...master[Check the HEAD diff
*Filebeat*

- Don't apply multiline rules in Logstash json logs. {pull}11346[11346]
- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263]

*Heartbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
// Outleter is the outlet for an input
type Outleter interface {
Close() error
Done() <-chan struct{}
OnEvent(data *util.Data) bool
}
7 changes: 7 additions & 0 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ type outlet struct {
wg eventCounter
client beat.Client
isOpen atomic.Bool
done chan struct{}
}

func newOutlet(client beat.Client, wg eventCounter) *outlet {
o := &outlet{
wg: wg,
client: client,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
}
return o
}

func (o *outlet) Close() error {
isOpen := o.isOpen.Swap(false)
if isOpen {
close(o.done)
return o.client.Close()
}
return nil
}

func (o *outlet) Done() <-chan struct{} {
return o.done
}

func (o *outlet) OnEvent(d *util.Data) bool {
if !o.isOpen.Load() {
return false
Expand Down
12 changes: 10 additions & 2 deletions filebeat/channel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error {
return nil
}

func (o *subOutlet) Done() <-chan struct{} {
return o.done
}

func (o *subOutlet) OnEvent(d *util.Data) bool {

o.mutex.Lock()
Expand Down Expand Up @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
if sig != nil {
go func() {
<-sig
outlet.Close()
select {
case <-outlet.Done():
return
case <-sig:
outlet.Close()
}
}()
}
return outlet
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/input_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,4 @@ type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }

0 comments on commit fd8f949

Please sign in to comment.