Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file handle leak when handling errors in filestream #37973

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fields added to events containing the Beats version. {pull}37553[37553]
- Fix TCP/UDP metric queue length parsing base. {pull}37714[37714]
- Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799]
- [threatintel] MISP pagination fixes {pull}37898[37898]
- Fix file handle leak when handling errors in filestream {pull}37973[37973]

*Heartbeat*

Expand Down
11 changes: 6 additions & 5 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
return nil, err
}

ok := false // used for cleanup
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))

log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// if the file is archived, it means that it is not going to be updated in the future
Expand All @@ -203,7 +206,6 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

dbgReader, err := debug.AppendReaders(logReader)
if err != nil {
f.Close()
return nil, err
}

Expand All @@ -221,7 +223,6 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

Expand All @@ -233,6 +234,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

ok = true // no need to close the file
return r, nil
}

Expand All @@ -252,11 +254,11 @@ func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*o
return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
}

ok := false
f, err := file.ReadOpen(path)
if err != nil {
return nil, nil, fmt.Errorf("failed opening %s: %w", path, err)
}
ok := false
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))

fi, err = f.Stat()
Expand All @@ -280,14 +282,13 @@ func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*o

encoding, err := inp.encodingFactory(f)
if err != nil {
f.Close()
if errors.Is(err, transform.ErrShortSrc) {
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
}
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
}
ok = true

ok = true // no need to close the file
return f, encoding, nil
}

Expand Down
Loading