Skip to content

Commit

Permalink
Merge pull request #297 from prymitive/stream
Browse files Browse the repository at this point in the history
feat: stream JSON responses when talking to alertmanager
  • Loading branch information
prymitive authored Dec 14, 2021
2 parents d27384d + 6e6e368 commit 7c46c47
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 491 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,5 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Install GolangCI
run: go install -modfile=tools/golangci-lint/go.mod github.com/golangci/golangci-lint/cmd/golangci-lint

- name: Lint Go code
run: golangci-lint run
run: make lint
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
NAME := kthxbye

GOBIN := $(shell go env GOBIN)
ifeq ($(GOBIN),)
GOBIN = $(shell go env GOPATH)/bin
endif

word-split = $(word $2,$(subst -, ,$1))
cc-%: go.mod go.sum cmd/kthxbye/*.go
$(eval GOOS := $(call word-split,$*,1))
Expand Down Expand Up @@ -28,3 +33,9 @@ $(NAME): go.mod go.sum cmd/kthxbye/*.go
.PHONY: tools-go-mod-tidy
tools-go-mod-tidy:
@for f in $(wildcard tools/*/go.mod) ; do echo ">>> $$f" && cd $(CURDIR)/`dirname "$$f"` && go mod tidy && cd $(CURDIR) ; done

$(GOBIN)/golangci-lint: tools/golangci-lint/go.mod tools/golangci-lint/go.sum
go install -modfile=tools/golangci-lint/go.mod github.com/golangci/golangci-lint/cmd/golangci-lint
.PHONY: lint
lint: $(GOBIN)/golangci-lint
$(ENV) golangci-lint run -v --timeout 5m -E staticcheck,misspell,promlinter,revive,tenv,errorlint,exportloopref,predeclared
37 changes: 17 additions & 20 deletions cmd/kthxbye/ack.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package main

import (
"context"
"strings"
"time"

"github.com/go-openapi/strfmt"
"github.com/rs/zerolog/log"
)

func extendACKs(cfg *ackConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

silences, err := querySilences(ctx, cfg)
func extendACKs(cfg ackConfig) error {
silences, err := querySilences(cfg)
if err != nil {
return err
}

alerts, err := queryAlerts(ctx, cfg)
alerts, err := queryAlerts(cfg)
if err != nil {
return err
}
Expand All @@ -29,41 +24,43 @@ func extendACKs(cfg *ackConfig) error {

silencesExpiring := 0
for _, sil := range silences {
if !strings.HasPrefix(*sil.Comment, cfg.extendWithPrefix) {
if !strings.HasPrefix(sil.Comment, cfg.extendWithPrefix) {
continue
}

usedBy := 0
for _, alert := range alerts {
for _, silenceID := range alert.Status.SilencedBy {
if silenceID == *sil.ID {
if silenceID == sil.ID {
usedBy++
}
}
}
if usedBy > 0 {
if time.Time(*sil.EndsAt).Before(extendIfBefore) {
duration := time.Time(*sil.EndsAt).Sub(time.Time(*sil.StartsAt))
if sil.EndsAt.Before(extendIfBefore) {
duration := time.Time(sil.EndsAt).Sub(time.Time(sil.StartsAt))
if cfg.maxDuration > 0 && duration > cfg.maxDuration {
log.Info().
Str("id", *sil.ID).
Str("id", sil.ID).
Strs("matchers", silenceMatchersToLogField(sil)).
Str("maxDuration", cfg.maxDuration.String()).
Msgf("Silence is used by %d alert(s) but it already reached the maximum duration, letting it expire", usedBy)
} else {
log.Info().
Str("id", *sil.ID).
Str("id", sil.ID).
Strs("matchers", silenceMatchersToLogField(sil)).
Msgf("Silence expires in %s and matches %d alert(s), extending it by %s",
time.Time(*sil.EndsAt).Sub(time.Now().UTC()), usedBy, cfg.extendBy)
endsAt := strfmt.DateTime(time.Now().UTC().Add(cfg.extendBy))
sil.EndsAt = &endsAt
updateSilence(ctx, cfg, sil)
sil.EndsAt.Sub(time.Now().UTC()), usedBy, cfg.extendBy)
sil.EndsAt = time.Now().UTC().Add(cfg.extendBy)
err = updateSilence(cfg, sil)
if err != nil {
log.Error().Err(err).Msg("Silence update failed")
}
}
}
} else {
log.Info().
Str("id", *sil.ID).
Str("id", sil.ID).
Strs("matchers", silenceMatchersToLogField(sil)).
Msg("Silence is not used by any alert, letting it expire")
silencesExpiring++
Expand All @@ -74,7 +71,7 @@ func extendACKs(cfg *ackConfig) error {
return nil
}

func ackLoop(cfg *ackConfig) {
func ackLoop(cfg ackConfig) {
metricsCycleStatus.Set(1)
for {
err := extendACKs(cfg)
Expand Down
62 changes: 44 additions & 18 deletions cmd/kthxbye/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,63 @@ package main

import (
"context"

"github.com/prometheus/alertmanager/api/v2/client/alert"
"github.com/prometheus/alertmanager/api/v2/models"
"encoding/json"
"fmt"
"net/http"
)

func queryAlerts(ctx context.Context, cfg *ackConfig) ([]*models.GettableAlert, error) {
type Alert struct {
Status struct {
SilencedBy []interface{} `json:"silencedBy"`
} `json:"status"`
}

func queryAlerts(cfg ackConfig) (alerts []Alert, err error) {
uri := fmt.Sprintf(
"%s?silenced=true&inhibited=false&active=false&unprocessed=false",
joinURI(cfg.alertmanagerURI, "api/v2/alerts"))

alerts := []*models.GettableAlert{}
ctx, cancel := context.WithTimeout(context.Background(), cfg.alertmanagerTimeout)
defer cancel()

withUnprocessed := false
withActive := false
withInhibited := false
withSilenced := true
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return nil, err
}

alertParams := alert.NewGetAlertsParams().WithContext(ctx).
WithUnprocessed(&withUnprocessed).
WithActive(&withActive).
WithInhibited(&withInhibited).
WithSilenced(&withSilenced)
client := newAMClient(cfg.alertmanagerURI)

amclient := newAMClient(cfg.alertmanagerURI)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

getOk, err := amclient.Alert.GetAlerts(alertParams)
dec := json.NewDecoder(resp.Body)

tok, err := dec.Token()
if err != nil {
return alerts, err
return nil, err
}
if tok != json.Delim('[') {
return nil, fmt.Errorf("invalid JSON token, expected [, got %s", tok)
}

for _, alert := range getOk.Payload {
var alert Alert
for dec.More() {
if err = dec.Decode(&alert); err != nil {
return nil, err
}
alerts = append(alerts, alert)
}

tok, err = dec.Token()
if err != nil {
return nil, err
}
if tok != json.Delim(']') {
return nil, fmt.Errorf("invalid JSON token, expected ], got %s", tok)
}

return alerts, nil
}
26 changes: 14 additions & 12 deletions cmd/kthxbye/am.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package main

import (
"fmt"
"net/http"
"net/url"
"path"

httptransport "github.com/go-openapi/runtime/client"

"github.com/prometheus/alertmanager/api/v2/client"
"strings"
)

func setAuth(inner http.RoundTripper, username string, password string) http.RoundTripper {
Expand All @@ -29,17 +26,22 @@ func (art *authRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
return art.inner.RoundTrip(r)
}

func newAMClient(uri string) *client.Alertmanager {
u, _ := url.Parse(uri)

transport := httptransport.New(u.Host, path.Join(u.Path, "/api/v2"), []string{u.Scheme})
func newAMClient(uri string) http.Client {
client := http.Client{}

u, _ := url.Parse(uri)
if u.User.Username() != "" {
username := u.User.Username()
password, _ := u.User.Password()
transport.Transport = setAuth(transport.Transport, username, password)
client.Transport = setAuth(client.Transport, username, password)
}

c := client.New(transport, nil)
return c
return client
}

func joinURI(base, path string) string {
if strings.HasSuffix(base, "/") {
return fmt.Sprintf("%s%s", base, path)
}
return fmt.Sprintf("%s/%s", base, path)
}
7 changes: 3 additions & 4 deletions cmd/kthxbye/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"

"github.com/prometheus/alertmanager/api/v2/models"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand All @@ -28,13 +27,13 @@ func setupLogger() {
})
}

func silenceMatchersToLogField(s *models.GettableSilence) (matchers []string) {
func silenceMatchersToLogField(s Silence) (matchers []string) {
for _, sm := range s.Matchers {
op := "="
if *sm.IsRegex {
if sm.IsRegex {
op = "=~"
}
matchers = append(matchers, fmt.Sprintf("%s%s%s", *sm.Name, op, *sm.Value))
matchers = append(matchers, fmt.Sprintf("%s%s%s", sm.Name, op, sm.Value))
}
return matchers
}
20 changes: 11 additions & 9 deletions cmd/kthxbye/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ var (
)

type ackConfig struct {
alertmanagerURI string
loopInterval time.Duration
extendIfExpiringIn time.Duration
extendBy time.Duration
extendWithPrefix string
maxDuration time.Duration
logJSON bool
alertmanagerURI string
alertmanagerTimeout time.Duration
loopInterval time.Duration
extendIfExpiringIn time.Duration
extendBy time.Duration
extendWithPrefix string
maxDuration time.Duration
logJSON bool
}

func main() {
addr := flag.String("listen", ":8080", "The address to listen on for HTTP requests.")

cfg := ackConfig{}
flag.StringVar(&cfg.alertmanagerURI, "alertmanager.uri", "http://localhost:9093", "Alertmanager URI to use")
flag.DurationVar(&cfg.alertmanagerTimeout, "alertmanager.timeout", time.Minute, "Alertmanager request timeout")
flag.DurationVar(&cfg.loopInterval, "interval", time.Duration(time.Second*45), "Silence check interval")
flag.DurationVar(&cfg.extendIfExpiringIn, "extend-if-expiring-in", time.Duration(time.Minute*5), "Extend silences that are about to expire in the next DURATION seconds")
flag.DurationVar(&cfg.extendBy, "extend-by", time.Duration(time.Minute*15), "Extend silences by adding DURATION seconds")
Expand All @@ -66,11 +68,11 @@ func main() {
setupLogger()
}

if cfg.extendBy.Seconds() < cfg.extendIfExpiringIn.Seconds() {
if cfg.extendBy.Seconds() <= cfg.extendIfExpiringIn.Seconds() {
log.Fatal().Msg("-extend-by value must be greater than -extend-if-expiring-in")
}

go ackLoop(&cfg)
go ackLoop(cfg)

http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/", index)
Expand Down
Loading

0 comments on commit 7c46c47

Please sign in to comment.