Skip to content

Commit

Permalink
feat: Allow switching between Datadog v1 and v2. Fixes #2549 (#2592)
Browse files Browse the repository at this point in the history
* Update Datadog queries to use V2 instead of V1

Signed-off-by: Daniel Del Rio <[email protected]>

* Update Datadog unit tests to use V2

Signed-off-by: Daniel Del Rio <[email protected]>

* Add ApiVersion field to Datadog integration to allow toggling versions

Signed-off-by: Daniel Del Rio <[email protected]>

* Update codegen files

Signed-off-by: Daniel Del Rio <[email protected]>

* Add apiVersion to openapi generated

Signed-off-by: Daniel Del Rio <[email protected]>

* Add back correct generated files

Signed-off-by: Daniel Del Rio <[email protected]>

* Add test for default ApiVersion value

Signed-off-by: Daniel Del Rio <[email protected]>

* Check for error in parsing new URL with apiVersion

Signed-off-by: Daniel Del Rio <[email protected]>

* Check for error in parsing JSON request

Signed-off-by: Daniel Del Rio <[email protected]>

---------

Signed-off-by: Daniel Del Rio <[email protected]>
  • Loading branch information
daniddelrio authored Mar 3, 2023
1 parent 96a1b67 commit 182069e
Show file tree
Hide file tree
Showing 13 changed files with 1,018 additions and 443 deletions.
8 changes: 8 additions & 0 deletions docs/analysis/datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ spec:
failureLimit: 3
provider:
datadog:
apiVersion: v2
interval: 5m
query: |
sum:requests.error.count{service:{{args.service-name}}} /
sum:requests.request.count{service:{{args.service-name}}}
```
The field `apiVersion` refers to the API version of Datadog (v1 or v2). Default value is `v1` if this is omitted.

!!! note
Datadog is moving away from the legacy v1 API. Rate limits imposed by Datadog are therefore stricter when using v1. It is recommended to switch to v2 soon. If you switch to v2, you will not need to change any other field aside from `apiVersion`.

Datadog api and app tokens can be configured in a kubernetes secret in argo-rollouts namespace.

```yaml
Expand All @@ -39,3 +45,5 @@ data:
api-key: <datadog-api-key>
app-key: <datadog-app-key>
```

`apiVersion` here is different from the `apiVersion` from the Datadog configuration above.
9 changes: 9 additions & 0 deletions docs/features/kustomize/rollout_cr_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down Expand Up @@ -4529,6 +4532,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down Expand Up @@ -8815,6 +8821,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/analysis-run-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/analysis-template-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/cluster-analysis-template-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
6 changes: 6 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down Expand Up @@ -3068,6 +3070,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down Expand Up @@ -5843,6 +5847,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
158 changes: 146 additions & 12 deletions metricproviders/datadog/datadog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -9,6 +10,7 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand All @@ -33,6 +35,7 @@ const (
DatadogApiKey = "api-key"
DatadogAppKey = "app-key"
DatadogAddress = "address"
DefaultApiVersion = "v1"
)

// Provider contains all the required components to run a Datadog query
Expand All @@ -42,12 +45,33 @@ type Provider struct {
config datadogConfig
}

type datadogResponse struct {
type datadogQueryAttributes struct {
From int64 `json:"from"`
To int64 `json:"to"`
Queries []map[string]string `json:"queries"`
}

type datadogQuery struct {
Attributes datadogQueryAttributes `json:"attributes"`
QueryType string `json:"type"`
}

type datadogResponseV1 struct {
Series []struct {
Pointlist [][]float64 `json:"pointlist"`
}
}

type datadogResponseV2 struct {
Data struct {
Attributes struct {
Values [][]float64
Times []int64
}
Errors string
}
}

type datadogConfig struct {
Address string `yaml:"address,omitempty"`
ApiKey string `yaml:"api-key,omitempty"`
Expand All @@ -72,16 +96,37 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
StartedAt: &startTime,
}

endpoint := "https://api.datadoghq.com/api/v1/query"
endpoint := "https://api.datadoghq.com"
if p.config.Address != "" {
endpoint = p.config.Address + "/api/v1/query"
endpoint = p.config.Address
}

// Check if the URL is valid first before adding the endpoint
url, err := url.Parse(endpoint)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

apiVersion := DefaultApiVersion
if metric.Provider.Datadog.ApiVersion != "" {
apiVersion = metric.Provider.Datadog.ApiVersion
}

if apiVersion == "v1" {
p.logCtx.Warn("Datadog will soon deprecate their API v1. Please consider switching to v2 soon.")
}

route := "/api/v1/query"
if apiVersion == "v2" {
route = "/api/v2/query/timeseries"
}

// Add endpoint after getting the API version
url, err = url.Parse(endpoint + route)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

now := unixNow()
var interval int64 = 300
if metric.Provider.Datadog.Interval != "" {
Expand All @@ -93,13 +138,11 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
interval = int64(expDuration.Seconds())
}

q := url.Query()
q.Set("query", metric.Provider.Datadog.Query)
q.Set("from", strconv.FormatInt(now-interval, 10))
q.Set("to", strconv.FormatInt(now, 10))
url.RawQuery = q.Encode()
request, err := p.createRequest(metric.Provider.Datadog.Query, apiVersion, now, interval, url)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

request := &http.Request{Method: "GET"}
request.URL = url
request.Header = make(http.Header)
request.Header.Set("Content-Type", "application/json")
Expand All @@ -116,7 +159,7 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
return metricutil.MarkMeasurementError(measurement, err)
}

value, status, err := p.parseResponse(metric, response)
value, status, err := p.parseResponse(metric, response, apiVersion)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
Expand All @@ -129,7 +172,48 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
return measurement
}

func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {
func (p *Provider) createRequest(query string, apiVersion string, now int64, interval int64, url *url.URL) (*http.Request, error) {
if apiVersion == "v1" {
q := url.Query()
q.Set("query", query)
q.Set("from", strconv.FormatInt(now-interval, 10))
q.Set("to", strconv.FormatInt(now, 10))
url.RawQuery = q.Encode()

return &http.Request{Method: "GET"}, nil
} else if apiVersion == "v2" {
queryBody, err := json.Marshal(datadogQuery{
QueryType: "timeseries_request",
Attributes: datadogQueryAttributes{
From: now - interval,
To: now,
Queries: []map[string]string{{
"data_source": "metrics",
"query": query,
}},
},
})
if err != nil {
return nil, fmt.Errorf("Could not parse your JSON request: %v", err)
}
request := &http.Request{Method: "POST"}
request.Body = io.NopCloser(bytes.NewReader(queryBody))
return request, nil
}

return nil, fmt.Errorf("Invalid API version: %s", apiVersion)
}

func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response, apiVersion string) (string, v1alpha1.AnalysisPhase, error) {
if apiVersion == "v1" {
return p.parseResponseV1(metric, response)
} else if apiVersion == "v2" {
return p.parseResponseV2(metric, response)
}
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Invalid API version: %s", apiVersion)
}

func (p *Provider) parseResponseV1(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {

bodyBytes, err := io.ReadAll(response.Body)

Expand All @@ -143,7 +227,7 @@ func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received non 2xx response code: %v %s", response.StatusCode, string(bodyBytes))
}

var res datadogResponse
var res datadogResponseV1
err = json.Unmarshal(bodyBytes, &res)
if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Could not parse JSON body: %v", err)
Expand Down Expand Up @@ -173,6 +257,56 @@ func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response
return strconv.FormatFloat(value, 'f', -1, 64), status, err
}

func (p *Provider) parseResponseV2(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {

bodyBytes, err := io.ReadAll(response.Body)

if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Received no bytes in response: %v", err)
}

if response.StatusCode == http.StatusForbidden || response.StatusCode == http.StatusUnauthorized {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received authentication error response code: %v %s", response.StatusCode, string(bodyBytes))
} else if response.StatusCode != http.StatusOK {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received non 2xx response code: %v %s", response.StatusCode, string(bodyBytes))
}

var res datadogResponseV2
err = json.Unmarshal(bodyBytes, &res)
if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Could not parse JSON body: %v", err)
}

// Handle an error returned by Datadog
if res.Data.Errors != "" {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("There were errors in your query: %v", res.Data.Errors)
}

// Handle an empty query result
if reflect.ValueOf(res.Data.Attributes).IsZero() || len(res.Data.Attributes.Values) == 0 || len(res.Data.Attributes.Times) == 0 {
var nilFloat64 *float64
status, err := evaluate.EvaluateResult(nilFloat64, metric, p.logCtx)
attributesBytes, jsonErr := json.Marshal(res.Data.Attributes)
if jsonErr != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Failed to marshall JSON empty series: %v", jsonErr)
}

return string(attributesBytes), status, err
}

// Handle a populated query result
attributes := res.Data.Attributes
datapoint := attributes.Values[0]
timepoint := attributes.Times[len(attributes.Times)-1]
if timepoint == 0 {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Datapoint does not have a corresponding time value")
}

value := datapoint[len(datapoint)-1]
status, err := evaluate.EvaluateResult(value, metric, p.logCtx)
return strconv.FormatFloat(value, 'f', -1, 64), status, err
}

// Resume should not be used the Datadog provider since all the work should occur in the Run method
func (p *Provider) Resume(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
p.logCtx.Warn("Datadog provider should not execute the Resume method")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestRunSuite(t *testing.T) {
},
expectedIntervalSeconds: 300,
expectedPhase: v1alpha1.AnalysisPhaseError,
expectedErrorMessage: "Could not parse JSON body: json: cannot unmarshal string into Go struct field datadogResponse.Series of type []struct { Pointlist [][]float64 \"json:\\\"pointlist\\\"\" }",
expectedErrorMessage: "Could not parse JSON body: json: cannot unmarshal string into Go struct field datadogResponseV1.Series of type []struct { Pointlist [][]float64 \"json:\\\"pointlist\\\"\" }",
useEnvVarForKeys: false,
},

Expand All @@ -222,7 +222,7 @@ func TestRunSuite(t *testing.T) {
serverURL: "://wrong.schema",
metric: v1alpha1.Metric{},
expectedPhase: v1alpha1.AnalysisPhaseError,
expectedErrorMessage: "parse \"://wrong.schema/api/v1/query\": missing protocol scheme",
expectedErrorMessage: "parse \"://wrong.schema\": missing protocol scheme",
useEnvVarForKeys: false,
},
}
Expand All @@ -235,6 +235,9 @@ func TestRunSuite(t *testing.T) {
if serverURL == "" {
// Server setup with response
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if test.metric.Provider.Datadog.ApiVersion == "" && DefaultApiVersion != "v1" {
t.Errorf("\nApiVersion was left blank in the tests, but the default API version is not v1 anymore.")
}

//Check query variables
actualQuery := req.URL.Query().Get("query")
Expand Down
Loading

0 comments on commit 182069e

Please sign in to comment.