From 2d0ae43f21e9b0215d53ffcc7fd3ae45194ffaef Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 24 Nov 2021 09:40:39 +0100 Subject: [PATCH] Skip config check in autodiscover for duplicated configurations (#29048) If the configuration is already running, it has been already checked, don't try to check it again to avoid problems with configuration checks that fail if some resource already exist with the same identifiers. (cherry picked from commit 1207d635d4636412652e52d457af96091b8336a2) --- CHANGELOG.next.asciidoc | 1 + libbeat/autodiscover/autodiscover.go | 39 +++++++++----- libbeat/autodiscover/autodiscover_test.go | 64 +++++++++++++++++++++++ 3 files changed, 91 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2478cb65d0a..7077cda3bfd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix discovery of Nomad allocations with multiple events during startup. {pull}28700[28700] - Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683] - Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755] +- Skip configuration checks in autodiscover for configurations that are already running {pull}29048[29048] *Auditbeat* diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 8c6977e8362..e1f1d8d2bfc 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -64,6 +64,10 @@ type Autodiscover struct { meta *meta.Map listener bus.Listener logger *logp.Logger + + // workDone is a channel used for testing purpouses, to know when the worker has + // done some work. + workDone chan struct{} } // NewAutodiscover instantiates and returns a new Autodiscover manager @@ -165,6 +169,11 @@ func (a *Autodiscover) worker() { // reset updated status updated = false } + + // For testing purpouses. + if a.workDone != nil { + a.workDone <- struct{}{} + } } } @@ -207,26 +216,30 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.factory.CheckConfig(config) - if err != nil { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s', won't start runner", - common.DebugString(config, true)))) - continue - } - // Update meta no matter what dynFields := a.meta.Store(hash, meta) + if _, ok := newCfg[hash]; ok { + a.logger.Debugf("Config %v duplicated in start event", common.DebugString(config, true)) + continue + } + if cfg, ok := a.configs[eventID][hash]; ok { a.logger.Debugf("Config %v is already running", common.DebugString(config, true)) newCfg[hash] = cfg continue - } else { - newCfg[hash] = &reload.ConfigWithMeta{ - Config: config, - Meta: &dynFields, - } + } + + err = a.factory.CheckConfig(config) + if err != nil { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) + continue + } + newCfg[hash] = &reload.ConfigWithMeta{ + Config: config, + Meta: &dynFields, } updated = true diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 4b2ecfef128..2d0ea26b689 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -75,6 +75,8 @@ type mockAdapter struct { mutex sync.Mutex configs []*common.Config runners []*mockRunner + + CheckConfigCallCount int } // CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` @@ -87,6 +89,8 @@ func (m *mockAdapter) CreateConfig(event bus.Event) ([]*common.Config, error) { // CheckConfig tests given config to check if it will work or not, returns errors in case it won't work func (m *mockAdapter) CheckConfig(c *common.Config) error { + m.CheckConfigCallCount++ + config := struct { Broken bool `config:"broken"` }{} @@ -324,6 +328,66 @@ func TestAutodiscoverHash(t *testing.T) { assert.False(t, runners[1].stopped) } +func TestAutodiscoverDuplicatedConfigConfigCheckCalledOnce(t *testing.T) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + // Register mock autodiscover provider + busChan := make(chan bus.Bus, 1) + + Registry = NewRegistry() + Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { + // intercept bus to mock events + busChan <- b + + return &mockProvider{}, nil + }) + + // Create a mock adapter that returns a duplicated config + runnerConfig, _ := common.NewConfigFrom(map[string]string{ + "id": "foo", + }) + adapter := mockAdapter{ + configs: []*common.Config{runnerConfig, runnerConfig}, + } + + // and settings: + providerConfig, _ := common.NewConfigFrom(map[string]string{ + "type": "mock", + }) + config := Config{ + Providers: []*common.Config{providerConfig}, + } + k, _ := keystore.NewFileKeystore("test") + // Create autodiscover manager + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + if err != nil { + t.Fatal(err) + } + + autodiscover.workDone = make(chan struct{}) + + // Start it + autodiscover.Start() + defer autodiscover.Stop() + eventBus := <-busChan + + // Publish a couple of events. + for i := 0; i < 2; i++ { + eventBus.Publish(bus.Event{ + "id": "foo", + "provider": "mock", + "start": true, + "meta": common.MapStr{ + "foo": "bar", + }, + }) + <-autodiscover.workDone + assert.Equal(t, 1, len(adapter.Runners()), "Only one runner should be started") + assert.Equal(t, 1, adapter.CheckConfigCallCount, "Check config should have been called only once") + } +} + func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { goroutines := resources.NewGoroutinesChecker() defer goroutines.Check(t)