From 57b66218c7230ac29e4d984380ab0a9442ad41ea Mon Sep 17 00:00:00 2001 From: Adam Harrison Date: Fri, 14 Oct 2016 15:09:33 +0100 Subject: [PATCH] Gather metrics from ulogd pcap pipe Read and parse packets from the ulogd named pipe, and serve statistics via the Prometheus client. --- cmd/weave-npc/main.go | 26 ++++++++++++-- pkg/metrics/metrics.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 pkg/metrics/metrics.go diff --git a/cmd/weave-npc/main.go b/cmd/weave-npc/main.go index bbc3ebd..78304fd 100644 --- a/cmd/weave-npc/main.go +++ b/cmd/weave-npc/main.go @@ -6,6 +6,7 @@ import ( "syscall" log "github.com/Sirupsen/logrus" + "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" @@ -19,11 +20,15 @@ import ( "k8s.io/kubernetes/pkg/util/wait" weavenpc "github.com/weaveworks/weave-npc/pkg/controller" + "github.com/weaveworks/weave-npc/pkg/metrics" "github.com/weaveworks/weave-npc/pkg/ulogd" "github.com/weaveworks/weave-npc/pkg/util/ipset" ) -var version = "(unreleased)" +var ( + version = "(unreleased)" + metricsAddr string +) func handleError(err error) { if err != nil { @@ -99,9 +104,13 @@ func resetIPSets(ips ipset.Interface) error { return nil } -func main() { +func root(cmd *cobra.Command, args []string) { log.Infof("Starting Weaveworks NPC %s", version) + if err := metrics.Start(metricsAddr); err != nil { + log.Fatalf("Failed to start metrics: %v", err) + } + if err := ulogd.Start(); err != nil { log.Fatalf("Failed to start ulogd: %v", err) } @@ -163,3 +172,16 @@ func main() { signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) log.Fatalf("Exiting: %v", <-signals) } + +func main() { + rootCmd := &cobra.Command{ + Use: "weave-npc", + Short: "Weaveworks Kubernetes Network Policy Controller", + Run: root} + + rootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":8686", "metrics server bind address") + + if err := rootCmd.Execute(); err != nil { + log.Fatal(err) + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..9ee8639 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,78 @@ +package metrics + +import ( + log "github.com/Sirupsen/logrus" + "net/http" + "os" + "strconv" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + blockedConnections = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "weavenpc_blocked_connections_total", + Help: "Connection attempts blocked by policy controller.", + }, + []string{"protocol", "dport"}, + ) +) + +func gatherMetrics() { + pipe, err := os.Open("/var/log/ulogd.pcap") + if err != nil { + log.Fatalf("Failed to open pcap: %v", err) + } + + reader, err := pcapgo.NewReader(pipe) + if err != nil { + log.Fatalf("Failed to read pcap header: %v", err) + } + + for { + data, _, err := reader.ReadPacketData() + if err != nil { + log.Fatalf("Failed to read pcap packet: %v", err) + } + + packet := gopacket.NewPacket(data, layers.LayerTypeIPv4, gopacket.Default) + + if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { + tcp, _ := tcpLayer.(*layers.TCP) + if tcp.SYN && !tcp.ACK { // Only plain SYN constitutes a NEW TCP connection + blockedConnections.With(prometheus.Labels{"protocol": "tcp", "dport": strconv.Itoa(int(tcp.DstPort))}).Inc() + continue + } + } + + if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil { + udp, _ := udpLayer.(*layers.UDP) + blockedConnections.With(prometheus.Labels{"protocol": "udp", "dport": strconv.Itoa(int(udp.DstPort))}).Inc() + continue + } + } +} + +func Start(addr string) error { + if err := prometheus.Register(blockedConnections); err != nil { + return err + } + + http.Handle("/metrics", promhttp.Handler()) + + go func() { + log.Infof("Serving /metrics on %s", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("Failed to bind metrics server: %v", err) + } + }() + + go gatherMetrics() + + return nil +}