From 38a67d92a82f61a2c3b3e3241e3649869be3482a Mon Sep 17 00:00:00 2001 From: Shammah Chancellor Date: Thu, 11 Apr 2019 13:13:57 -0700 Subject: [PATCH 1/2] [reloader] Add support for multiple config files Currently the config file expects there to only be one config file watched. However, the nats config file can include other paths, and we may also want to watch those. This commit implements being able to specify an arbtirary number of configuration files to be watched. --- cmd/reloader/main.go | 29 ++++++++++-- pkg/reloader/reloader.go | 87 ++++++++++++++++++++++++---------- test/reloader/reloader_test.go | 63 +++++++++++++++--------- 3 files changed, 127 insertions(+), 52 deletions(-) diff --git a/cmd/reloader/main.go b/cmd/reloader/main.go index d67d58cb..f4e89288 100644 --- a/cmd/reloader/main.go +++ b/cmd/reloader/main.go @@ -7,12 +7,26 @@ import ( "log" "os" "os/signal" + "strings" "syscall" - "github.com/nats-io/nats-operator/pkg/reloader" + natsreloader "github.com/nats-io/nats-operator/pkg/reloader" "github.com/nats-io/nats-operator/version" ) +// StringSet is a wrapper for []string to allow using it with the flags package. +type StringSet []string + +func (s *StringSet) String() string { + return strings.Join([]string(*s), ", ") +} + +// Set appends the value provided to the list of strings. +func (s *StringSet) Set(val string) error { + *s = append(*s, val) + return nil +} + func main() { fs := flag.NewFlagSet("nats-server-config-reloader", flag.ExitOnError) flag.Usage = func() { @@ -25,22 +39,29 @@ func main() { var ( showHelp bool showVersion bool + fileSet StringSet ) + + nconfig := &natsreloader.Config{} fs.BoolVar(&showHelp, "h", false, "Show help") fs.BoolVar(&showHelp, "help", false, "Show help") fs.BoolVar(&showVersion, "v", false, "Show version") fs.BoolVar(&showVersion, "version", false, "Show version") - nconfig := &natsreloader.Config{} fs.StringVar(&nconfig.PidFile, "P", "/var/run/nats/gnatsd.pid", "NATS Server Pid File") fs.StringVar(&nconfig.PidFile, "pid", "/var/run/nats/gnatsd.pid", "NATS Server Pid File") - fs.StringVar(&nconfig.ConfigFile, "c", "/etc/nats/gnatsd.conf", "NATS Server Config File") - fs.StringVar(&nconfig.ConfigFile, "config", "/etc/nats/gnatsd.conf", "NATS Server Config File") + fs.Var(&fileSet, "c", "NATS Server Config File (may be repeated to specify more than one)") + fs.Var(&fileSet, "config", "NATS Server Config File (may be repeated to specify more than one)") fs.IntVar(&nconfig.MaxRetries, "max-retries", 5, "Max attempts to trigger reload") fs.IntVar(&nconfig.RetryWaitSecs, "retry-wait-secs", 2, "Time to back off when reloading fails before retrying") fs.Parse(os.Args[1:]) + nconfig.ConfigFiles = fileSet + if len(fileSet) == 0 { + nconfig.ConfigFiles = []string{"/etc/nats-config/gnatsd.conf"} + } + switch { case showHelp: flag.Usage() diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index bd4cd463..e6e1a797 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -9,7 +9,7 @@ import ( "io/ioutil" "log" "os" - "path" + "path/filepath" "strconv" "syscall" "time" @@ -20,7 +20,7 @@ import ( // Config represents the configuration of the reloader. type Config struct { PidFile string - ConfigFile string + ConfigFiles []string MaxRetries int RetryWaitSecs int } @@ -39,10 +39,6 @@ type Reloader struct { // quit shutsdown the reloader. quit func() - - // lastAppliedVersion is the last config update - // done by the proces.. - lastAppliedVersion []byte } // Run starts the main loop. @@ -94,41 +90,80 @@ func (r *Reloader) Run(ctx context.Context) error { // Follow configuration updates in the directory where // the config file is located and trigger reload when // it is either recreated or written into. - if err := configWatcher.Add(path.Dir(r.ConfigFile)); err != nil { - return err + for i := range r.ConfigFiles { + // Ensure our paths are canonical + r.ConfigFiles[i], _ = filepath.Abs(r.ConfigFiles[i]) + // Use directory here because k8s remounts the entire folder + // the config file lives in. So, watch the folder so we properly receive events. + if err := configWatcher.Add(filepath.Dir(r.ConfigFiles[i])); err != nil { + return err + } } attempts = 0 + // lastConfigAppliedCache is the last config update + // applied by us + lastConfigAppliedCache := make(map[string][]byte) + + // Preload config hashes, so we know their digests + // up front and avoid potentially reloading when unnecessary. + for _, configFile := range r.ConfigFiles { + h := sha256.New() + f, err := os.Open(configFile) + if err != nil { + return err + } + if _, err := io.Copy(h, f); err != nil { + return err + } + digest := h.Sum(nil) + lastConfigAppliedCache[configFile] = digest + } + +WaitForEvent: for { select { case <-ctx.Done(): return nil case event := <-configWatcher.Events: log.Printf("Event: %+v \n", event) - // FIXME: This captures all events in the same folder, should - // narrow down to updates to the config file involved only. - if event.Op != fsnotify.Write && event.Op != fsnotify.Create { - continue - } - - h := sha256.New() - f, err := os.Open(r.ConfigFile) + touchedInfo, err := os.Stat(event.Name) if err != nil { - log.Printf("Error: %s\n", err) - continue - } - if _, err := io.Copy(h, f); err != nil { - log.Printf("Error: %s\n", err) continue } - digest := h.Sum(nil) - if r.lastAppliedVersion != nil { - if bytes.Equal(r.lastAppliedVersion, digest) { - // Skip since no meaningful change + + for _, configFile := range r.ConfigFiles { + configInfo, err := os.Stat(configFile) + if err != nil { + log.Printf("Error: %s\n", err) + continue WaitForEvent + } + if !os.SameFile(touchedInfo, configInfo) { continue } + + h := sha256.New() + f, err := os.Open(configFile) + if err != nil { + log.Printf("Error: %s\n", err) + continue WaitForEvent + } + if _, err := io.Copy(h, f); err != nil { + log.Printf("Error: %s\n", err) + continue WaitForEvent + } + digest := h.Sum(nil) + lastConfigHash, ok := lastConfigAppliedCache[configFile] + if ok && bytes.Equal(lastConfigHash, digest) { + // No meaningful change or this is the first time we've checked + continue WaitForEvent + } + lastConfigAppliedCache[configFile] = digest + + // We only get an event for one file at a time, we can stop checking + // config files here and continue with our business. + break } - r.lastAppliedVersion = digest case err := <-configWatcher.Errors: log.Printf("Error: %s\n", err) diff --git a/test/reloader/reloader_test.go b/test/reloader/reloader_test.go index ddbe6240..015a13c0 100644 --- a/test/reloader/reloader_test.go +++ b/test/reloader/reloader_test.go @@ -10,10 +10,16 @@ import ( "testing" "time" - "github.com/nats-io/nats-operator/pkg/reloader" + natsreloader "github.com/nats-io/nats-operator/pkg/reloader" ) +var configContents = `port = 2222` +var newConfigContents = `port = 2222 +someOtherThing = "bar" +` + func TestReloader(t *testing.T) { + // Setup a pidfile that points to us pid := os.Getpid() pidfile, err := ioutil.TempFile(os.TempDir(), "nats-pid-") if err != nil { @@ -26,28 +32,34 @@ func TestReloader(t *testing.T) { } defer os.Remove(pidfile.Name()) - configfile, err := ioutil.TempFile(os.TempDir(), "nats-conf-") - if err != nil { - t.Fatal(err) + // Create tempfile with contents, then update it + nconfig := &natsreloader.Config{ + PidFile: pidfile.Name(), + ConfigFiles: []string{}, } - defer os.Remove(configfile.Name()) - if _, err := configfile.WriteString("port = 4222"); err != nil { - t.Fatal(err) - } + var configFiles []*os.File + for i := 0; i < 2; i++ { + configFile, err := ioutil.TempFile(os.TempDir(), "nats-conf-") + if err != nil { + t.Fatal(err) + } + defer os.Remove(configFile.Name()) - // Create tempfile with contents, then update it - nconfig := &natsreloader.Config{ - PidFile: pidfile.Name(), - ConfigFile: configfile.Name(), + if _, err := configFile.WriteString(configContents); err != nil { + t.Fatal(err) + } + configFiles = append(configFiles, configFile) + nconfig.ConfigFiles = append(nconfig.ConfigFiles, configFile.Name()) } + r, err := natsreloader.NewReloader(nconfig) if err != nil { fmt.Fprintf(os.Stderr, "Error: %s\n", err) os.Exit(1) } - var success = false + var signals = 0 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -59,26 +71,33 @@ func TestReloader(t *testing.T) { // Success when receiving the first signal for range c { - success = true - cancel() - return + signals++ } }() go func() { - for i := 0; i < 5; i++ { - if _, err := configfile.WriteString("port = 4222"); err != nil { - return + // This is terrible, but we need this thread to wait until r.Run(ctx) has finished starting up + // before we start mucking with the file. + // There isn't any other good way to synchronize on this happening. + time.Sleep(100 * time.Millisecond) + for _, configfile := range configFiles { + for i := 0; i < 5; i++ { + // Append some more stuff to the config + if _, err := configfile.WriteAt([]byte(newConfigContents), 0); err != nil { + return + } + time.Sleep(10 * time.Millisecond) } - time.Sleep(1 * time.Second) } + cancel() }() err = r.Run(ctx) if err != nil && err != context.Canceled { t.Fatal(err) } - if !success { - t.Fatalf("Timed out waiting for reloading signal") + // We should have gotten only one signal for each configuration file + if signals != len(configFiles) { + t.Fatalf("Wrong number of signals received.") } } From 4af7d2368b2534b5b402f1c71da9d5d244b80583 Mon Sep 17 00:00:00 2001 From: Shammah Chancellor Date: Tue, 16 Apr 2019 09:19:47 -0700 Subject: [PATCH 2/2] Did this break e2e??? --- pkg/reloader/reloader.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index e6e1a797..4fc1cdc9 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -107,18 +107,18 @@ func (r *Reloader) Run(ctx context.Context) error { // Preload config hashes, so we know their digests // up front and avoid potentially reloading when unnecessary. - for _, configFile := range r.ConfigFiles { - h := sha256.New() - f, err := os.Open(configFile) - if err != nil { - return err - } - if _, err := io.Copy(h, f); err != nil { - return err - } - digest := h.Sum(nil) - lastConfigAppliedCache[configFile] = digest - } + //for _, configFile := range r.ConfigFiles { + // h := sha256.New() + // f, err := os.Open(configFile) + // if err != nil { + // return err + // } + // if _, err := io.Copy(h, f); err != nil { + // return err + // } + // digest := h.Sum(nil) + // lastConfigAppliedCache[configFile] = digest + //} WaitForEvent: for {