Skip to content

Commit

Permalink
Fix clean_inactive for Filestream input (#38632)
Browse files Browse the repository at this point in the history
* Add tests for store GC

* Add changelog

* Run mage check and update all necessary files

* Improve changelog entry

* Fix `clean_inactive` for Filestream input

The `clean_inactive` parameter was being parsed with the wrong key. It
is parsed/used by an anonymous struct on `input-logfile/manager.go`,
there it was parsed and used as `CleanTimeout` (`clean_timeout`). This
`CleanTimeout` setting has got exactly the same effect as the
`clean_inactive` described in our documentation.

This commit fixes this bug by renaming `clean_timeout` to
`clean_inactive` so the configuration value can have effect.

* Add tests for `clean_inactive` and fix documentation

* Add changelog

* Update test config

* Add licence headers and build tags

* PR improvements

- Rename `CleanTimeout` to `CleanInactive`
- Remove commented out code

* Fix rebase conflicts

* Fix lint errors

* Jenkins test this PR
  • Loading branch information
belimawr authored Apr 12, 2024
1 parent 9304106 commit ccd7b13
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780]
- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]


*Heartbeat*

Expand Down
13 changes: 7 additions & 6 deletions filebeat/docs/inputs/input-filestream-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,12 @@ WARNING: Only use this option if you understand that data loss is a potential
side effect.

When this option is enabled, {beatname_uc} removes the state of a file after the
specified period of inactivity has elapsed. The state can only be removed if
specified period of inactivity has elapsed. The state can only be removed if
the file is already ignored by {beatname_uc} (the file is older than
`ignore_older`). The `clean_inactive` setting must be greater than `ignore_older +
prospector.scanner.check_interval` to make sure that no states are removed while a file is still
being harvested. Otherwise, the setting could result in {beatname_uc} resending
the full content constantly because `clean_inactive` removes state for files
the full content constantly because `clean_inactive` removes state for files
that are still detected by {beatname_uc}. If a file is updated or appears
again, the file is read from the beginning.

Expand All @@ -461,10 +461,11 @@ for `clean_inactive` starts at 0 again.

TIP: During testing, you might notice that the registry contains state entries
that should be removed based on the `clean_inactive` setting. This happens
because {beatname_uc} doesn't remove the entries until it opens the registry
again to read a different file. If you are testing the `clean_inactive` setting,
make sure {beatname_uc} is configured to read from more than one file, or the
file state will never be removed from the registry.
because {beatname_uc} doesn't remove the entries until the registry garbage
collector (GC) runs. Once the TTL for a state expired, there are no active
harvesters for the file and the registry GC runs, then, and only then
the state is removed from memory and an `op: remove` is added to the registry
log file.

[float]
[id="{beatname_lc}-input-{type}-clean-removed"]
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type cleaner struct {
// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whose TTL
// (clean_timeout setting) has expired will be removed.
// (clean_inactive setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {

settings := struct {
ID string `config:"id"`
CleanTimeout time.Duration `config:"clean_timeout"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
}{CleanTimeout: cim.DefaultCleanTimeout}
}{CleanInactive: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
prospector: prospector,
harvester: harvester,
sourceIdentifier: sourceIdentifier,
cleanTimeout: settings.CleanTimeout,
cleanTimeout: settings.CleanInactive,
harvesterLimit: settings.HarvesterLimit,
}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/v2/input-cursor/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type cleaner struct {
// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whos TTL
// (clean_timeout setting) has expired will be removed.
// (clean_inactive setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
Expand All @@ -44,6 +44,7 @@ type cleaner struct {
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
started := time.Now()
//nolint: errcheck // gcStore does not return an error
timed.Periodic(canceler, interval, func() error {
gcStore(c.log, started, store)
return nil
Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}

settings := struct {
ID string `config:"id"`
CleanTimeout time.Duration `config:"clean_timeout"`
}{ID: "", CleanTimeout: cim.DefaultCleanTimeout}
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
}{ID: "", CleanInactive: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}
Expand All @@ -176,7 +176,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
userID: settings.ID,
sources: sources,
input: inp,
cleanTimeout: settings.CleanTimeout,
cleanTimeout: settings.CleanInactive,
}, nil
}

Expand Down
107 changes: 107 additions & 0 deletions filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"fmt"
"path"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/tests/integration"
)

var filestreamCleanInactiveCfg = `
filebeat.inputs:
- type: filestream
id: "test-clean-inactive"
paths:
- %s
clean_inactive: 3s
ignore_older: 2s
close.on_state_change.inactive: 1s
prospector.scanner.check_interval: 1s
filebeat.registry:
cleanup_interval: 5s
flush: 1s
queue.mem:
events: 32
flush.min_events: 8
flush.timeout: 0.1s
path.home: %s
output.file:
path: ${path.home}
filename: "output-file"
rotate_every_kb: 10000
logging:
level: debug
selectors:
- input
- input.filestream
metrics:
enabled: false
`

func TestFilestreamCleanInactive(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)
tempDir := filebeat.TempDir()

// 1. Generate the log file path, but do not write data to it
logFilePath := path.Join(tempDir, "log.log")

// 2. Write configuration file ans start Filebeat
filebeat.WriteConfigFile(fmt.Sprintf(filestreamCleanInactiveCfg, logFilePath, tempDir))
filebeat.Start()

// 3. Create the log file
integration.GenerateLogFile(t, logFilePath, 10, false)

// 4. Wait for Filebeat to start scanning for files
//
filebeat.WaitForLogs(
fmt.Sprintf("A new file %s has been found", logFilePath),
10*time.Second,
"Filebeat did not start looking for files to ingest")

filebeat.WaitForLogs(
fmt.Sprintf("Reader was closed. Closing. Path='%s", logFilePath),
10*time.Second, "Filebeat did not close the file")

// 5. Now that the reader has been closed, nothing is holding the state
// of the file, so once the TTL of its state expires and the store GC runs,
// it will be removed from the registry.
// Wait for the log message stating 1 entry has been removed from the registry
filebeat.WaitForLogs("1 entries removed", 20*time.Second, "entry was not removed from registtry")

// 6. Then assess it has been removed in the registry
registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json")
filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second)
}

0 comments on commit ccd7b13

Please sign in to comment.