From 66a43a8453b6e6902b713d27e8416d3722bbd852 Mon Sep 17 00:00:00 2001 From: Yuxuan 'fishy' Wang Date: Thu, 23 May 2024 10:19:24 -0700 Subject: [PATCH] Fixup --- filewatcher/filewatcher.go | 35 ++--- filewatcher/v2/filewatcher.go | 215 ++++++----------------------- filewatcher/v2/filewatcher_test.go | 90 +----------- filewatcher/v2/fwtest/doc.go | 2 + filewatcher/v2/fwtest/fake.go | 95 +++++++++++++ filewatcher/v2/fwtest/fake_test.go | 98 +++++++++++++ 6 files changed, 258 insertions(+), 277 deletions(-) create mode 100644 filewatcher/v2/fwtest/doc.go create mode 100644 filewatcher/v2/fwtest/fake.go create mode 100644 filewatcher/v2/fwtest/fake_test.go diff --git a/filewatcher/filewatcher.go b/filewatcher/filewatcher.go index 2f471542b..75cae4658 100644 --- a/filewatcher/filewatcher.go +++ b/filewatcher/filewatcher.go @@ -7,6 +7,7 @@ import ( "time" v2 "github.com/reddit/baseplate.go/filewatcher/v2" + "github.com/reddit/baseplate.go/filewatcher/v2/fwtest" "github.com/reddit/baseplate.go/log" ) @@ -188,22 +189,22 @@ func New(ctx context.Context, cfg Config) (*Result, error) { return &Result{result: result}, nil } +// MockFileWatcher is an implementation of FileWatcher that does not actually read +// from a file, it simply returns the data given to it when it was initialized +// with NewMockFilewatcher. It provides an additional Update method that allows +// you to update this data after it has been created. +type MockFileWatcher struct { + fake *fwtest.FakeFileWatcher[any] +} + // NewMockFilewatcher returns a pointer to a new MockFileWatcher object // initialized with the given io.Reader and Parser. func NewMockFilewatcher(r io.Reader, parser Parser) (*MockFileWatcher, error) { - mock, err := v2.NewMockFilewatcher(r, parser) + fake, err := fwtest.NewFakeFilewatcher(r, parser) if err != nil { return nil, err } - return &MockFileWatcher{mock: mock}, nil -} - -// MockFileWatcher is an implementation of FileWatcher that does not actually read -// from a file, it simply returns the data given to it when it was initialized -// with NewMockFilewatcher. It provides an additional Update method that allows -// you to update this data after it has been created. -type MockFileWatcher struct { - mock *v2.MockFileWatcher[any] + return &MockFileWatcher{fake: fake}, nil } // Update updates the data of the MockFileWatcher using the given io.Reader and @@ -211,12 +212,12 @@ type MockFileWatcher struct { // // This method is not threadsafe. func (fw *MockFileWatcher) Update(r io.Reader) error { - return fw.mock.Update(r) + return fw.fake.Update(r) } // Get returns the parsed data. func (fw *MockFileWatcher) Get() any { - return fw.mock.Get() + return fw.fake.Get() } // Stop is a no-op. @@ -225,7 +226,7 @@ func (fw *MockFileWatcher) Stop() {} // MockDirWatcher is an implementation of FileWatcher for testing with watching // directories. type MockDirWatcher struct { - mock *v2.MockDirWatcher[any] + fake *fwtest.FakeDirWatcher[any] } // NewMockDirWatcher creates a MockDirWatcher with the initial data and the @@ -233,21 +234,21 @@ type MockDirWatcher struct { // // It provides Update function to update the data after it's been created. func NewMockDirWatcher(dir fs.FS, parser DirParser) (*MockDirWatcher, error) { - mock, err := v2.NewMockDirWatcher(dir, parser) + fake, err := fwtest.NewFakeDirWatcher(dir, parser) if err != nil { return nil, err } - return &MockDirWatcher{mock: mock}, nil + return &MockDirWatcher{fake: fake}, nil } // Update updates the data stored in this MockDirWatcher. func (dw *MockDirWatcher) Update(dir fs.FS) error { - return dw.mock.Update(dir) + return dw.fake.Update(dir) } // Get implements FileWatcher by returning the last updated data. func (dw *MockDirWatcher) Get() any { - return dw.mock.Get() + return dw.fake.Get() } // Stop is a no-op. diff --git a/filewatcher/v2/filewatcher.go b/filewatcher/v2/filewatcher.go index 6c191803d..0e48d4ded 100644 --- a/filewatcher/v2/filewatcher.go +++ b/filewatcher/v2/filewatcher.go @@ -81,7 +81,7 @@ func WrapDirParser[T any](dp DirParser[T]) Parser[T] { // Result is the return type of New. Use Get function to get the actual data. type Result[T any] struct { - data atomic.Value + data atomic.Pointer[dataAndMtime[T]] ctx context.Context cancel context.CancelFunc @@ -89,7 +89,7 @@ type Result[T any] struct { // Get returns the latest parsed data from the FileWatcher. func (r *Result[T]) Get() T { - return r.data.Load().(*atomicData[T]).data + return r.data.Load().data } // Close stops the FileWatcher. @@ -131,20 +131,20 @@ func (r *Result[T]) watcherLoop( d, mtime, files, err := openAndParse(path, parser, softLimit, hardLimit) if err != nil { slog.ErrorContext(r.ctx, "filewatcher: openAndParse returned error", "err", err) - } else { - r.data.Store(&atomicData[T]{ - data: d, - mtime: mtime, - }) - // remove all previously watched files - for _, path := range watcher.WatchList() { - watcher.Remove(path) - } - // then read all new files to watch - for _, path := range files { - if err := watcher.Add(path); err != nil { - slog.ErrorContext(r.ctx, "filewatcher: failed to watch file", "err", err, "path", path) - } + return + } + r.data.Store(&dataAndMtime[T]{ + data: d, + mtime: mtime, + }) + // remove all previously watched files + for _, path := range watcher.WatchList() { + watcher.Remove(path) + } + // then read all new files to watch + for _, path := range files { + if err := watcher.Add(path); err != nil { + slog.ErrorContext(r.ctx, "filewatcher: failed to watch file", "err", err, "path", path) } } } @@ -155,7 +155,7 @@ func (r *Result[T]) watcherLoop( slog.ErrorContext(r.ctx, "filewatcher: failed to get mtime for file", "err", err, "path", path) return } - if r.data.Load().(*atomicData[T]).mtime.Before(mtime) { + if r.data.Load().mtime.Before(mtime) { forceReload() } } @@ -213,8 +213,8 @@ func (r *Result[T]) watcherLoop( } } -// The actual data held in Result.data. -type atomicData[T any] struct { +// The actual data and mtime held in Result.data. +type dataAndMtime[T any] struct { // actual parsed data data T @@ -226,55 +226,6 @@ var ( _ FileWatcher[any] = (*Result[any])(nil) ) -// Config defines the config to be used in New function. -// -// Can be deserialized from YAML. -type Config struct { - // The path to the file to be watched, required. - Path string `yaml:"path"` - - // Optional. When <=0 DefaultMaxFileSize will be used instead. - // - // This is the soft limit, - // we will also auto add a hard limit which is 10x (see HardLimitMultiplier) - // of soft limit. - // - // If the soft limit is violated, - // the violation will be reported via log.DefaultWrapper and prometheus - // counter of limitopen_softlimit_violation_total, - // but it does not stop the normal parsing process. - // - // If the hard limit is violated, - // The loading of the file will fail immediately. - MaxFileSize int64 `yaml:"maxFileSize"` - - // Optional, the interval to check file changes proactively. - // - // Default to DefaultPollingInterval. - // To disable polling completely, set it to a negative value. - // - // Without polling filewatcher relies solely on fs events from the parent - // directory. This works for most cases but will not work in the cases that - // the parent directory will be remount upon change - // (for example, k8s ConfigMap). - PollingInterval time.Duration `yaml:"pollingInterval"` - - // Optional, the delay between receiving the fs events and actually reading - // and parsing the changes. - // - // It's used to avoid short bursts of fs events (for example, when watching a - // directory) causing reading and parsing repetively. - // - // Defaut to DefaultFSEventsDelay. - FSEventsDelay time.Duration `yaml:"fsEventsDelay"` - - // Optional, the interval to keep retrying to open the file when creating a - // new FileWatcher, when the file was not initially available. - // - // Default to DefaultInitialReadInterval. - InitialReadInterval time.Duration `yaml:"initialReadInterval"` -} - type fakeDirectoryReader string func (fakeDirectoryReader) Read([]byte) (int, error) { @@ -335,6 +286,15 @@ type opts struct { // Option used in New. type Option func(*opts) +// WithOptions is a sugar to curry zero or more options. +func WithOptions(options ...Option) Option { + return func(o *opts) { + for _, opt := range options { + opt(o) + } + } +} + // WithFSEventsDelay sets the delay between receiving the fs events and actually // reading and parsing the changes. // @@ -353,7 +313,7 @@ func WithFSEventsDelay(delay time.Duration) Option { // Default to DefaultPollingInterval. // To disable polling completely, set it to a negative value. // -// Without polling filewatcher relies solely on fs events from the parent +// Without polling, filewatcher relies solely on fs events from the parent // directory. This works for most cases but will not work in the cases that // the parent directory will be remount upon change // (for example, k8s ConfigMap). @@ -393,27 +353,26 @@ func WithFileSizeLimit(limit int64) Option { } } +func defaultOptions() Option { + return WithOptions( + WithFSEventsDelay(DefaultFSEventsDelay), + WithPollingInterval(DefaultPollingInterval), + WithInitialReadInterval(DefaultInitialReadInterval), + WithFileSizeLimit(DefaultMaxFileSize), + ) +} + // New creates a new FileWatcher. // // If the path is not available at the time of calling, // it blocks until the file becomes available, or context is cancelled, // whichever comes first. -// -// When logger is non-nil, it will be used to log errors, -// either returned by parser or by the underlying file system watcher. -// Please note that this does not include errors returned by the first parser -// call, which will be returned directly. func New[T any](ctx context.Context, path string, parser Parser[T], options ...Option) (*Result[T], error) { - opt := opts{ - fsEventsDelay: DefaultFSEventsDelay, - pollingInterval: DefaultPollingInterval, - initialReadInterval: DefaultInitialReadInterval, - - fileSizeLimit: DefaultMaxFileSize, - } - for _, o := range options { - o(&opt) - } + var opt opts + WithOptions( + defaultOptions(), + WithOptions(options...), + )(&opt) hardLimit := opt.fileSizeLimit * HardLimitMultiplier var data T @@ -461,7 +420,7 @@ func New[T any](ctx context.Context, path string, parser Parser[T], options ...O } res := new(Result[T]) - res.data.Store(&atomicData[T]{ + res.data.Store(&dataAndMtime[T]{ data: data, mtime: mtime, }) @@ -479,89 +438,3 @@ func New[T any](ctx context.Context, path string, parser Parser[T], options ...O return res, nil } - -// NewMockFilewatcher returns a pointer to a new MockFileWatcher object -// initialized with the given io.Reader and Parser. -func NewMockFilewatcher[T any](r io.Reader, parser Parser[T]) (*MockFileWatcher[T], error) { - fw := &MockFileWatcher[T]{parser: parser} - if err := fw.Update(r); err != nil { - return nil, err - } - return fw, nil -} - -// MockFileWatcher is an implementation of FileWatcher that does not actually read -// from a file, it simply returns the data given to it when it was initialized -// with NewMockFilewatcher. It provides an additional Update method that allows -// you to update this data after it has been created. -type MockFileWatcher[T any] struct { - data atomic.Value - parser Parser[T] -} - -// Update updates the data of the MockFileWatcher using the given io.Reader and -// the Parser used to initialize the FileWatcher. -// -// This method is not threadsafe. -func (fw *MockFileWatcher[T]) Update(r io.Reader) error { - data, err := fw.parser(r) - if err != nil { - return err - } - fw.data.Store(data) - return nil -} - -// Get returns the parsed data. -func (fw *MockFileWatcher[T]) Get() T { - return fw.data.Load().(T) -} - -// Close is a no-op. -func (fw *MockFileWatcher[T]) Close() error { - return nil -} - -// MockDirWatcher is an implementation of FileWatcher for testing with watching -// directories. -type MockDirWatcher[T any] struct { - data atomic.Value - parser DirParser[T] -} - -// NewMockDirWatcher creates a MockDirWatcher with the initial data and the -// given DirParser. -// -// It provides Update function to update the data after it's been created. -func NewMockDirWatcher[T any](dir fs.FS, parser DirParser[T]) (*MockDirWatcher[T], error) { - dw := &MockDirWatcher[T]{parser: parser} - if err := dw.Update(dir); err != nil { - return nil, err - } - return dw, nil -} - -// Update updates the data stored in this MockDirWatcher. -func (dw *MockDirWatcher[T]) Update(dir fs.FS) error { - data, err := dw.parser(dir) - if err != nil { - return err - } - dw.data.Store(data) - return nil -} - -// Get implements FileWatcher by returning the last updated data. -func (dw *MockDirWatcher[T]) Get() T { - return dw.data.Load().(T) -} - -// Close is a no-op. -func (dw *MockDirWatcher[T]) Close() error { - return nil -} - -var ( - _ FileWatcher[any] = (*MockFileWatcher[any])(nil) - _ FileWatcher[any] = (*MockDirWatcher[any])(nil) -) diff --git a/filewatcher/v2/filewatcher_test.go b/filewatcher/v2/filewatcher_test.go index 2b036e69e..03a31c59a 100644 --- a/filewatcher/v2/filewatcher_test.go +++ b/filewatcher/v2/filewatcher_test.go @@ -10,7 +10,6 @@ import ( "log/slog" "os" "path/filepath" - "strings" "sync/atomic" "testing" "time" @@ -365,7 +364,7 @@ func TestFileWatcherDir(t *testing.T) { m := make(map[string]string) if err := fs.WalkDir(dir, ".", func(path string, de fs.DirEntry, err error) error { if err != nil { - return nil + return nil // skip to the next file } if de.IsDir() { return nil @@ -523,90 +522,3 @@ func TestParserSizeLimit(t *testing.T) { t.Errorf("Expected log.Wrapper to be called at least %d times, actual %d", expectedCalledMin, called) } } - -func TestMockFileWatcher(t *testing.T) { - t.Parallel() - - const ( - foo = "foo" - bar = "bar" - ) - - r := strings.NewReader(foo) - fw, err := filewatcher.NewMockFilewatcher(r, func(r io.Reader) (string, error) { - var buf bytes.Buffer - _, err := io.Copy(&buf, r) - if err != nil { - return "", err - } - return buf.String(), nil - }) - if err != nil { - t.Fatal(err) - } - - t.Run( - "get", - func(t *testing.T) { - data := fw.Get() - if strings.Compare(data, foo) != 0 { - t.Fatalf("%q does not match %q", data, foo) - } - }, - ) - - t.Run( - "update", - func(t *testing.T) { - if err := fw.Update(strings.NewReader(bar)); err != nil { - t.Fatal(err) - } - - data := fw.Get() - if strings.Compare(data, bar) != 0 { - t.Fatalf("%q does not match %q", data, foo) - } - }, - ) - - t.Run( - "errors", - func(t *testing.T) { - t.Run( - "NewMockFilewatcher", - func(t *testing.T) { - if _, err := filewatcher.NewMockFilewatcher(r, func(r io.Reader) (interface{}, error) { - return "", errors.New("test") - }); err == nil { - t.Fatal("expected an error, got nil") - } - }, - ) - - t.Run( - "update", - func(t *testing.T) { - fw, err := filewatcher.NewMockFilewatcher(r, func(r io.Reader) (interface{}, error) { - var buf bytes.Buffer - _, err := io.Copy(&buf, r) - if err != nil { - return "", err - } - data := buf.String() - if strings.Compare(data, bar) == 0 { - return "", errors.New("test") - } - return data, nil - }) - if err != nil { - t.Fatal(err) - } - - if err := fw.Update(strings.NewReader(bar)); err == nil { - t.Fatal("expected an error, got nil") - } - }, - ) - }, - ) -} diff --git a/filewatcher/v2/fwtest/doc.go b/filewatcher/v2/fwtest/doc.go new file mode 100644 index 000000000..53d567a65 --- /dev/null +++ b/filewatcher/v2/fwtest/doc.go @@ -0,0 +1,2 @@ +// Package fwtest provides test helpers for filewatcher v2. +package fwtest diff --git a/filewatcher/v2/fwtest/fake.go b/filewatcher/v2/fwtest/fake.go new file mode 100644 index 000000000..c3276e7b9 --- /dev/null +++ b/filewatcher/v2/fwtest/fake.go @@ -0,0 +1,95 @@ +package fwtest + +import ( + "io" + "io/fs" + "sync/atomic" + + "github.com/reddit/baseplate.go/filewatcher/v2" +) + +// FakeFileWatcher is an implementation of FileWatcher that does not actually +// read from a file, it simply returns the data given to it when it was +// initialized with NewFakeFilewatcher. It provides an additional Update method +// that allows you to update this data after it has been created. +type FakeFileWatcher[T any] struct { + data atomic.Value // type: T + parser filewatcher.Parser[T] +} + +// NewFakeFilewatcher returns a pointer to a new FakeFileWatcher object +// initialized with the given io.Reader and Parser. +func NewFakeFilewatcher[T any](r io.Reader, parser filewatcher.Parser[T]) (*FakeFileWatcher[T], error) { + fw := &FakeFileWatcher[T]{parser: parser} + if err := fw.Update(r); err != nil { + return nil, err + } + return fw, nil +} + +// Update updates the data of the FakeFileWatcher using the given io.Reader and +// the Parser used to initialize the FileWatcher. +// +// This method is not threadsafe. +func (fw *FakeFileWatcher[T]) Update(r io.Reader) error { + data, err := fw.parser(r) + if err != nil { + return err + } + fw.data.Store(data) + return nil +} + +// Get returns the parsed data. +func (fw *FakeFileWatcher[T]) Get() T { + return fw.data.Load().(T) +} + +// Close is a no-op. +func (fw *FakeFileWatcher[T]) Close() error { + return nil +} + +// FakeDirWatcher is an implementation of FileWatcher for testing with watching +// directories. +type FakeDirWatcher[T any] struct { + data atomic.Value // type: T + parser filewatcher.DirParser[T] +} + +// NewFakeDirWatcher creates a FakeDirWatcher with the initial data and the +// given DirParser. +// +// It provides Update function to update the data after it's been created. +func NewFakeDirWatcher[T any](dir fs.FS, parser filewatcher.DirParser[T]) (*FakeDirWatcher[T], error) { + dw := &FakeDirWatcher[T]{parser: parser} + if err := dw.Update(dir); err != nil { + return nil, err + } + return dw, nil +} + +// Update updates the data stored in this FakeDirWatcher. +func (dw *FakeDirWatcher[T]) Update(dir fs.FS) error { + data, err := dw.parser(dir) + if err != nil { + return err + } + dw.data.Store(data) + return nil +} + +// Get implements FileWatcher by returning the last updated data. +func (dw *FakeDirWatcher[T]) Get() T { + return dw.data.Load().(T) +} + +// Close is a no-op. +func (dw *FakeDirWatcher[T]) Close() error { + return nil +} + +var ( + _ filewatcher.FileWatcher[any] = (*FakeFileWatcher[any])(nil) + _ filewatcher.FileWatcher[any] = (*FakeDirWatcher[any])(nil) +) diff --git a/filewatcher/v2/fwtest/fake_test.go b/filewatcher/v2/fwtest/fake_test.go new file mode 100644 index 000000000..b2822976c --- /dev/null +++ b/filewatcher/v2/fwtest/fake_test.go @@ -0,0 +1,98 @@ +package fwtest_test + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" + + "github.com/reddit/baseplate.go/filewatcher/v2/fwtest" +) + +func TestFakeFileWatcher(t *testing.T) { + t.Parallel() + + const ( + foo = "foo" + bar = "bar" + ) + + r := strings.NewReader(foo) + fw, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + var buf bytes.Buffer + _, err := io.Copy(&buf, r) + if err != nil { + return "", err + } + return buf.String(), nil + }) + if err != nil { + t.Fatal(err) + } + + t.Run( + "get", + func(t *testing.T) { + data := fw.Get() + if strings.Compare(data, foo) != 0 { + t.Fatalf("%q does not match %q", data, foo) + } + }, + ) + + t.Run( + "update", + func(t *testing.T) { + if err := fw.Update(strings.NewReader(bar)); err != nil { + t.Fatal(err) + } + + data := fw.Get() + if strings.Compare(data, bar) != 0 { + t.Fatalf("%q does not match %q", data, foo) + } + }, + ) + + t.Run( + "errors", + func(t *testing.T) { + t.Run( + "NewFakeFilewatcher", + func(t *testing.T) { + if _, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + return "", errors.New("test") + }); err == nil { + t.Fatal("expected an error, got nil") + } + }, + ) + + t.Run( + "update", + func(t *testing.T) { + fw, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + var buf bytes.Buffer + _, err := io.Copy(&buf, r) + if err != nil { + return "", err + } + data := buf.String() + if strings.Compare(data, bar) == 0 { + return "", errors.New("test") + } + return data, nil + }) + if err != nil { + t.Fatal(err) + } + + if err := fw.Update(strings.NewReader(bar)); err == nil { + t.Fatal("expected an error, got nil") + } + }, + ) + }, + ) +}