-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore] [exporterhelper] Refactor queue initialization #8284
[chore] [exporterhelper] Refactor queue initialization #8284
Conversation
Codecov ReportPatch coverage is 📢 Thoughts on this report? Let us know!. |
3390d83
to
25e16d6
Compare
Hi @swiatekm-sumo, can you please take a look at this PR? |
5920592
to
23d1e4d
Compare
} | ||
} | ||
|
||
// StartConsumers starts a given number of goroutines consuming items from the queue | ||
// and passing them into the consumer callback. | ||
func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) { | ||
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not passing the callback as well in the ctor vs passing set here?
CreateSettings: set, | ||
DataType: qrs.signal, | ||
Callback: func(item internal.Request) { | ||
_ = qrs.consumerSender.send(item) | ||
item.OnProcessingFinished() | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as previous comment, better to pass all of these in the ctor if possible or at least pass only the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to expose the constructors to the user, as you suggested in #8248 (comment). I wanted to reuse the same structures in the public and private packages. If we want to move all these arguments that users should not be providing, we will need different public structs encapsulating the internal ones. Also, we would likely need to change the signature of the Option from type Option func(*baseSettings)
to type Option func(*baseExporter)
, which should not be considered as a breaking change I believe. Let me try this out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This involves too many additional changes. I would like to merge this one and try it in a separate PR. This PR essentially doesn't change the way how the Callback
argument is passed anyway
@@ -294,7 +240,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { | |||
|
|||
// send implements the requestSender interface | |||
func (qrs *queuedRetrySender) send(req internal.Request) error { | |||
if !qrs.queueSettings.config.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a separate PR, I think we should always have a "queue" instead of all these if nil checks, and use a "direct call queue" that just pass to the callback.
Make the queue initialization process consistent for both queue types. Instead of different workflows for memory and persistent queues, this change breaks it into two generic steps: 1. Queue factory 2. Start method This change: - reduces coupling; - allows future refactoring of queuedRetrySender; - allows extracting of the queue package from the exporterhelper; - makes it possible to have `WithRequestQueue` option for the new exporter helper API.
23d1e4d
to
8b03839
Compare
Make the queue initialization process consistent for both queue types. Instead of having different workflows for memory and persistent queues, this change breaks the initialization of both into two generic steps:
NewBoundedMemoryQueue
,NewPersistentQueue
queue.Start(context.Context, component.Host, QueueSettings)
This change:
queuedRetrySender
and the queues;queuedRetrySender
;WithRequestQueue
option for the new exporter helper API as drafted in [exporterhelper] Add WithRequestQueue option to the exporter #8275.