diff --git a/pkg/exporter/cmd/eventserver.go b/pkg/exporter/cmd/eventserver.go index 17304669..85fa0d7f 100644 --- a/pkg/exporter/cmd/eventserver.go +++ b/pkg/exporter/cmd/eventserver.go @@ -12,7 +12,7 @@ type EventServer struct { *DynamicProbeServer[probe.EventProbe] } -func NewEventServer(sinks []sink.Sink) (*EventServer, error) { +func newEventServer(sinks []sink.Sink) (*EventServer, error) { var sinkWrappers []*sinkWrapper done := make(chan struct{}) diff --git a/pkg/exporter/cmd/metricserver.go b/pkg/exporter/cmd/metricserver.go index 5d211af5..90eb6d19 100644 --- a/pkg/exporter/cmd/metricserver.go +++ b/pkg/exporter/cmd/metricserver.go @@ -11,7 +11,7 @@ import ( log "github.com/sirupsen/logrus" ) -func NewMetricsServer() (*MetricsServer, error) { +func newMetricsServer() (*MetricsServer, error) { r := prometheus.NewRegistry() handler := promhttp.HandlerFor(prometheus.Gatherers{ diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index 0dd68935..3f8d90b2 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -280,90 +280,93 @@ func (i *inspServer) reload() error { return nil } +func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server { + http.Handle("/metrics", i.metricsServer) + http.Handle("/", http.HandlerFunc(defaultPage)) + http.Handle("/status", http.HandlerFunc(i.statusPage)) + if cfg.DebugMode { + reg := prometheus.NewRegistry() + + reg.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) + } + listenAddr := fmt.Sprintf(":%d", cfg.Port) + log.Infof("inspector start metric server, listenAddr: %s", listenAddr) + return &http.Server{Addr: listenAddr} +} + func (i *inspServer) start(cfg *InspServerConfig) error { if err := gops.Listen(gops.Options{}); err != nil { log.Infof("start gops err: %v", err) } - go func() { - var err error - ctx := context.TODO() - - err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels) - if err != nil { - log.Errorf("failed init additional labels: %v", err) - return - } - - log.Infof("start metrics server") - i.metricsServer, err = NewMetricsServer() - if err != nil { - log.Errorf("failed create metrics server: %v", err) - return - } + var err error + ctx := context.TODO() + err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels) + if err != nil { + return fmt.Errorf("failed init additional labels: %w", err) + } - defer func() { - _ = i.metricsServer.Stop(ctx) - }() + i.metricsServer, err = newMetricsServer() + if err != nil { + return fmt.Errorf("failed create metrics server: %w", err) + } - if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { - log.Errorf("failed start metrics server: %v", err) - return - } + if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { + return fmt.Errorf("failed start metrics server: %w", err) + } - //sink - sinks, err := createSink(cfg.EventConfig.EventSinks) - if err != nil { - log.Errorf("failed create sinks, err: %v", err) - } else if len(sinks) != len(cfg.EventConfig.EventSinks) { - log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks)) - } + defer func() { + _ = i.metricsServer.Stop(ctx) + }() - log.Infof("start event server") - i.eventServer, err = NewEventServer(sinks) - if err != nil { - log.Errorf("failed create event server: %v", err) - return - } + sinks, err := createSink(cfg.EventConfig.EventSinks) + if err != nil { + return fmt.Errorf("failed create sinks, err: %w", err) + } + if len(sinks) != len(cfg.EventConfig.EventSinks) { + log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks)) + } - defer func() { - _ = i.eventServer.Stop(context.TODO()) - }() + i.eventServer, err = newEventServer(sinks) + if err != nil { + return fmt.Errorf("failed create event server: %w", err) + } - if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { - log.Errorf("failed start event server: %v", err) - return - } + if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { + return fmt.Errorf("failed start event server: %w", err) + } - http.Handle("/metrics", i.metricsServer) - http.Handle("/", http.HandlerFunc(defaultPage)) - http.Handle("/status", http.HandlerFunc(i.statusPage)) - if cfg.DebugMode { - reg := prometheus.NewRegistry() - - reg.MustRegister( - collectors.NewGoCollector(), - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - ) - http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - } - listenAddr := fmt.Sprintf(":%d", cfg.Port) - log.Infof("inspector start metric server, listenAddr: %s", listenAddr) - srv := &http.Server{Addr: listenAddr} - if err := srv.ListenAndServe(); err != nil { - log.Errorf("inspector start metric server err: %v", err) - } + defer func() { + _ = i.eventServer.Stop(ctx) }() done := make(chan struct{}) - if err := i.WatchConfig(done); err != nil { + if err = i.WatchConfig(done); err != nil { log.Errorf("failed watch config, dynamic load would not work: %v", err) } - WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - close(done) + srv := i.newHTTPServer(cfg) + serverClosedChan := make(chan struct{}) + serverClosed := false + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("server error: %v", err) + } + close(serverClosedChan) + serverClosed = true + }() + + WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + close(done) + if !serverClosed { + _ = srv.Shutdown(ctx) + } return nil } @@ -380,16 +383,15 @@ func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) { return ret, nil } -func WaitSignals(i *inspServer, sgs ...os.Signal) { +func WaitSignals(done <-chan struct{}, sgs ...os.Signal) { s := make(chan os.Signal, 1) signal.Notify(s, sgs...) - sig := <-s - log.Warnf("recive signal %s, stopping", sig.String()) - if err := i.metricsServer.Stop(i.ctx); err != nil { - log.Errorf("failed stop metrics server, err: %v", err) - } - if err := i.eventServer.Stop(i.ctx); err != nil { - log.Errorf("failed stop event server, err: %v", err) + select { + case sig := <-s: + log.Warnf("recive signal %s, stopping", sig.String()) + return + case <-done: + log.Warnf("recive server close signal") } } diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go index e202cc5e..1e82f22c 100644 --- a/pkg/exporter/nettop/cache.go +++ b/pkg/exporter/nettop/cache.go @@ -240,7 +240,7 @@ func StartCache(ctx context.Context, sidecarMode bool) error { } func StopCache() { - control <- struct{}{} + close(control) } func cacheDaemonLoop(_ context.Context, control chan struct{}) { @@ -249,10 +249,12 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) { t := time.NewTicker(cacheUpdateInterval) defer t.Stop() +loop: for { select { case <-control: log.Info("cache daemon loop exit of control signal") + break loop case <-t.C: if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { log.Errorf("failed cache pods: %v", err)