Skip to content

Commit

Permalink
Fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
fishy committed May 23, 2024
1 parent 126d45b commit 66a43a8
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 277 deletions.
35 changes: 18 additions & 17 deletions filewatcher/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

v2 "github.com/reddit/baseplate.go/filewatcher/v2"
"github.com/reddit/baseplate.go/filewatcher/v2/fwtest"
"github.com/reddit/baseplate.go/log"
)

Expand Down Expand Up @@ -188,35 +189,35 @@ func New(ctx context.Context, cfg Config) (*Result, error) {
return &Result{result: result}, nil
}

// MockFileWatcher is an implementation of FileWatcher that does not actually read
// from a file, it simply returns the data given to it when it was initialized
// with NewMockFilewatcher. It provides an additional Update method that allows
// you to update this data after it has been created.
type MockFileWatcher struct {
fake *fwtest.FakeFileWatcher[any]
}

// NewMockFilewatcher returns a pointer to a new MockFileWatcher object
// initialized with the given io.Reader and Parser.
func NewMockFilewatcher(r io.Reader, parser Parser) (*MockFileWatcher, error) {
mock, err := v2.NewMockFilewatcher(r, parser)
fake, err := fwtest.NewFakeFilewatcher(r, parser)
if err != nil {
return nil, err
}
return &MockFileWatcher{mock: mock}, nil
}

// MockFileWatcher is an implementation of FileWatcher that does not actually read
// from a file, it simply returns the data given to it when it was initialized
// with NewMockFilewatcher. It provides an additional Update method that allows
// you to update this data after it has been created.
type MockFileWatcher struct {
mock *v2.MockFileWatcher[any]
return &MockFileWatcher{fake: fake}, nil
}

// Update updates the data of the MockFileWatcher using the given io.Reader and
// the Parser used to initialize the FileWatcher.
//
// This method is not threadsafe.
func (fw *MockFileWatcher) Update(r io.Reader) error {
return fw.mock.Update(r)
return fw.fake.Update(r)
}

// Get returns the parsed data.
func (fw *MockFileWatcher) Get() any {
return fw.mock.Get()
return fw.fake.Get()
}

// Stop is a no-op.
Expand All @@ -225,29 +226,29 @@ func (fw *MockFileWatcher) Stop() {}
// MockDirWatcher is an implementation of FileWatcher for testing with watching
// directories.
type MockDirWatcher struct {
mock *v2.MockDirWatcher[any]
fake *fwtest.FakeDirWatcher[any]
}

// NewMockDirWatcher creates a MockDirWatcher with the initial data and the
// given DirParser.
//
// It provides Update function to update the data after it's been created.
func NewMockDirWatcher(dir fs.FS, parser DirParser) (*MockDirWatcher, error) {
mock, err := v2.NewMockDirWatcher(dir, parser)
fake, err := fwtest.NewFakeDirWatcher(dir, parser)
if err != nil {
return nil, err
}
return &MockDirWatcher{mock: mock}, nil
return &MockDirWatcher{fake: fake}, nil
}

// Update updates the data stored in this MockDirWatcher.
func (dw *MockDirWatcher) Update(dir fs.FS) error {
return dw.mock.Update(dir)
return dw.fake.Update(dir)
}

// Get implements FileWatcher by returning the last updated data.
func (dw *MockDirWatcher) Get() any {
return dw.mock.Get()
return dw.fake.Get()
}

// Stop is a no-op.
Expand Down
215 changes: 44 additions & 171 deletions filewatcher/v2/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ func WrapDirParser[T any](dp DirParser[T]) Parser[T] {

// Result is the return type of New. Use Get function to get the actual data.
type Result[T any] struct {
data atomic.Value
data atomic.Pointer[dataAndMtime[T]]

ctx context.Context
cancel context.CancelFunc
}

// Get returns the latest parsed data from the FileWatcher.
func (r *Result[T]) Get() T {
return r.data.Load().(*atomicData[T]).data
return r.data.Load().data
}

// Close stops the FileWatcher.
Expand Down Expand Up @@ -131,20 +131,20 @@ func (r *Result[T]) watcherLoop(
d, mtime, files, err := openAndParse(path, parser, softLimit, hardLimit)
if err != nil {
slog.ErrorContext(r.ctx, "filewatcher: openAndParse returned error", "err", err)
} else {
r.data.Store(&atomicData[T]{
data: d,
mtime: mtime,
})
// remove all previously watched files
for _, path := range watcher.WatchList() {
watcher.Remove(path)
}
// then read all new files to watch
for _, path := range files {
if err := watcher.Add(path); err != nil {
slog.ErrorContext(r.ctx, "filewatcher: failed to watch file", "err", err, "path", path)
}
return
}
r.data.Store(&dataAndMtime[T]{
data: d,
mtime: mtime,
})
// remove all previously watched files
for _, path := range watcher.WatchList() {
watcher.Remove(path)
}
// then read all new files to watch
for _, path := range files {
if err := watcher.Add(path); err != nil {
slog.ErrorContext(r.ctx, "filewatcher: failed to watch file", "err", err, "path", path)
}
}
}
Expand All @@ -155,7 +155,7 @@ func (r *Result[T]) watcherLoop(
slog.ErrorContext(r.ctx, "filewatcher: failed to get mtime for file", "err", err, "path", path)
return
}
if r.data.Load().(*atomicData[T]).mtime.Before(mtime) {
if r.data.Load().mtime.Before(mtime) {
forceReload()
}
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func (r *Result[T]) watcherLoop(
}
}

// The actual data held in Result.data.
type atomicData[T any] struct {
// The actual data and mtime held in Result.data.
type dataAndMtime[T any] struct {
// actual parsed data
data T

Expand All @@ -226,55 +226,6 @@ var (
_ FileWatcher[any] = (*Result[any])(nil)
)

// Config defines the config to be used in New function.
//
// Can be deserialized from YAML.
type Config struct {
// The path to the file to be watched, required.
Path string `yaml:"path"`

// Optional. When <=0 DefaultMaxFileSize will be used instead.
//
// This is the soft limit,
// we will also auto add a hard limit which is 10x (see HardLimitMultiplier)
// of soft limit.
//
// If the soft limit is violated,
// the violation will be reported via log.DefaultWrapper and prometheus
// counter of limitopen_softlimit_violation_total,
// but it does not stop the normal parsing process.
//
// If the hard limit is violated,
// The loading of the file will fail immediately.
MaxFileSize int64 `yaml:"maxFileSize"`

// Optional, the interval to check file changes proactively.
//
// Default to DefaultPollingInterval.
// To disable polling completely, set it to a negative value.
//
// Without polling filewatcher relies solely on fs events from the parent
// directory. This works for most cases but will not work in the cases that
// the parent directory will be remount upon change
// (for example, k8s ConfigMap).
PollingInterval time.Duration `yaml:"pollingInterval"`

// Optional, the delay between receiving the fs events and actually reading
// and parsing the changes.
//
// It's used to avoid short bursts of fs events (for example, when watching a
// directory) causing reading and parsing repetively.
//
// Defaut to DefaultFSEventsDelay.
FSEventsDelay time.Duration `yaml:"fsEventsDelay"`

// Optional, the interval to keep retrying to open the file when creating a
// new FileWatcher, when the file was not initially available.
//
// Default to DefaultInitialReadInterval.
InitialReadInterval time.Duration `yaml:"initialReadInterval"`
}

type fakeDirectoryReader string

func (fakeDirectoryReader) Read([]byte) (int, error) {
Expand Down Expand Up @@ -335,6 +286,15 @@ type opts struct {
// Option used in New.
type Option func(*opts)

// WithOptions is a sugar to curry zero or more options.
func WithOptions(options ...Option) Option {
return func(o *opts) {
for _, opt := range options {
opt(o)
}
}
}

// WithFSEventsDelay sets the delay between receiving the fs events and actually
// reading and parsing the changes.
//
Expand All @@ -353,7 +313,7 @@ func WithFSEventsDelay(delay time.Duration) Option {
// Default to DefaultPollingInterval.
// To disable polling completely, set it to a negative value.
//
// Without polling filewatcher relies solely on fs events from the parent
// Without polling, filewatcher relies solely on fs events from the parent
// directory. This works for most cases but will not work in the cases that
// the parent directory will be remount upon change
// (for example, k8s ConfigMap).
Expand Down Expand Up @@ -393,27 +353,26 @@ func WithFileSizeLimit(limit int64) Option {
}
}

func defaultOptions() Option {
return WithOptions(
WithFSEventsDelay(DefaultFSEventsDelay),
WithPollingInterval(DefaultPollingInterval),
WithInitialReadInterval(DefaultInitialReadInterval),
WithFileSizeLimit(DefaultMaxFileSize),
)
}

// New creates a new FileWatcher.
//
// If the path is not available at the time of calling,
// it blocks until the file becomes available, or context is cancelled,
// whichever comes first.
//
// When logger is non-nil, it will be used to log errors,
// either returned by parser or by the underlying file system watcher.
// Please note that this does not include errors returned by the first parser
// call, which will be returned directly.
func New[T any](ctx context.Context, path string, parser Parser[T], options ...Option) (*Result[T], error) {
opt := opts{
fsEventsDelay: DefaultFSEventsDelay,
pollingInterval: DefaultPollingInterval,
initialReadInterval: DefaultInitialReadInterval,

fileSizeLimit: DefaultMaxFileSize,
}
for _, o := range options {
o(&opt)
}
var opt opts
WithOptions(
defaultOptions(),
WithOptions(options...),
)(&opt)
hardLimit := opt.fileSizeLimit * HardLimitMultiplier

var data T
Expand Down Expand Up @@ -461,7 +420,7 @@ func New[T any](ctx context.Context, path string, parser Parser[T], options ...O
}

res := new(Result[T])
res.data.Store(&atomicData[T]{
res.data.Store(&dataAndMtime[T]{
data: data,
mtime: mtime,
})
Expand All @@ -479,89 +438,3 @@ func New[T any](ctx context.Context, path string, parser Parser[T], options ...O

return res, nil
}

// NewMockFilewatcher returns a pointer to a new MockFileWatcher object
// initialized with the given io.Reader and Parser.
func NewMockFilewatcher[T any](r io.Reader, parser Parser[T]) (*MockFileWatcher[T], error) {
fw := &MockFileWatcher[T]{parser: parser}
if err := fw.Update(r); err != nil {
return nil, err
}
return fw, nil
}

// MockFileWatcher is an implementation of FileWatcher that does not actually read
// from a file, it simply returns the data given to it when it was initialized
// with NewMockFilewatcher. It provides an additional Update method that allows
// you to update this data after it has been created.
type MockFileWatcher[T any] struct {
data atomic.Value
parser Parser[T]
}

// Update updates the data of the MockFileWatcher using the given io.Reader and
// the Parser used to initialize the FileWatcher.
//
// This method is not threadsafe.
func (fw *MockFileWatcher[T]) Update(r io.Reader) error {
data, err := fw.parser(r)
if err != nil {
return err
}
fw.data.Store(data)
return nil
}

// Get returns the parsed data.
func (fw *MockFileWatcher[T]) Get() T {
return fw.data.Load().(T)
}

// Close is a no-op.
func (fw *MockFileWatcher[T]) Close() error {
return nil
}

// MockDirWatcher is an implementation of FileWatcher for testing with watching
// directories.
type MockDirWatcher[T any] struct {
data atomic.Value
parser DirParser[T]
}

// NewMockDirWatcher creates a MockDirWatcher with the initial data and the
// given DirParser.
//
// It provides Update function to update the data after it's been created.
func NewMockDirWatcher[T any](dir fs.FS, parser DirParser[T]) (*MockDirWatcher[T], error) {
dw := &MockDirWatcher[T]{parser: parser}
if err := dw.Update(dir); err != nil {
return nil, err
}
return dw, nil
}

// Update updates the data stored in this MockDirWatcher.
func (dw *MockDirWatcher[T]) Update(dir fs.FS) error {
data, err := dw.parser(dir)
if err != nil {
return err
}
dw.data.Store(data)
return nil
}

// Get implements FileWatcher by returning the last updated data.
func (dw *MockDirWatcher[T]) Get() T {
return dw.data.Load().(T)
}

// Close is a no-op.
func (dw *MockDirWatcher[T]) Close() error {
return nil
}

var (
_ FileWatcher[any] = (*MockFileWatcher[any])(nil)
_ FileWatcher[any] = (*MockDirWatcher[any])(nil)
)
Loading

0 comments on commit 66a43a8

Please sign in to comment.