Skip to content

Commit

Permalink
Refactoring of filestream input backend (elastic#22448) (elastic#22866)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR refactors parts of the filestream input.

A new interface is added named `resourceLocker` to let the `HarvesterGroup` lock resources from the store with fewer hops.

Another new interface is added for cleaning up the internal states before a `loginp.Prospector` is started. These cleaner functions can clean entries of removed files and update IDs if the `file_identity` configuration is changed.

The bookkeeping of readers is now threadsafe.

From now on `source.Name()` returns only a suffix of the state ID. The prefix `{input_type}::{user_id}` is added by a new interface `sourceIder` (I am open for better names.).

```golang
// SourceIder generates an ID for a Source.
type sourceIder interface {
    ID(Source) string
}
```

A special store is added as well which accepts `Source` interface as parameters and gets the full ID from a `sourceIder` instance. Only the interface `StateMetadataUpdater` is now accepted by `prospector.Run`.

```golang
 // StateMetadataUpdater updates and removes the state information for a given Source.
type StateMetadataUpdater interface {
    // FindCursorMeta retrieves and unpacks the cursor metadata of a Source.
    FindCursorMeta(s Source, v interface{}) error
    // UpdateMetadata updates the source metadata of a registry entry for a given Source.
    UpdateMetadata(s Source, v interface{}) error
    // Remove marks a state for deletion with a given Source.
    Remove(s Source) error
}
```

Important difference between `sourceStore` and `store` is that the first one only deals with `Source` parameters. When you use `store`, you need to specify the full ID of an entry in the format of `{input_type::user_id::source_id}`.

## Why is it important?

Cleaner architecture, responsibilities are separated even more. Prospector can stop/start readers out of band.

(cherry picked from commit f8f1be7)
  • Loading branch information
kvch authored Dec 3, 2020
1 parent d70e0be commit 2fae7f6
Show file tree
Hide file tree
Showing 15 changed files with 727 additions and 234 deletions.
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-filestream-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ instead and let {beatname_uc} pick up the file again.
Different `file_identity` methods can be configured to suit the
environment where you are collecting log messages.

WARNING: Changing `file_identity` methods between runs may result in
duplicated events in the output.

*`native`*:: The default behaviour of {beatname_uc} is to differentiate
between files using their inodes and device ids.
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ values might change during the lifetime of the file. If this happens
of the file. To solve this problem you can configure `file_identity` option. Possible
values besides the default `inode_deviceid` are `path` and `inode_marker`.

WARNING: Changing `file_identity` methods between runs may result in
duplicated events in the output.

Selecting `path` instructs {beatname_uc} to identify files based on their
paths. This is a quick way to avoid rereading files if inode and device ids
might change. However, keep in mind if the files are rotated (renamed), they
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func (w *fileWatcher) Event() loginp.FSEvent {
return <-w.events
}

func (w *fileWatcher) GetFiles() map[string]os.FileInfo {
return w.scanner.GetFiles()
}

type fileScannerConfig struct {
ExcludedFiles []match.Matcher `config:"exclude_files"`
Symlinks bool `config:"symlinks"`
Expand Down
26 changes: 24 additions & 2 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ import (
"github.com/elastic/beats/v7/libbeat/common/file"
)

type identifierFeature uint8

const (
// trackRename is a feature of an identifier which changes
// IDs if a source is renamed.
trackRename identifierFeature = iota

nativeName = "native"
pathName = "path"
inodeMarkerName = "inode_marker"
Expand All @@ -48,6 +54,7 @@ type identifierFactory func(*common.Config) (fileIdentifier, error)
type fileIdentifier interface {
GetSource(loginp.FSEvent) fileSource
Name() string
Supports(identifierFeature) bool
}

// fileSource implements the Source interface
Expand Down Expand Up @@ -96,7 +103,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(),
name: i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
}
Expand All @@ -105,6 +112,15 @@ func (i *inodeDeviceIdentifier) Name() string {
return i.name
}

func (i *inodeDeviceIdentifier) Supports(f identifierFeature) bool {
switch f {
case trackRename:
return true
default:
}
return false
}

type pathIdentifier struct {
name string
}
Expand All @@ -124,7 +140,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + p.name + identitySep + path,
name: p.name + identitySep + path,
identifierGenerator: p.name,
}
}
Expand All @@ -133,6 +149,10 @@ func (p *pathIdentifier) Name() string {
return p.name
}

func (p *pathIdentifier) Supports(f identifierFeature) bool {
return false
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

Expand All @@ -141,3 +161,5 @@ func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource {
}

func (m *MockIdentifier) Name() string { return "mock" }

func (m *MockIdentifier) Supports(_ identifierFeature) bool { return false }
11 changes: 10 additions & 1 deletion filebeat/input/filestream/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,20 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: fmt.Sprintf("%s%s%s-%s", i.name, identitySep, osstate.InodeString(), i.markerContents()),
name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(),
identifierGenerator: i.name,
}
}

func (i *inodeMarkerIdentifier) Name() string {
return i.name
}

func (i *inodeMarkerIdentifier) Supports(f identifierFeature) bool {
switch f {
case trackRename:
return true
default:
}
return false
}
48 changes: 33 additions & 15 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ import (
const pluginName = "filestream"

type state struct {
Offset int64 `json:"offset" struct:"offset"`
}

type fileMeta struct {
Source string `json:"source" struct:"source"`
Offset int64 `json:"offset" struct:"offset"`
IdentifierName string `json:"identifier_name" struct:"identifier_name"`
}

Expand Down Expand Up @@ -84,22 +87,30 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, err
}

prospector, err := newFileProspector(
config.Paths,
config.IgnoreOlder,
config.FileWatcher,
config.FileIdentity,
)
filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("error while creating filewatcher %v", err)
}

identifier, err := newFileIdentifier(config.FileIdentity)
if err != nil {
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
}

return prospector, &filestream{
prospector := &fileProspector{
filewatcher: filewatcher,
identifier: identifier,
ignoreOlder: config.IgnoreOlder,
cleanRemoved: config.CleanRemoved,
stateChangeCloser: config.Close.OnStateChange,
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
encodingFactory: encodingFactory,
Expand All @@ -108,13 +119,20 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}, nil
}

return prospector, filestream, nil
}

func (inp *filestream) Name() string { return pluginName }

func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error {
reader, err := inp.open(ctx.Logger, ctx.Cancelation, state{})
fs, ok := src.(fileSource)
if !ok {
return fmt.Errorf("not file source")
}

reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs.newPath, 0)
if err != nil {
return err
}
Expand All @@ -135,7 +153,7 @@ func (inp *filestream) Run(
log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name())
state := initState(log, cursor, fs)

r, err := inp.open(log, ctx.Cancelation, state)
r, err := inp.open(log, ctx.Cancelation, fs.newPath, state.Offset)
if err != nil {
log.Errorf("File could not be opened for reading: %v", err)
return err
Expand All @@ -154,7 +172,7 @@ func (inp *filestream) Run(
}

func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
state := state{Source: s.newPath, IdentifierName: s.identifierGenerator}
var state state
if c.IsNew() {
return state
}
Expand All @@ -167,8 +185,8 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
return state
}

func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, s state) (reader.Reader, error) {
f, err := inp.openFile(s.Source, s.Offset)
func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path string, offset int64) (reader.Reader, error) {
f, err := inp.openFile(path, offset)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type FSScanner interface {

// FSWatcher returns file events of the monitored files.
type FSWatcher interface {
FSScanner

// Run is the event loop which watchers for changes
// in the file system and returns events based on the data.
Run(unison.Canceler)
Expand Down
Loading

0 comments on commit 2fae7f6

Please sign in to comment.