diff --git a/pkg/exporter/cmd/config.go b/pkg/exporter/cmd/config.go new file mode 100644 index 00000000..42649d4f --- /dev/null +++ b/pkg/exporter/cmd/config.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +type InspServerConfig struct { + DebugMode bool `yaml:"debugmode" mapstructure:"debugmode" json:"debugmode"` + Port uint16 `yaml:"port" mapstructure:"port" json:"port"` + EnableController bool `yaml:"enable_controller" mapstructure:"enable_controller" json:"enable_controller"` + MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"` + EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"` +} + +type MetricsConfig struct { + Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"` +} + +type EventConfig struct { + EventSinks []EventSinkConfig `yaml:"sinks" mapstructure:"sinks" json:"sinks"` + Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"` +} + +type EventSinkConfig struct { + Name string `yaml:"name" mapstructure:"name" json:"name"` + Args interface{} `yaml:"args" mapstructure:"args" json:"args"` +} + +type ProbeConfig struct { + Name string `yaml:"name" mapstructure:"name" json:"name"` + Args map[string]interface{} `yaml:"args" mapstructure:"args" json:"args"` +} + +func loadConfig(path string) (*InspServerConfig, error) { + cfg := InspServerConfig{} + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed read config file %s: %w", path, err) + } + + if err = yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed parse config file %s: %w", path, err) + } + + return &cfg, nil + +} diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index 3b6fa1ec..d3e1abdf 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -12,14 +12,15 @@ import ( "os/signal" "reflect" "sync" + "sync/atomic" "syscall" + "time" task_agent "github.com/alibaba/kubeskoop/pkg/exporter/task-agent" + "github.com/fsnotify/fsnotify" "github.com/alibaba/kubeskoop/pkg/exporter/sink" - "github.com/fsnotify/fsnotify" - _ "net/http" //for golangci-lint _ "net/http/pprof" //for golangci-lint once more @@ -32,7 +33,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/spf13/viper" ) // serverCmd represents the server command @@ -42,48 +42,41 @@ var ( Short: "start inspector server", Run: func(cmd *cobra.Command, args []string) { insp := &inspServer{ - v: *viper.New(), - ctx: context.Background(), + configPath: configPath, + ctx: context.Background(), } log.Infof("start with config file %s", configPath) - insp.v.SetConfigFile(configPath) - err := insp.MergeConfig() + + cfg, err := loadConfig(insp.configPath) if err != nil { log.Errorf("merge config err: %v", err) return } - if insp.config.DebugMode { + if debug { + cfg.DebugMode = true + } + + if cfg.DebugMode { log.SetLevel(log.DebugLevel) } - // nolint - if err := nettop.StartCache(insp.ctx, sidecar); err != nil { + if err = nettop.StartCache(insp.ctx, sidecar); err != nil { log.Errorf("failed start cache: %v", err) return } defer nettop.StopCache() - // config hot reload process - insp.v.OnConfigChange(func(e fsnotify.Event) { - log.Info("Start reload config") - if err := insp.reload(); err != nil { - log.Warnf("Reload config error: %v", err) - } - log.Info("Config reload succeed.") - }) - insp.v.WatchConfig() - - if insp.config.EnableController { + if cfg.EnableController { if err := task_agent.NewTaskAgent().Run(); err != nil { log.Errorf("failed start agent: %v", err) return } } // block here - err = insp.start() + err = insp.start(cfg) if err != nil { log.Infof("start server err: %v", err) return @@ -223,65 +216,51 @@ func init() { serverCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "/etc/config/config.yaml", "config file path") } -type InspServerConfig struct { - DebugMode bool `yaml:"debugmode" mapstructure:"debugmode" json:"debugmode"` - Port uint16 `yaml:"port" mapstructure:"port" json:"port"` - EnableController bool `yaml:"enable_controller" mapstructure:"enable_controller" json:"enable_controller"` - MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"` - EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"` -} - -type MetricsConfig struct { - Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"` -} - -type EventConfig struct { - EventSinks []EventSinkConfig `yaml:"sinks" mapstructure:"sinks" json:"sinks"` - Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"` -} - -type EventSinkConfig struct { - Name string `yaml:"name" mapstructure:"name" json:"name"` - Args interface{} `yaml:"args" mapstructure:"args" json:"args"` -} - -type ProbeConfig struct { - Name string `yaml:"name" mapstructure:"name" json:"name"` - Args map[string]interface{} `yaml:"args" mapstructure:"args" json:"args"` -} - type inspServer struct { - v viper.Viper - config InspServerConfig + configPath string ctx context.Context metricsServer *MetricsServer eventServer *EventServer } -func (i *inspServer) MergeConfig() error { - err := i.v.ReadInConfig() +func (i *inspServer) WatchConfig(done <-chan struct{}) error { + watcher, err := fsnotify.NewWatcher() if err != nil { - if _, ok := err.(viper.ConfigFileNotFoundError); ok { - log.Infof("validate config err: %v", err) - return fmt.Errorf("config file %s not found", i.v.ConfigFileUsed()) - } - return fmt.Errorf("config file err: %w", err) + return err } - cfg := &InspServerConfig{} - err = i.v.Unmarshal(cfg) - if err != nil { - return fmt.Errorf("config file err: %w", err) + if err = watcher.Add(i.configPath); err != nil { + return err } - i.config = *cfg + var delaying atomic.Bool + + go func() { + for { + select { + case <-watcher.Events: + if delaying.CompareAndSwap(false, true) { + time.AfterFunc(1*time.Second, func() { + delaying.Store(false) + if err = i.reload(); err != nil { + log.Errorf("failed reload config %s: %v", i.configPath, err) + } + }) + } + case err = <-watcher.Errors: + log.Errorf("error watch %s: %v", i.configPath, err) + case <-done: + _ = watcher.Close() + return + } + } + }() return nil } func (i *inspServer) reload() error { - cfg := InspServerConfig{} - err := i.v.Unmarshal(&cfg) + cfg, err := loadConfig(i.configPath) if err != nil { return err } @@ -298,11 +277,10 @@ func (i *inspServer) reload() error { return fmt.Errorf("reload event server error: %s", err) } - i.config = cfg return nil } -func (i *inspServer) start() error { +func (i *inspServer) start(cfg *InspServerConfig) error { if err := gops.Listen(gops.Options{}); err != nil { log.Infof("start gops err: %v", err) } @@ -322,13 +300,13 @@ func (i *inspServer) start() error { _ = i.metricsServer.Stop(ctx) }() - if err := i.metricsServer.Start(ctx, i.config.MetricsConfig.Probes); err != nil { + if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { log.Errorf("failed start metrics server: %v", err) return } //sink - sinks, err := createSink(i.config.EventConfig.EventSinks) + sinks, err := createSink(cfg.EventConfig.EventSinks) if err != nil { log.Errorf("failed create sinks, err: %v", err) return @@ -346,16 +324,15 @@ func (i *inspServer) start() error { _ = i.eventServer.Stop(context.TODO()) }() - if err := i.eventServer.Start(ctx, i.config.EventConfig.Probes); err != nil { + if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { log.Errorf("failed start event server: %v", err) return } http.Handle("/metrics", i.metricsServer) http.Handle("/", http.HandlerFunc(defaultPage)) - http.Handle("/config", http.HandlerFunc(i.configPage)) http.Handle("/status", http.HandlerFunc(i.statusPage)) - if i.config.DebugMode { + if cfg.DebugMode { reg := prometheus.NewRegistry() reg.MustRegister( @@ -364,7 +341,7 @@ func (i *inspServer) start() error { ) http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) } - listenAddr := fmt.Sprintf(":%d", i.config.Port) + listenAddr := fmt.Sprintf(":%d", cfg.Port) log.Infof("inspector start metric server, listenAddr: %s", listenAddr) srv := &http.Server{Addr: listenAddr} if err := srv.ListenAndServe(); err != nil { @@ -372,7 +349,15 @@ func (i *inspServer) start() error { } }() + done := make(chan struct{}) + + if err := i.WatchConfig(done); err != nil { + log.Errorf("failed watch config, dynamic load would not work: %v", err) + } + WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + close(done) + return nil } @@ -412,13 +397,6 @@ func defaultPage(w http.ResponseWriter, _ *http.Request) { `)) } -func (i *inspServer) configPage(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "application/json") - rawText, _ := json.MarshalIndent(i.config, " ", " ") - w.WriteHeader(http.StatusOK) - w.Write(rawText) // nolint -} - func (i *inspServer) statusPage(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK)