Skip to content

Commit

Permalink
Merge pull request #309 from jzwlqx/feature/unixsock
Browse files Browse the repository at this point in the history
Feature/unixsock
  • Loading branch information
BSWANG authored Jul 23, 2024
2 parents f6810e3 + d077ed0 commit 49b115c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
8 changes: 6 additions & 2 deletions pkg/exporter/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
)

type InspServerConfig struct {
DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"`
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`
DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"`
// A better way to set listen port is a `address` field that can be used for bind interface ip, or even unix domain socket
// Deprecated: use address instead
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`

Address string `yaml:"address" mapstructure:"address" json:"address"`
EnableController bool `yaml:"enableController" mapstructure:"enableController" json:"enableController"`
MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"`
EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"`
Expand Down
79 changes: 72 additions & 7 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"reflect"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -92,7 +97,6 @@ type ProbeManager[T probe.Probe] interface {
StartProbe(ctx context.Context, probe T) error
StopProbe(ctx context.Context, probe T) error
}

type DynamicProbeServer[T probe.Probe] struct {
lock sync.Mutex
probeManager ProbeManager[T]
Expand Down Expand Up @@ -280,7 +284,57 @@ func (i *inspServer) reload() error {
return nil
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
func (i *inspServer) createListener(cfg *InspServerConfig) (net.Listener, error) {
if cfg.Address == "" {
if cfg.Port != 0 {
cfg.Address = fmt.Sprintf(":%d", cfg.Port)
log.Warningf("port is derepcated, use address instead")
} else {
return nil, fmt.Errorf("listen address is empty")
}
}

rawAddr := cfg.Address
if !strings.Contains(rawAddr, "//") {
log.Infof("address contains no protocol part, use tcp:// by default")
rawAddr = "tcp://" + rawAddr
}

protocol := ""
addr := ""

u, err := url.Parse(rawAddr)
if err != nil {
return nil, fmt.Errorf("invalid address %s, valid format is [protocol://]addr[:port]", cfg.Address)
}
protocol = u.Scheme
switch protocol {
case "unix":
// For Unix domain sockets, the path is in the opaque part
addr = strings.TrimPrefix(cfg.Address, "unix://")
if _, err = os.Stat(addr); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("failed stat sock file %s: %w", addr, err)
}
parent := path.Dir(addr)
if err = os.MkdirAll(parent, 0o755); err != nil {
return nil, fmt.Errorf("failed create director %s: %w", parent, err)
}
} else {
//socket file exists, just remove it
_ = os.Remove(addr)
}
case "tcp":
host, port, _ := net.SplitHostPort(cfg.Address)
addr = fmt.Sprintf("%s:%s", host, port)
default:
return nil, fmt.Errorf("unsupported protocol %s, only `tcp` and 'unix' are supported", protocol)
}

return net.Listen(protocol, addr)
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) (*http.Server, net.Listener, error) {
http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
Expand All @@ -293,9 +347,14 @@ func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
return &http.Server{Addr: listenAddr}

listener, err := i.createListener(cfg)
if err != nil {
return nil, nil, fmt.Errorf("failed create listener: %w", err)
}

log.Infof("inspector start metric server, listenAddr: %s", listener.Addr())
return &http.Server{}, listener, nil
}

func (i *inspServer) start(cfg *InspServerConfig) error {
Expand Down Expand Up @@ -350,11 +409,14 @@ func (i *inspServer) start(cfg *InspServerConfig) error {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

srv := i.newHTTPServer(cfg)
srv, listener, err := i.newHTTPServer(cfg)
if err != nil {
return fmt.Errorf("failed start http server: %w", err)
}
serverClosedChan := make(chan struct{})
serverClosed := false
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
log.Errorf("server error: %v", err)
}

Expand All @@ -364,9 +426,12 @@ func (i *inspServer) start(cfg *InspServerConfig) error {

WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)

if !serverClosed {
_ = srv.Shutdown(ctx)
}
_ = listener.Close()

return nil
}

Expand Down

0 comments on commit 49b115c

Please sign in to comment.