Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reloader] Add support for multiple config files #171

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions cmd/reloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,26 @@ import (
"log"
"os"
"os/signal"
"strings"
"syscall"

"github.com/nats-io/nats-operator/pkg/reloader"
natsreloader "github.com/nats-io/nats-operator/pkg/reloader"
"github.com/nats-io/nats-operator/version"
)

// StringSet is a wrapper for []string to allow using it with the flags package.
type StringSet []string

func (s *StringSet) String() string {
return strings.Join([]string(*s), ", ")
}

// Set appends the value provided to the list of strings.
func (s *StringSet) Set(val string) error {
*s = append(*s, val)
return nil
}

schancel marked this conversation as resolved.
Show resolved Hide resolved
func main() {
fs := flag.NewFlagSet("nats-server-config-reloader", flag.ExitOnError)
flag.Usage = func() {
Expand All @@ -25,22 +39,29 @@ func main() {
var (
showHelp bool
showVersion bool
fileSet StringSet
)

nconfig := &natsreloader.Config{}
fs.BoolVar(&showHelp, "h", false, "Show help")
fs.BoolVar(&showHelp, "help", false, "Show help")
fs.BoolVar(&showVersion, "v", false, "Show version")
fs.BoolVar(&showVersion, "version", false, "Show version")

nconfig := &natsreloader.Config{}
fs.StringVar(&nconfig.PidFile, "P", "/var/run/nats/gnatsd.pid", "NATS Server Pid File")
fs.StringVar(&nconfig.PidFile, "pid", "/var/run/nats/gnatsd.pid", "NATS Server Pid File")
fs.StringVar(&nconfig.ConfigFile, "c", "/etc/nats/gnatsd.conf", "NATS Server Config File")
fs.StringVar(&nconfig.ConfigFile, "config", "/etc/nats/gnatsd.conf", "NATS Server Config File")
fs.Var(&fileSet, "c", "NATS Server Config File (may be repeated to specify more than one)")
fs.Var(&fileSet, "config", "NATS Server Config File (may be repeated to specify more than one)")
fs.IntVar(&nconfig.MaxRetries, "max-retries", 5, "Max attempts to trigger reload")
fs.IntVar(&nconfig.RetryWaitSecs, "retry-wait-secs", 2, "Time to back off when reloading fails before retrying")

fs.Parse(os.Args[1:])

nconfig.ConfigFiles = fileSet
if len(fileSet) == 0 {
nconfig.ConfigFiles = []string{"/etc/nats-config/gnatsd.conf"}
}

switch {
case showHelp:
flag.Usage()
Expand Down
87 changes: 61 additions & 26 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"strconv"
"syscall"
"time"
Expand All @@ -20,7 +20,7 @@ import (
// Config represents the configuration of the reloader.
type Config struct {
PidFile string
ConfigFile string
ConfigFiles []string
MaxRetries int
RetryWaitSecs int
}
Expand All @@ -39,10 +39,6 @@ type Reloader struct {

// quit shutsdown the reloader.
quit func()

// lastAppliedVersion is the last config update
// done by the proces..
lastAppliedVersion []byte
}

// Run starts the main loop.
Expand Down Expand Up @@ -94,41 +90,80 @@ func (r *Reloader) Run(ctx context.Context) error {
// Follow configuration updates in the directory where
// the config file is located and trigger reload when
// it is either recreated or written into.
if err := configWatcher.Add(path.Dir(r.ConfigFile)); err != nil {
return err
for i := range r.ConfigFiles {
// Ensure our paths are canonical
r.ConfigFiles[i], _ = filepath.Abs(r.ConfigFiles[i])
// Use directory here because k8s remounts the entire folder
// the config file lives in. So, watch the folder so we properly receive events.
if err := configWatcher.Add(filepath.Dir(r.ConfigFiles[i])); err != nil {
schancel marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}

attempts = 0
// lastConfigAppliedCache is the last config update
// applied by us
lastConfigAppliedCache := make(map[string][]byte)

// Preload config hashes, so we know their digests
// up front and avoid potentially reloading when unnecessary.
//for _, configFile := range r.ConfigFiles {
// h := sha256.New()
// f, err := os.Open(configFile)
// if err != nil {
// return err
// }
// if _, err := io.Copy(h, f); err != nil {
// return err
// }
// digest := h.Sum(nil)
// lastConfigAppliedCache[configFile] = digest
//}

WaitForEvent:
for {
select {
case <-ctx.Done():
return nil
case event := <-configWatcher.Events:
log.Printf("Event: %+v \n", event)
// FIXME: This captures all events in the same folder, should
// narrow down to updates to the config file involved only.
if event.Op != fsnotify.Write && event.Op != fsnotify.Create {
continue
}

h := sha256.New()
f, err := os.Open(r.ConfigFile)
touchedInfo, err := os.Stat(event.Name)
if err != nil {
log.Printf("Error: %s\n", err)
continue
}
if _, err := io.Copy(h, f); err != nil {
log.Printf("Error: %s\n", err)
continue
}
digest := h.Sum(nil)
if r.lastAppliedVersion != nil {
if bytes.Equal(r.lastAppliedVersion, digest) {
// Skip since no meaningful change

for _, configFile := range r.ConfigFiles {
configInfo, err := os.Stat(configFile)
if err != nil {
log.Printf("Error: %s\n", err)
continue WaitForEvent
}
if !os.SameFile(touchedInfo, configInfo) {
continue
}

h := sha256.New()
f, err := os.Open(configFile)
if err != nil {
log.Printf("Error: %s\n", err)
continue WaitForEvent
}
if _, err := io.Copy(h, f); err != nil {
log.Printf("Error: %s\n", err)
continue WaitForEvent
}
digest := h.Sum(nil)
lastConfigHash, ok := lastConfigAppliedCache[configFile]
if ok && bytes.Equal(lastConfigHash, digest) {
// No meaningful change or this is the first time we've checked
continue WaitForEvent
}
lastConfigAppliedCache[configFile] = digest

// We only get an event for one file at a time, we can stop checking
// config files here and continue with our business.
break
}
r.lastAppliedVersion = digest

case err := <-configWatcher.Errors:
log.Printf("Error: %s\n", err)
Expand Down
63 changes: 41 additions & 22 deletions test/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ import (
"testing"
"time"

"github.com/nats-io/nats-operator/pkg/reloader"
natsreloader "github.com/nats-io/nats-operator/pkg/reloader"
)

var configContents = `port = 2222`
var newConfigContents = `port = 2222
someOtherThing = "bar"
`

func TestReloader(t *testing.T) {
// Setup a pidfile that points to us
pid := os.Getpid()
pidfile, err := ioutil.TempFile(os.TempDir(), "nats-pid-")
if err != nil {
Expand All @@ -26,28 +32,34 @@ func TestReloader(t *testing.T) {
}
defer os.Remove(pidfile.Name())

configfile, err := ioutil.TempFile(os.TempDir(), "nats-conf-")
if err != nil {
t.Fatal(err)
// Create tempfile with contents, then update it
nconfig := &natsreloader.Config{
PidFile: pidfile.Name(),
ConfigFiles: []string{},
}
defer os.Remove(configfile.Name())

if _, err := configfile.WriteString("port = 4222"); err != nil {
t.Fatal(err)
}
var configFiles []*os.File
for i := 0; i < 2; i++ {
configFile, err := ioutil.TempFile(os.TempDir(), "nats-conf-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(configFile.Name())

// Create tempfile with contents, then update it
nconfig := &natsreloader.Config{
PidFile: pidfile.Name(),
ConfigFile: configfile.Name(),
if _, err := configFile.WriteString(configContents); err != nil {
t.Fatal(err)
}
configFiles = append(configFiles, configFile)
nconfig.ConfigFiles = append(nconfig.ConfigFiles, configFile.Name())
}

r, err := natsreloader.NewReloader(nconfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}

var success = false
var signals = 0

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -59,26 +71,33 @@ func TestReloader(t *testing.T) {

// Success when receiving the first signal
for range c {
success = true
cancel()
return
signals++
}
}()

go func() {
for i := 0; i < 5; i++ {
if _, err := configfile.WriteString("port = 4222"); err != nil {
return
// This is terrible, but we need this thread to wait until r.Run(ctx) has finished starting up
// before we start mucking with the file.
// There isn't any other good way to synchronize on this happening.
time.Sleep(100 * time.Millisecond)
for _, configfile := range configFiles {
for i := 0; i < 5; i++ {
// Append some more stuff to the config
if _, err := configfile.WriteAt([]byte(newConfigContents), 0); err != nil {
return
}
time.Sleep(10 * time.Millisecond)
}
time.Sleep(1 * time.Second)
}
cancel()
}()

err = r.Run(ctx)
if err != nil && err != context.Canceled {
t.Fatal(err)
}
if !success {
t.Fatalf("Timed out waiting for reloading signal")
// We should have gotten only one signal for each configuration file
if signals != len(configFiles) {
t.Fatalf("Wrong number of signals received.")
}
}