Skip to content

Commit

Permalink
remove gin-gonic dependency, add metrics, clean up and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver006 committed May 11, 2015
1 parent 64f9e51 commit 1c3c9ba
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 88 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ Adding a new storage provider is easy. I you're interested in adding a new stora

- some sort of tag management endpoint would be nice to support deleting of tags
- more storage providers
- make the annotation server export its own set of metrics
- ...


Expand Down
151 changes: 106 additions & 45 deletions prom_annotation_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,21 @@ package main
*/

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/prometheus/client_golang/prometheus"
)

const VERSION = "0.2"
const VERSION = "0.3"

var (
/*
Expand All @@ -28,71 +29,136 @@ var (
currently only "local" is available for storage type
for rethinkdb use this format: rethinkdb:<HOST:PORT>/<DBNAME>
*/
storageConfig = flag.String("storage", "local:/tmp/annotations.db", "Storage config, format is \"type:options\". \"local\" is currently the only supported type with options being the location of the DB file.")
listenAddress = flag.String("listen-addr", ":9119", "Address to listen on for web interface")
endpoint = flag.String("endpoint", "/annotations", "Path under which to expose the annotation server")
showVersion = flag.Bool("version", false, "Show version information")
storageConfig = flag.String("storage", "local:/tmp/annotations.db", "Storage config, format is \"type:options\". \"local\" and \"rethinkdb\" are currently the only supported types.")
listenAddress = flag.String("listen-addr", ":9119", "Address to listen on for web interface")
annoEndpoint = flag.String("endpoint", "/annotations", "Path under which to expose the annotation server")
metricsEndpoint = flag.String("metris", "/metrics", "Path under which to expose the metrics of the annotation server")
showVersion = flag.Bool("version", false, "Show version information")
)

type ServerContext struct {
storage Storage
router http.Handler
storage Storage
annotationStats *prometheus.GaugeVec
}

func (s *ServerContext) put(c *gin.Context) {
func newAnnotationStats() *prometheus.GaugeVec {

return prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "annotations_total",
Help: "Number of annotations per tag.",
}, []string{"tag"})
}

func NewServerContext(storage string) (*ServerContext, error) {

st, err := NewStorage(storage)
if err != nil {
return nil, err
}
srvr := ServerContext{
storage: st,
annotationStats: newAnnotationStats(),
}
prometheus.MustRegister(&srvr)
return &srvr, nil
}

func (s *ServerContext) ServeHTTP(w http.ResponseWriter, req *http.Request) {

log.Printf("Request: %s %s", req.Method, req.URL.Path)

switch req.URL.Path {
case *metricsEndpoint:
prometheus.Handler().ServeHTTP(w, req)
case *annoEndpoint:
prometheus.InstrumentHandlerFunc("annotations", s.annotations)(w, req)
default:
http.Error(w, "Not found", 404)
}
}

func (s *ServerContext) Describe(ch chan<- *prometheus.Desc) {
s.annotationStats.Describe(ch)
}

func (s *ServerContext) Collect(ch chan<- prometheus.Metric) {
s.annotationStats = newAnnotationStats()
defer s.annotationStats.Collect(ch)

stats, err := s.storage.TagStats()
if err != nil {
log.Printf("stats err: %s")
return
}

for tag, count := range stats {
s.annotationStats.WithLabelValues(tag).Set(float64(count))
}
}

func (s *ServerContext) writeJSON(w http.ResponseWriter, code int, data interface{}) {
w.WriteHeader(code)
w.Header().Set("Content-Type", "application/json")
temp, _ := json.Marshal(data)
fmt.Fprintln(w, string(temp))
}

func (s *ServerContext) annotations(w http.ResponseWriter, req *http.Request) {

switch req.Method {

case "GET":
s.get(w, req)

case "PUT":
s.put(w, req)

default:
http.Error(w, "Not supported", 405)
}
}

func (s *ServerContext) put(w http.ResponseWriter, req *http.Request) {

defer req.Body.Close()
body, _ := ioutil.ReadAll(req.Body)
var a Annotation
if ok := c.BindWith(&a, binding.JSON); ok && a.Message != "" && len(a.Tags) > 0 {
if err := json.Unmarshal(body, &a); err == nil {
if a.CreatedAt == 0 {
a.CreatedAt = int(time.Now().Unix())
}

if err := s.storage.Add(a); err == nil {
c.JSON(200, map[string]string{"result": "ok"})
s.writeJSON(w, 200, map[string]string{"result": "ok"})
return
}
}

log.Printf("unmarshal annotion error or mad bad data")
c.JSON(200, map[string]string{"result": "invalid_json"})
return
log.Printf("unmarshal annotion error or mad bad data: %s", body)
s.writeJSON(w, 500, map[string]string{"result": "invalid_json"})
}

func (s *ServerContext) get(c *gin.Context) {
c.Request.ParseForm()
func (s *ServerContext) get(w http.ResponseWriter, req *http.Request) {
req.ParseForm()

r, err := strconv.Atoi(c.Request.Form.Get("range"))
r, err := strconv.Atoi(req.Form.Get("range"))
if err != nil || r == 0 {
r = 3600
}

until, err := strconv.Atoi(c.Request.Form.Get("until"))
until, err := strconv.Atoi(req.Form.Get("until"))
if err != nil || until == 0 {
until = int(time.Now().Unix())
}
tags, _ := c.Request.Form["tags[]"]
tags, _ := req.Form["tags[]"]

list, err := s.storage.Posts(tags, r, until)
if err != nil {
c.JSON(200, map[string]string{"result": fmt.Sprintf("err: %s", err)})
s.writeJSON(w, 500, map[string]string{"result": fmt.Sprintf("err: %s", err)})
return
}

c.JSON(200, list)
}

func NewServerContext(storage string) (*ServerContext, error) {

st, err := NewStorage(storage)
if err != nil {
return nil, err
}
srvr := ServerContext{storage: st}

r := gin.Default()
r.GET(*endpoint, srvr.get)
r.PUT(*endpoint, srvr.put)
srvr.router = r
return &srvr, nil
s.writeJSON(w, 200, list)
}

func main() {
Expand All @@ -108,15 +174,10 @@ func main() {
}
defer ctx.storage.Close()

s := &http.Server{
Addr: *listenAddress,
Handler: ctx.router,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
MaxHeaderBytes: 1 << 20,
}
http.Handle("/", ctx)

log.Printf("Running server listening at %s, ", *listenAddress)
go s.ListenAndServe()
go http.ListenAndServe(*listenAddress, nil)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
Expand Down
Loading

0 comments on commit 1c3c9ba

Please sign in to comment.