diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c0a2c78 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +debug.env +debug.sh +debugds2.env +exports.sh +env.env +router-metrics + diff --git a/.gitignore b/.gitignore index 32505e4..c0a2c78 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ debug.env debug.sh debugds2.env exports.sh +env.env +router-metrics diff --git a/Dockerfile b/Dockerfile index 45d1e2c..db84ddf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ -FROM golang:1.10 +FROM golang:1.12 RUN apt-get update RUN apt-get install -y tzdata git RUN cp /usr/share/zoneinfo/America/Denver /etc/localtime RUN mkdir -p /go/src/router-metrics -ADD router-metrics.go /go/src/router-metrics/router-metrics.go -ADD build.sh /build.sh -RUN chmod +x /build.sh -RUN /build.sh +WORKDIR /go/src/router-metrics +ADD . . +RUN chmod +x ./build.sh +RUN ./build.sh CMD ["/go/src/router-metrics/router-metrics"] diff --git a/build.sh b/build.sh index 855bdfb..a1aa032 100755 --- a/build.sh +++ b/build.sh @@ -1,7 +1,6 @@ #!/bin/sh -cd /go/src -go get "github.com/bsm/sarama-cluster" cd /go/src/router-metrics -go build router-metrics.go +export GO111MODULE=on +go build . diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cf284d1 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module router-metrics + +go 1.12 + +require github.com/Shopify/sarama v1.26.1 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..94c6f78 --- /dev/null +++ b/go.sum @@ -0,0 +1,62 @@ +github.com/Shopify/sarama v1.26.1 h1:3jnfWKD7gVwbB1KSy/lE0szA9duPuSFLViK0o/d3DgA= +github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= +github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/router-metrics.go b/router-metrics.go index 6eb3202..85b6930 100644 --- a/router-metrics.go +++ b/router-metrics.go @@ -1,24 +1,25 @@ package main import ( + "context" "fmt" - cluster "github.com/bsm/sarama-cluster" + "github.com/Shopify/sarama" "log" "net" "os" "os/signal" "strconv" "strings" + "sync" + "syscall" "time" ) var conn net.Conn var err error -var processed int var debugmode bool func main() { - processed = 0 envdebug := os.Getenv("DEBUG") if envdebug == "" { envdebug = "false" @@ -35,62 +36,102 @@ func main() { os.Exit(1) } - config := cluster.NewConfig() + version, err := sarama.ParseKafkaVersion("2.0.0") + if err != nil { + log.Panicf("Error parsing Kafka version: %v", err) + } + + config := sarama.NewConfig() + config.Version = version config.Consumer.Return.Errors = true - config.Group.Return.Notifications = true brokers := strings.Split(os.Getenv("KAFKA_BROKERS"), ",") + fmt.Println(brokers) topics := []string{"alamoweblogs"} - consumer, err := cluster.NewConsumer(brokers, os.Getenv("CONSUMER_GROUP_NAME"), topics, config) + group := os.Getenv("CONSUMER_GROUP_NAME") + consumer := Consumer{ + ready: make(chan bool), + } + ctx, cancel := context.WithCancel(context.Background()) + client, err := sarama.NewConsumerGroup(brokers, group, config) if err != nil { - fmt.Println(err) - os.Exit(1) + log.Panicf("Error creating consumer group client: %v", err) } - defer consumer.Close() - - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - signal.Notify(signals, os.Kill) - - consumed := 0 - - for { - select { - case msg, more := <-consumer.Messages(): - if more { - if debugmode { - fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) - } - sock(string(msg.Value[:])) - consumer.MarkOffset(msg, "") // mark message as processed - consumed++ - } - case err, more := <-consumer.Errors(): - if more { - log.Printf("Error: %s\n", err.Error()) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := client.Consume(ctx, topics, &consumer); err != nil { + log.Panicf("Error from consumer: %v", err) } - case ntf, more := <-consumer.Notifications(): - if more { - log.Printf("Rebalanced: %+v\n", ntf) + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return } - case <-signals: - return + consumer.ready = make(chan bool) } + }() + + <-consumer.ready // Await till the consumer has been set up + log.Println("Sarama consumer up and running!...") + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + case <-sigterm: + log.Println("terminating: via signal") + } + cancel() + wg.Wait() + if err = client.Close(); err != nil { + log.Panicf("Error closing client: %v", err) } +} + +type Consumer struct { + ready chan bool +} - log.Printf("Consumed: %d\n", consumed) - log.Printf("Processed: %d\n", processed) - conn.Close() +func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + close(consumer.ready) + return nil +} +func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil } +func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 + for message := range claim.Messages() { + if (debugmode){ + log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) + } + sock(string(message.Value[:])) + session.MarkMessage(message, "") + } + + return nil +} func sock(logline string) { words := strings.Fields(logline) var fieldmap map[string]string fieldmap = make(map[string]string) for _, element := range words { - if strings.Contains(element, "=") { - fieldmap[strings.Split(element, "=")[0]] = strings.Split(element, "=")[1] - } + if strings.Contains(element, "=") { + fieldmap[strings.Split(element, "=")[0]] = strings.Split(element, "=")[1] + } } host := fieldmap["hostname"] if !(strings.HasPrefix(host, "alamotest")) && len(words) > 9 && !strings.Contains(strings.Join(words, " "), "4813") { @@ -209,6 +250,5 @@ func sock(logline string) { } } - processed++ }