Skip to content
This repository has been archived by the owner on Mar 26, 2018. It is now read-only.

Commit

Permalink
Gather metrics from ulogd pcap pipe
Browse files Browse the repository at this point in the history
Read and parse packets from the ulogd named pipe, and serve statistics
via the Prometheus client.
  • Loading branch information
awh committed Oct 19, 2016
1 parent fef0aac commit 7b89ee4
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 2 deletions.
26 changes: 24 additions & 2 deletions cmd/weave-npc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"syscall"

log "github.com/Sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
Expand All @@ -19,11 +20,15 @@ import (
"k8s.io/kubernetes/pkg/util/wait"

weavenpc "github.com/weaveworks/weave-npc/pkg/controller"
"github.com/weaveworks/weave-npc/pkg/metrics"
"github.com/weaveworks/weave-npc/pkg/util/ipset"
"github.com/weaveworks/weave-npc/pkg/util/ulogd"
)

var version = "(unreleased)"
var (
version = "(unreleased)"
metricsAddr string
)

func handleError(err error) {
if err != nil {
Expand Down Expand Up @@ -99,9 +104,13 @@ func resetIPSets(ips ipset.Interface) error {
return nil
}

func main() {
func root(cmd *cobra.Command, args []string) {
log.Infof("Starting Weaveworks NPC %s", version)

if err := metrics.Start(metricsAddr); err != nil {
log.Fatal(err)
}

if err := ulogd.Start(); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -163,3 +172,16 @@ func main() {
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
log.Fatalf("Exiting: %v", <-signals)
}

var RootCmd = &cobra.Command{
Use: "weave-npc",
Short: "Weaveworks Kubernetes Network Policy Controller",
Run: root,
}

func main() {
RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":8686", "metrics server bind address")
if err := RootCmd.Execute(); err != nil {
log.Fatal(err)
}
}
78 changes: 78 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package metrics

import (
log "github.com/Sirupsen/logrus"
"net/http"
"os"
"strconv"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
blockedConnections = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "weavenpc_blocked_connections_total",
Help: "Connection attempts blocked by policy controller.",
},
[]string{"protocol", "dport"},
)
)

func gatherMetrics() {
pipe, err := os.Open("/var/log/ulogd.pcap")
if err != nil {
log.Fatal(err)
}

reader, err := pcapgo.NewReader(pipe)
if err != nil {
log.Fatal(err)
}

for {
data, _, err := reader.ReadPacketData()
if err != nil {
log.Fatal(err)
}

packet := gopacket.NewPacket(data, layers.LayerTypeIPv4, gopacket.Default)

if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, _ := tcpLayer.(*layers.TCP)
if tcp.SYN && !tcp.ACK { // Only plain SYN constitutes a NEW TCP connection
blockedConnections.With(prometheus.Labels{"protocol": "tcp", "dport": strconv.Itoa(int(tcp.DstPort))}).Inc()
continue
}
}

if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
blockedConnections.With(prometheus.Labels{"protocol": "udp", "dport": strconv.Itoa(int(udp.DstPort))}).Inc()
continue
}
}
}

func Start(addr string) error {
if err := prometheus.Register(blockedConnections); err != nil {
return err
}

http.Handle("/metrics", promhttp.Handler())

go func() {
log.Infof("Serving /metrics on %s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal(err)
}
}()

go gatherMetrics()

return nil
}

0 comments on commit 7b89ee4

Please sign in to comment.