Skip to content

Commit

Permalink
bugfix: exit when recive signals(sigint/sighup)
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed May 16, 2024
1 parent 621803d commit cb09e8c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 75 deletions.
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type EventServer struct {
*DynamicProbeServer[probe.EventProbe]
}

func NewEventServer(sinks []sink.Sink) (*EventServer, error) {
func newEventServer(sinks []sink.Sink) (*EventServer, error) {
var sinkWrappers []*sinkWrapper

done := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/metricserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
log "github.com/sirupsen/logrus"
)

func NewMetricsServer() (*MetricsServer, error) {
func newMetricsServer() (*MetricsServer, error) {

r := prometheus.NewRegistry()
handler := promhttp.HandlerFor(prometheus.Gatherers{
Expand Down
146 changes: 74 additions & 72 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,90 +280,93 @@ func (i *inspServer) reload() error {
return nil
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
return &http.Server{Addr: listenAddr}
}

func (i *inspServer) start(cfg *InspServerConfig) error {
if err := gops.Listen(gops.Options{}); err != nil {
log.Infof("start gops err: %v", err)
}

go func() {
var err error
ctx := context.TODO()

err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
log.Errorf("failed init additional labels: %v", err)
return
}

log.Infof("start metrics server")
i.metricsServer, err = NewMetricsServer()
if err != nil {
log.Errorf("failed create metrics server: %v", err)
return
}
var err error
ctx := context.TODO()
err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
return fmt.Errorf("failed init additional labels: %w", err)
}

defer func() {
_ = i.metricsServer.Stop(ctx)
}()
i.metricsServer, err = newMetricsServer()
if err != nil {
return fmt.Errorf("failed create metrics server: %w", err)
}

if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
log.Errorf("failed start metrics server: %v", err)
return
}
if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
return fmt.Errorf("failed start metrics server: %w", err)
}

//sink
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
log.Errorf("failed create sinks, err: %v", err)
} else if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}
defer func() {
_ = i.metricsServer.Stop(ctx)
}()

log.Infof("start event server")
i.eventServer, err = NewEventServer(sinks)
if err != nil {
log.Errorf("failed create event server: %v", err)
return
}
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
return fmt.Errorf("failed create sinks, err: %w", err)
}
if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}

defer func() {
_ = i.eventServer.Stop(context.TODO())
}()
i.eventServer, err = newEventServer(sinks)
if err != nil {
return fmt.Errorf("failed create event server: %w", err)
}

if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
log.Errorf("failed start event server: %v", err)
return
}
if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
return fmt.Errorf("failed start event server: %w", err)
}

http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
srv := &http.Server{Addr: listenAddr}
if err := srv.ListenAndServe(); err != nil {
log.Errorf("inspector start metric server err: %v", err)
}
defer func() {
_ = i.eventServer.Stop(ctx)
}()

done := make(chan struct{})

if err := i.WatchConfig(done); err != nil {
if err = i.WatchConfig(done); err != nil {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
srv := i.newHTTPServer(cfg)
serverClosedChan := make(chan struct{})
serverClosed := false
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("server error: %v", err)
}

close(serverClosedChan)
serverClosed = true
}()

WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
if !serverClosed {
_ = srv.Shutdown(ctx)
}
return nil
}

Expand All @@ -380,16 +383,15 @@ func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) {
return ret, nil
}

func WaitSignals(i *inspServer, sgs ...os.Signal) {
func WaitSignals(done <-chan struct{}, sgs ...os.Signal) {
s := make(chan os.Signal, 1)
signal.Notify(s, sgs...)
sig := <-s
log.Warnf("recive signal %s, stopping", sig.String())
if err := i.metricsServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop metrics server, err: %v", err)
}
if err := i.eventServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop event server, err: %v", err)
select {
case sig := <-s:
log.Warnf("recive signal %s, stopping", sig.String())
return
case <-done:
log.Warnf("recive server close signal")
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/exporter/nettop/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func StartCache(ctx context.Context, sidecarMode bool) error {
}

func StopCache() {
control <- struct{}{}
close(control)
}

func cacheDaemonLoop(_ context.Context, control chan struct{}) {
Expand All @@ -249,10 +249,12 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) {
t := time.NewTicker(cacheUpdateInterval)
defer t.Stop()

loop:
for {
select {
case <-control:
log.Info("cache daemon loop exit of control signal")
break loop
case <-t.C:
if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil {
log.Errorf("failed cache pods: %v", err)
Expand Down

0 comments on commit cb09e8c

Please sign in to comment.