diff --git a/pkg/exporter/cmd/config.go b/pkg/exporter/cmd/config.go index 0c2deadd..644fc9f8 100644 --- a/pkg/exporter/cmd/config.go +++ b/pkg/exporter/cmd/config.go @@ -8,8 +8,12 @@ import ( ) type InspServerConfig struct { - DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"` - Port uint16 `yaml:"port" mapstructure:"port" json:"port"` + DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"` + // A better way to set listen port is a `address` field that can be used for bind interface ip, or even unix domain socket + // Deprecated: use address instead + Port uint16 `yaml:"port" mapstructure:"port" json:"port"` + + Address string `yaml:"address" mapstructure:"address" json:"address"` EnableController bool `yaml:"enableController" mapstructure:"enableController" json:"enableController"` MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"` EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"` diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index 3f8d90b2..f1b4d835 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -6,11 +6,16 @@ package cmd import ( "context" "encoding/json" + "errors" "fmt" + "net" "net/http" + "net/url" "os" "os/signal" + "path" "reflect" + "strings" "sync" "sync/atomic" "syscall" @@ -92,7 +97,6 @@ type ProbeManager[T probe.Probe] interface { StartProbe(ctx context.Context, probe T) error StopProbe(ctx context.Context, probe T) error } - type DynamicProbeServer[T probe.Probe] struct { lock sync.Mutex probeManager ProbeManager[T] @@ -280,7 +284,57 @@ func (i *inspServer) reload() error { return nil } -func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server { +func (i *inspServer) createListener(cfg *InspServerConfig) (net.Listener, error) { + if cfg.Address == "" { + if cfg.Port != 0 { + cfg.Address = fmt.Sprintf(":%d", cfg.Port) + log.Warningf("port is derepcated, use address instead") + } else { + return nil, fmt.Errorf("listen address is empty") + } + } + + rawAddr := cfg.Address + if !strings.Contains(rawAddr, "//") { + log.Infof("address contains no protocol part, use tcp:// by default") + rawAddr = "tcp://" + rawAddr + } + + protocol := "" + addr := "" + + u, err := url.Parse(rawAddr) + if err != nil { + return nil, fmt.Errorf("invalid address %s, valid format is [protocol://]addr[:port]", cfg.Address) + } + protocol = u.Scheme + switch protocol { + case "unix": + // For Unix domain sockets, the path is in the opaque part + addr = strings.TrimPrefix(cfg.Address, "unix://") + if _, err = os.Stat(addr); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("failed stat sock file %s: %w", addr, err) + } + parent := path.Dir(addr) + if err = os.MkdirAll(parent, 0o755); err != nil { + return nil, fmt.Errorf("failed create director %s: %w", parent, err) + } + } else { + //socket file exists, just remove it + _ = os.Remove(addr) + } + case "tcp": + host, port, _ := net.SplitHostPort(cfg.Address) + addr = fmt.Sprintf("%s:%s", host, port) + default: + return nil, fmt.Errorf("unsupported protocol %s, only `tcp` and 'unix' are supported", protocol) + } + + return net.Listen(protocol, addr) +} + +func (i *inspServer) newHTTPServer(cfg *InspServerConfig) (*http.Server, net.Listener, error) { http.Handle("/metrics", i.metricsServer) http.Handle("/", http.HandlerFunc(defaultPage)) http.Handle("/status", http.HandlerFunc(i.statusPage)) @@ -293,9 +347,14 @@ func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server { ) http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) } - listenAddr := fmt.Sprintf(":%d", cfg.Port) - log.Infof("inspector start metric server, listenAddr: %s", listenAddr) - return &http.Server{Addr: listenAddr} + + listener, err := i.createListener(cfg) + if err != nil { + return nil, nil, fmt.Errorf("failed create listener: %w", err) + } + + log.Infof("inspector start metric server, listenAddr: %s", listener.Addr()) + return &http.Server{}, listener, nil } func (i *inspServer) start(cfg *InspServerConfig) error { @@ -350,11 +409,14 @@ func (i *inspServer) start(cfg *InspServerConfig) error { log.Errorf("failed watch config, dynamic load would not work: %v", err) } - srv := i.newHTTPServer(cfg) + srv, listener, err := i.newHTTPServer(cfg) + if err != nil { + return fmt.Errorf("failed start http server: %w", err) + } serverClosedChan := make(chan struct{}) serverClosed := false go func() { - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed { log.Errorf("server error: %v", err) } @@ -364,9 +426,12 @@ func (i *inspServer) start(cfg *InspServerConfig) error { WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) close(done) + if !serverClosed { _ = srv.Shutdown(ctx) } + _ = listener.Close() + return nil }