diff --git a/pkg/metrics/providers/chronosphere.go b/pkg/metrics/providers/chronosphere.go new file mode 100644 index 000000000..24124d5ba --- /dev/null +++ b/pkg/metrics/providers/chronosphere.go @@ -0,0 +1,189 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "net/url" + "path" + "regexp" + "strconv" + "time" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) + +const chronosphereOnlineQuery = "vector(1)" + +// ChronosphereProvider executes promQL queries +type ChronosphereProvider struct { + timeout time.Duration + url url.URL + username string + password string + token string + client *http.Client +} + +type chronosphereResponse struct { + Data struct { + Result []struct { + Metric struct { + Name string `json:"name"` + } + Value []interface{} `json:"value"` + Values []interface{} `json:"values"` + } + } +} + +// NewChronosphereProvider takes a provider spec and the credentials map, +// validates the address, extracts the bearer token or username and password values if provided and +// returns a Chronosphere client ready to execute queries against the API +func NewChronosphereProvider(provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (*ChronosphereProvider, error) { + promURL, err := url.Parse(provider.Address) + if provider.Address == "" || err != nil { + return nil, fmt.Errorf("%s address %s is not a valid URL", provider.Type, provider.Address) + } + + prom := ChronosphereProvider{ + timeout: 5 * time.Second, + url: *promURL, + client: http.DefaultClient, + } + + if provider.InsecureSkipVerify { + t := http.DefaultTransport.(*http.Transport).Clone() + t.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + prom.client = &http.Client{Transport: t} + } + + if provider.SecretRef != nil { + if token, ok := credentials["token"]; ok { + prom.token = string(token) + } else { + + if username, ok := credentials["username"]; ok { + prom.username = string(username) + } else { + return nil, fmt.Errorf("%s credentials does not contain a username", provider.Type) + } + + if password, ok := credentials["password"]; ok { + prom.password = string(password) + } else { + return nil, fmt.Errorf("%s credentials does not contain a password", provider.Type) + } + } + } + + return &prom, nil +} + +// RunQuery executes the promQL query and returns the the first result as float64 +func (p *ChronosphereProvider) RunQuery(query string) (float64, error) { + query = url.QueryEscape(p.trimQuery(query)) + u, err := url.Parse(fmt.Sprintf("./data/metrics/api/v1/query?query=%s", query)) + if err != nil { + return 0, fmt.Errorf("url.Parse failed: %w", err) + } + u.Path = path.Join(p.url.Path, u.Path) + + u = p.url.ResolveReference(u) + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return 0, fmt.Errorf("http.NewRequest failed: %w", err) + } + + if p.token != "" { + req.Header.Add("Authorization", "Bearer "+p.token) + } else if p.username != "" && p.password != "" { + req.SetBasicAuth(p.username, p.password) + } + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + + r, err := p.client.Do(req.WithContext(ctx)) + if err != nil { + return 0, fmt.Errorf("request failed: %w", err) + } + defer r.Body.Close() + + b, err := io.ReadAll(r.Body) + if err != nil { + return 0, fmt.Errorf("error reading body: %w", err) + } + + if 400 <= r.StatusCode { + return 0, fmt.Errorf("error response: %s", string(b)) + } + + var result chronosphereResponse + err = json.Unmarshal(b, &result) + if err != nil { + return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b)) + } + + var value *float64 + for _, v := range result.Data.Result { + if v.Values != nil { + return 0, fmt.Errorf("%w", ErrMultipleValuesReturned) + } + metricValue := v.Value[1] + switch metricValue.(type) { + case string: + f, err := strconv.ParseFloat(metricValue.(string), 64) + if err != nil { + return 0, err + } + value = &f + } + } + if value == nil || math.IsNaN(*value) { + return 0, fmt.Errorf("%w", ErrNoValuesFound) + } + + return *value, nil +} + +// IsOnline run simple Chronosphere query and returns an error if the API is unreachable +func (p *ChronosphereProvider) IsOnline() (bool, error) { + value, err := p.RunQuery(chronosphereOnlineQuery) + if err != nil { + return false, fmt.Errorf("running query failed: %w", err) + } + + if value != float64(1) { + return false, fmt.Errorf("value is not 1 for query: %s", chronosphereOnlineQuery) + } + + return true, nil +} + +// trimQuery takes a promql query and removes whitespace +func (p *ChronosphereProvider) trimQuery(query string) string { + space := regexp.MustCompile(`\s+`) + return space.ReplaceAllString(query, " ") +} diff --git a/pkg/metrics/providers/chronosphere_test.go b/pkg/metrics/providers/chronosphere_test.go new file mode 100644 index 000000000..b14773080 --- /dev/null +++ b/pkg/metrics/providers/chronosphere_test.go @@ -0,0 +1,309 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned" + fakeFlagger "github.com/fluxcd/flagger/pkg/client/clientset/versioned/fake" +) + +type fakeChronosphereClients struct { + kubeClient kubernetes.Interface + flaggerClient clientset.Interface +} + +func chronosphereFake() fakeChronosphereClients { + provider := flaggerv1.MetricTemplateProvider{ + Type: "chronosphere", + Address: "https://fake.chronosphere.io:443", + SecretRef: &corev1.LocalObjectReference{Name: "chronosphere"}, + } + + template := &flaggerv1.MetricTemplate{ + TypeMeta: metav1.TypeMeta{APIVersion: flaggerv1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "chronosphere", + }, + Spec: flaggerv1.MetricTemplateSpec{ + Provider: provider, + Query: "sum(envoy_cluster_upstream_rq)", + }, + } + + flaggerClient := fakeFlagger.NewSimpleClientset(template) + + secret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "chronosphere", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte("username"), + "password": []byte("password"), + }, + } + + bearerTokenSecret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "chronosphere-bearer", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "token": []byte("bearer_token"), + }, + } + + kubeClient := fake.NewSimpleClientset(secret, bearerTokenSecret) + + return fakeChronosphereClients{ + kubeClient: kubeClient, + flaggerClient: flaggerClient, + } +} + +func TestNewChronosphereProvider(t *testing.T) { + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + assert.Equal(t, "https://fake.chronosphere.io:443", prom.url.String()) + assert.Equal(t, "password", prom.password) +} + +func TestChronosphereProvider_RunQueryWithBasicAuth(t *testing.T) { + t.Run("ok", func(t *testing.T) { + expected := `sum(envoy_cluster_upstream_rq)` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + assert.Equal(t, expected, promql) + + header, ok := r.Header["Authorization"] + if assert.True(t, ok, "Authorization header not found") { + assert.True(t, strings.Contains(header[0], "Basic"), "Basic authorization header not found") + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + val, err := prom.RunQuery(template.Spec.Query) + require.NoError(t, err) + + assert.Equal(t, float64(100), val) + }) + + noResultTests := []struct { + name string + queryResult string + }{ + {name: "no values result", queryResult: `{"status":"success","data":{"resultType":"vector","result":[]}}`}, + {name: "NaN result", queryResult: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1643023250.379,"NaN"]}]}}`}, + } + + for _, tt := range noResultTests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := tt.queryResult + w.Write([]byte(json)) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1(). + MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + _, err = prom.RunQuery(template.Spec.Query) + require.True(t, errors.Is(err, ErrNoValuesFound)) + }) + } + + multipleResultTests := []struct { + name string + queryResult string + }{ + {name: "values instead of value", queryResult: `{"status": "success","data": {"resultType": "matrix","result": [{"metric": {"__name__": "processTime_seconds:avg"},"values": [[1714404069.294,"NaN"],[1714404071.3,"NaN"],[1714404099.294,"NaN"],[1714404101.3,"NaN"]]},{"metric": {"__name__": "processTime_seconds:avg"},"values": [[1714404069.294,"NaN"],[1714404071.3,"NaN"],[1714404099.294,"NaN"],[1714404101.3,"NaN"]]}]}}`}, + } + + for _, tt := range multipleResultTests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := tt.queryResult + w.Write([]byte(json)) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1(). + MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + _, err = prom.RunQuery(template.Spec.Query) + require.True(t, errors.Is(err, ErrMultipleValuesReturned)) + }) + } + +} + +func TestChronosphereProvider_RunQueryWithBearerAuth(t *testing.T) { + t.Run("ok", func(t *testing.T) { + expected := `sum(envoy_cluster_upstream_rq)` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + assert.Equal(t, expected, promql) + + header, ok := r.Header["Authorization"] + if assert.True(t, ok, "Authorization header not found") { + assert.True(t, strings.Contains(header[0], "Bearer"), "Bearer authorization header not found") + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere-bearer", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + val, err := prom.RunQuery(template.Spec.Query) + require.NoError(t, err) + + assert.Equal(t, float64(100), val) + }) +} + +func TestChronosphereProvider_IsOnline(t *testing.T) { + t.Run("fail", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + template.Spec.Provider.SecretRef = nil + + prom, err := NewChronosphereProvider(template.Spec.Provider, nil) + require.NoError(t, err) + + ok, err := prom.IsOnline() + assert.Error(t, err, "Got no error wanted %v", http.StatusBadGateway) + assert.False(t, ok) + }) + + t.Run("ok", func(t *testing.T) { + expected := `vector(1)` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + assert.Equal(t, expected, promql) + + header, ok := r.Header["Authorization"] + if assert.True(t, ok, "Authorization header not found") { + assert.True(t, strings.Contains(header[0], "Basic"), "Basic authorization header not found") + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"1"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + clients := chronosphereFake() + + template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + template.Spec.Provider.Address = ts.URL + + secret, err := clients.kubeClient.CoreV1().Secrets("default").Get(context.TODO(), "chronosphere", metav1.GetOptions{}) + require.NoError(t, err) + + prom, err := NewChronosphereProvider(template.Spec.Provider, secret.Data) + require.NoError(t, err) + + ok, err := prom.IsOnline() + require.NoError(t, err) + + assert.Equal(t, true, ok) + }) +}