Skip to content

Commit

Permalink
refactor: replace glog in metrics & telemetry packages (#6587)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabelf5 authored Oct 7, 2024
1 parent 62642a5 commit 5b06b2c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 48 deletions.
10 changes: 5 additions & 5 deletions cmd/nginx-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func createManagerAndControllerCollectors(ctx context.Context, constLabels map[s
registry = prometheus.NewRegistry()
mc = collectors.NewLocalManagerMetricsCollector(constLabels)
cc = collectors.NewControllerMetricsCollector(*enableCustomResources, constLabels)
processCollector := collectors.NewNginxProcessesMetricsCollector(constLabels)
processCollector := collectors.NewNginxProcessesMetricsCollector(ctx, constLabels)
workQueueCollector := collectors.NewWorkQueueMetricsCollector(constLabels)

err = mc.Register(registry)
Expand Down Expand Up @@ -781,18 +781,18 @@ func createPlusAndLatencyCollectors(
logger := kitlog.NewLogfmtLogger(os.Stdout)
logger = level.NewFilter(logger, level.AllowError())
plusCollector = nginxCollector.NewNginxPlusCollector(plusClient, "nginx_ingress_nginxplus", variableLabelNames, constLabels, logger)
go metrics.RunPrometheusListenerForNginxPlus(*prometheusMetricsListenPort, plusCollector, registry, prometheusSecret)
go metrics.RunPrometheusListenerForNginxPlus(ctx, *prometheusMetricsListenPort, plusCollector, registry, prometheusSecret)
} else {
httpClient := getSocketClient("/var/lib/nginx/nginx-status.sock")
client := metrics.NewNginxMetricsClient(httpClient)
go metrics.RunPrometheusListenerForNginx(*prometheusMetricsListenPort, client, registry, constLabels, prometheusSecret)
go metrics.RunPrometheusListenerForNginx(ctx, *prometheusMetricsListenPort, client, registry, constLabels, prometheusSecret)
}
if *enableLatencyMetrics {
lc = collectors.NewLatencyMetricsCollector(constLabels, upstreamServerVariableLabels, upstreamServerPeerVariableLabelNames)
lc = collectors.NewLatencyMetricsCollector(ctx, constLabels, upstreamServerVariableLabels, upstreamServerPeerVariableLabelNames)
if err := lc.Register(registry); err != nil {
nl.Errorf(l, "Error registering Latency Prometheus metrics: %v", err)
}
syslogListener = metrics.NewLatencyMetricsListener("/var/lib/nginx/nginx-syslog.sock", lc)
syslogListener = metrics.NewLatencyMetricsListener(ctx, "/var/lib/nginx/nginx-syslog.sock", lc)
go syslogListener.Run()
}
}
Expand Down
15 changes: 10 additions & 5 deletions internal/metrics/collectors/latency.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package collectors

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"

"github.com/golang/glog"
nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -73,10 +75,12 @@ type LatencyMetricsCollector struct {
metricsPublishedMap metricsPublishedMap
metricsPublishedMutex sync.Mutex
variableLabelsMutex sync.RWMutex
logger *slog.Logger
}

// NewLatencyMetricsCollector creates a new LatencyMetricsCollector
func NewLatencyMetricsCollector(
ctx context.Context,
constLabels map[string]string,
upstreamServerLabelNames []string,
upstreamServerPeerLabelNames []string,
Expand All @@ -96,6 +100,7 @@ func NewLatencyMetricsCollector(
metricsPublishedMap: make(metricsPublishedMap),
upstreamServerLabelNames: upstreamServerLabelNames,
upstreamServerPeerLabelNames: upstreamServerPeerLabelNames,
logger: nl.LoggerFromContext(ctx),
}
}

Expand Down Expand Up @@ -141,7 +146,7 @@ func (l *LatencyMetricsCollector) DeleteMetrics(upstreamServerPeerNames []string
for _, labelValues := range l.listAndDeleteMetricsPublished(name) {
success := l.httpLatency.DeleteLabelValues(labelValues...)
if !success {
glog.Warningf("could not delete metric for upstream server peer: %s with values: %v", name, labelValues)
nl.Warnf(l.logger, "could not delete metric for upstream server peer: %s with values: %v", name, labelValues)
}
}
}
Expand Down Expand Up @@ -178,7 +183,7 @@ func (l *LatencyMetricsCollector) Collect(ch chan<- prometheus.Metric) {
func (l *LatencyMetricsCollector) RecordLatency(syslogMsg string) {
lm, err := parseMessage(syslogMsg)
if err != nil {
glog.V(3).Infof("could not parse syslog message: %v", err)
nl.Debugf(l.logger, "could not parse syslog message: %v", err)
return
}

Expand All @@ -188,13 +193,13 @@ func (l *LatencyMetricsCollector) RecordLatency(syslogMsg string) {
// https://github.com/nginxinc/kubernetes-ingress/issues/5010
// https://github.com/nginxinc/kubernetes-ingress/issues/6124
if lm.Upstream == "-" {
glog.V(3).Infof("latency metrics for gRPC upstreams: %v", lm)
nl.Debugf(l.logger, "latency metrics for gRPC upstreams: %v", lm)
return
}

labelValues, err := l.createLatencyLabelValues(lm)
if err != nil {
glog.Errorf("cannot record latency for upstream %s and server %s: %v", lm.Upstream, lm.Server, err)
nl.Errorf(l.logger, "cannot record latency for upstream %s and server %s: %v", lm.Upstream, lm.Server, err)
return
}
l.httpLatency.WithLabelValues(labelValues...).Observe(lm.Latency * 1000)
Expand Down
10 changes: 7 additions & 3 deletions internal/metrics/collectors/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package collectors

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/golang/glog"
nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)

// NginxProcessesMetricsCollector implements prometheus.Collector interface
type NginxProcessesMetricsCollector struct {
workerProcessTotal *prometheus.GaugeVec
logger *slog.Logger
}

// NewNginxProcessesMetricsCollector creates a new NginxProcessMetricsCollector
func NewNginxProcessesMetricsCollector(constLabels map[string]string) *NginxProcessesMetricsCollector {
func NewNginxProcessesMetricsCollector(ctx context.Context, constLabels map[string]string) *NginxProcessesMetricsCollector {
return &NginxProcessesMetricsCollector{
workerProcessTotal: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand All @@ -29,14 +32,15 @@ func NewNginxProcessesMetricsCollector(constLabels map[string]string) *NginxProc
},
[]string{"generation"},
),
logger: nl.LoggerFromContext(ctx),
}
}

// updateWorkerProcessCount sets the number of NGINX worker processes
func (pc *NginxProcessesMetricsCollector) updateWorkerProcessCount() {
currWorkerProcesses, prevWorkerProcesses, err := getWorkerProcesses()
if err != nil {
glog.Errorf("unable to collect process metrics : %v", err)
nl.Errorf(pc.logger, "unable to collect process metrics : %v", err)
return
}
pc.workerProcessTotal.WithLabelValues("current").Set(float64(currWorkerProcesses))
Expand Down
29 changes: 17 additions & 12 deletions internal/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"strconv"
Expand All @@ -13,12 +14,13 @@ import (
"github.com/go-chi/chi/v5"
kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/glog"
prometheusClient "github.com/nginxinc/nginx-prometheus-exporter/client"
nginxCollector "github.com/nginxinc/nginx-prometheus-exporter/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"

nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
)

// NewNginxMetricsClient creates an NginxClient to fetch stats from NGINX over an unix socket
Expand All @@ -27,41 +29,43 @@ func NewNginxMetricsClient(httpClient *http.Client) *prometheusClient.NginxClien
}

// RunPrometheusListenerForNginx runs an http server to expose Prometheus metrics for NGINX
func RunPrometheusListenerForNginx(port int, client *prometheusClient.NginxClient, registry *prometheus.Registry, constLabels map[string]string, prometheusSecret *v1.Secret) {
func RunPrometheusListenerForNginx(ctx context.Context, port int, client *prometheusClient.NginxClient, registry *prometheus.Registry, constLabels map[string]string, prometheusSecret *v1.Secret) {
logger := kitlog.NewLogfmtLogger(os.Stdout)
logger = level.NewFilter(logger, level.AllowError())
registry.MustRegister(nginxCollector.NewNginxCollector(client, "nginx_ingress_nginx", constLabels, logger))
runServer(strconv.Itoa(port), registry, prometheusSecret)
runServer(ctx, strconv.Itoa(port), registry, prometheusSecret)
}

// RunPrometheusListenerForNginxPlus runs an http server to expose Prometheus metrics for NGINX Plus
func RunPrometheusListenerForNginxPlus(port int, nginxPlusCollector prometheus.Collector, registry *prometheus.Registry, prometheusSecret *v1.Secret) {
func RunPrometheusListenerForNginxPlus(ctx context.Context, port int, nginxPlusCollector prometheus.Collector, registry *prometheus.Registry, prometheusSecret *v1.Secret) {
registry.MustRegister(nginxPlusCollector)
runServer(strconv.Itoa(port), registry, prometheusSecret)
runServer(ctx, strconv.Itoa(port), registry, prometheusSecret)
}

// runServer starts the metrics server.
func runServer(port string, registry prometheus.Gatherer, prometheusSecret *v1.Secret) {
func runServer(ctx context.Context, port string, registry prometheus.Gatherer, prometheusSecret *v1.Secret) {
addr := fmt.Sprintf(":%s", port)
s, err := NewServer(addr, registry, prometheusSecret)
l := nl.LoggerFromContext(ctx)
s, err := newServer(ctx, addr, registry, prometheusSecret)
if err != nil {
glog.Fatal(err)
nl.Fatal(l, err)
}
glog.Infof("Starting prometheus listener on: %s/metrics", addr)
glog.Fatal(s.ListenAndServe())
nl.Infof(l, "Starting prometheus listener on: %s/metrics", addr)
nl.Fatal(l, s.ListenAndServe())
}

// Server holds information about NIC metrics server.
type Server struct {
Server *http.Server
URL string
Registry prometheus.Gatherer
logger *slog.Logger
}

// NewServer creates HTTP server for serving NIC metrics for Prometheus.
//
// Metrics are exposed on the `/metrics` endpoint.
func NewServer(addr string, registry prometheus.Gatherer, secret *v1.Secret) (*Server, error) {
func newServer(ctx context.Context, addr string, registry prometheus.Gatherer, secret *v1.Secret) (*Server, error) {
s := Server{
Server: &http.Server{
Addr: addr,
Expand All @@ -70,6 +74,7 @@ func NewServer(addr string, registry prometheus.Gatherer, secret *v1.Secret) (*S
},
URL: fmt.Sprintf("http://%s/", addr),
Registry: registry,
logger: nl.LoggerFromContext(ctx),
}
// Secrets are read from K8s API. If the secret for Prometheus is present
// we configure Metrics Server with the key and cert.
Expand Down Expand Up @@ -99,7 +104,7 @@ func (s *Server) Home(w http.ResponseWriter, r *http.Request) { //nolint:revive
</body>
</html>`))
if err != nil {
glog.Errorf("error while sending a response for the '/' path: %v", err)
nl.Errorf(s.logger, "error while sending a response for the '/' path: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
Expand Down
19 changes: 11 additions & 8 deletions internal/metrics/syslog_listener.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package metrics

import (
"context"
"errors"
"log/slog"
"net"

"github.com/golang/glog"

nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors"
)

Expand All @@ -21,21 +22,23 @@ type LatencyMetricsListener struct {
conn *net.UnixConn
addr string
collector collectors.LatencyCollector
logger *slog.Logger
}

// NewLatencyMetricsListener returns a LatencyMetricsListener that listens over a unix socket
// for syslog messages from nginx.
func NewLatencyMetricsListener(sockPath string, c collectors.LatencyCollector) SyslogListener {
glog.Infof("Starting latency metrics server listening on: %s", sockPath)
func NewLatencyMetricsListener(ctx context.Context, sockPath string, c collectors.LatencyCollector) SyslogListener {
l := nl.LoggerFromContext(ctx)
nl.Infof(l, "Starting latency metrics server listening on: %s", sockPath)
conn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Name: sockPath,
Net: "unixgram",
})
if err != nil {
glog.Errorf("Failed to create latency metrics listener: %v. Latency metrics will not be collected.", err)
nl.Errorf(l, "Failed to create latency metrics listener: %v. Latency metrics will not be collected.", err)
return NewSyslogFakeServer()
}
return &LatencyMetricsListener{conn: conn, addr: sockPath, collector: c}
return &LatencyMetricsListener{conn: conn, addr: sockPath, collector: c, logger: l}
}

// Run reads from the unix connection until an unrecoverable error occurs or the connection is closed.
Expand All @@ -45,7 +48,7 @@ func (l LatencyMetricsListener) Run() {
n, err := l.conn.Read(buffer)
if err != nil {
if !isErrorRecoverable(err) {
glog.Info("Stopping latency metrics listener")
nl.Info(l.logger, "Stopping latency metrics listener")
return
}
}
Expand All @@ -57,7 +60,7 @@ func (l LatencyMetricsListener) Run() {
func (l LatencyMetricsListener) Stop() {
err := l.conn.Close()
if err != nil {
glog.Errorf("error closing latency metrics unix connection: %v", err)
nl.Errorf(l.logger, "error closing latency metrics unix connection: %v", err)
}
}

Expand Down
Loading

0 comments on commit 5b06b2c

Please sign in to comment.