diff --git a/.chloggen/fileconsumer-fix-windows.yaml b/.chloggen/fileconsumer-fix-windows.yaml new file mode 100755 index 000000000000..8d0ea6fddff7 --- /dev/null +++ b/.chloggen/fileconsumer-fix-windows.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where files were unnecessarily kept open on Windows + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29149] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index bbb072db9078..cc458513469d 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -80,6 +80,7 @@ func (m *Manager) closePreviousFiles() { for _, r := range m.previousPollFiles { m.knownFiles = append(m.knownFiles, r.Close()) } + m.previousPollFiles = nil } // Stop will stop the file monitoring process @@ -159,11 +160,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files", zap.Strings("paths", paths)) readers := m.makeReaders(paths) - // take care of files which disappeared from the pattern since the last poll cycle - // this can mean either files which were removed, or rotated into a name not matching the pattern - // we do this before reading existing files to ensure we emit older log lines before newer ones - m.readLostFiles(ctx, readers) - m.closePreviousFiles() + m.preConsume(ctx, readers) // read new readers to end var wg sync.WaitGroup @@ -176,7 +173,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - m.previousPollFiles = readers + m.postConsume(readers) } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index d613433bd940..0c2374b9aa2c 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -13,8 +13,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { - // Detect files that have been rotated out of matching pattern +// Take care of files which disappeared from the pattern since the last poll cycle +// this can mean either files which were removed, or rotated into a name not matching the pattern +// we do this before reading existing files to ensure we emit older log lines before newer ones +func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) { lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles)) OUTER: for _, oldReader := range m.previousPollFiles { @@ -48,3 +50,10 @@ OUTER: } lostWG.Wait() } + +// On non-windows platforms, we keep files open between poll cycles so that we can detect +// and read "lost" files, which have been moved out of the matching pattern. +func (m *Manager) postConsume(readers []*reader.Reader) { + m.closePreviousFiles() + m.previousPollFiles = readers +} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index cc34a6bcddca..cbd0c1cd797c 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1071,6 +1071,7 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) { operator, emitChan := buildTestManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() + operator.movingAverageMatches = 10 temps := make([]*os.File, 0, initFiles+moreFiles) for i := 0; i < initFiles; i++ { @@ -1662,7 +1663,10 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { waitForToken(t, emitCalls, []byte(content)) expectNoTokens(t, emitCalls) operator.wg.Wait() - require.Len(t, operator.previousPollFiles, 1) + if runtime.GOOS != "windows" { + // On windows, we never keep files in previousPollFiles, so we don't expect to see them here + require.Len(t, operator.previousPollFiles, 1) + } // keep append data to file1 and file2 newContent := "bbbbbbbbbbbb" @@ -1675,3 +1679,22 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { waitForTokens(t, emitCalls, []byte(content), []byte(newContent1), []byte(newContent)) operator.wg.Wait() } + +func TestWindowsFilesClosedImmediately(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + operator, emitCalls := buildTestManager(t, cfg) + + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog\n") + require.NoError(t, temp.Close()) + + operator.poll(context.Background()) + waitForToken(t, emitCalls, []byte("testlog")) + + // On Windows, poll should close the file after reading it. We can test this by trying to move it. + require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed")) +} diff --git a/pkg/stanza/fileconsumer/file_windows.go b/pkg/stanza/fileconsumer/file_windows.go index eb79821c3569..e67c61ab2e2a 100644 --- a/pkg/stanza/fileconsumer/file_windows.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -12,6 +12,12 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { +func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) { return } + +// On windows, we close files immediately after reading becauase they cannot be moved while open. +func (m *Manager) postConsume(readers []*reader.Reader) { + m.previousPollFiles = readers + m.closePreviousFiles() +}