Skip to content

Commit

Permalink
Merge pull request #460 from mathetake/datadog-metrics-provider
Browse files Browse the repository at this point in the history
feature: add datadog metrics provider
  • Loading branch information
stefanprodan authored Feb 27, 2020
2 parents 6c398c2 + a157824 commit c17c69e
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 5 deletions.
1 change: 1 addition & 0 deletions artifacts/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions charts/flagger/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions kustomize/base/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
}

factory := providers.Factory{}
provider, err := factory.Provider(template.Spec.Provider, credentials)
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
if err != nil {
c.recordEventErrorf(canary, "Metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)
Expand Down
168 changes: 168 additions & 0 deletions pkg/metrics/providers/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package providers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

// https://docs.datadoghq.com/api/
const (
datadogDefaultHost = "https://api.datadoghq.com"

datadogMetricsQueryPath = "/api/v1/query"
datadogAPIKeyValidationPath = "/api/v1/validate"

datadogAPIKeySecretKey = "datadog_api_key"
datadogAPIKeyHeaderKey = "DD-API-KEY"

datadogApplicationKeySecretKey = "datadog_application_key"
datadogApplicationKeyHeaderKey = "DD-APPLICATION-KEY"

datadogFromDeltaMultiplierOnMetricInterval = 10
)

// DatadogProvider executes datadog queries
type DatadogProvider struct {
metricsQueryEndpoint string
apiKeyValidationEndpoint string

timeout time.Duration
apiKey string
applicationKey string
fromDelta int64
}

type datadogResponse struct {
Series []struct {
Pointlist [][]float64 `json:"pointlist"`
}
}

// NewDatadogProvider takes a canary spec, a provider spec and the credentials map, and
// returns a Datadog client ready to execute queries against the API
func NewDatadogProvider(metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte) (*DatadogProvider, error) {

address := provider.Address
if address == "" {
address = datadogDefaultHost
}

dd := DatadogProvider{
timeout: 5 * time.Second,
metricsQueryEndpoint: address + datadogMetricsQueryPath,
apiKeyValidationEndpoint: address + datadogAPIKeyValidationPath,
}

if b, ok := credentials[datadogAPIKeySecretKey]; ok {
dd.apiKey = string(b)
} else {
return nil, fmt.Errorf("datadog credentials does not contain datadog_api_key")
}

if b, ok := credentials[datadogApplicationKeySecretKey]; ok {
dd.applicationKey = string(b)
} else {
return nil, fmt.Errorf("datadog credentials does not contain datadog_application_key")
}

md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %s", err.Error())
}

dd.fromDelta = int64(datadogFromDeltaMultiplierOnMetricInterval * md.Seconds())
return &dd, nil
}

// RunQuery executes the datadog query against DatadogProvider.metricsQueryEndpoint
// and returns the the first result as float64
func (p *DatadogProvider) RunQuery(query string) (float64, error) {

req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil)
if err != nil {
return 0, fmt.Errorf("error http.NewRequest: %s", err.Error())
}

req.Header.Set(datadogAPIKeyHeaderKey, p.apiKey)
req.Header.Set(datadogApplicationKeyHeaderKey, p.applicationKey)
now := time.Now().Unix()
q := req.URL.Query()
q.Add("query", query)
q.Add("from", strconv.FormatInt(now-p.fromDelta, 10))
q.Add("to", strconv.FormatInt(now, 10))
req.URL.RawQuery = q.Encode()

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return 0, err
}

defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, fmt.Errorf("error reading body: %s", err.Error())
}

if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error response: %s", string(b))
}

var res datadogResponse
if err := json.Unmarshal(b, &res); err != nil {
return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
}

if len(res.Series) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
}

s := res.Series[0]
vs := s.Pointlist[len(s.Pointlist)-1]
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
}

return vs[1], nil
}

// IsOnline calls the Datadog's validation endpoint with api keys
// and returns an error if the validation fails
func (p *DatadogProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiKeyValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %s", err.Error())
}

req.Header.Add(datadogAPIKeyHeaderKey, p.apiKey)
req.Header.Add(datadogApplicationKeyHeaderKey, p.applicationKey)

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, err
}
defer r.Body.Close()

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %s", err.Error())
}

if r.StatusCode != http.StatusOK {
return false, fmt.Errorf("error response: %s", string(b))
}

return true, nil
}
156 changes: 156 additions & 0 deletions pkg/metrics/providers/datadog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package providers

import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

func TestNewDatadogProvider(t *testing.T) {
appKey := "app-key"
apiKey := "api-key"
cs := map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
}

mi := "100s"
md, err := time.ParseDuration(mi)
if err != nil {
t.Fatal(err)
}

dp, err := NewDatadogProvider("100s", flaggerv1.MetricTemplateProvider{}, cs)

if err != nil {
t.Fatal(err)
}

if exp := "https://api.datadoghq.com/api/v1/validate"; dp.apiKeyValidationEndpoint != exp {
t.Fatalf("apiKeyValidationEndpoint expected %s but got %s", exp, dp.apiKeyValidationEndpoint)
}

if exp := "https://api.datadoghq.com/api/v1/query"; dp.metricsQueryEndpoint != exp {
t.Fatalf("metricsQueryEndpoint expected %s but got %s", exp, dp.metricsQueryEndpoint)
}

if exp := int64(md.Seconds() * datadogFromDeltaMultiplierOnMetricInterval); dp.fromDelta != exp {
t.Fatalf("fromDelta expected %d but got %d", exp, dp.fromDelta)
}

if dp.applicationKey != appKey {
t.Fatalf("application key expected %s but got %s", appKey, dp.applicationKey)
}

if dp.apiKey != apiKey {
t.Fatalf("api key expected %s but got %s", apiKey, dp.apiKey)
}
}

func TestDatadogProvider_RunQuery(t *testing.T) {
eq := `avg:system.cpu.user\{*}by{host}`
appKey := "app-key"
apiKey := "api-key"
expected := 1.11111

now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
if aq != eq {
t.Errorf("\nquery expected %s bug got %s", eq, aq)
}

if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey {
t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs)
}
if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey {
t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs)
}

rf := r.URL.Query().Get("from")
if from, err := strconv.ParseInt(rf, 10, 64); err == nil && from >= now {
t.Errorf("\nfrom %d should be less than %d", from, now)
} else if err != nil {
t.Errorf("\nfailed to parse from: %v", err)
}

rt := r.URL.Query().Get("to")
if to, err := strconv.ParseInt(rt, 10, 64); err == nil && to < now {
t.Errorf("\nto %d should be greater than or equals %d", to, now)
} else if err != nil {
t.Errorf("\nfailed to parse to: %v", err)
}

json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
if err != nil {
t.Fatal(err)
}

f, err := dp.RunQuery(eq)
if err != nil {
t.Fatal(err)
}

if f != expected {
t.Fatalf("metric value expected %f but got %f", expected, f)
}
}

func TestDatadogProvider_IsOnline(t *testing.T) {
for _, c := range []struct {
code int
errExpected bool
}{
{code: http.StatusOK, errExpected: false},
{code: http.StatusUnauthorized, errExpected: true},
} {
t.Run(fmt.Sprintf("%d", c.code), func(t *testing.T) {
appKey := "app-key"
apiKey := "api-key"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey {
t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs)
}
if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey {
t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs)
}
w.WriteHeader(c.code)
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
if err != nil {
t.Fatal(err)
}

_, err = dp.IsOnline()
if c.errExpected && err == nil {
t.Fatal("error expected but got no error")
} else if !c.errExpected && err != nil {
t.Fatalf("no error expected but got %v", err)
}
})
}
}
16 changes: 12 additions & 4 deletions pkg/metrics/providers/factory.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package providers

import flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
import (
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

type Factory struct {
}
type Factory struct{}

func (factory Factory) Provider(
metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte,
) (Interface, error) {

func (factory Factory) Provider(provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (Interface, error) {
switch {
case provider.Type == "prometheus":
return NewPrometheusProvider(provider, credentials)
case provider.Type == "datadog":
return NewDatadogProvider(metricInterval, provider, credentials)
default:
return NewPrometheusProvider(provider, credentials)
}
Expand Down

0 comments on commit c17c69e

Please sign in to comment.