From c0e1315130adf9b3771afea5e06915df71c7adb5 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Fri, 18 Nov 2022 11:01:31 +0900 Subject: [PATCH 1/5] fix in handling states and purging --- x-pack/filebeat/input/awss3/s3.go | 6 +++--- x-pack/filebeat/input/awss3/states.go | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 5b1187e4317..bf59f1a634f 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -282,8 +282,8 @@ func (p *s3Poller) Purge() { for _, state := range p.states.GetStatesByListingID(listingID) { // it is not stored, keep - if !state.Stored { - p.log.Debugw("state not stored, skip purge", "state", state) + if !state.Stored && !state.Error { + p.log.Debugw("state not stored or with error, skip purge", "state", state) continue } @@ -294,7 +294,7 @@ func (p *s3Poller) Purge() { var commitWriteState commitWriteState err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket+state.ListPrefix, &commitWriteState) if err == nil { - // we have no entry in the map and we have no entry in the store + // we have no entry in the map, and we have no entry in the store // set zero time latestStoredTime = time.Time{} p.log.Debugw("last stored time is zero time", "bucket", state.Bucket, "listPrefix", state.ListPrefix) diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 46ecfa1a320..d20ed25ce96 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -78,6 +78,12 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool { return true } + // the previous state is stored or has error: let's skip + if !previousState.IsEmpty() && (previousState.Stored || previousState.Error) { + s.log.Debugw("previous state is stored or has error", "state", state) + return true + } + // we have no previous state or the previous state // is not stored: refresh the state if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) { From 2419b6b118398d34d95f56d41ec3a81ac5aae0a3 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Fri, 18 Nov 2022 17:05:16 +0900 Subject: [PATCH 2/5] changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d851c22b8d9..c896d55d2fb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597] - Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609] - Fix Google workspace pagination and document ID generation. {pull}33666[33666] +- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722] *Heartbeat* - Fix bug affecting let's encrypt and other users of cross-signed certs, where cert expiration was incorrectly calculated. {issue}33215[33215] From d462858c327c6b9fa4a6c176182177cbae331099 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Fri, 18 Nov 2022 17:05:23 +0900 Subject: [PATCH 3/5] increase coverage --- .../input/awss3/input_integration_test.go | 25 --- x-pack/filebeat/input/awss3/states_test.go | 159 ++++++++++++++++++ 2 files changed, 159 insertions(+), 25 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 024caa23ab5..6f4d793f42a 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -31,11 +31,8 @@ import ( "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -134,28 +131,6 @@ file_selectors: `, queueURL)) } -type testInputStore struct { - registry *statestore.Registry -} - -func openTestStatestore() beater.StateStore { - return &testInputStore{ - registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), - } -} - -func (s *testInputStore) Close() { - s.registry.Close() -} - -func (s *testInputStore) Access() (*statestore.Store, error) { - return s.registry.Get("filebeat") -} - -func (s *testInputStore) CleanupInterval() time.Duration { - return 24 * time.Hour -} - func createInput(t *testing.T, cfg *conf.C) *s3Input { inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) if err != nil { diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index b8a7cbb63d1..7893d669159 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -9,12 +9,38 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/stretchr/testify/assert" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" ) +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() beater.StateStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + _ = s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + var inputCtx = v2.Context{ Logger: logp.NewLogger("test"), Cancelation: context.Background(), @@ -83,6 +109,139 @@ func TestStatesIsNew(t *testing.T) { } } +func TestMustSkip(t *testing.T) { + type stateTestCase struct { + states func() *states + state state + mustBeNew bool + persistentStoreKV map[string]interface{} + expected bool + } + lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC) + tests := map[string]stateTestCase{ + "with empty states": { + states: func() *states { + return newStates(inputCtx) + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expected: false, + }, + "not existing state": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") + return states + }, + state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), + expected: false, + }, + "existing state": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expected: true, + }, + "with different etag": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "") + return states + }, + state: newState("bucket", "key", "etag2", "listPrefix", lastModified), + expected: false, + }, + "with different lastmodified": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), + expected: false, + }, + "with stored state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Stored = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expected: true, + }, + "with error state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Error = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expected: true, + }, + "before commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)), + expected: true, + }, + "same commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expected: true, + }, + "after commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)), + expected: false, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + states := test.states() + store := openTestStatestore() + persistentStore, err := store.Access() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + for key, value := range test.persistentStoreKV { + _ = persistentStore.Set(key, value) + } + + if test.mustBeNew { + test.state.LastModified = test.state.LastModified.Add(1 * time.Second) + } + + isNew := states.MustSkip(test.state, persistentStore) + assert.Equal(t, test.expected, isNew) + _ = persistentStore.Close() + }) + } +} + func TestStatesDelete(t *testing.T) { type stateTestCase struct { states func() *states From 1e76705136ad57e9b3255d07d8b212c7bee2e5cd Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 29 Nov 2022 11:26:12 +0900 Subject: [PATCH 4/5] cr fixes --- x-pack/filebeat/input/awss3/s3.go | 2 +- x-pack/filebeat/input/awss3/state.go | 5 +++++ x-pack/filebeat/input/awss3/states.go | 6 +++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index bf59f1a634f..f4d3295e13e 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -282,7 +282,7 @@ func (p *s3Poller) Purge() { for _, state := range p.states.GetStatesByListingID(listingID) { // it is not stored, keep - if !state.Stored && !state.Error { + if !state.IsProcessed() { p.log.Debugw("state not stored or with error, skip purge", "state", state) continue } diff --git a/x-pack/filebeat/input/awss3/state.go b/x-pack/filebeat/input/awss3/state.go index 1e2761ce742..97fb8d538cd 100644 --- a/x-pack/filebeat/input/awss3/state.go +++ b/x-pack/filebeat/input/awss3/state.go @@ -60,6 +60,11 @@ func (s *state) MarkAsError() { s.Error = true } +// IsProcessed checks if the state is either Stored or Error +func (s *state) IsProcessed() bool { + return s.Stored || s.Error +} + // IsEqual checks if the two states point to the same s3 object. func (s *state) IsEqual(c *state) bool { return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified) diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index d20ed25ce96..de100007d12 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -79,14 +79,14 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool { } // the previous state is stored or has error: let's skip - if !previousState.IsEmpty() && (previousState.Stored || previousState.Error) { + if !previousState.IsEmpty() && previousState.IsProcessed() { s.log.Debugw("previous state is stored or has error", "state", state) return true } // we have no previous state or the previous state // is not stored: refresh the state - if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) { + if previousState.IsEmpty() || !previousState.IsProcessed() { s.Update(state, "") } @@ -172,7 +172,7 @@ func (s *states) Update(newState state, listingID string) { s.log.Debug("New state added for ", newState.ID) } - if listingID == "" || (!newState.Stored && !newState.Error) { + if listingID == "" || !newState.IsProcessed() { return } From 53eb4dc95631873b9ceecdb48e58190e16cf364c Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Fri, 2 Dec 2022 12:00:48 +0900 Subject: [PATCH 5/5] remove code duplication in tests --- x-pack/filebeat/input/awss3/s3.go | 7 +- x-pack/filebeat/input/awss3/states.go | 6 - x-pack/filebeat/input/awss3/states_test.go | 130 +++++++-------------- 3 files changed, 46 insertions(+), 97 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 4f71885a3ae..bb367c755b4 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -208,7 +208,12 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- continue } - p.states.Update(state, "") + // we have no previous state or the previous state + // is not stored: refresh the state + previousState := p.states.FindPrevious(state) + if previousState.IsEmpty() || !previousState.IsProcessed() { + p.states.Update(state, "") + } event := s3EventV2{} event.AWSRegion = p.region diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index de100007d12..449219a867f 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -84,12 +84,6 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool { return true } - // we have no previous state or the previous state - // is not stored: refresh the state - if previousState.IsEmpty() || !previousState.IsProcessed() { - s.Update(state, "") - } - return false } diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index 7893d669159..39dc4cf82e6 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -46,76 +46,14 @@ var inputCtx = v2.Context{ Cancelation: context.Background(), } -func TestStatesIsNew(t *testing.T) { - type stateTestCase struct { - states func() *states - state state - expected bool - } - lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC) - tests := map[string]stateTestCase{ - "with empty states": { - states: func() *states { - return newStates(inputCtx) - }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: true, - }, - "not existing state": { - states: func() *states { - states := newStates(inputCtx) - states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") - return states - }, - state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), - expected: true, - }, - "existing state": { - states: func() *states { - states := newStates(inputCtx) - states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") - return states - }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: false, - }, - "with different etag": { - states: func() *states { - states := newStates(inputCtx) - states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "") - return states - }, - state: newState("bucket", "key", "etag2", "listPrefix", lastModified), - expected: true, - }, - "with different lastmodified": { - states: func() *states { - states := newStates(inputCtx) - states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") - return states - }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), - expected: true, - }, - } - - for name, test := range tests { - test := test - t.Run(name, func(t *testing.T) { - states := test.states() - isNew := states.IsNew(test.state) - assert.Equal(t, test.expected, isNew) - }) - } -} - -func TestMustSkip(t *testing.T) { +func TestStatesIsNewAndMustSkip(t *testing.T) { type stateTestCase struct { states func() *states state state mustBeNew bool persistentStoreKV map[string]interface{} - expected bool + expectedMustSkip bool + expectedIsNew bool } lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC) tests := map[string]stateTestCase{ @@ -123,8 +61,9 @@ func TestMustSkip(t *testing.T) { states: func() *states { return newStates(inputCtx) }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: false, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "not existing state": { states: func() *states { @@ -132,8 +71,9 @@ func TestMustSkip(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), - expected: false, + state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "existing state": { states: func() *states { @@ -141,8 +81,9 @@ func TestMustSkip(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: false, }, "with different etag": { states: func() *states { @@ -150,8 +91,9 @@ func TestMustSkip(t *testing.T) { states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag2", "listPrefix", lastModified), - expected: false, + state: newState("bucket", "key", "etag2", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "with different lastmodified": { states: func() *states { @@ -159,8 +101,9 @@ func TestMustSkip(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), - expected: false, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), + expectedMustSkip: false, + expectedIsNew: true, }, "with stored state": { states: func() *states { @@ -170,9 +113,10 @@ func TestMustSkip(t *testing.T) { states.Update(aState, "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - mustBeNew: true, - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, }, "with error state": { states: func() *states { @@ -182,9 +126,10 @@ func TestMustSkip(t *testing.T) { states.Update(aState, "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - mustBeNew: true, - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, }, "before commit write": { states: func() *states { @@ -193,8 +138,9 @@ func TestMustSkip(t *testing.T) { persistentStoreKV: map[string]interface{}{ awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)), + expectedMustSkip: true, + expectedIsNew: true, }, "same commit write": { states: func() *states { @@ -203,8 +149,9 @@ func TestMustSkip(t *testing.T) { persistentStoreKV: map[string]interface{}{ awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: true, }, "after commit write": { states: func() *states { @@ -213,8 +160,9 @@ func TestMustSkip(t *testing.T) { persistentStoreKV: map[string]interface{}{ awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)), - expected: false, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)), + expectedMustSkip: false, + expectedIsNew: true, }, } @@ -235,9 +183,11 @@ func TestMustSkip(t *testing.T) { test.state.LastModified = test.state.LastModified.Add(1 * time.Second) } - isNew := states.MustSkip(test.state, persistentStore) - assert.Equal(t, test.expected, isNew) - _ = persistentStore.Close() + isNew := states.IsNew(test.state) + assert.Equal(t, test.expectedIsNew, isNew) + + mustSkip := states.MustSkip(test.state, persistentStore) + assert.Equal(t, test.expectedMustSkip, mustSkip) }) } }