From 3102cce5ac5ff3f72054d64e146bdfd15e074e3b Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 25 Apr 2024 18:15:02 +0100 Subject: [PATCH 1/4] fix(promtail): Fix bug with Promtail config reloading getting stuck indefinitely Signed-off-by: Paulin Todev --- .../pkg/promtail/targets/file/filetarget.go | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 0ade51902b49..df97117c8ae5 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -25,6 +25,8 @@ const ( FilenameLabel = "filename" ) +var errFileTargetStopped = errors.New("File target is stopped") + // Config describes behavior for Target type Config struct { SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"` @@ -223,6 +225,11 @@ func (t *FileTarget) run() { } case <-ticker.C: err := t.sync() + if errors.Is(err, errFileTargetStopped) { + // This file target has been stopped. + // This is normal and there is no need to log an error. + return + } if err != nil { level.Error(t.logger).Log("msg", "error running sync function", "error", err) } @@ -291,14 +298,20 @@ func (t *FileTarget) sync() error { t.watchesMutex.Lock() toStartWatching := missing(t.watches, dirs) t.watchesMutex.Unlock() - t.startWatching(toStartWatching) + err := t.startWatching(toStartWatching) + if errors.Is(err, errFileTargetStopped) { + return err + } // Remove any directories which no longer need watching. t.watchesMutex.Lock() toStopWatching := missing(dirs, t.watches) t.watchesMutex.Unlock() - t.stopWatching(toStopWatching) + err = t.stopWatching(toStopWatching) + if errors.Is(err, errFileTargetStopped) { + return err + } // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. t.watchesMutex.Lock() @@ -321,32 +334,44 @@ func (t *FileTarget) sync() error { return nil } -func (t *FileTarget) startWatching(dirs map[string]struct{}) { +func (t *FileTarget) startWatching(dirs map[string]struct{}) error { for dir := range dirs { if _, ok := t.getWatch(dir); ok { continue } level.Info(t.logger).Log("msg", "watching new directory", "directory", dir) - t.targetEventHandler <- fileTargetEvent{ + select { + case <-t.quit: + return errFileTargetStopped + case t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStart, + }: + // continue } } + return nil } -func (t *FileTarget) stopWatching(dirs map[string]struct{}) { +func (t *FileTarget) stopWatching(dirs map[string]struct{}) error { for dir := range dirs { if _, ok := t.getWatch(dir); !ok { continue } level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir) - t.targetEventHandler <- fileTargetEvent{ + select { + case <-t.quit: + return errFileTargetStopped + case t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStop, + }: + // continue } } + return nil } func (t *FileTarget) startTailing(ps []string) { From 0bcaabcd961b397cee1318dafa277a5f23566d12 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Fri, 26 Apr 2024 19:20:51 +0100 Subject: [PATCH 2/4] fix(promtail): Add a unit test Signed-off-by: Paulin Todev --- .../promtail/targets/file/filetarget_test.go | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 579ea19e2e56..fc2503dc8ec3 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -336,6 +336,90 @@ func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) { ps.Stop() } +// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send. +func TestFileTarget_StopAbruptly(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + dirName := newTestLogDirectories(t) + positionsFileName := filepath.Join(dirName, "positions.yml") + logDir1 := filepath.Join(dirName, "log1") + logDir2 := filepath.Join(dirName, "log2") + logDir3 := filepath.Join(dirName, "log3") + + logfile1 := filepath.Join(logDir1, "test1.log") + logfile2 := filepath.Join(logDir2, "test1.log") + logfile3 := filepath.Join(logDir3, "test1.log") + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Millisecond, + PositionsFile: positionsFileName, + }) + require.NoError(t, err) + + client := fake.New(func() {}) + defer client.Stop() + + // fakeHandler has to be a buffered channel so that we can call the len() function on it. + // We need to call len() to check if the channel is full. + fakeHandler := make(chan fileTargetEvent, 1) + pathToWatch := filepath.Join(dirName, "**", "*.log") + registry := prometheus.NewRegistry() + target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{ + SyncPeriod: 10 * time.Millisecond, + }, DefaultWatchConig, nil, fakeHandler, "", nil) + assert.NoError(t, err) + + // Create a directory, still nothing is watched. + err = os.MkdirAll(logDir1, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile1) + assert.NoError(t, err) + + // There should be only one WatchStart event in the channel so far. + ftEvent := <-fakeHandler + require.Equal(t, fileTargetEventWatchStart, ftEvent.eventType) + + requireEventually(t, func() bool { + return target.getReadersLen() == 1 + }, "expected 1 tailer to be created") + + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP promtail_files_active_total Number of active files. + # TYPE promtail_files_active_total gauge + promtail_files_active_total 1 + `), "promtail_files_active_total")) + + // Create two directories - one more than the buffer of fakeHandler, + // so that the file target hands until we call Stop(). + err = os.MkdirAll(logDir2, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile2) + assert.NoError(t, err) + + err = os.MkdirAll(logDir3, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile3) + assert.NoError(t, err) + + // Wait until the file target is waiting on a channel send due to a full channel buffer. + requireEventually(t, func() bool { + return len(fakeHandler) == 1 + }, "expected an event in the fakeHandler channel") + + // If FileHandler works well, then it will stop waiting for + // the blocked fakeHandler and stop cleanly. + // This is why this time we don't drain fakeHandler. + target.Stop() + ps.Stop() + + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP promtail_files_active_total Number of active files. + # TYPE promtail_files_active_total gauge + promtail_files_active_total 0 + `), "promtail_files_active_total")) +} + func TestFileTargetPathExclusion(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) From d4e30c290e5b75a1c768c6f207fdca9d35138d89 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 30 Apr 2024 18:40:55 +0100 Subject: [PATCH 3/4] Fail test if FileTarget doesn't stop within a few seconds. --- clients/pkg/promtail/targets/file/filetarget_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index fc2503dc8ec3..caf33395ba20 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -410,8 +410,11 @@ func TestFileTarget_StopAbruptly(t *testing.T) { // If FileHandler works well, then it will stop waiting for // the blocked fakeHandler and stop cleanly. // This is why this time we don't drain fakeHandler. - target.Stop() - ps.Stop() + requireEventually(t, func() bool { + target.Stop() + ps.Stop() + return true + }, "expected FileTarget not to hang") require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP promtail_files_active_total Number of active files. From eec3a967b608b9aee46c63b8032cc35a2b32b912 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 8 May 2024 18:12:30 +0100 Subject: [PATCH 4/4] Remove unnecessary "continue" statements. --- clients/pkg/promtail/targets/file/filetarget.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index df97117c8ae5..ffa168fde43d 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -348,7 +348,6 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) error { path: dir, eventType: fileTargetEventWatchStart, }: - // continue } } return nil @@ -368,7 +367,6 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) error { path: dir, eventType: fileTargetEventWatchStop, }: - // continue } } return nil