Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
file_input should not attempt to track lost files on windows (#366)
Browse files Browse the repository at this point in the history
* WIP - filelog receiver should not attempt to track lost files on windows

* Pull os-specific file handle management into separate struct
  • Loading branch information
djaglowski authored Feb 10, 2022
1 parent 105a41f commit 89d49a3
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 36 deletions.
1 change: 1 addition & 0 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
roller: newRoller(),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
Expand Down
43 changes: 7 additions & 36 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type InputOperator struct {

persister operator.Persister

knownFiles []*Reader
queuedMatches []string
maxBatchFiles int
lastPollReaders []*Reader
knownFiles []*Reader
queuedMatches []string
maxBatchFiles int
roller roller

startAtBeginning bool

Expand All @@ -70,6 +70,7 @@ func (f *InputOperator) Start(persister operator.Persister) error {
f.firstCheck = true

f.persister = persister

// Load offsets from disk
if err := f.loadLastPollFiles(ctx); err != nil {
return fmt.Errorf("read known files from database: %s", err)
Expand All @@ -85,9 +86,7 @@ func (f *InputOperator) Start(persister operator.Persister) error {
func (f *InputOperator) Stop() error {
f.cancel()
f.wg.Wait()
for _, reader := range f.lastPollReaders {
reader.Close()
}
f.roller.cleanup()
for _, reader := range f.knownFiles {
reader.Close()
}
Expand Down Expand Up @@ -148,45 +147,17 @@ func (f *InputOperator) poll(ctx context.Context) {
readers := f.makeReaders(matches)
f.firstCheck = false

// Detect files that have been rotated out of matching pattern
lostReaders := make([]*Reader, 0, len(f.lastPollReaders))
OUTER:
for _, oldReader := range f.lastPollReaders {
for _, reader := range readers {
if reader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
lostReaders = append(lostReaders, oldReader)
}

var wg sync.WaitGroup
for _, reader := range lostReaders {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
}

for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
}

// Wait until all the reader goroutines are finished
wg.Wait()

// Close all files
for _, reader := range f.lastPollReaders {
reader.Close()
}

f.lastPollReaders = readers

f.roller.roll(ctx, readers)
f.saveCurrent(readers)
f.syncLastPollFiles(ctx)
}
Expand Down
22 changes: 22 additions & 0 deletions operator/builtin/input/file/roller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import "context"

type roller interface {
roll(context.Context, []*Reader)
cleanup()
}
67 changes: 67 additions & 0 deletions operator/builtin/input/file/roller_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package file

import (
"context"
"sync"
)

type detectLostFiles struct {
oldReaders []*Reader
}

func newRoller() roller {
return &detectLostFiles{[]*Reader{}}
}

func (r *detectLostFiles) roll(ctx context.Context, readers []*Reader) {
// Detect files that have been rotated out of matching pattern
lostReaders := make([]*Reader, 0, len(r.oldReaders))
OUTER:
for _, oldReader := range r.oldReaders {
for _, reader := range readers {
if reader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
lostReaders = append(lostReaders, oldReader)
}

var lostWG sync.WaitGroup
for _, reader := range lostReaders {
lostWG.Add(1)
go func(r *Reader) {
defer lostWG.Done()
r.ReadToEnd(ctx)
}(reader)
}
lostWG.Wait()

for _, reader := range r.oldReaders {
reader.Close()
}

r.oldReaders = readers
}

func (r *detectLostFiles) cleanup() {
for _, reader := range r.oldReaders {
reader.Close()
}
}
36 changes: 36 additions & 0 deletions operator/builtin/input/file/roller_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build windows
// +build windows

package file

import "context"

type closeImmediately struct{}

func newRoller() roller {
return &closeImmediately{}
}

func (r *closeImmediately) roll(_ context.Context, readers []*Reader) {
for _, reader := range readers {
reader.Close()
}
}

func (r *closeImmediately) cleanup() {
return
}

0 comments on commit 89d49a3

Please sign in to comment.