diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 97609354e15..c014d22322f 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -64,6 +64,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502] - The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699] - Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903] +- `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078] ==== Bugfixes diff --git a/filebeat/beater/acker.go b/filebeat/beater/acker.go index 244a99fd9cf..00da2762b1d 100644 --- a/filebeat/beater/acker.go +++ b/filebeat/beater/acker.go @@ -34,7 +34,7 @@ type statelessLogger interface { // eventAcker handles publisher pipeline ACKs and forwards // them to the registrar or directly to the stateless logger. -func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer { +func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.EventListener { log := logp.NewLogger("acker") return acker.EventPrivateReporter(func(_ int, data []interface{}) { diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index 5d4283f609f..276a5f699f3 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -52,12 +52,12 @@ type countingClient struct { client beat.Client } -type countingEventer struct { +type countingClientListener struct { wgEvents *eventCounter } -type combinedEventer struct { - a, b beat.ClientEventer +type combinedClientListener struct { + a, b beat.ClientListener } func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { @@ -111,13 +111,13 @@ func (c *eventCounter) Wait() { // all events published, dropped and ACKed by any active client. // The type accepted by counter is compatible with sync.WaitGroup. func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector { - counterListener := &countingEventer{counter} + counterListener := &countingClientListener{counter} pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) { - if evts := config.Events; evts != nil { - config.Events = &combinedEventer{evts, counterListener} + if evts := config.ClientListener; evts != nil { + config.ClientListener = &combinedClientListener{evts, counterListener} } else { - config.Events = counterListener + config.ClientListener = counterListener } return config, nil }) @@ -145,36 +145,36 @@ func (c *countingClient) Close() error { return c.client.Close() } -func (*countingEventer) Closing() {} -func (*countingEventer) Closed() {} -func (*countingEventer) Published() {} +func (*countingClientListener) Closing() {} +func (*countingClientListener) Closed() {} +func (*countingClientListener) Published() {} -func (c *countingEventer) FilteredOut(_ beat.Event) {} -func (c *countingEventer) DroppedOnPublish(_ beat.Event) { +func (c *countingClientListener) FilteredOut(_ beat.Event) {} +func (c *countingClientListener) DroppedOnPublish(_ beat.Event) { c.wgEvents.Done() } -func (c *combinedEventer) Closing() { +func (c *combinedClientListener) Closing() { c.a.Closing() c.b.Closing() } -func (c *combinedEventer) Closed() { +func (c *combinedClientListener) Closed() { c.a.Closed() c.b.Closed() } -func (c *combinedEventer) Published() { +func (c *combinedClientListener) Published() { c.a.Published() c.b.Published() } -func (c *combinedEventer) FilteredOut(event beat.Event) { +func (c *combinedClientListener) FilteredOut(event beat.Event) { c.a.FilteredOut(event) c.b.FilteredOut(event) } -func (c *combinedEventer) DroppedOnPublish(event beat.Event) { +func (c *combinedClientListener) DroppedOnPublish(event beat.Event) { c.a.DroppedOnPublish(event) c.b.DroppedOnPublish(event) } diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 6af52ded862..479e939ff39 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -484,7 +484,7 @@ func (s *testInputStore) CleanupInterval() time.Duration { type mockClient struct { publishing []beat.Event published []beat.Event - ackHandler beat.ACKer + ackHandler beat.EventListener closed bool mtx sync.Mutex canceler context.CancelFunc @@ -599,15 +599,15 @@ func (pc *mockPipelineConnector) cancelClient(i int) { pc.clients[i].canceler() } -func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer { +func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener { if !blocking { - return config.ACKHandler + return config.EventListener } - return acker.Combine(blockingACKer(starter), config.ACKHandler) + return acker.Combine(blockingACKer(starter), config.EventListener) } -func blockingACKer(starter context.Context) beat.ACKer { +func blockingACKer(starter context.Context) beat.EventListener { return acker.EventPrivateReporter(func(acked int, private []interface{}) { for starter.Err() == nil { } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 4f7241afc78..592f05d8a11 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -26,7 +26,6 @@ import ( "time" input "github.com/elastic/beats/v7/filebeat/input/v2" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" @@ -73,7 +72,7 @@ func newReaderGroupWithLimit(limit uint64) *readerGroup { // function is nil in that case and must not be called. // // The context will be automatically cancelled once the ID is removed from the group. Calling `cancel` is optional. -func (r *readerGroup) newContext(id string, cancelation v2.Canceler) (context.Context, context.CancelFunc, error) { +func (r *readerGroup) newContext(id string, cancelation input.Canceler) (context.Context, context.CancelFunc, error) { r.mu.Lock() defer r.mu.Unlock() @@ -144,7 +143,7 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { ctx.Logger = ctx.Logger.With("source_file", sourceName) ctx.Logger.Debug("Starting harvester for file") - hg.tg.Go(startHarvester(ctx, hg, s, false)) + _ = hg.tg.Go(startHarvester(ctx, hg, s, false)) } // Restart starts the Harvester for a Source if a Harvester is already running it waits for it @@ -155,7 +154,7 @@ func (hg *defaultHarvesterGroup) Restart(ctx input.Context, s Source) { ctx.Logger = ctx.Logger.With("source_file", sourceName) ctx.Logger.Debug("Restarting harvester for file") - hg.tg.Go(startHarvester(ctx, hg, s, true)) + _ = hg.tg.Go(startHarvester(ctx, hg, s, true)) } func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, restart bool) func(context.Context) error { @@ -178,7 +177,7 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest harvesterCtx, cancelHarvester, err := hg.readers.newContext(srcID, canceler) if err != nil { - return fmt.Errorf("error while adding new reader to the bookkeeper %v", err) + return fmt.Errorf("error while adding new reader to the bookkeeper %w", err) } ctx.Cancelation = harvesterCtx defer cancelHarvester() @@ -186,17 +185,17 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest resource, err := lock(ctx, hg.store, srcID) if err != nil { hg.readers.remove(srcID) - return fmt.Errorf("error while locking resource: %v", err) + return fmt.Errorf("error while locking resource: %w", err) } defer releaseResource(resource) client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, - ACKHandler: newInputACKHandler(hg.ackCH, ctx.Logger), + CloseRef: ctx.Cancelation, + EventListener: newInputACKHandler(hg.ackCH, ctx.Logger), }) if err != nil { hg.readers.remove(srcID) - return fmt.Errorf("error while connecting to output with pipeline: %v", err) + return fmt.Errorf("error while connecting to output with pipeline: %w", err) } defer client.Close() @@ -205,14 +204,14 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} err = hg.harvester.Run(ctx, s, cursor, publisher) - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { hg.readers.remove(srcID) - return fmt.Errorf("error while running harvester: %v", err) + return fmt.Errorf("error while running harvester: %w", err) } // If the context was not cancelled it means that the Harvester is stopping because of // some internal decision, not due to outside interaction. // If it is stopping itself, it must clean up the bookkeeper. - if ctx.Cancelation.Err() != context.Canceled { + if !errors.Is(ctx.Cancelation.Err(), context.Canceled) { hg.readers.remove(srcID) } @@ -226,18 +225,18 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour prevID := hg.identifier.ID(previous) nextID := hg.identifier.ID(next) - hg.tg.Go(func(canceler context.Context) error { + _ = hg.tg.Go(func(canceler context.Context) error { previousResource, err := lock(ctx, hg.store, prevID) if err != nil { - return fmt.Errorf("error while locking previous resource: %v", err) + return fmt.Errorf("error while locking previous resource: %w", err) } // mark previous state out of date // so when reading starts again the offset is set to zero - hg.store.remove(prevID) + _ = hg.store.remove(prevID) nextResource, err := lock(ctx, hg.store, nextID) if err != nil { - return fmt.Errorf("error while locking next resource: %v", err) + return fmt.Errorf("error while locking next resource: %w", err) } hg.store.UpdateTTL(nextResource, hg.cleanTimeout) @@ -252,7 +251,7 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour // Stop stops the running Harvester for a given Source. func (hg *defaultHarvesterGroup) Stop(s Source) { - hg.tg.Go(func(_ context.Context) error { + _ = hg.tg.Go(func(_ context.Context) error { hg.readers.remove(hg.identifier.ID(s)) return nil }) diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 25f9cdac630..4b9a9fc1a57 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -88,7 +88,7 @@ func (inp *managedInput) Run( return nil } -func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.ACKer { +func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.EventListener { return acker.EventPrivateReporter(func(acked int, private []interface{}) { var n uint var last int diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 8b19a849ea9..38be35f0b94 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -153,7 +153,7 @@ func (s *testInputStore) CleanupInterval() time.Duration { type mockClient struct { publishing []beat.Event published []beat.Event - ackHandler beat.ACKer + ackHandler beat.EventListener closed bool mtx sync.Mutex canceler context.CancelFunc @@ -268,15 +268,15 @@ func (pc *mockPipelineConnector) cancelClient(i int) { pc.clients[i].canceler() } -func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer { +func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener { if !blocking { - return config.ACKHandler + return config.EventListener } - return acker.Combine(blockingACKer(starter), config.ACKHandler) + return acker.Combine(blockingACKer(starter), config.EventListener) } -func blockingACKer(starter context.Context) beat.ACKer { +func blockingACKer(starter context.Context) beat.EventListener { return acker.EventPrivateReporter(func(acked int, private []interface{}) { for starter.Err() == nil { } diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 78f0e8bf088..83114f2c630 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -20,6 +20,7 @@ package kafka import ( "context" "encoding/json" + "errors" "fmt" "io" "strings" @@ -30,7 +31,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/Shopify/sarama" - "github.com/pkg/errors" input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -66,7 +66,7 @@ func configure(cfg *conf.C) (input.Input, error) { saramaConfig, err := newSaramaConfig(config) if err != nil { - return nil, errors.Wrap(err, "initializing Sarama config") + return nil, fmt.Errorf("initializing Sarama config: %w", err) } return NewInput(config, saramaConfig) } @@ -111,7 +111,7 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { log := ctx.Logger.Named("kafka input").With("hosts", input.config.Hosts) client, err := pipeline.ConnectWith(beat.ClientConfig{ - ACKHandler: acker.ConnectionOnly( + EventListener: acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, events []interface{}) { for _, event := range events { if meta, ok := event.(eventMeta); ok { @@ -165,7 +165,7 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { input.runConsumerGroup(log, client, goContext, consumerGroup) } - if ctx.Cancelation.Err() == context.Canceled { + if errors.Is(ctx.Cancelation.Err(), context.Canceled) { return nil } else { return ctx.Cancelation.Err() @@ -254,6 +254,7 @@ func doneChannelContext(ctx input.Context) context.Context { } func (c channelCtx) Deadline() (deadline time.Time, ok bool) { + //nolint:nakedret // omitting the return gives a build error return } @@ -311,7 +312,7 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s parser := h.parsers.Create(reader) for h.session.Context().Err() == nil { message, err := parser.Next() - if err == io.EOF { + if errors.Is(err, io.EOF) { return nil } if err != nil { diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 92879df8dca..88e28dde2fb 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -146,8 +146,8 @@ func (inp *managedInput) runSource( }() client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, - ACKHandler: newInputACKHandler(ctx.Logger), + CloseRef: ctx.Cancelation, + EventListener: newInputACKHandler(ctx.Logger), }) if err != nil { return err @@ -175,7 +175,7 @@ func (inp *managedInput) createSourceID(s Source) string { return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name()) } -func newInputACKHandler(log *logp.Logger) beat.ACKer { +func newInputACKHandler(log *logp.Logger) beat.EventListener { return acker.EventPrivateReporter(func(acked int, private []interface{}) { var n uint var last int diff --git a/filebeat/input/v2/input-cursor/manager_test.go b/filebeat/input/v2/input-cursor/manager_test.go index 6b97763c9be..8cb7a015d3d 100644 --- a/filebeat/input/v2/input-cursor/manager_test.go +++ b/filebeat/input/v2/input-cursor/manager_test.go @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/require" input "github.com/elastic/beats/v7/filebeat/input/v2" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" "github.com/elastic/beats/v7/libbeat/tests/resources" @@ -63,11 +62,11 @@ func TestManager_Init(t *testing.T) { DefaultCleanTimeout: 10 * time.Millisecond, } - err := manager.Init(&grp, v2.ModeRun) + err := manager.Init(&grp, input.ModeRun) require.NoError(t, err) time.Sleep(200 * time.Millisecond) - grp.Stop() + _ = grp.Stop() // wait for all go-routines to be gone @@ -86,6 +85,7 @@ func TestManager_Init(t *testing.T) { store.GCPeriod = 10 * time.Millisecond var grp unison.TaskGroup + //nolint:errcheck // We don't need the error from grp.Stop() defer grp.Stop() manager := &InputManager{ Logger: logp.NewLogger("test"), @@ -94,7 +94,7 @@ func TestManager_Init(t *testing.T) { DefaultCleanTimeout: 10 * time.Millisecond, } - err := manager.Init(&grp, v2.ModeRun) + err := manager.Init(&grp, input.ModeRun) require.NoError(t, err) for len(store.snapshot()) > 0 { @@ -157,7 +157,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { mu.Lock() defer mu.Unlock() seen = append(seen, source.Name()) @@ -179,7 +179,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(_ Source, ctx v2.TestContext) error { + OnTest: func(_ Source, ctx input.TestContext) error { <-ctx.Cancelation.Done() return nil }, @@ -209,7 +209,7 @@ func TestManager_InputsTest(t *testing.T) { sources := []Source{failing, stringSource("source2")} manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { if source == failing { t.Log("return error") return errors.New("oops") @@ -238,7 +238,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { panic("oops") }, }) @@ -278,7 +278,7 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -302,7 +302,7 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -331,7 +331,7 @@ func TestManager_InputsRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -369,7 +369,7 @@ func TestManager_InputsRun(t *testing.T) { for i := 0; i < config.Max; i++ { event := beat.Event{Fields: mapstr.M{"n": state.N}} state.N++ - pub.Publish(event, state) + _ = pub.Publish(event, state) } return nil }, @@ -397,7 +397,7 @@ func TestManager_InputsRun(t *testing.T) { // create and run second instance instance inp, err = manager.Create(conf.MustNewConfigFrom(runConfig{Max: 3})) require.NoError(t, err) - inp.Run(input.Context{ + _ = inp.Run(input.Context{ Logger: log, Cancelation: context.Background(), }, pipeline) @@ -416,13 +416,13 @@ func TestManager_InputsRun(t *testing.T) { OnRun: func(ctx input.Context, _ Source, _ Cursor, pub Publisher) error { defer wgSend.Done() fields := mapstr.M{"hello": "world"} - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state1") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state2") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state3") - pub.Publish(beat.Event{Fields: fields}, nil) - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state4") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state5") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state6") + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state1") + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state2") + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state3") + _ = pub.Publish(beat.Event{Fields: fields}, nil) + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state4") + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state5") + _ = pub.Publish(beat.Event{Fields: fields}, "test-cursor-state6") return nil }, }) @@ -435,13 +435,13 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() // setup publishing pipeline and capture ACKer, so we can simulate progress in the Output - var acker beat.ACKer + var acker beat.EventListener var wgACKer sync.WaitGroup wgACKer.Add(1) pipeline := &pubtest.FakeConnector{ ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { defer wgACKer.Done() - acker = cfg.ACKHandler + acker = cfg.EventListener return &pubtest.FakeClient{ PublishFunc: func(event beat.Event) { acker.AddEvent(event, true) @@ -455,7 +455,7 @@ func TestManager_InputsRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, pipeline) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index c6702257833..114e4f0e2cb 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -54,24 +54,24 @@ type ClientConfig struct { // is configured WaitClose time.Duration - // Configure ACK callback. - ACKHandler ACKer + // Callbacks for when events are added / acknowledged + EventListener EventListener - // Events configures callbacks for common client callbacks - Events ClientEventer + // ClientListener configures callbacks for monitoring pipeline clients + ClientListener ClientListener } -// ACKer can be registered with a Client when connecting to the pipeline. -// The ACKer will be informed when events are added or dropped by the processors, +// EventListener can be registered with a Client when connecting to the pipeline. +// The EventListener will be informed when events are added or dropped by the processors, // and when an event has been ACKed by the outputs. // // Due to event publishing and ACKing are asynchronous operations, the -// operations on ACKer are normally executed in different go routines. ACKers +// operations on EventListener are normally executed in different go routines. ACKers // are required to be multi-threading safe. -type ACKer interface { - // AddEvent informs the ACKer that a new event has been sent to the client. +type EventListener interface { + // AddEvent informs the listener that a new event has been sent to the client. // AddEvent is called after the processors have handled the event. If the - // event has been dropped by the processor `published` will be set to true. + // event has been dropped by the processor `published` will be set to false. // This allows the ACKer to do some bookkeeping for dropped events. AddEvent(event Event, published bool) @@ -80,12 +80,12 @@ type ACKer interface { // ACKers might need to keep track of dropped events by themselves. ACKEvents(n int) - // Close informs the ACKer that the Client used to publish to the pipeline has been closed. - // No new events should be published anymore. The ACKEvents method still will be actively called + // ClientClosed informs the ACKer that the Client used to publish to the pipeline has been closed. + // No new events should be published anymore. The ACKEvents method still will be called as long // as long as there are pending events for the client in the pipeline. The Close signal can be used // to suppress any ACK event propagation if required. // Close might be called from another go-routine than AddEvent and ACKEvents. - Close() + ClientClosed() } // CloseRef allows users to close the client asynchronously. @@ -130,8 +130,8 @@ type ProcessingConfig struct { Private interface{} } -// ClientEventer provides access to internal client events. -type ClientEventer interface { +// ClientListener provides access to internal client events. +type ClientListener interface { Closing() // Closing indicates the client is being shutdown next Closed() // Closed indicates the client being fully shutdown @@ -161,13 +161,6 @@ const ( // DefaultGuarantees are up to the pipeline configuration itself. DefaultGuarantees PublishMode = iota - // OutputChooses mode fully depends on the output and its configuration. - // Events might be dropped based on the users output configuration. - // In this mode no events are dropped within the pipeline. Events are only removed - // after the output has ACKed the events to the pipeline, even if the output - // did drop the events. - OutputChooses - // GuaranteedSend ensures events are retried until acknowledged by the output. // Normally guaranteed sending should be used with some client ACK-handling // to update state keeping track of the sending status. diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 830ec37aec2..0f1d44a777c 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -96,7 +96,7 @@ type Beat struct { IdxSupporter idxmgmt.Supporter keystore keystore.Keystore - processing processing.Supporter + processors processing.Supporter InputQueueSize int // Size of the producer queue used by most queues. @@ -376,16 +376,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } outputFactory := b.makeOutputFactory(b.Config.Output) settings := pipeline.Settings{ - WaitClose: 0, - WaitCloseMode: pipeline.NoWaitOnClose, - Processors: b.processing, + Processors: b.processors, InputQueueSize: b.InputQueueSize, } - if settings.InputQueueSize > 0 { - publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) - } else { - publisher, err = pipeline.Load(b.Info, monitors, b.Config.Pipeline, b.processing, outputFactory) - } + publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) if err != nil { return nil, fmt.Errorf("error initializing publisher: %w", err) } @@ -412,7 +406,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { defer logp.Info("%s stopped.", b.Info.Beat) defer func() { - if err := b.processing.Close(); err != nil { + if err := b.processors.Close(); err != nil { logp.Warn("Failed to close global processing: %v", err) } }() @@ -845,7 +839,7 @@ func (b *Beat) configure(settings Settings) error { if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) } - b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) + b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", "global_processors.txt", "text/plain", b.agentDiagnosticHook) @@ -858,7 +852,7 @@ func (b *Beat) configure(settings Settings) error { // to expand this to other components of the beat state. // To anyone refactoring: be careful to make sure the callback is registered after the global processors are initialized func (b *Beat) agentDiagnosticHook() []byte { - list := b.processing.Processors() + list := b.processors.Processors() var debugBytes []byte for _, proc := range list { diff --git a/libbeat/common/acker/acker.go b/libbeat/common/acker/acker.go index 1b9928bf302..c75aec13564 100644 --- a/libbeat/common/acker/acker.go +++ b/libbeat/common/acker/acker.go @@ -25,7 +25,7 @@ import ( ) // Nil creates an ACKer that does nothing. -func Nil() beat.ACKer { +func Nil() beat.EventListener { return nilACKer{} } @@ -33,12 +33,12 @@ type nilACKer struct{} func (nilACKer) AddEvent(event beat.Event, published bool) {} func (nilACKer) ACKEvents(n int) {} -func (nilACKer) Close() {} +func (nilACKer) ClientClosed() {} // RawCounting reports the number of ACKed events as has been reported by the outputs or queue. // The ACKer does not keep track of dropped events. Events after the client has // been closed will still be reported. -func RawCounting(fn func(int)) beat.ACKer { +func RawCounting(fn func(int)) beat.EventListener { return countACKer(fn) } @@ -46,7 +46,7 @@ type countACKer func(int) func (countACKer) AddEvent(_ beat.Event, _ bool) {} func (fn countACKer) ACKEvents(n int) { fn(n) } -func (countACKer) Close() {} +func (countACKer) ClientClosed() {} // TrackingCounter keeps track of published and dropped events. It reports // the number of acked events from the queue in the 'acked' argument and the @@ -59,12 +59,12 @@ func (countACKer) Close() {} // event: X X D D X D D X D X X X // // If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: -// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - the drop sequence for events 2 and 3 is in between the number of forwarded and ACKed events // - events 5-6 have been dropped as well, but event 7 is not ACKed yet // // If there is no event currently tracked by this ACKer and the next event is dropped by the processors, // then `fn` will be called immediately with acked=0 and total=1. -func TrackingCounter(fn func(acked, total int)) beat.ACKer { +func TrackingCounter(fn func(acked, total int)) beat.EventListener { a := &trackingACKer{fn: fn} init := &gapInfo{} a.lst.head = init @@ -74,7 +74,7 @@ func TrackingCounter(fn func(acked, total int)) beat.ACKer { // Counting returns an ACK count for all events a client has tried to publish. // The ACKer keeps track of dropped events as well, and adjusts the ACK from the outputs accordingly. -func Counting(fn func(n int)) beat.ACKer { +func Counting(fn func(n int)) beat.EventListener { return TrackingCounter(func(_ int, total int) { fn(total) }) @@ -206,7 +206,7 @@ func (a *trackingACKer) ACKEvents(n int) { a.fn(acked, total) } -func (a *trackingACKer) Close() {} +func (a *trackingACKer) ClientClosed() {} // EventPrivateReporter reports all private fields from all events that have // been published or removed. @@ -221,16 +221,16 @@ func (a *trackingACKer) Close() {} // event: X X D D X D D X D X X X // // If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: -// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - the drop sequence for events 2 and 3 is in between the number of forwarded and ACKed events // - events 5-6 have been dropped as well, but event 7 is not ACKed yet -func EventPrivateReporter(fn func(acked int, data []interface{})) beat.ACKer { +func EventPrivateReporter(fn func(acked int, data []interface{})) beat.EventListener { a := &eventDataACKer{fn: fn} - a.ACKer = TrackingCounter(a.onACK) + a.EventListener = TrackingCounter(a.onACK) return a } type eventDataACKer struct { - beat.ACKer + beat.EventListener mu sync.Mutex data []interface{} fn func(acked int, data []interface{}) @@ -240,7 +240,7 @@ func (a *eventDataACKer) AddEvent(event beat.Event, published bool) { a.mu.Lock() a.data = append(a.data, event.Private) a.mu.Unlock() - a.ACKer.AddEvent(event, published) + a.EventListener.AddEvent(event, published) } func (a *eventDataACKer) onACK(acked, total int) { @@ -260,7 +260,7 @@ func (a *eventDataACKer) onACK(acked, total int) { // LastEventPrivateReporter reports only the 'latest' published and acked // event if a batch of events have been ACKed. -func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer { +func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.EventListener { ignored := 0 return EventPrivateReporter(func(acked int, data []interface{}) { for i := len(data) - 1; i >= 0; i-- { @@ -277,11 +277,11 @@ func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer { } // Combine forwards events to a list of ackers. -func Combine(as ...beat.ACKer) beat.ACKer { +func Combine(as ...beat.EventListener) beat.EventListener { return ackerList(as) } -type ackerList []beat.ACKer +type ackerList []beat.EventListener func (l ackerList) AddEvent(event beat.Event, published bool) { for _, a := range l { @@ -295,22 +295,22 @@ func (l ackerList) ACKEvents(n int) { } } -func (l ackerList) Close() { +func (l ackerList) ClientClosed() { for _, a := range l { - a.Close() + a.ClientClosed() } } // ConnectionOnly ensures that the given ACKer is only used for as long as the // pipeline Client is active. Once the Client is closed, the ACKer will drop // its internal state and no more ACK events will be processed. -func ConnectionOnly(a beat.ACKer) beat.ACKer { +func ConnectionOnly(a beat.EventListener) beat.EventListener { return &clientOnlyACKer{acker: a} } type clientOnlyACKer struct { mu sync.Mutex - acker beat.ACKer + acker beat.EventListener } func (a *clientOnlyACKer) AddEvent(event beat.Event, published bool) { @@ -330,12 +330,12 @@ func (a *clientOnlyACKer) ACKEvents(n int) { } } -func (a *clientOnlyACKer) Close() { +func (a *clientOnlyACKer) ClientClosed() { a.mu.Lock() sub := a.acker a.acker = nil // drop the internal ACKer on Close and allow the runtime to gc accumulated state. a.mu.Unlock() if sub != nil { - sub.Close() + sub.ClientClosed() } } diff --git a/libbeat/common/acker/acker_test.go b/libbeat/common/acker/acker_test.go index 2f02ed5f673..6e109819837 100644 --- a/libbeat/common/acker/acker_test.go +++ b/libbeat/common/acker/acker_test.go @@ -39,7 +39,7 @@ func TestNil(t *testing.T) { acker.AddEvent(beat.Event{}, false) acker.AddEvent(beat.Event{}, true) acker.ACKEvents(3) - acker.Close() + acker.ClientClosed() } func TestCounting(t *testing.T) { @@ -200,7 +200,7 @@ func TestCombine(t *testing.T) { t.Run("Close distributes", func(t *testing.T) { var c1, c2 int acker := Combine(countACKerOps(nil, nil, &c1), countACKerOps(nil, nil, &c2)) - acker.Close() + acker.ClientClosed() require.Equal(t, 1, c1) require.Equal(t, 1, c2) }) @@ -217,13 +217,13 @@ func TestConnectionOnly(t *testing.T) { t.Run("ignores ACKs after close", func(t *testing.T) { var n int acker := ConnectionOnly(RawCounting(func(acked int) { n = acked })) - acker.Close() + acker.ClientClosed() acker.ACKEvents(3) require.Equal(t, 0, n) }) } -func countACKerOps(add, acked, close *int) beat.ACKer { +func countACKerOps(add, acked, close *int) beat.EventListener { return &fakeACKer{ AddEventFunc: func(_ beat.Event, _ bool) { *add++ }, ACKEventsFunc: func(_ int) { *acked++ }, @@ -243,7 +243,7 @@ func (f *fakeACKer) ACKEvents(n int) { } } -func (f *fakeACKer) Close() { +func (f *fakeACKer) ClientClosed() { if f.CloseFunc != nil { f.CloseFunc() } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index ad367fa1b10..fbc4fcef772 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -20,14 +20,13 @@ package elasticsearch import ( "context" "encoding/json" + "errors" "fmt" "net/http" "time" "go.elastic.co/apm/v2" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/monitoring/report" @@ -65,7 +64,7 @@ func (c *publishClient) Connect() error { err := c.es.Connect() if err != nil { - return errors.Wrap(err, "cannot connect underlying Elasticsearch client") + return fmt.Errorf("cannot connect underlying Elasticsearch client: %w", err) } params := map[string]string{ @@ -73,7 +72,7 @@ func (c *publishClient) Connect() error { } status, body, err := c.es.Request("GET", "/_xpack", "", params, nil) if err != nil { - return fmt.Errorf("X-Pack capabilities query failed with: %v", err) + return fmt.Errorf("X-Pack capabilities query failed with: %w", err) } if status != 200 { @@ -172,6 +171,7 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, meta["_type"] = "doc" } + //nolint:typecheck // typecheck linter is buggy and thinks opType is unused. opType := events.OpTypeCreate if esVersion.LessThan(createDocPrivAvailableESVersion) { opType = events.OpTypeIndex @@ -181,7 +181,7 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, opType.String(): meta, } - event.Content.Fields.Put("timestamp", event.Content.Timestamp) + _, _ = event.Content.Fields.Put("timestamp", event.Content.Timestamp) fields := mapstr.M{ "type": typ, @@ -190,15 +190,15 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, interval, err := event.Content.Meta.GetValue("interval_ms") if err != nil { - return errors.Wrap(err, "could not determine interval_ms field") + return fmt.Errorf("could not determine interval_ms field: %w", err) } - fields.Put("interval_ms", interval) + _, _ = fields.Put("interval_ms", interval) clusterUUID, err := event.Content.Meta.GetValue("cluster_uuid") - if err != nil && err != mapstr.ErrKeyNotFound { - return errors.Wrap(err, "could not determine cluster_uuid field") + if err != nil && !errors.Is(err, mapstr.ErrKeyNotFound) { + return fmt.Errorf("could not determine cluster_uuid field: %w", err) } - fields.Put("cluster_uuid", clusterUUID) + _, _ = fields.Put("cluster_uuid", clusterUUID) document := report.Event{ Timestamp: event.Content.Timestamp, diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 112bec5045c..fa0780b5ee1 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -31,8 +31,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -136,14 +134,6 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report clients = append(clients, client) } - queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { - return memqueue.NewQueue(log, - memqueue.Settings{ - ACKListener: ackListener, - Events: 20, - }), nil - } - monitoring := monitoring.Default.GetRegistry("monitoring") outClient := outputs.NewFailoverClient(clients) @@ -154,13 +144,23 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report return nil, err } + queueConfig := conf.Namespace{} + conf, err := conf.NewConfigFrom("mem.events: 20") + if err != nil { + return nil, err + } + err = queueConfig.Unpack(conf) + if err != nil { + return nil, err + } + pipeline, err := pipeline.New( beat, pipeline.Monitors{ Metrics: monitoring, Logger: log, }, - queueFactory, + queueConfig, outputs.Group{ Clients: []outputs.Client{outClient}, BatchSize: windowSize, diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 9ccf7d170f9..31a21999761 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -35,7 +35,6 @@ type client struct { processors beat.Processor producer queue.Producer mutex sync.Mutex - acker beat.ACKer waiter *clientCloseWaiter eventFlags publisher.EventFlags @@ -48,7 +47,8 @@ type client struct { closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed. done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run. - eventer beat.ClientEventer + eventListener beat.EventListener + clientListener beat.ClientListener } type clientCloseWaiter struct { @@ -107,7 +107,7 @@ func (c *client) publish(e beat.Event) { e = *event } - c.acker.AddEvent(e, publish) + c.eventListener.AddEvent(e, publish) if !publish { c.onFilteredOut(e) return @@ -119,10 +119,6 @@ func (c *client) publish(e beat.Event) { Flags: c.eventFlags, } - if c.reportEvents { - c.pipeline.waitCloseGroup.Add(1) - } - var published bool if c.canDrop { _, published = c.producer.TryPublish(pubEvent) @@ -134,9 +130,6 @@ func (c *client) publish(e beat.Event) { c.onPublished() } else { c.onDroppedOnPublish(e) - if c.reportEvents { - c.pipeline.waitCloseGroup.Add(-1) - } } } @@ -155,12 +148,13 @@ func (c *client) Close() error { c.waiter.signalClose() c.waiter.wait() - c.acker.Close() + c.eventListener.ClientClosed() log.Debug("client: done closing acker") - log.Debug("client: unlink from queue") - c.unlink() - log.Debug("client: done unlink") + log.Debug("client: close queue producer") + cancelledEventCount := c.producer.Cancel() + c.onClosed(cancelledEventCount) + log.Debug("client: done producer close") if c.processors != nil { log.Debug("client: closing processors") @@ -174,38 +168,30 @@ func (c *client) Close() error { return nil } -// unlink is the final step of closing a client. It cancels the connect of the -// client as producer to the queue. -func (c *client) unlink() { - log := c.logger() - - n := c.producer.Cancel() // close connection to queue - log.Debugf("client: cancelled %v events", n) - - if c.reportEvents { - log.Debugf("client: remove client events") - if n > 0 { - c.pipeline.waitCloseGroup.Add(-n) - } - } - - c.onClosed() -} - func (c *client) logger() *logp.Logger { return c.pipeline.monitors.Logger } func (c *client) onClosing() { - if c.eventer != nil { - c.eventer.Closing() + if c.clientListener != nil { + c.clientListener.Closing() } } -func (c *client) onClosed() { +func (c *client) onClosed(cancelledEventCount int) { + log := c.logger() + log.Debugf("client: cancelled %v events", cancelledEventCount) + + if c.reportEvents { + log.Debugf("client: remove client events") + if cancelledEventCount > 0 { + c.pipeline.waitCloseGroup.Add(-cancelledEventCount) + } + } + c.pipeline.observer.clientClosed() - if c.eventer != nil { - c.eventer.Closed() + if c.clientListener != nil { + c.clientListener.Closed() } } @@ -214,9 +200,12 @@ func (c *client) onNewEvent() { } func (c *client) onPublished() { + if c.reportEvents { + c.pipeline.waitCloseGroup.Add(1) + } c.pipeline.observer.publishedEvent() - if c.eventer != nil { - c.eventer.Published() + if c.clientListener != nil { + c.clientListener.Published() } } @@ -225,8 +214,8 @@ func (c *client) onFilteredOut(e beat.Event) { log.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e) c.pipeline.observer.filteredEvent() - if c.eventer != nil { - c.eventer.FilteredOut(e) + if c.clientListener != nil { + c.clientListener.FilteredOut(e) } } @@ -235,8 +224,8 @@ func (c *client) onDroppedOnPublish(e beat.Event) { log.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e) c.pipeline.observer.failedPublishEvent() - if c.eventer != nil { - c.eventer.DroppedOnPublish(e) + if c.clientListener != nil { + c.clientListener.DroppedOnPublish(e) } } @@ -266,10 +255,10 @@ func (w *clientCloseWaiter) ACKEvents(n int) { } } -// The Close signal from the pipeline is ignored. Instead the client +// The client's close signal is ignored. Instead the client // explicitly uses `signalClose` and `wait` before it continues with the // closing sequence. -func (w *clientCloseWaiter) Close() {} +func (w *clientCloseWaiter) ClientClosed() {} func (w *clientCloseWaiter) signalClose() { if w == nil { diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index a23763841f5..51a03a63f74 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -42,15 +42,16 @@ func TestClient(t *testing.T) { makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { p, err := New(beat.Info{}, Monitors{}, - func(_ queue.ACKListener) (queue.Queue, error) { - return qu, nil - }, + conf.Namespace{}, outputs.Group{}, settings, ) if err != nil { panic(err) } + // Close the built-in queue and replace with the given one. + p.outputController.queue.Close() + p.outputController.queue = qu return p } @@ -129,13 +130,16 @@ func TestClientWaitClose(t *testing.T) { makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { p, err := New(beat.Info{}, Monitors{}, - func(queue.ACKListener) (queue.Queue, error) { return qu, nil }, + conf.Namespace{}, outputs.Group{}, settings, ) if err != nil { panic(err) } + // Close the built-in queue and replace with the given one. + p.outputController.queue.Close() + p.outputController.queue = qu return p } @@ -192,8 +196,8 @@ func TestClientWaitClose(t *testing.T) { return nil }) defer output.Close() - pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}}) - defer pipeline.output.Set(outputs.Group{}) + pipeline.outputController.Set(outputs.Group{Clients: []outputs.Client{output}}) + defer pipeline.outputController.Set(outputs.Group{}) client.Publish(beat.Event{}) diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index 4eca919d447..e05658d9749 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -29,9 +29,8 @@ import ( ) type worker struct { - observer outputObserver - qu chan publisher.Batch - done chan struct{} + qu chan publisher.Batch + done chan struct{} } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -50,11 +49,10 @@ type netClientWorker struct { tracer *apm.Tracer } -func makeClientWorker(observer outputObserver, qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { +func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { w := worker{ - observer: observer, - qu: qu, - done: make(chan struct{}), + qu: qu, + done: make(chan struct{}), } var c interface { diff --git a/libbeat/publisher/pipeline/client_worker_test.go b/libbeat/publisher/pipeline/client_worker_test.go index 0aec54e47b9..97692b2aada 100644 --- a/libbeat/publisher/pipeline/client_worker_test.go +++ b/libbeat/publisher/pipeline/client_worker_test.go @@ -64,7 +64,7 @@ func TestMakeClientWorker(t *testing.T) { client := ctor(publishFn) - worker := makeClientWorker(nilObserver, workQueue, client, logger, nil) + worker := makeClientWorker(workQueue, client, logger, nil) defer worker.Close() for i := uint(0); i < numBatches; i++ { @@ -158,7 +158,7 @@ func TestReplaceClientWorker(t *testing.T) { } client := ctor(blockingPublishFn) - worker := makeClientWorker(nilObserver, workQueue, client, logger, nil) + worker := makeClientWorker(workQueue, client, logger, nil) // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second @@ -185,7 +185,7 @@ func TestReplaceClientWorker(t *testing.T) { } client = ctor(countingPublishFn) - makeClientWorker(nilObserver, workQueue, client, logger, nil) + makeClientWorker(workQueue, client, logger, nil) wg.Wait() // Make sure that all events have eventually been published @@ -231,7 +231,7 @@ func TestMakeClientTracer(t *testing.T) { recorder := apmtest.NewRecordingTracer() defer recorder.Close() - worker := makeClientWorker(nilObserver, workQueue, client, logger, recorder.Tracer) + worker := makeClientWorker(workQueue, client, logger, recorder.Tracer) defer worker.Close() for i := 0; i < numBatches; i++ { diff --git a/libbeat/publisher/pipeline/config.go b/libbeat/publisher/pipeline/config.go index dc8cc4da4d4..cb5c81395c3 100644 --- a/libbeat/publisher/pipeline/config.go +++ b/libbeat/publisher/pipeline/config.go @@ -42,7 +42,7 @@ func validateClientConfig(c *beat.ClientConfig) error { withDrop := false switch m := c.PublishMode; m { - case beat.DefaultGuarantees, beat.GuaranteedSend, beat.OutputChooses: + case beat.DefaultGuarantees, beat.GuaranteedSend: case beat.DropIfFull: withDrop = true default: @@ -51,7 +51,7 @@ func validateClientConfig(c *beat.ClientConfig) error { // ACK handlers can not be registered DropIfFull is set, as dropping events // due to full broker can not be accounted for in the clients acker. - if c.ACKHandler != nil && withDrop { + if c.EventListener != nil && withDrop { return errors.New("ACK handlers with DropIfFull mode not supported") } diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index f7961adb8ca..1ff8c1bc95d 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -53,14 +53,12 @@ type eventConsumer struct { // This waitgroup is released when this eventConsumer's worker // goroutines return. wg sync.WaitGroup - - // The queue the eventConsumer will retrieve batches from. - queue queue.Queue } -// consumerTarget specifies the output channel and parameters needed for -// eventConsumer to generate a batch. +// consumerTarget specifies the queue to read from, the parameters needed +// to generate a batch, and the output channel to send batches to. type consumerTarget struct { + queue queue.Queue ch chan publisher.Batch timeToLive int batchSize int @@ -75,13 +73,11 @@ type retryRequest struct { func newEventConsumer( log *logp.Logger, - queue queue.Queue, observer outputObserver, ) *eventConsumer { c := &eventConsumer{ logger: log, observer: observer, - queue: queue, queueReader: makeQueueReader(), targetChan: make(chan consumerTarget), @@ -133,10 +129,10 @@ outerLoop: // If possible, start reading the next batch in the background. // We require a non-nil target channel so we don't queue up a large // batch before we know the real requested size for our output. - if queueBatch == nil && !pendingRead && target.ch != nil { + if queueBatch == nil && !pendingRead && target.queue != nil && target.ch != nil { pendingRead = true c.queueReader.req <- queueReaderRequest{ - queue: c.queue, + queue: target.queue, retryer: c, batchSize: target.batchSize, timeToLive: target.timeToLive, diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 648a3b35575..41ab03a6451 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -18,13 +18,18 @@ package pipeline import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) // outputController manages the pipelines output capabilities, like: @@ -36,7 +41,10 @@ type outputController struct { monitors Monitors observer outputObserver - workQueue chan publisher.Batch + queueConfig queueConfig + queue queue.Queue + + workerChan chan publisher.Batch consumer *eventConsumer workers []outputWorker @@ -48,28 +56,51 @@ type outputWorker interface { Close() error } +type queueConfig struct { + logger *logp.Logger + queueType string + userConfig *conf.C + ackCallback func(eventCount int) + inQueueSize int +} + func newOutputController( beat beat.Info, monitors Monitors, observer outputObserver, - queue queue.Queue, -) *outputController { - return &outputController{ - beat: beat, - monitors: monitors, - observer: observer, - workQueue: make(chan publisher.Batch), - consumer: newEventConsumer(monitors.Logger, queue, observer), + queueConfig queueConfig, +) (*outputController, error) { + + controller := &outputController{ + beat: beat, + monitors: monitors, + observer: observer, + queueConfig: queueConfig, + workerChan: make(chan publisher.Batch), + consumer: newEventConsumer(monitors.Logger, observer), + } + + err := controller.createQueue() + if err != nil { + return nil, err } + + return controller, nil } func (c *outputController) Close() error { c.consumer.close() - close(c.workQueue) + close(c.workerChan) for _, out := range c.workers { out.Close() } + + // Closing the queue stops ACKs from propagating, so we close everything + // else first to give it a chance to wait for any outstanding events to be + // acknowledged. + c.queue.Close() + return nil } @@ -88,10 +119,10 @@ func (c *outputController) Set(outGrp outputs.Group) { c.workers = make([]outputWorker, len(clients)) for i, client := range clients { logger := logp.NewLogger("publisher_pipeline_output") - c.workers[i] = makeClientWorker(c.observer, c.workQueue, client, logger, c.monitors.Tracer) + c.workers[i] = makeClientWorker(c.workerChan, client, logger, c.monitors.Tracer) } - targetChan := c.workQueue + targetChan := c.workerChan if len(clients) == 0 { // If there are no output clients, we are probably still waiting // for our output config from Agent via BeatV2Manager.reloadOutput. @@ -105,6 +136,7 @@ func (c *outputController) Set(outGrp outputs.Group) { // Resume consumer targeting the new work queue c.consumer.setTarget( consumerTarget{ + queue: c.queue, ch: targetChan, batchSize: outGrp.BatchSize, timeToLive: outGrp.Retry + 1, @@ -114,9 +146,9 @@ func (c *outputController) Set(outGrp outputs.Group) { // Reload the output func (c *outputController) Reload( cfg *reload.ConfigWithMeta, - outFactory func(outputs.Observer, config.Namespace) (outputs.Group, error), + outFactory func(outputs.Observer, conf.Namespace) (outputs.Group, error), ) error { - outCfg := config.Namespace{} + outCfg := conf.Namespace{} if cfg != nil { if err := cfg.Config.Unpack(&outCfg); err != nil { return err @@ -136,3 +168,46 @@ func (c *outputController) Reload( return nil } + +func (c *outputController) queueProducer(config queue.ProducerConfig) queue.Producer { + return c.queue.Producer(config) +} + +func (c *outputController) createQueue() error { + config := c.queueConfig + + switch config.queueType { + case memqueue.QueueType: + settings, err := memqueue.SettingsForUserConfig(config.userConfig) + if err != nil { + return err + } + // The memory queue has a special override during pipeline + // initialization for the size of its API channel buffer. + settings.InputQueueSize = config.inQueueSize + settings.ACKCallback = config.ackCallback + c.queue = memqueue.NewQueue(config.logger, settings) + case diskqueue.QueueType: + settings, err := diskqueue.SettingsForUserConfig(config.userConfig) + if err != nil { + return err + } + settings.WriteToDiskCallback = config.ackCallback + queue, err := diskqueue.NewQueue(config.logger, settings) + if err != nil { + return err + } + c.queue = queue + default: + return fmt.Errorf("'%v' is not a valid queue type", config.queueType) + } + + if c.monitors.Telemetry != nil { + queueReg := c.monitors.Telemetry.NewRegistry("queue") + monitoring.NewString(queueReg, "name").Set(config.queueType) + } + maxEvents := c.queue.BufferConfig().MaxEvents + c.observer.queueMaxEvents(maxEvents) + + return nil +} diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index e6d67b79532..fdc408fcc30 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -18,6 +18,7 @@ package pipeline import ( + "fmt" "sync" "testing" "testing/quick" @@ -28,9 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - "github.com/elastic/elastic-agent-libs/logp" + conf "github.com/elastic/elastic-agent-libs/config" //"github.com/elastic/beats/v7/libbeat/tests/resources" @@ -53,17 +52,13 @@ func TestOutputReload(t *testing.T) { //defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 - numOutputReloads := 350 + (q % 150) // 350 to 499 - - queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { - return memqueue.NewQueue( - logp.L(), - memqueue.Settings{ - ACKListener: ackListener, - Events: int(numEventsToPublish), - }), nil - } + numEventsToPublish := 15000 + (q % 5000) // 15000 to 19999 + numOutputReloads := 350 + (q % 150) // 350 to 499 + + queueConfig := conf.Namespace{} + conf, _ := conf.NewConfigFrom( + fmt.Sprintf("mem.events: %v", numEventsToPublish)) + _ = queueConfig.Unpack(conf) var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { @@ -74,7 +69,7 @@ func TestOutputReload(t *testing.T) { pipeline, err := New( beat.Info{}, Monitors{}, - queueFactory, + queueConfig, outputs.Group{}, Settings{}, ) @@ -99,7 +94,7 @@ func TestOutputReload(t *testing.T) { out := outputs.Group{ Clients: []outputs.Client{outputClient}, } - pipeline.output.Set(out) + pipeline.outputController.Set(out) } wg.Wait() diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 7c36ea87fdb..92cbf914314 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -19,17 +19,12 @@ package pipeline import ( "flag" - "fmt" "go.elastic.co/apm/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/processing" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -97,17 +92,12 @@ func LoadWithSettings( name := beatInfo.Name - queueFactory, err := createQueueFactory(config.Queue, monitors, settings.InputQueueSize) - if err != nil { - return nil, err - } - out, err := loadOutput(monitors, makeOutput) if err != nil { return nil, err } - p, err := New(beatInfo, monitors, queueFactory, out, settings) + p, err := New(beatInfo, monitors, config.Queue, out, settings) if err != nil { return nil, err } @@ -171,55 +161,3 @@ func loadOutput( return out, nil } - -func createQueueFactory( - config conf.Namespace, - monitors Monitors, - inQueueSize int, -) (queueFactory, error) { - queueType := defaultQueueType - if b := config.Name(); b != "" { - queueType = b - } - - if monitors.Telemetry != nil { - queueReg := monitors.Telemetry.NewRegistry("queue") - monitoring.NewString(queueReg, "name").Set(queueType) - } - - switch queueType { - case memqueue.QueueType: - settings, err := memqueue.SettingsForUserConfig(config.Config()) - if err != nil { - return nil, err - } - // The memory queue has a special override during pipeline - // initialization for the size of its API channel buffer. - settings.InputQueueSize = inQueueSize - return memQueueFactory(monitors.Logger, settings), nil - case diskqueue.QueueType: - settings, err := diskqueue.SettingsForUserConfig(config.Config()) - if err != nil { - return nil, err - } - return diskQueueFactory(monitors.Logger, settings), nil - default: - return nil, fmt.Errorf("'%v' is not a valid queue type", queueType) - } -} - -func memQueueFactory(logger *logp.Logger, settings memqueue.Settings) queueFactory { - return func(ackListener queue.ACKListener) (queue.Queue, error) { - factorySettings := settings - factorySettings.ACKListener = ackListener - return memqueue.NewQueue(logger, factorySettings), nil - } -} - -func diskQueueFactory(logger *logp.Logger, settings diskqueue.Settings) queueFactory { - return func(ackListener queue.ACKListener) (queue.Queue, error) { - factorySettings := settings - factorySettings.WriteToDiskListener = ackListener - return diskqueue.NewQueue(logger, factorySettings) - } -} diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index e5c49ad2ad2..69a21c2c71c 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -22,7 +22,6 @@ import "github.com/elastic/elastic-agent-libs/monitoring" type observer interface { pipelineObserver clientObserver - queueObserver outputObserver cleanup() @@ -40,14 +39,11 @@ type clientObserver interface { failedPublishEvent() } -type queueObserver interface { - queueACKed(n int) - queueMaxEvents(n int) -} - type outputObserver interface { eventsDropped(int) eventsRetry(int) + queueACKed(n int) + queueMaxEvents(n int) } // metricsObserver is used by many component in the publisher pipeline, to report diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go index 6d699dff27b..beb8267fabf 100644 --- a/libbeat/publisher/pipeline/nilpipeline.go +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -24,8 +24,8 @@ import ( type nilPipeline struct{} type nilClient struct { - eventer beat.ClientEventer - acker beat.ACKer + clientListener beat.ClientListener + acker beat.EventListener } var _nilPipeline = (*nilPipeline)(nil) @@ -42,8 +42,8 @@ func (p *nilPipeline) Connect() (beat.Client, error) { func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return &nilClient{ - eventer: cfg.Events, - acker: cfg.ACKHandler, + clientListener: cfg.ClientListener, + acker: cfg.EventListener, }, nil } @@ -66,9 +66,9 @@ func (c *nilClient) PublishAll(events []beat.Event) { } func (c *nilClient) Close() error { - if c.eventer != nil { - c.eventer.Closing() - c.eventer.Closed() + if c.clientListener != nil { + c.clientListener.Closing() + c.clientListener.Closed() } return nil } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 27280182a2f..d3c35b63830 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -33,7 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/config" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -58,8 +58,7 @@ type Pipeline struct { monitors Monitors - queue queue.Queue - output *outputController + outputController *outputController observer observer @@ -107,24 +106,20 @@ const ( type OutputReloader interface { Reload( cfg *reload.ConfigWithMeta, - factory func(outputs.Observer, config.Namespace) (outputs.Group, error), + factory func(outputs.Observer, conf.Namespace) (outputs.Group, error), ) error } -type queueFactory func(queue.ACKListener) (queue.Queue, error) - // New create a new Pipeline instance from a queue instance and a set of outputs. // The new pipeline will take ownership of queue and outputs. On Close, the // queue and outputs will be closed. func New( beat beat.Info, monitors Monitors, - queueFactory queueFactory, + userQueueConfig conf.Namespace, out outputs.Group, settings Settings, ) (*Pipeline, error) { - var err error - if monitors.Logger == nil { monitors.Logger = logp.NewLogger("publish") } @@ -142,35 +137,35 @@ func New( p.observer = newMetricsObserver(monitors.Metrics) } - p.queue, err = queueFactory(p) - if err != nil { - return nil, err + ackCallback := func(eventCount int) { + p.observer.queueACKed(eventCount) + if p.waitOnClose { + p.waitCloseGroup.Add(-eventCount) + } } - maxEvents := p.queue.BufferConfig().MaxEvents - if maxEvents <= 0 { - // Maximum number of events until acker starts blocking. - // Only active if pipeline can drop events. - maxEvents = 64000 + queueType := defaultQueueType + if b := userQueueConfig.Name(); b != "" { + queueType = b + } + queueConfig := queueConfig{ + logger: monitors.Logger, + queueType: queueType, + userConfig: userQueueConfig.Config(), + ackCallback: ackCallback, + inQueueSize: settings.InputQueueSize, } - p.observer.queueMaxEvents(maxEvents) - p.output = newOutputController(beat, monitors, p.observer, p.queue) - p.output.Set(out) + output, err := newOutputController(beat, monitors, p.observer, queueConfig) + if err != nil { + return nil, err + } + p.outputController = output + p.outputController.Set(out) return p, nil } -// OnACK implements the queue.ACKListener interface, so the queue can notify the -// Pipeline when events are acknowledged. -func (p *Pipeline) OnACK(n int) { - p.observer.queueACKed(n) - - if p.waitOnClose { - p.waitCloseGroup.Add(-n) - } -} - // Close stops the pipeline, outputs and queue. // If WaitClose with WaitOnPipelineClose mode is configured, Close will block // for a duration of WaitClose, if there are still active events in the pipeline. @@ -197,15 +192,7 @@ func (p *Pipeline) Close() error { } // Note: active clients are not closed / disconnected. - - // Closing the queue stops ACKs from propagating, so we close the output first - // to give it a chance to wait for any outstanding events to be acknowledged. - p.output.Close() - // shutdown queue - err := p.queue.Close() - if err != nil { - log.Error("pipeline queue shutdown error: ", err) - } + p.outputController.Close() p.observer.cleanup() if p.sigNewClient != nil { @@ -250,26 +237,26 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } client := &client{ - pipeline: p, - closeRef: cfg.CloseRef, - done: make(chan struct{}), - isOpen: atomic.MakeBool(true), - eventer: cfg.Events, - processors: processors, - eventFlags: eventFlags, - canDrop: canDrop, - reportEvents: reportEvents, + pipeline: p, + closeRef: cfg.CloseRef, + done: make(chan struct{}), + isOpen: atomic.MakeBool(true), + clientListener: cfg.ClientListener, + processors: processors, + eventFlags: eventFlags, + canDrop: canDrop, + reportEvents: reportEvents, } - ackHandler := cfg.ACKHandler + ackHandler := cfg.EventListener producerCfg := queue.ProducerConfig{} - if reportEvents || cfg.Events != nil { + if reportEvents || cfg.ClientListener != nil { producerCfg.OnDrop = func(event interface{}) { publisherEvent, _ := event.(publisher.Event) - if cfg.Events != nil { - cfg.Events.DroppedOnPublish(publisherEvent.Content) + if cfg.ClientListener != nil { + cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) } if reportEvents { p.waitCloseGroup.Add(-1) @@ -296,9 +283,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { ackHandler = acker.Nil() } - client.acker = ackHandler + client.eventListener = ackHandler client.waiter = waiter - client.producer = p.queue.Producer(producerCfg) + client.producer = p.outputController.queueProducer(producerCfg) p.observer.clientConnected() @@ -397,5 +384,5 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo // OutputReloader returns a reloadable object for the output section of this pipeline func (p *Pipeline) OutputReloader() OutputReloader { - return p.output + return p.outputController } diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go index 175dd45e7b0..2a4d8c72ef0 100644 --- a/libbeat/publisher/pipeline/stress/gen.go +++ b/libbeat/publisher/pipeline/stress/gen.go @@ -69,7 +69,7 @@ func generate( logger := logp.NewLogger("publisher_pipeline_stress_generate") if config.ACK { - settings.ACKHandler = acker.Counting(func(n int) { + settings.EventListener = acker.Counting(func(n int) { logger.Infof("Pipeline client (%v) ACKS; %v", id, n) }) } @@ -129,7 +129,7 @@ func generate( if last == current { // collect all active go-routines stack-traces: var buf bytes.Buffer - pprof.Lookup("goroutine").WriteTo(&buf, 2) + _ = pprof.Lookup("goroutine").WriteTo(&buf, 2) err := fmt.Errorf("no progress in generator %v (last=%v, current=%v):\n%s", id, last, current, buf.Bytes()) errors(err) diff --git a/libbeat/publisher/pipetool/pipetool.go b/libbeat/publisher/pipetool/pipetool.go index b4b479485a3..709fa3f68e1 100644 --- a/libbeat/publisher/pipetool/pipetool.go +++ b/libbeat/publisher/pipetool/pipetool.go @@ -87,12 +87,12 @@ func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMod }) } -func WithACKer(pipeline beat.PipelineConnector, a beat.ACKer) beat.PipelineConnector { +func WithACKer(pipeline beat.PipelineConnector, a beat.EventListener) beat.PipelineConnector { return WithClientConfigEdit(pipeline, func(cfg beat.ClientConfig) (beat.ClientConfig, error) { - if h := cfg.ACKHandler; h != nil { - cfg.ACKHandler = acker.Combine(a, h) + if h := cfg.EventListener; h != nil { + cfg.EventListener = acker.Combine(a, h) } else { - cfg.ACKHandler = a + cfg.EventListener = a } return cfg, nil }) diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 111c849cd42..c14b6272920 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -78,7 +78,7 @@ func makeMessagesEvent() *messages.Event { Timestamp: timestamppb.New(eventTime), Fields: &messages.Struct{ Data: map[string]*messages.Value{ - "message": &messages.Value{ + "message": { Kind: &messages.Value_StringValue{ StringValue: msgs[rand.Intn(len(msgs))], }, diff --git a/libbeat/publisher/queue/diskqueue/checksum.go b/libbeat/publisher/queue/diskqueue/checksum.go index 87cdb7b1aef..3f5f92399ae 100644 --- a/libbeat/publisher/queue/diskqueue/checksum.go +++ b/libbeat/publisher/queue/diskqueue/checksum.go @@ -27,7 +27,7 @@ import ( func computeChecksum(data []byte) uint32 { hash := crc32.NewIEEE() frameLength := uint32(len(data) + frameMetadataSize) - binary.Write(hash, binary.LittleEndian, &frameLength) + _ = binary.Write(hash, binary.LittleEndian, &frameLength) hash.Write(data) return hash.Sum32() } diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 9deba4b2ee4..47d861045af 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -24,7 +24,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common/cfgtype" - "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/paths" ) @@ -59,9 +58,9 @@ type Settings struct { // this limit can keep it from overflowing memory. WriteAheadLimit int - // A listener that should be sent ACKs when an event is successfully + // A callback that is called when an event is successfully // written to disk. - WriteToDiskListener queue.ACKListener + WriteToDiskCallback func(eventCount int) // RetryInterval specifies how long to wait before retrying a fatal error // writing to disk. If MaxRetryInterval is nonzero, subsequent retries will diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 9095cc49a3c..2ee5de6de6d 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -69,7 +69,7 @@ func TestSerialize(t *testing.T) { event = &messages.Event{ Fields: &messages.Struct{ Data: map[string]*messages.Value{ - "test_field": &messages.Value{ + "test_field": { Kind: &messages.Value_StringValue{ StringValue: tc.value, }, diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index d40da0ca272..96e8cd1ac96 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -235,8 +235,8 @@ outerLoop: _ = wl.outputFile.Sync() // If the queue has an ACK listener, notify it the frames were written. - if wl.settings.WriteToDiskListener != nil { - wl.settings.WriteToDiskListener.OnACK(totalACKCount) + if wl.settings.WriteToDiskCallback != nil { + wl.settings.WriteToDiskCallback(totalACKCount) } // Notify any producers with ACK listeners that their frames were written. diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 1c6cc43d265..f61439a6d50 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -65,8 +65,8 @@ func (l *ackLoop) handleBatchSig() int { } if count > 0 { - if listener := l.broker.ackListener; listener != nil { - listener.OnACK(count) + if callback := l.broker.ackCallback; callback != nil { + callback(count) } // report acks to waiting clients diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 07371a90f62..722fb501b07 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -64,14 +64,11 @@ type broker struct { // chanList of all outstanding ACK channels. scheduledACKs chan chanList - // A listener that should be notified when ACKs are processed. - // ackLoop calls this listener's OnACK function when it advances - // the consumer ACK position. - // Right now this listener always points at the Pipeline associated with - // this queue. Pipeline.OnACK then forwards the notification to - // Pipeline.observer.queueACKed(), which updates the beats registry - // if needed. - ackListener queue.ACKListener + // A callback that should be invoked when ACKs are processed. + // ackLoop calls this function when it advances the consumer ACK position. + // Right now this forwards the notification to queueACKed() in + // the pipeline observer, which updates the beats registry if needed. + ackCallback func(eventCount int) // This channel is used to request/return metrics where such metrics require insight into // the actual eventloop itself. This seems like it might be overkill, but it seems that @@ -84,7 +81,7 @@ type broker struct { } type Settings struct { - ACKListener queue.ACKListener + ACKCallback func(eventCount int) Events int FlushMinEvents int FlushTimeout time.Duration @@ -162,7 +159,7 @@ func NewQueue( // internal broker and ACK handler channels scheduledACKs: make(chan chanList), - ackListener: settings.ACKListener, + ackCallback: settings.ACKCallback, metricChan: make(chan metricsRequest), } diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go index d13e4bc0f3e..1f0f1412e7c 100644 --- a/libbeat/publisher/queue/proxy/broker.go +++ b/libbeat/publisher/queue/proxy/broker.go @@ -42,13 +42,12 @@ type broker struct { // Consumers send requests to getChan to read entries from the queue. getChan chan getRequest - // A listener that should be notified when ACKs are processed. - // Right now this listener always points at the Pipeline associated with - // this queue, and Pipeline.OnACK forwards the notification to - // Pipeline.observer.queueACKed(), which updates the beats registry - // if needed. This pointer is included in batches created by the proxy - // queue, so they can call it when they receive a Done call. - ackListener queue.ACKListener + // A callback that should be invoked when ACKs are processed. + // This is used to forward notifications back to the pipeline observer, + // which updates the beats registry if needed. This callback is included + // in batches created by the proxy queue, so they can invoke it when they + // receive a Done call. + ackCallback func(eventCount int) // Internal state for the broker's run loop. queuedEntries []queueEntry @@ -60,7 +59,7 @@ type broker struct { } type Settings struct { - ACKListener queue.ACKListener + ACKCallback func(eventCount int) BatchSize int } @@ -103,7 +102,7 @@ func NewQueue( pushChan: make(chan *pushRequest), getChan: make(chan getRequest), - ackListener: settings.ACKListener, + ackCallback: settings.ACKCallback, } b.wg.Add(1) @@ -184,8 +183,8 @@ func (b *broker) run() { } // Notify the pipeline's metrics reporter //nolint:typecheck // this nil check is ok - if b.ackListener != nil { - b.ackListener.OnACK(ackedBatch.originalEntryCount) + if b.ackCallback != nil { + b.ackCallback(ackedBatch.originalEntryCount) } } } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 3b639f7b635..c985f02f3e9 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -24,11 +24,6 @@ import ( "github.com/elastic/elastic-agent-libs/opt" ) -// ACKListener listens to special events to be send by queue implementations. -type ACKListener interface { - OnACK(eventCount int) // number of consecutively published events acked by producers -} - // Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. type Metrics struct { //EventCount is the total events currently in the queue @@ -95,11 +90,10 @@ type ProducerConfig struct { // by the producer instance and being ACKed by the queue. ACK func(count int) - // OnDrop provided to the queue, to report events being silently dropped by - // the queue. For example an async producer close and publish event, - // with close happening early might result in the event being dropped. The callback - // gives a queue user a chance to keep track of total number of events - // being buffered by the queue. + // OnDrop is called to report events being silently dropped by + // the queue. Currently this can only happen when a Publish call is sent + // to the memory queue's request channel but the producer is cancelled + // before it reaches the queue buffer. OnDrop func(interface{}) // DropOnCancel is a hint to the queue to drop events if the producer disconnects diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index dfe6d046dfb..2071e534f38 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -87,7 +87,7 @@ func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { Processor: e.processors, KeepNull: e.keepNull, }, - ACKHandler: acker.Counting(func(n int) { + EventListener: acker.Counting(func(n int) { addPublished(e.source.Name(), n) e.log.Debugw("Successfully published events.", "event.count", n) }), diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 39e0e6941f9..3fd25630f80 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -5,6 +5,7 @@ package pipelinemanager import ( + "errors" "io" "strings" "time" @@ -51,7 +52,7 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade WaitClose: 0, } clientLogger := logp.NewLogger("clientLogReader") - settings.ACKHandler = acker.Counting(func(n int) { + settings.EventListener = acker.Counting(func(n int) { clientLogger.Debugf("Pipeline client ACKS; %v", n) }) settings.PublishMode = beat.DefaultGuarantees @@ -94,7 +95,7 @@ func (cl *ClientLogger) ConsumePipelineAndSend() { for { err := cl.logFile.ReadMessage(&log) if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { return } cl.logger.Errorf("Error getting message: %s\n", err) @@ -137,7 +138,7 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { return } - cl.localLog.Log(constructLogSpoolMsg(entry)) + _ = cl.localLog.Log(constructLogSpoolMsg(entry)) line := strings.TrimSpace(string(entry.Line)) cl.client.Publish(beat.Event{ diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 5b0f84441f8..f9d69fe1184 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -112,8 +112,8 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, - ACKHandler: awscommon.NewEventACKHandler(), + CloseRef: inputContext.Cancelation, + EventListener: awscommon.NewEventACKHandler(), }) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index bc14a594a44..6b76108b0e1 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -140,8 +140,8 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, - ACKHandler: awscommon.NewEventACKHandler(), + CloseRef: inputContext.Cancelation, + EventListener: awscommon.NewEventACKHandler(), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization // is not required. diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 08700351e99..10df3322497 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -309,7 +309,7 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log // Create a pipeline client scoped to this goroutine. client, err := p.pipeline.ConnectWith(beat.ClientConfig{ - ACKHandler: awscommon.NewEventACKHandler(), + EventListener: awscommon.NewEventACKHandler(), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization // is not required. diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go index ccbae03c7df..b786acf29c7 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go @@ -65,8 +65,8 @@ func (n *input) Run(runCtx v2.Context, connector beat.PipelineConnector) (err er }() client, err := connector.ConnectWith(beat.ClientConfig{ - CloseRef: runCtx.Cancelation, - ACKHandler: NewTxACKHandler(), + CloseRef: runCtx.Cancelation, + EventListener: NewTxACKHandler(), }) dataDir := paths.Resolve(paths.Data, "kvstore") diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go index 2423fb701c5..b18229b7795 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go @@ -55,9 +55,9 @@ func NewTxTracker(ctx context.Context) *TxTracker { return &t } -// NewTxACKHandler creates a new beat.ACKer. As events are ACK-ed and if the event +// NewTxACKHandler creates a new beat.EventListener. As events are ACK-ed and if the event // contains a TxTracker, Ack will be called on the TxTracker. -func NewTxACKHandler() beat.ACKer { +func NewTxACKHandler() beat.EventListener { return acker.ConnectionOnly(acker.EventPrivateReporter(func(acked int, privates []interface{}) { for _, private := range privates { if t, ok := private.(*TxTracker); ok { diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index 512c7dc561b..f49caf3a20a 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -114,7 +114,7 @@ func NewInput( // Build outlet for events. in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{ - ACKHandler: acker.ConnectionOnly( + EventListener: acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, privates []interface{}) { for _, priv := range privates { if msg, ok := priv.(*pubsub.Message); ok { diff --git a/x-pack/filebeat/input/gcppubsub/pubsub_test.go b/x-pack/filebeat/input/gcppubsub/pubsub_test.go index ead35dfebe6..123738102d2 100644 --- a/x-pack/filebeat/input/gcppubsub/pubsub_test.go +++ b/x-pack/filebeat/input/gcppubsub/pubsub_test.go @@ -6,6 +6,7 @@ package gcppubsub import ( "context" + "errors" "io/ioutil" "net/http" "os" @@ -56,12 +57,13 @@ func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) { } once.Do(func() { - logp.TestingSetup() + _ = logp.TestingSetup() // Disable HTTP keep-alives to ensure no extra goroutines hang around. httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}} // Sanity check the emulator. + //nolint:noctx // this is just for the tests resp, err := httpClient.Get("http://" + host) if err != nil { t.Fatalf("pubsub emulator at %s is not healthy: %v", host, err) @@ -95,7 +97,7 @@ func resetPubSub(t *testing.T, client *pubsub.Client) { topics := client.Topics(ctx) for { topic, err := topics.Next() - if err == iterator.Done { + if errors.Is(err, iterator.Done) { break } if err != nil { @@ -110,7 +112,7 @@ func resetPubSub(t *testing.T, client *pubsub.Client) { subs := client.Subscriptions(ctx) for { sub, err := subs.Next() - if err == iterator.Done { + if errors.Is(err, iterator.Done) { break } if err != nil { @@ -277,12 +279,12 @@ func newStubOutlet(onEvent eventHandler) *stubOutleter { } func ackEvent(ev beat.Event, cfg beat.ClientConfig) bool { - if cfg.ACKHandler == nil { + if cfg.EventListener == nil { return false } - cfg.ACKHandler.AddEvent(ev, true) - cfg.ACKHandler.ACKEvents(1) + cfg.EventListener.AddEvent(ev, true) + cfg.EventListener.ACKEvents(1) return true } @@ -345,7 +347,7 @@ func TestTopicDoesNotExist(t *testing.T) { func TestSubscriptionDoesNotExistError(t *testing.T) { cfg := defaultTestConfig() - cfg.SetBool("subscription.create", -1, false) + _ = cfg.SetBool("subscription.create", -1, false) runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { createTopic(t, client) diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go index ab15ad157dc..809a7e7d135 100644 --- a/x-pack/filebeat/input/lumberjack/ack.go +++ b/x-pack/filebeat/input/lumberjack/ack.go @@ -65,7 +65,7 @@ func (t *batchACKTracker) ACK() { // an event has been ACKed an output. If the event contains a private metadata // pointing to a batchACKTracker then it will invoke the tracker's ACK() method // to decrement the number of pending ACKs. -func newEventACKHandler() beat.ACKer { +func newEventACKHandler() beat.EventListener { return acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, privates []interface{}) { for _, private := range privates { diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go index 61016fff67c..caa966a3814 100644 --- a/x-pack/filebeat/input/lumberjack/input.go +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -62,8 +62,8 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputCtx.Cancelation, - ACKHandler: newEventACKHandler(), + CloseRef: inputCtx.Cancelation, + EventListener: newEventACKHandler(), }) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) diff --git a/x-pack/libbeat/common/aws/acker.go b/x-pack/libbeat/common/aws/acker.go index 347347dde67..95fbe14b774 100644 --- a/x-pack/libbeat/common/aws/acker.go +++ b/x-pack/libbeat/common/aws/acker.go @@ -72,7 +72,7 @@ func (a *EventACKTracker) Wait() { // an event has been ACKed an output. If the event contains a private metadata // pointing to an eventACKTracker then it will invoke the trackers ACK() method // to decrement the number of pending ACKs. -func NewEventACKHandler() beat.ACKer { +func NewEventACKHandler() beat.EventListener { return acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, privates []interface{}) { for _, private := range privates {