-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Defer queue creation / activate proxy queue #35118
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
I've fixed the most relevant tests and added unit tests for the new behavior. I'm still investigating some CI failures that I can't yet reproduce locally but I think it's ready for review. |
There is one remaining failing test, Edit: after working more with the python tests and various dependencies I can now reproduce the |
Oddly enough, looking at the log files generated by the beat, I can see the log line that the test is looking for:
|
// parsing errors: if there is an error in the queue config, we want it to | ||
// show up as fatal during initialization, even if the queue itself isn't | ||
// created until later. | ||
func queueSettingsForUserConfig(queueType string, userConfig *conf.C, inQueueSize int) (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That bare interface really bugs me, even if I understand why we ended up with it. At very least I kinda wonder if we could have some kind of named interface or something, just for the sake of making the API itself look less opaque.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It bugs me too and it wasn't my first choice. The constraint that led me here is that we need to be able to fail during initialization if we get an invalid queue config up front, therefore we need to call SettingsFromConfig
for the appropriate queue since they all have incompatible settings, but we also need that to be overridable by something the output can create.
- Alternative: We could panic later when we realize the config is invalid. I don't like this one.
- Alternative: We could report an error when we see it but fall back on the default memory queue. (I'm open to this one but in the end I decided quitting on initialization was preferable.)
The settings alone aren't enough to create the queue, we also need the logger (which we could get from other sources in a pinch) and the queue's global acknowledgment callback (which we can't) so we can't have the output actually create the queue. On the other hand, just accepting a queue type / *conf.C
is also awkward: the proxy queue isn't user-configurable, so a natural future choice like passing along a queue
subtree from an output
configuration would expose that internal type. (Besides which, to me an interface{}
that is a placeholder for 3 concrete types is still a lot easier to work with than a *conf.C
that could contain literally anything).
The old way of handling this tension was to use factories, which gives thing a nice name but makes the internals too opaque: a factory that just took the callback and created a queue would work, but there would be no way to know what type of queue we'd created, which we currently report.
Which... having talked myself into a corner, maybe we just need to add a QueueType()
to the queue interface, and have both the pipeline initialization and the outputs give a queue factory for this field? At this point it could be a func(*logp.Logger, func(eventCount int)) (queue.Queue, error)
which is more consistent than the old types, and we could still fall back on the memory queue on error (knowing that an honest queue creation can only fail for the disk queue). I dislike losing access to the settings objects but right now we don't actually need them so this might make sense, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I switched to a version using factories, see if you feel better about this one ^^
So, the tests produce a huge number of these log lines:
Not sure how much of that is the tests, but it makes me worried we might produce a bit of log spam with it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also curious about the large number of log messages that Alex saw.
I know why that is, and I felt conflicted about how to handle it, but the good news is that it won't happen in production: the cause is I'm not sure the exact motivation, considering a real pipeline is only ever reloaded once. What it de facto tests is that the target of an So I'm not sure how to resolve the tension there -- the test is somewhat useful to keep currently-working asynchronous code from degrading, but it really is an error condition in current code so it necessarily spams the pipeline logger during tests. I could give the pipeline a nil logger for that test, but that would prevent logging of things we do want to see. Maybe it's still worth it considering the noise, and just leave an explanatory comment saying to reactivate the logger if the test ever fails? Open to suggestions :-) |
could we maybe change the test so it constantly sends events but only reloads a small number of times? that should still give us the confidence that reloading doesn't drop anything while limiting the valid error log messages. |
Yes indeed, my local tests confirm that the exact same log line (modulo timestamp) appears in both cases but only my PR branch considers it a failure. So it must be something in how the testing framework scans logs? Very mysterious, especially since it doesn't seem to affect any other tests. Hopefully getting close to pinning down the problem... Edit: found it! Or, one step closer: it has nothing to do with the log lines, it's with shutdown. The |
@@ -72,6 +74,8 @@ type Queue interface { | |||
Metrics() (Metrics, error) | |||
} | |||
|
|||
type QueueFactory func(logger *logp.Logger, ack func(eventCount int)) (Queue, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nit: can we get a comment on the QueueFactory type? Other than that, LGTM.
Found the full issue with the failing test and it's somewhat ugly: Beat shutdown blocks on the main Complicating this is the fact that My proposal for a short-term fix, until we have time for a more thorough cleanup of pipeline shutdown: have |
Didn't even know |
Defer creation of the beats queue until the full output configuration is received; Activate the proxy queue when the Shipper output is in use.
The proxy queue is meant to track event acknowledgments without storing event data in memory, to avoid a doubled memory cost for events stored in both Beats and the Shipper. However, activating it has required multiple refactoring passes, since previously the queue was created globally before receiving any output configuration. Now that previous work has moved queue logic into
outputController
, this PR completes the work by activating the queue-related hooks so queue creation is delayed and queue clients are blocked until the output is active.The important changes are:
outputs.Group
now has a new field,QueueSettings
, with which an output can specify the settings to use when creating the queue.outputController.Set
, which gives the output controller its output worker group, is now responsible for creating the queue when a nonempty output is assigned (previously this was done on initialization).outputController.queueProducer
can now be called before the queue is created, in which case it accumulates requests that then block until queue creation.Settings
structs. The callback is used for bookkeeping in the pipeline and needs to be controlled by the queue creator, while everything else in{memqueue,diskqueue,proxyqueue}.Settings
can be safely set by the output.client
andoutputController
so both components can update them in a consistent way instead of distributing the logic across various callbacks and explicitPipeline
pointers.Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
There are two paths to testing this PR: one is to make sure that normal queue settings continue to apply, both standalone and under agent. This should just involve making sure that events reach the output without errors. The other is to test with the shipper output and verify that data can flow to the shipper, that event acknowledgments are received, and that the beat memory use remains low even when many events are queued by the shipper.
Related issues