Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] aws-s3 - create beat.Client for each SQS worker #33658

Merged

Conversation

andrewkroh
Copy link
Member

@andrewkroh andrewkroh commented Nov 13, 2022

What does this PR do?

To address mutex contention in the single beat.Client used for publishing S3 events, create a unique beat.Client for each worker goroutine that is processing an SQS message. The beat.Client is used for all S3 objects contained within the SQS message (they are processed serially). After all events are ACKed the beat.Client is closed.

Why is it important?

A mutex profile of the aws-s3 input showed that there was a lot of contention for the "Publish()" lock.

func (c *client) Publish(e beat.Event) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.publish(e)
}

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Related

Pprof Mutex

I ran Filebeat for 90s with max_number_of_messages: 5.

Before

(pprof) top10
Showing nodes accounting for 48.18s, 100% of 48.18s total
      flat  flat%   sum%        cum   cum%
    48.18s   100%   100%     48.18s   100%  sync.(*Mutex).Unlock
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Publish
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Publish
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3ObjectProcessor).ProcessS3Object
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3ObjectProcessor).publish
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3ObjectProcessor).readFile
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*sqsReader).Receive.func1
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*sqsS3EventProcessor).ProcessSQS
         0     0%   100%     48.18s   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*sqsS3EventProcessor).processS3Events

After

(pprof) top10
Showing nodes accounting for 46.26ms, 100% of 46.26ms total
Showing top 10 nodes out of 14
      flat  flat%   sum%        cum   cum%
   46.26ms   100%   100%    46.26ms   100%  sync.(*Mutex).Unlock (inline)
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Publish
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/libbeat/common/acker.(*eventDataACKer).AddEvent
         0     0%   100%     1.67ms  3.61%  github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).AddEvent
         0     0%   100%     1.67ms  3.61%  github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).addPublishedEvent
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/libbeat/common/acker.ackerList.AddEvent
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Publish
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).publish
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3ObjectProcessor).ProcessS3Object
         0     0%   100%    46.26ms   100%  github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3ObjectProcessor).publish

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Nov 13, 2022
@andrewkroh andrewkroh force-pushed the feature/fb/awss3-per-reader-beat-client branch from 53cb8c3 to f231b1e Compare November 13, 2022 20:04
@mergify
Copy link
Contributor

mergify bot commented Nov 13, 2022

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @andrewkroh? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

To address mutex connection in the single beat.Client used for publishing S3
events, create a unique beat.Client for each worker goroutine that is processing
an SQS message. The beat.Client is used for all S3 objects contained within the
SQS message (they are processed serially). After all events are ACKed the beat.Client
is closed.
@andrewkroh andrewkroh force-pushed the feature/fb/awss3-per-reader-beat-client branch from f231b1e to ae78fa0 Compare November 13, 2022 20:19
@elasticmachine
Copy link
Collaborator

elasticmachine commented Nov 13, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-11-15T11:39:39.179+0000

  • Duration: 127 min 38 sec

Test stats 🧪

Test Results
Failed 0
Passed 4924
Skipped 340
Total 5264

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Nov 13, 2022
@andrewkroh andrewkroh marked this pull request as ready for review November 13, 2022 20:31
@andrewkroh andrewkroh requested a review from a team as a code owner November 13, 2022 20:31
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@andrewkroh
Copy link
Member Author

/test

@efd6
Copy link
Contributor

efd6 commented Nov 13, 2022

I'm wondering if

func (c *client) Publish(e beat.Event) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.publish(e)
}
could be further improved to reduce contention.

If it's guaranteed that X can't panic, then it can be rewritted as

func (c *client) Publish(e beat.Event) {
	c.mutex.Lock()
	c.publish(e)
	c.mutex.Unlock()
}

which will reduce the time during the locked state.

If that can't be guaranteed, then this makes some (less) reduction without the potential for leaving the mutex in the locked state, by retaining the deferred unlock, but not locking until the defer is constructed.

func (c *client) Publish(e beat.Event) {
	defer c.mutex.Unlock()
	c.mutex.Lock()

	c.publish(e)
}

@andrewkroh
Copy link
Member Author

I remember back in Go 1.14 the release notes came out saying that defer incurred almost zero overhead due to https://github.com/golang/proposal/blob/master/design/34481-opencoded-defers.md. I've never tried to measure the speed of defer vs no defer.

It is possible that a processor could panic so I would be wary of removing the defer.

defer c.mutex.Unlock()
c.mutex.Lock()

Interesting, I've never seen code that uses this ordering. I realize that it has the same outcome. What makes this any different?

@mergify

This comment was marked as outdated.

@efd6
Copy link
Contributor

efd6 commented Nov 15, 2022

Interesting, I've never seen code that uses this ordering. I realize that it has the same outcome. What makes this any different?

The time taken is the same, but less of that time is in the locked state.

https://godbolt.org/z/4Ka9GTWsd

@aspacca aspacca mentioned this pull request Nov 15, 2022
6 tasks
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
To address mutex connection in the single beat.Client used for publishing S3
events, create a unique beat.Client for each worker goroutine that is processing
an SQS message. The beat.Client is used for all S3 objects contained within the
SQS message (they are processed serially). After all events are ACKed the beat.Client
is closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Filebeat Filebeat Team:Cloud-Monitoring Label for the Cloud Monitoring team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants