Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug][filestream] - Fix filebeat GC cleanup bug #40258

Merged
merged 13 commits into from
Jul 17, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ filebeat.inputs:

# Files for the modification data is older than clean_inactive the state from the registry is removed
# By default this is disabled.
#clean_inactive: 0
#clean_inactive: -1

# Removes the state for files which cannot be found on disk anymore immediately
#clean_removed: true
Expand Down
2 changes: 1 addition & 1 deletion filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ filebeat.inputs:

# Files for the modification data is older than clean_inactive the state from the registry is removed
# By default this is disabled.
#clean_inactive: 0
#clean_inactive: -1

# Removes the state for files which cannot be found on disk anymore immediately
#clean_removed: true
Expand Down
16 changes: 9 additions & 7 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
type config struct {
Reader readerConfig `config:",inline"`

ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher *conf.Namespace `config:"prospector"`
FileIdentity *conf.Namespace `config:"file_identity"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher *conf.Namespace `config:"prospector"`
FileIdentity *conf.Namespace `config:"file_identity"`

// -1 means that registry will never be cleaned
CleanInactive time.Duration `config:"clean_inactive" validate:"min=-1"`
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"`
IgnoreOlder time.Duration `config:"ignore_older"`
Expand Down Expand Up @@ -98,7 +100,7 @@ func defaultConfig() config {
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
CleanInactive: -1,
CleanRemoved: true,
HarvesterLimit: 0,
IgnoreOlder: 0,
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
Info: "filestream input",
Doc: "The filestream input collects logs from the local filestream service",
Manager: &loginp.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
DefaultCleanTimeout: -1,
},
}
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,7 @@ func checkCleanResource(started, now time.Time, resource *resource) bool {
reference = started
}

return reference.Add(ttl).Before(now) && resource.stored
// if ttl is negative, we never delete the entry
// else check for time elapsed
return ttl >= 0 && reference.Add(ttl).Before(now) && resource.stored
}
20 changes: 20 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,24 @@ func TestGCStore(t *testing.T) {
want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("state never removed with ttl=-1", func(t *testing.T) {

// keep started as a large value
started := time.Now().Add(-1 * time.Hour * 24 * 356) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: -1,
Updated: started,
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())
})
}
3 changes: 0 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ type StateStore interface {

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
if cim.DefaultCleanTimeout <= 0 {
cim.DefaultCleanTimeout = 30 * time.Minute
}

log := cim.Logger.With("input_type", cim.Type)

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2541,7 +2541,7 @@ filebeat.inputs:

# Files for the modification data is older than clean_inactive the state from the registry is removed
# By default this is disabled.
#clean_inactive: 0
#clean_inactive: -1

# Removes the state for files which cannot be found on disk anymore immediately
#clean_removed: true
Expand Down
Loading