diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index 768960323f9..b0ced1eab5b 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -358,6 +358,8 @@ instead and let {beatname_uc} pick up the file again. Different `file_identity` methods can be configured to suit the environment where you are collecting log messages. +WARNING: Changing `file_identity` methods between runs may result in +duplicated events in the output. *`native`*:: The default behaviour of {beatname_uc} is to differentiate between files using their inodes and device ids. diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc index 0a02a865465..be121a4fd7e 100644 --- a/filebeat/docs/inputs/input-filestream.asciidoc +++ b/filebeat/docs/inputs/input-filestream.asciidoc @@ -74,6 +74,9 @@ values might change during the lifetime of the file. If this happens of the file. To solve this problem you can configure `file_identity` option. Possible values besides the default `inode_deviceid` are `path` and `inode_marker`. +WARNING: Changing `file_identity` methods between runs may result in +duplicated events in the output. + Selecting `path` instructs {beatname_uc} to identify files based on their paths. This is a quick way to avoid rereading files if inode and device ids might change. However, keep in mind if the files are rotated (renamed), they diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index e988fb3cee9..8c285c8a4ba 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -211,6 +211,10 @@ func (w *fileWatcher) Event() loginp.FSEvent { return <-w.events } +func (w *fileWatcher) GetFiles() map[string]os.FileInfo { + return w.scanner.GetFiles() +} + type fileScannerConfig struct { ExcludedFiles []match.Matcher `config:"exclude_files"` Symlinks bool `config:"symlinks"` diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 63883383a1c..331b9cedc9f 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -26,7 +26,13 @@ import ( "github.com/elastic/beats/v7/libbeat/common/file" ) +type identifierFeature uint8 + const ( + // trackRename is a feature of an identifier which changes + // IDs if a source is renamed. + trackRename identifierFeature = iota + nativeName = "native" pathName = "path" inodeMarkerName = "inode_marker" @@ -48,6 +54,7 @@ type identifierFactory func(*common.Config) (fileIdentifier, error) type fileIdentifier interface { GetSource(loginp.FSEvent) fileSource Name() string + Supports(identifierFeature) bool } // fileSource implements the Source interface @@ -96,7 +103,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, - name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(), + name: i.name + identitySep + file.GetOSState(e.Info).String(), identifierGenerator: i.name, } } @@ -105,6 +112,15 @@ func (i *inodeDeviceIdentifier) Name() string { return i.name } +func (i *inodeDeviceIdentifier) Supports(f identifierFeature) bool { + switch f { + case trackRename: + return true + default: + } + return false +} + type pathIdentifier struct { name string } @@ -124,7 +140,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, - name: pluginName + identitySep + p.name + identitySep + path, + name: p.name + identitySep + path, identifierGenerator: p.name, } } @@ -133,6 +149,10 @@ func (p *pathIdentifier) Name() string { return p.name } +func (p *pathIdentifier) Supports(f identifierFeature) bool { + return false +} + // mockIdentifier is used for testing type MockIdentifier struct{} @@ -141,3 +161,5 @@ func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource { } func (m *MockIdentifier) Name() string { return "mock" } + +func (m *MockIdentifier) Supports(_ identifierFeature) bool { return false } diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index 25254d97fdc..459ae90348b 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -98,7 +98,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, - name: fmt.Sprintf("%s%s%s-%s", i.name, identitySep, osstate.InodeString(), i.markerContents()), + name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), identifierGenerator: i.name, } } @@ -106,3 +106,12 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { func (i *inodeMarkerIdentifier) Name() string { return i.name } + +func (i *inodeMarkerIdentifier) Supports(f identifierFeature) bool { + switch f { + case trackRename: + return true + default: + } + return false +} diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 9f715d1183e..7e253bcc9ec 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -41,8 +41,11 @@ import ( const pluginName = "filestream" type state struct { + Offset int64 `json:"offset" struct:"offset"` +} + +type fileMeta struct { Source string `json:"source" struct:"source"` - Offset int64 `json:"offset" struct:"offset"` IdentifierName string `json:"identifier_name" struct:"identifier_name"` } @@ -84,14 +87,14 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, err } - prospector, err := newFileProspector( - config.Paths, - config.IgnoreOlder, - config.FileWatcher, - config.FileIdentity, - ) + filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("error while creating filewatcher %v", err) + } + + identifier, err := newFileIdentifier(config.FileIdentity) + if err != nil { + return nil, nil, fmt.Errorf("error while creating file identifier: %v", err) } encodingFactory, ok := encoding.FindEncoding(config.Encoding) @@ -99,7 +102,15 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding) } - return prospector, &filestream{ + prospector := &fileProspector{ + filewatcher: filewatcher, + identifier: identifier, + ignoreOlder: config.IgnoreOlder, + cleanRemoved: config.CleanRemoved, + stateChangeCloser: config.Close.OnStateChange, + } + + filestream := &filestream{ readerConfig: config.readerConfig, bufferSize: config.BufferSize, encodingFactory: encodingFactory, @@ -108,13 +119,20 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) includeLines: config.IncludeLines, maxBytes: config.MaxBytes, closerConfig: config.Close, - }, nil + } + + return prospector, filestream, nil } func (inp *filestream) Name() string { return pluginName } func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { - reader, err := inp.open(ctx.Logger, ctx.Cancelation, state{}) + fs, ok := src.(fileSource) + if !ok { + return fmt.Errorf("not file source") + } + + reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs.newPath, 0) if err != nil { return err } @@ -135,7 +153,7 @@ func (inp *filestream) Run( log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name()) state := initState(log, cursor, fs) - r, err := inp.open(log, ctx.Cancelation, state) + r, err := inp.open(log, ctx.Cancelation, fs.newPath, state.Offset) if err != nil { log.Errorf("File could not be opened for reading: %v", err) return err @@ -154,7 +172,7 @@ func (inp *filestream) Run( } func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { - state := state{Source: s.newPath, IdentifierName: s.identifierGenerator} + var state state if c.IsNew() { return state } @@ -167,8 +185,8 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { return state } -func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, s state) (reader.Reader, error) { - f, err := inp.openFile(s.Source, s.Offset) +func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path string, offset int64) (reader.Reader, error) { + f, err := inp.openFile(path, offset) if err != nil { return nil, err } diff --git a/filebeat/input/filestream/internal/input-logfile/fswatch.go b/filebeat/input/filestream/internal/input-logfile/fswatch.go index 685b54253a4..eb080bad292 100644 --- a/filebeat/input/filestream/internal/input-logfile/fswatch.go +++ b/filebeat/input/filestream/internal/input-logfile/fswatch.go @@ -57,6 +57,8 @@ type FSScanner interface { // FSWatcher returns file events of the monitored files. type FSWatcher interface { + FSScanner + // Run is the event loop which watchers for changes // in the file system and returns events based on the data. Run(unison.Canceler) diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 3c7573ad460..72635c194f4 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -21,10 +21,13 @@ import ( "context" "fmt" "runtime/debug" + "sync" "time" input "github.com/elastic/beats/v7/filebeat/input/v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" ) @@ -41,15 +44,63 @@ type Harvester interface { Run(input.Context, Source, Cursor, Publisher) error } +type readerGroup struct { + mu sync.Mutex + table map[string]context.CancelFunc +} + +func newReaderGroup() *readerGroup { + return &readerGroup{ + table: make(map[string]context.CancelFunc), + } +} + +// newContext createas a new context, cancel function and associates it with the given id within +// the reader group. Using the cancel function does not remvoe the association. +// An error is returned if the id is already associated with a context. The cancel +// function is nil in that case and must not be called. +// +// The context will be automatically cancelled once the ID is removed from the group. Calling `cancel` is optional. +func (r *readerGroup) newContext(id string, cancelation v2.Canceler) (context.Context, context.CancelFunc, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.table[id]; ok { + return nil, nil, fmt.Errorf("harvester is already running for file") + } + + ctx, cancel := context.WithCancel(ctxtool.FromCanceller(cancelation)) + + r.table[id] = cancel + return ctx, cancel, nil +} + +func (r *readerGroup) remove(id string) { + r.mu.Lock() + defer r.mu.Unlock() + + cancel, ok := r.table[id] + if !ok { + return + } + + cancel() + delete(r.table, id) +} + // HarvesterGroup is responsible for running the // Harvesters started by the Prospector. type HarvesterGroup interface { - Run(input.Context, Source) error + // Start starts a Harvester and adds it to the readers list. + Start(input.Context, Source) + // Stop cancels the reader of a given Source. + Stop(Source) + // StopGroup cancels all running Harvesters. + StopGroup() error } type defaultHarvesterGroup struct { - manager *InputManager - readers map[string]context.CancelFunc + readers *readerGroup pipeline beat.PipelineConnector harvester Harvester cleanTimeout time.Duration @@ -57,70 +108,92 @@ type defaultHarvesterGroup struct { tg unison.TaskGroup } -// Run starts the Harvester for a Source. -func (hg *defaultHarvesterGroup) Run(ctx input.Context, s Source) error { - log := ctx.Logger.With("source", s.Name()) - log.Debug("Starting harvester for file") - - harvesterCtx, cancelHarvester := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation)) - ctx.Cancelation = harvesterCtx - - resource, err := hg.manager.lock(ctx, s.Name()) - if err != nil { - cancelHarvester() - return err - } - - if _, ok := hg.readers[s.Name()]; ok { - cancelHarvester() - log.Debug("A harvester is already running for file") - return nil - } - hg.readers[s.Name()] = cancelHarvester - - hg.store.UpdateTTL(resource, hg.cleanTimeout) - - client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, - ACKHandler: newInputACKHandler(ctx.Logger), - }) - if err != nil { - cancelHarvester() - return err - } - - cursor := makeCursor(hg.store, resource) - publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} +// Start starts the Harvester for a Source. It does not block. +func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { + sourceName := s.Name() - go func(cancel context.CancelFunc) { - defer client.Close() - defer log.Debug("Stopped harvester for file") - defer cancel() - defer releaseResource(resource) - defer delete(hg.readers, s.Name()) + ctx.Logger = ctx.Logger.With("source", sourceName) + ctx.Logger.Debug("Starting harvester for file") + hg.tg.Go(func(canceler unison.Canceler) error { defer func() { if v := recover(); v != nil { err := fmt.Errorf("harvester panic with: %+v\n%s", v, debug.Stack()) ctx.Logger.Errorf("Harvester crashed with: %+v", err) } }() + defer ctx.Logger.Debug("Stopped harvester for file") - err := hg.harvester.Run(ctx, s, cursor, publisher) + harvesterCtx, cancelHarvester, err := hg.readers.newContext(sourceName, canceler) if err != nil { - log.Errorf("Harvester stopped: %v", err) + return fmt.Errorf("error while adding new reader to the bookkeeper %v", err) } - }(cancelHarvester) - return nil + ctx.Cancelation = harvesterCtx + defer cancelHarvester() + defer hg.readers.remove(sourceName) + + resource, err := lock(ctx, hg.store, sourceName) + if err != nil { + return fmt.Errorf("error while locking resource: %v", err) + } + defer releaseResource(resource) + + client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: ctx.Cancelation, + ACKHandler: newInputACKHandler(ctx.Logger), + }) + if err != nil { + return fmt.Errorf("error while connecting to output with pipeline: %v", err) + } + defer client.Close() + + hg.store.UpdateTTL(resource, hg.cleanTimeout) + cursor := makeCursor(hg.store, resource) + publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} + + err = hg.harvester.Run(ctx, s, cursor, publisher) + if err != nil && err != context.Canceled { + return fmt.Errorf("error while running harvester: %v", err) + } + return nil + }) } -// Cancel stops the running Harvester for a given Source. -func (hg *defaultHarvesterGroup) Cancel(s Source) error { - if cancel, ok := hg.readers[s.Name()]; ok { - cancel() +// Stop stops the running Harvester for a given Source. +func (hg *defaultHarvesterGroup) Stop(s Source) { + hg.tg.Go(func(_ unison.Canceler) error { + hg.readers.remove(s.Name()) return nil + }) +} + +// StopGroup stops all running Harvesters. +func (hg *defaultHarvesterGroup) StopGroup() error { + return hg.tg.Stop() +} + +// Lock locks a key for exclusive access and returns an resource that can be used to modify +// the cursor state and unlock the key. +func lock(ctx input.Context, store *store, key string) (*resource, error) { + resource := store.Get(key) + err := lockResource(ctx.Logger, resource, ctx.Cancelation) + if err != nil { + resource.Release() + return nil, err } - return fmt.Errorf("no such harvester %s", s.Name()) + return resource, nil +} + +func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { + if !resource.lock.TryLock() { + log.Infof("Resource '%v' currently in use, waiting...", resource.key) + err := resource.lock.LockContext(canceler) + if err != nil { + log.Infof("Input for resource '%v' has been stopped while waiting", resource.key) + return err + } + } + return nil } func releaseResource(resource *resource) { diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 11092479cf3..41219c88659 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -30,10 +30,12 @@ import ( ) type managedInput struct { - manager *InputManager - prospector Prospector - harvester Harvester - cleanTimeout time.Duration + userID string + manager *InputManager + sourceIdentifier *sourceIdentifier + prospector Prospector + harvester Harvester + cleanTimeout time.Duration } // Name is required to implement the v2.Input interface @@ -49,33 +51,29 @@ func (inp *managedInput) Run( ctx input.Context, pipeline beat.PipelineConnector, ) (err error) { + groupStore := inp.manager.getRetainedStore() + defer groupStore.Release() + // Setup cancellation using a custom cancel context. All workers will be // stopped if one failed badly by returning an error. cancelCtx, cancel := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation)) defer cancel() ctx.Cancelation = cancelCtx - store := inp.manager.store - store.Retain() - defer store.Release() - hg := &defaultHarvesterGroup{ pipeline: pipeline, - readers: make(map[string]context.CancelFunc), - manager: inp.manager, + readers: newReaderGroup(), cleanTimeout: inp.cleanTimeout, harvester: inp.harvester, - store: store, + store: groupStore, tg: unison.TaskGroup{}, } - stateStore, err := inp.manager.StateStore.Access() - if err != nil { - return err - } - defer stateStore.Close() + prospectorStore := inp.manager.getRetainedStore() + defer prospectorStore.Release() + sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier) - inp.prospector.Run(ctx, stateStore, hg) + inp.prospector.Run(ctx, sourceStore, hg) return nil } diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index db3c600d2bc..8d8548e22f0 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -19,6 +19,8 @@ package input_logfile import ( "errors" + "fmt" + "strings" "sync" "time" @@ -79,6 +81,8 @@ type Source interface { var errNoSourceConfigured = errors.New("no source has been configured") var errNoInputRunner = errors.New("no input runner available") +const globalInputID = ".global" + // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { Access() (*statestore.Store, error) @@ -92,6 +96,7 @@ func (cim *InputManager) init() error { } log := cim.Logger.With("input_type", cim.Type) + var store *store store, cim.initErr = openStore(log, cim.StateStore, cim.Type) if cim.initErr != nil { @@ -117,9 +122,8 @@ func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { log := cim.Logger.With("input_type", cim.Type) - store := cim.store + store := cim.getRetainedStore() cleaner := &cleaner{log: log} - store.Retain() err := group.Go(func(canceler unison.Canceler) error { defer cim.shutdown() defer store.Release() @@ -166,34 +170,60 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) { return nil, errNoInputRunner } - return &managedInput{ - manager: cim, - prospector: prospector, - harvester: harvester, - cleanTimeout: settings.CleanTimeout, - }, nil -} + sourceIdentifier, err := newSourceIdentifier(cim.Type, settings.ID) + if err != nil { + return nil, fmt.Errorf("error while creating source identifier for input: %v", err) + } -// Lock locks a key for exclusive access and returns an resource that can be used to modify -// the cursor state and unlock the key. -func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { - resource := cim.store.Get(key) - err := lockResource(ctx.Logger, resource, ctx.Cancelation) + pStore := cim.getRetainedStore() + defer pStore.Release() + prospectorStore := newSourceStore(pStore, sourceIdentifier) + err = prospector.Init(prospectorStore) if err != nil { - resource.Release() return nil, err } - return resource, nil + + return &managedInput{ + manager: cim, + userID: settings.ID, + prospector: prospector, + harvester: harvester, + sourceIdentifier: sourceIdentifier, + cleanTimeout: settings.CleanTimeout, + }, nil } -func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { - if !resource.lock.TryLock() { - log.Infof("Resource '%v' currently in use, waiting...", resource.key) - err := resource.lock.LockContext(canceler) - if err != nil { - log.Infof("Input for resource '%v' has been stopped while waiting", resource.key) - return err - } +func (cim *InputManager) getRetainedStore() *store { + store := cim.store + store.Retain() + return store +} + +type sourceIdentifier struct { + prefix string + configuredUserID bool +} + +func newSourceIdentifier(pluginName, userID string) (*sourceIdentifier, error) { + if userID == globalInputID { + return nil, fmt.Errorf("invalid user ID: .global") } - return nil + + configuredUserID := true + if userID == "" { + configuredUserID = false + userID = globalInputID + } + return &sourceIdentifier{ + prefix: pluginName + "::" + userID + "::", + configuredUserID: configuredUserID, + }, nil +} + +func (i *sourceIdentifier) ID(s Source) string { + return i.prefix + s.Name() +} + +func (i *sourceIdentifier) MatchesInput(id string) bool { + return strings.HasPrefix(id, i.prefix) } diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 185d6f9ec7e..fbd1dd5906c 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -19,17 +19,44 @@ package input_logfile import ( input "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/statestore" ) // Prospector is responsible for starting, stopping harvesters // based on the retrieved information about the configured paths. // It also updates the statestore with the meta data of the running harvesters. type Prospector interface { + // Init runs the cleanup processes before starting the prospector. + Init(c ProspectorCleaner) error // Run starts the event loop and handles the incoming events // either by starting/stopping a harvester, or updating the statestore. - Run(input.Context, *statestore.Store, HarvesterGroup) + Run(input.Context, StateMetadataUpdater, HarvesterGroup) // Test checks if the Prospector is able to run the configuration // specified by the user. Test() error } + +// StateMetadataUpdater updates and removes the state information for a given Source. +type StateMetadataUpdater interface { + // FindCursorMeta retrieves and unpacks the cursor metadata of an entry of the given Source. + FindCursorMeta(s Source, v interface{}) error + // UpdateMetadata updates the source metadata of a registry entry of a given Source. + UpdateMetadata(s Source, v interface{}) error + // Remove marks a state for deletion of a given Source. + Remove(s Source) error +} + +// ProspectorCleaner cleans the state store before it starts running. +type ProspectorCleaner interface { + // CleanIf removes an entry if the function returns true + CleanIf(func(v Value) bool) + // UpdateIdentifiers updates ID in the registry. + // The function passed to UpdateIdentifiers must return an empty string if the key + // remains the same. + UpdateIdentifiers(func(v Value) (string, interface{})) +} + +// Value contains the cursor metadata. +type Value interface { + // UnpackCursorMeta returns the cursor metadata required by the prospector. + UnpackCursorMeta(to interface{}) error +} diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 8267565f551..3bd3fcdb081 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -18,6 +18,7 @@ package input_logfile import ( + "fmt" "strings" "sync" "time" @@ -31,6 +32,13 @@ import ( "github.com/elastic/go-concert/unison" ) +// sourceStore is a store which can access resources using the Source +// from an input. +type sourceStore struct { + identifier *sourceIdentifier + store *store +} + // store encapsulates the persistent store and the in memory state store, that // can be ahead of the the persistent store. // The store lifetime is managed by a reference counter. Once all owners (the @@ -99,6 +107,7 @@ type resource struct { // we always write the complete state of the key/value pair. cursor interface{} pendingCursor interface{} + cursorMeta interface{} } type ( @@ -116,6 +125,7 @@ type ( TTL time.Duration Updated time.Time Cursor interface{} + Meta interface{} } stateInternal struct { @@ -149,6 +159,87 @@ func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, }, nil } +func newSourceStore(s *store, identifier *sourceIdentifier) *sourceStore { + return &sourceStore{ + store: s, + identifier: identifier, + } +} + +func (s *sourceStore) FindCursorMeta(src Source, v interface{}) error { + key := s.identifier.ID(src) + return s.store.findCursorMeta(key, v) +} + +func (s *sourceStore) UpdateMetadata(src Source, v interface{}) error { + key := s.identifier.ID(src) + return s.store.updateMetadata(key, v) +} + +func (s *sourceStore) Remove(src Source) error { + key := s.identifier.ID(src) + return s.store.remove(key) +} + +// CleanIf sets the TTL of a resource if the predicate return true. +func (s *sourceStore) CleanIf(pred func(v Value) bool) { + s.store.ephemeralStore.mu.Lock() + defer s.store.ephemeralStore.mu.Unlock() + + for key, res := range s.store.ephemeralStore.table { + if !s.identifier.MatchesInput(key) { + continue + } + + if !res.lock.TryLock() { + continue + } + + remove := pred(res) + if remove { + s.store.UpdateTTL(res, 0) + } + res.lock.Unlock() + } +} + +// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one +// for removal. +func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) { + s.store.ephemeralStore.mu.Lock() + defer s.store.ephemeralStore.mu.Unlock() + + for key, res := range s.store.ephemeralStore.table { + if !s.identifier.MatchesInput(key) { + continue + } + + if !res.lock.TryLock() { + continue + } + + newKey, updatedMeta := getNewID(res) + if len(newKey) > 0 && res.internalState.TTL > 0 { + if _, ok := s.store.ephemeralStore.table[newKey]; ok { + res.lock.Unlock() + continue + } + + // Pending updates due to events that have not yet been ACKed + // are not included in the copy. Collection on + // the copy start from the last known ACKed position. + // This might lead to duplicates if configurations are adapted + // for inputs with the same ID are changed. + r := res.copyWithNewKey(newKey) + r.cursorMeta = updatedMeta + r.stored = false + s.store.writeState(r) + } + + res.lock.Unlock() + } +} + func (s *store) Retain() { s.refCount.Retain() } func (s *store) Release() { if s.refCount.Release() { @@ -169,6 +260,51 @@ func (s *store) Get(key string) *resource { return s.ephemeralStore.Find(key, true) } +func (s *store) findCursorMeta(key string, to interface{}) error { + resource := s.ephemeralStore.Find(key, false) + if resource == nil { + return fmt.Errorf("resource '%s' not found", key) + } + return typeconv.Convert(to, resource.cursorMeta) +} + +// updateMetadata updates the cursor metadata in the persistent store. +func (s *store) updateMetadata(key string, meta interface{}) error { + resource := s.ephemeralStore.Find(key, false) + if resource == nil { + return fmt.Errorf("resource '%s' not found", key) + } + + resource.cursorMeta = meta + + s.writeState(resource) + return nil +} + +// writeState writes the state to the persistent store. +// WARNING! it does not lock the store +func (s *store) writeState(r *resource) { + err := s.persistentStore.Set(r.key, r.inSyncStateSnapshot()) + if err != nil { + s.log.Errorf("Failed to update resource fields for '%v'", r.key) + r.internalInSync = false + } else { + r.stored = true + r.internalInSync = true + } +} + +// Removes marks an entry for removal by setting its TTL to zero. +func (s *store) remove(key string) error { + resource := s.ephemeralStore.Find(key, false) + if resource == nil { + return fmt.Errorf("resource '%s' not found", key) + } + + s.UpdateTTL(resource, 0) + return nil +} + // UpdateTTL updates the time-to-live of a resource. Inactive resources with expired TTL are subject to removal. // The TTL value is part of the internal state, and will be written immediately to the persistent store. // On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known @@ -185,18 +321,7 @@ func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { resource.internalState.Updated = time.Now() } - err := s.persistentStore.Set(resource.key, state{ - TTL: resource.internalState.TTL, - Updated: resource.internalState.Updated, - Cursor: resource.cursor, - }) - if err != nil { - s.log.Errorf("Failed to update resource management fields for '%v'", resource.key) - resource.internalInSync = false - } else { - resource.stored = true - resource.internalInSync = true - } + s.writeState(resource) } // Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned. @@ -259,12 +384,37 @@ func (r *resource) UnpackCursor(to interface{}) error { return typeconv.Convert(to, r.pendingCursor) } +func (r *resource) UnpackCursorMeta(to interface{}) error { + return typeconv.Convert(to, r.cursorMeta) +} + // syncStateSnapshot returns the current insync state based on already ACKed update operations. func (r *resource) inSyncStateSnapshot() state { return state{ TTL: r.internalState.TTL, Updated: r.internalState.Updated, Cursor: r.cursor, + Meta: r.cursorMeta, + } +} + +func (r *resource) copyWithNewKey(key string) *resource { + internalState := r.internalState + + // This is required to prevent the cleaner from removing the + // entry from the registry immediately. + // It still might be removed if the output is blocked for a long + // time. If removed the whole file is resent to the output when found/updated. + internalState.Updated = time.Now() + return &resource{ + key: key, + stored: r.stored, + internalInSync: true, + internalState: internalState, + activeCursorOperations: r.activeCursorOperations, + cursor: r.cursor, + pendingCursor: nil, + cursorMeta: r.cursorMeta, } } @@ -280,6 +430,7 @@ func (r *resource) stateSnapshot() state { TTL: r.internalState.TTL, Updated: r.internalState.Updated, Cursor: cursor, + Meta: r.cursorMeta, } } @@ -310,7 +461,8 @@ func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*stat TTL: st.TTL, Updated: st.Updated, }, - cursor: st.Cursor, + cursor: st.Cursor, + cursorMeta: st.Meta, } states.table[resource.key] = resource diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 71ea41298b2..d55162bd6c4 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -227,6 +227,131 @@ func TestStore_UpdateTTL(t *testing.T) { }) } +type testMeta struct { + IdentifierName string +} + +func TestSourceStore_UpdateIdentifiers(t *testing.T) { + t.Run("update identifiers when TTL is bigger than zero", func(t *testing.T) { + backend := createSampleStore(t, map[string]state{ + "test::key1": state{ + TTL: 60 * time.Second, + Meta: testMeta{IdentifierName: "method"}, + }, + "test::key2": state{ + TTL: 0 * time.Second, + Meta: testMeta{IdentifierName: "method"}, + }, + }) + s := testOpenStore(t, "test", backend) + defer s.Release() + store := &sourceStore{&sourceIdentifier{"test", true}, s} + + store.UpdateIdentifiers(func(v Value) (string, interface{}) { + var m testMeta + err := v.UnpackCursorMeta(&m) + if err != nil { + t.Fatalf("cannot unpack meta: %v", err) + } + if m.IdentifierName == "method" { + return "test::key1::updated", testMeta{IdentifierName: "something"} + + } + return "", nil + + }) + + var newState state + s.persistentStore.Get("test::key1::updated", &newState) + + want := map[string]state{ + "test::key1": state{ + Updated: s.Get("test::key1").internalState.Updated, + TTL: 60 * time.Second, + Meta: map[string]interface{}{"identifiername": "method"}, + }, + "test::key2": state{ + Updated: s.Get("test::key2").internalState.Updated, + TTL: 0 * time.Second, + Meta: map[string]interface{}{"identifiername": "method"}, + }, + "test::key1::updated": state{ + Updated: newState.Updated, + TTL: 60 * time.Second, + Meta: map[string]interface{}{"identifiername": "something"}, + }, + } + + checkEqualStoreState(t, want, backend.snapshot()) + }) +} + +func TestSourceStore_CleanIf(t *testing.T) { + t.Run("entries are cleaned when funtion returns true", func(t *testing.T) { + backend := createSampleStore(t, map[string]state{ + "test::key1": state{ + TTL: 60 * time.Second, + }, + "test::key2": state{ + TTL: 0 * time.Second, + }, + }) + s := testOpenStore(t, "test", backend) + defer s.Release() + store := &sourceStore{&sourceIdentifier{"test", true}, s} + + store.CleanIf(func(_ Value) bool { + return true + }) + + want := map[string]state{ + "test::key1": state{ + Updated: s.Get("test::key1").internalState.Updated, + TTL: 0 * time.Second, + }, + "test::key2": state{ + Updated: s.Get("test::key2").internalState.Updated, + TTL: 0 * time.Second, + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(s)) + checkEqualStoreState(t, want, storeInSyncSnapshot(s)) + }) + + t.Run("entries are left alone when funtion returns false", func(t *testing.T) { + backend := createSampleStore(t, map[string]state{ + "test::key1": state{ + TTL: 60 * time.Second, + }, + "test::key2": state{ + TTL: 0 * time.Second, + }, + }) + s := testOpenStore(t, "test", backend) + defer s.Release() + store := &sourceStore{&sourceIdentifier{"test", true}, s} + + store.CleanIf(func(v Value) bool { + return false + }) + + want := map[string]state{ + "test::key1": state{ + Updated: s.Get("test::key1").internalState.Updated, + TTL: 60 * time.Second, + }, + "test::key2": state{ + Updated: s.Get("test::key2").internalState.Updated, + TTL: 0 * time.Second, + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(s)) + checkEqualStoreState(t, want, storeInSyncSnapshot(s)) + }) +} + func closeStoreWith(fn func(s *store)) func() { old := closeStore closeStore = fn diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 11f479ccef8..89932773648 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -18,17 +18,13 @@ package filestream import ( - "os" - "strings" "time" "github.com/urso/sderr" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/go-concert/unison" ) @@ -41,47 +37,61 @@ const ( // The FS events then trigger either new Harvester runs or updates // the statestore. type fileProspector struct { - filewatcher loginp.FSWatcher - identifier fileIdentifier - ignoreOlder time.Duration - cleanRemoved bool + filewatcher loginp.FSWatcher + identifier fileIdentifier + ignoreOlder time.Duration + cleanRemoved bool + stateChangeCloser stateChangeCloserConfig } -func newFileProspector( - paths []string, - ignoreOlder time.Duration, - fileWatcherNs, identifierNs *common.ConfigNamespace, -) (loginp.Prospector, error) { +func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error { + files := p.filewatcher.GetFiles() - filewatcher, err := newFileWatcher(paths, fileWatcherNs) - if err != nil { - return nil, err - } + if p.cleanRemoved { + cleaner.CleanIf(func(v loginp.Value) bool { + var fm fileMeta + err := v.UnpackCursorMeta(&fm) + if err != nil { + // remove faulty entries + return true + } - identifier, err := newFileIdentifier(identifierNs) - if err != nil { - return nil, err + _, ok := files[fm.Source] + return !ok + }) } - return &fileProspector{ - filewatcher: filewatcher, - identifier: identifier, - ignoreOlder: ignoreOlder, - cleanRemoved: true, - }, nil + identifierName := p.identifier.Name() + cleaner.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) { + var fm fileMeta + err := v.UnpackCursorMeta(&fm) + if err != nil { + return "", nil + } + + fi, ok := files[fm.Source] + if !ok { + return "", fm + } + + if fm.IdentifierName != identifierName { + newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Info: fi}).Name() + fm.IdentifierName = identifierName + return newKey, fm + } + return "", fm + }) + + return nil } // Run starts the fileProspector which accepts FS events from a file watcher. -func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.HarvesterGroup) { +func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, hg loginp.HarvesterGroup) { log := ctx.Logger.With("prospector", prospectorDebugKey) log.Debug("Starting prospector") defer log.Debug("Prospector has stopped") - if p.cleanRemoved { - p.cleanRemovedBetweenRuns(log, s) - } - - p.updateIdentifiersBetweenRuns(log, s) + defer p.stopHarvesterGroup(log, hg) var tg unison.MultiErrGroup @@ -115,15 +125,20 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.H } } - hg.Run(ctx, src) + hg.Start(ctx, src) case loginp.OpDelete: log.Debugf("File %s has been removed", fe.OldPath) + if p.stateChangeCloser.Removed { + log.Debugf("Stopping harvester as file %s has been removed and close.on_state_change.removed is enabled.", src.Name()) + hg.Stop(src) + } + if p.cleanRemoved { log.Debugf("Remove state for file as file removed: %s", fe.OldPath) - err := s.Remove(src.Name()) + err := s.Remove(src) if err != nil { log.Errorf("Error while removing state from statestore: %v", err) } @@ -131,7 +146,39 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.H case loginp.OpRename: log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath) - // TODO update state information in the store + + // if file_identity is based on path, the current reader has to be cancelled + // and a new one has to start. + if !p.identifier.Supports(trackRename) { + prevSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: fe.OldPath}) + hg.Stop(prevSrc) + + log.Debugf("Remove state for file as file renamed and path file_identity is configured: %s", fe.OldPath) + err := s.Remove(prevSrc) + if err != nil { + log.Errorf("Error while removing old state of renamed file (%s): %v", fe.OldPath, err) + } + + hg.Start(ctx, src) + } else { + // update file metadata as the path has changed + var meta fileMeta + err := s.FindCursorMeta(src, meta) + if err != nil { + log.Errorf("Error while getting cursor meta data of entry %s: %v", src.Name(), err) + + meta.IdentifierName = p.identifier.Name() + } + s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) + + if p.stateChangeCloser.Renamed { + log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name()) + + fe.Op = loginp.OpDelete + srcToClose := p.identifier.GetSource(fe) + hg.Stop(srcToClose) + } + } default: log.Error("Unkown return value %v", fe.Op) @@ -146,62 +193,11 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.H } } -func (p *fileProspector) cleanRemovedBetweenRuns(log *logp.Logger, s *statestore.Store) { - keyPrefix := pluginName + "::" - s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { - return true, nil - } - - var st state - if err := dec.Decode(&st); err != nil { - log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", - key, err) - return true, nil - } - - _, err := os.Stat(st.Source) - if err != nil { - s.Remove(key) - } - - return true, nil - }) -} - -func (p *fileProspector) updateIdentifiersBetweenRuns(log *logp.Logger, s *statestore.Store) { - keyPrefix := pluginName + "::" - s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { - return true, nil - } - - var st state - if err := dec.Decode(&st); err != nil { - log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", key, err) - return true, nil - } - - if st.IdentifierName == p.identifier.Name() { - return true, nil - } - - fi, err := os.Stat(st.Source) - if err != nil { - return true, nil - } - newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: st.Source, Info: fi}).Name() - st.IdentifierName = p.identifier.Name() - - err = s.Set(newKey, st) - if err != nil { - log.Errorf("Failed to add updated state for '%v', cursor state will be ignored. Error was: %+v", key, err) - return true, nil - } - s.Remove(key) - - return true, nil - }) +func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.HarvesterGroup) { + err := hg.StopGroup() + if err != nil { + log.Errorf("Error while stopping harverster group: %v", err) + } } func (p *fileProspector) Test() error { diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index 1f75b12d2bd..eef65685a02 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -19,6 +19,7 @@ package filestream import ( "context" + "os" "testing" "time" @@ -27,8 +28,6 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" "github.com/elastic/go-concert/unison" ) @@ -45,13 +44,13 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"}, }, - expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + expectedSources: []string{"path::/path/to/file", "path::/path/to/other/file"}, }, "one updated file": { events: []loginp.FSEvent{ loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, }, - expectedSources: []string{"filestream::path::/path/to/file"}, + expectedSources: []string{"path::/path/to/file"}, }, "old files with ignore older configured": { events: []loginp.FSEvent{ @@ -83,7 +82,7 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }, }, ignoreOlder: 5 * time.Minute, - expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + expectedSources: []string{"path::/path/to/file", "path::/path/to/other/file"}, }, } @@ -99,7 +98,7 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} hg := getTestHarvesterGroup() - p.Run(ctx, testStateStore(), hg) + p.Run(ctx, newMockMetadataUpdater(), hg) assert.ElementsMatch(t, hg.encounteredNames, test.expectedSources) }) @@ -136,15 +135,12 @@ func TestProspectorDeletedFile(t *testing.T) { } ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} - testStore := testStateStore() - testStore.Set("filestream::path::/path/to/file", nil) + testStore := newMockMetadataUpdater() + testStore.set("path::/path/to/file") p.Run(ctx, testStore, getTestHarvesterGroup()) - has, err := testStore.Has("filestream::path::/path/to/file") - if err != nil { - t.Fatal(err) - } + has := testStore.has("path::/path/to/file") if test.cleanRemoved { assert.False(t, has) @@ -162,8 +158,15 @@ type testHarvesterGroup struct { func getTestHarvesterGroup() *testHarvesterGroup { return &testHarvesterGroup{make([]string, 0)} } -func (t *testHarvesterGroup) Run(_ input.Context, s loginp.Source) error { +func (t *testHarvesterGroup) Start(_ input.Context, s loginp.Source) { t.encounteredNames = append(t.encounteredNames, s.Name()) +} + +func (t *testHarvesterGroup) Stop(_ loginp.Source) { + return +} + +func (t *testHarvesterGroup) StopGroup() error { return nil } @@ -180,11 +183,40 @@ func (m *mockFileWatcher) Event() loginp.FSEvent { m.nextIdx++ return evt } + func (m *mockFileWatcher) Run(_ unison.Canceler) { return } -func testStateStore() *statestore.Store { - s, _ := statestore.NewRegistry(storetest.NewMemoryStoreBackend()).Get(pluginName) - return s +func (m *mockFileWatcher) GetFiles() map[string]os.FileInfo { return nil } + +type mockMetadataUpdater struct { + table map[string]interface{} +} + +func newMockMetadataUpdater() *mockMetadataUpdater { + return &mockMetadataUpdater{ + table: make(map[string]interface{}), + } +} + +func (mu *mockMetadataUpdater) set(id string) { mu.table[id] = struct{}{} } + +func (mu *mockMetadataUpdater) has(id string) bool { + _, ok := mu.table[id] + return ok +} + +func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v interface{}) error { + return nil +} + +func (mu *mockMetadataUpdater) UpdateMetadata(s loginp.Source, v interface{}) error { + mu.table[s.Name()] = v + return nil +} + +func (mu *mockMetadataUpdater) Remove(s loginp.Source) error { + delete(mu.table, s.Name()) + return nil } func mustPathIdentifier() fileIdentifier {