Skip to content

Commit

Permalink
remove viper config manage which set all map key to lowercase
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed Apr 2, 2024
1 parent a39b091 commit 3c16ac6
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 80 deletions.
50 changes: 50 additions & 0 deletions pkg/exporter/cmd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cmd

import (
"fmt"
"os"

"gopkg.in/yaml.v3"
)

type InspServerConfig struct {
DebugMode bool `yaml:"debugmode" mapstructure:"debugmode" json:"debugmode"`
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`
EnableController bool `yaml:"enable_controller" mapstructure:"enable_controller" json:"enable_controller"`
MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"`
EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"`
}

type MetricsConfig struct {
Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"`
}

type EventConfig struct {
EventSinks []EventSinkConfig `yaml:"sinks" mapstructure:"sinks" json:"sinks"`
Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"`
}

type EventSinkConfig struct {
Name string `yaml:"name" mapstructure:"name" json:"name"`
Args interface{} `yaml:"args" mapstructure:"args" json:"args"`
}

type ProbeConfig struct {
Name string `yaml:"name" mapstructure:"name" json:"name"`
Args map[string]interface{} `yaml:"args" mapstructure:"args" json:"args"`
}

func loadConfig(path string) (*InspServerConfig, error) {
cfg := InspServerConfig{}
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed read config file %s: %w", path, err)
}

if err = yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("failed parse config file %s: %w", path, err)
}

return &cfg, nil

}
138 changes: 58 additions & 80 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"os/signal"
"reflect"
"sync"
"sync/atomic"
"syscall"
"time"

task_agent "github.com/alibaba/kubeskoop/pkg/exporter/task-agent"
"github.com/fsnotify/fsnotify"

"github.com/alibaba/kubeskoop/pkg/exporter/sink"

"github.com/fsnotify/fsnotify"

_ "net/http" //for golangci-lint
_ "net/http/pprof" //for golangci-lint once more

Expand All @@ -32,7 +33,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// serverCmd represents the server command
Expand All @@ -42,48 +42,41 @@ var (
Short: "start inspector server",
Run: func(cmd *cobra.Command, args []string) {
insp := &inspServer{
v: *viper.New(),
ctx: context.Background(),
configPath: configPath,
ctx: context.Background(),
}

log.Infof("start with config file %s", configPath)
insp.v.SetConfigFile(configPath)
err := insp.MergeConfig()

cfg, err := loadConfig(insp.configPath)
if err != nil {
log.Errorf("merge config err: %v", err)
return
}

if insp.config.DebugMode {
if debug {
cfg.DebugMode = true
}

if cfg.DebugMode {
log.SetLevel(log.DebugLevel)
}

// nolint
if err := nettop.StartCache(insp.ctx, sidecar); err != nil {
if err = nettop.StartCache(insp.ctx, sidecar); err != nil {
log.Errorf("failed start cache: %v", err)
return
}

defer nettop.StopCache()

// config hot reload process
insp.v.OnConfigChange(func(e fsnotify.Event) {
log.Info("Start reload config")
if err := insp.reload(); err != nil {
log.Warnf("Reload config error: %v", err)
}
log.Info("Config reload succeed.")
})
insp.v.WatchConfig()

if insp.config.EnableController {
if cfg.EnableController {
if err := task_agent.NewTaskAgent().Run(); err != nil {
log.Errorf("failed start agent: %v", err)
return
}
}
// block here
err = insp.start()
err = insp.start(cfg)
if err != nil {
log.Infof("start server err: %v", err)
return
Expand Down Expand Up @@ -223,65 +216,51 @@ func init() {
serverCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "/etc/config/config.yaml", "config file path")
}

type InspServerConfig struct {
DebugMode bool `yaml:"debugmode" mapstructure:"debugmode" json:"debugmode"`
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`
EnableController bool `yaml:"enable_controller" mapstructure:"enable_controller" json:"enable_controller"`
MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"`
EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"`
}

type MetricsConfig struct {
Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"`
}

type EventConfig struct {
EventSinks []EventSinkConfig `yaml:"sinks" mapstructure:"sinks" json:"sinks"`
Probes []ProbeConfig `yaml:"probes" mapstructure:"probes" json:"probes"`
}

type EventSinkConfig struct {
Name string `yaml:"name" mapstructure:"name" json:"name"`
Args interface{} `yaml:"args" mapstructure:"args" json:"args"`
}

type ProbeConfig struct {
Name string `yaml:"name" mapstructure:"name" json:"name"`
Args map[string]interface{} `yaml:"args" mapstructure:"args" json:"args"`
}

type inspServer struct {
v viper.Viper
config InspServerConfig
configPath string
ctx context.Context
metricsServer *MetricsServer
eventServer *EventServer
}

func (i *inspServer) MergeConfig() error {
err := i.v.ReadInConfig()
func (i *inspServer) WatchConfig(done <-chan struct{}) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
log.Infof("validate config err: %v", err)
return fmt.Errorf("config file %s not found", i.v.ConfigFileUsed())
}
return fmt.Errorf("config file err: %w", err)
return err
}

cfg := &InspServerConfig{}
err = i.v.Unmarshal(cfg)
if err != nil {
return fmt.Errorf("config file err: %w", err)
if err = watcher.Add(i.configPath); err != nil {
return err
}

i.config = *cfg
var delaying atomic.Bool

go func() {
for {
select {
case <-watcher.Events:
if delaying.CompareAndSwap(false, true) {
time.AfterFunc(1*time.Second, func() {
delaying.Store(false)
if err = i.reload(); err != nil {
log.Errorf("failed reload config %s: %v", i.configPath, err)
}
})
}
case err = <-watcher.Errors:
log.Errorf("error watch %s: %v", i.configPath, err)
case <-done:
_ = watcher.Close()
return
}
}
}()

return nil
}

func (i *inspServer) reload() error {
cfg := InspServerConfig{}
err := i.v.Unmarshal(&cfg)
cfg, err := loadConfig(i.configPath)
if err != nil {
return err
}
Expand All @@ -298,11 +277,10 @@ func (i *inspServer) reload() error {
return fmt.Errorf("reload event server error: %s", err)
}

i.config = cfg
return nil
}

func (i *inspServer) start() error {
func (i *inspServer) start(cfg *InspServerConfig) error {
if err := gops.Listen(gops.Options{}); err != nil {
log.Infof("start gops err: %v", err)
}
Expand All @@ -322,13 +300,13 @@ func (i *inspServer) start() error {
_ = i.metricsServer.Stop(ctx)
}()

if err := i.metricsServer.Start(ctx, i.config.MetricsConfig.Probes); err != nil {
if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
log.Errorf("failed start metrics server: %v", err)
return
}

//sink
sinks, err := createSink(i.config.EventConfig.EventSinks)
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
log.Errorf("failed create sinks, err: %v", err)
return
Expand All @@ -346,16 +324,15 @@ func (i *inspServer) start() error {
_ = i.eventServer.Stop(context.TODO())
}()

if err := i.eventServer.Start(ctx, i.config.EventConfig.Probes); err != nil {
if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
log.Errorf("failed start event server: %v", err)
return
}

http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/config", http.HandlerFunc(i.configPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if i.config.DebugMode {
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
Expand All @@ -364,15 +341,23 @@ func (i *inspServer) start() error {
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", i.config.Port)
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
srv := &http.Server{Addr: listenAddr}
if err := srv.ListenAndServe(); err != nil {
log.Errorf("inspector start metric server err: %v", err)
}
}()

done := make(chan struct{})

if err := i.WatchConfig(done); err != nil {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)

return nil
}

Expand Down Expand Up @@ -412,13 +397,6 @@ func defaultPage(w http.ResponseWriter, _ *http.Request) {
</html>`))
}

func (i *inspServer) configPage(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
rawText, _ := json.MarshalIndent(i.config, " ", " ")
w.WriteHeader(http.StatusOK)
w.Write(rawText) // nolint
}

func (i *inspServer) statusPage(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand Down

0 comments on commit 3c16ac6

Please sign in to comment.