Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto-scaling calculation based by CPU load (#1722) #1726

Merged
merged 17 commits into from
Feb 19, 2020
109 changes: 109 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/calculate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/util/sets"
)

const (
TikvSumCpuMetricsPattern = `sum(increase(tikv_thread_cpu_seconds_total{cluster="%s"}[%s])) by (instance)`
TidbSumCpuMetricsPattern = `sum(increase(process_cpu_seconds_total{cluster="%s",job="tidb"}[%s])) by (instance)`
InvalidTacMetricConfigureMsg = "tac[%s/%s] metric configuration invalid"
queryPath = "/api/v1/query"

float64EqualityThreshold = 1e-9
)

type SingleQuery struct {
Timestamp int64
Quary string
Instances []string
Metric autoscalingv2beta2.MetricSpec
}

func queryMetricsFromPrometheus(tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client, sq *SingleQuery, resp *Response) error {
query := sq.Quary
timestamp := sq.Timestamp
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", *tac.Spec.MetricsUrl, queryPath), nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("query", query)
q.Add("time", fmt.Sprintf("%d", timestamp))
req.URL.RawQuery = q.Encode()
r, body, err := client.Do(req.Context(), req)
if err != nil {
return err
}
if r.StatusCode != http.StatusOK {
return fmt.Errorf("tac[%s/%s] query error, status code:%d", tac.Namespace, tac.Name, r.StatusCode)
}
err = json.Unmarshal(body, resp)
if err != nil {
return err
}
if resp.Status != statusSuccess {
return fmt.Errorf("tac[%s/%s] query error, response status: %v", tac.Namespace, tac.Name, resp.Status)
}
return nil
}

// sumForEachInstance sum the value in Response of each instance from Prometheus
func sumForEachInstance(instances []string, resp *Response) (float64, error) {
if resp == nil {
return 0, fmt.Errorf("metrics response from Promethus can't be empty")
}
s := sets.String{}
for _, instance := range instances {
s.Insert(instance)
}
sum := 0.0
if len(resp.Data.Result) < 1 {
return 0, fmt.Errorf("metrics Response return zero info")
}
for _, r := range resp.Data.Result {
if s.Has(r.Metric.Instance) {
v, err := strconv.ParseFloat(r.Value[1].(string), 64)
if err != nil {
return 0.0, err
}
sum = sum + v
}
}
return sum, nil
}

// calculate func calculate the recommended replicas by given usageRatio and currentReplicas
func calculate(currentValue float64, targetValue float64, currentReplicas int32) (int32, error) {
if almostEqual(targetValue, 0.0) {
return -1, fmt.Errorf("targetValue in calculate func can't be zero")
}
usageRatio := currentValue / targetValue
return int32(math.Ceil(usageRatio * float64(currentReplicas))), nil
}

func almostEqual(a, b float64) bool {
return math.Abs(a-b) <= float64EqualityThreshold
}
76 changes: 76 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const (
CpuSumMetricsErrorMsg = "tac[%s/%s] cpu sum metrics error, can't calculate the past %s cpu metrics, may caused by prometheus restart while data persistence not enabled"
)

//TODO: create issue to explain how auto-scaling algorithm based on cpu metrics work
func CalculateRecomendedReplicasByCpuCosts(tac *v1alpha1.TidbClusterAutoScaler, sq *SingleQuery, sts *appsv1.StatefulSet,
client promClient.Client, memberType v1alpha1.MemberType, duration time.Duration) (int32, error) {
metric := sq.Metric
instances := sq.Instances

if metric.Resource == nil || metric.Resource.Target.AverageUtilization == nil {
return -1, fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}
currentReplicas := len(instances)
c, err := filterContainer(tac, sts, memberType.String())
if err != nil {
return -1, err
}
cpuRequestsRatio, err := extractCpuRequestsRatio(c)
if err != nil {
return -1, err
}
r := &Response{}
err = queryMetricsFromPrometheus(tac, client, sq, r)
if err != nil {
return -1, err
}
sum, err := sumForEachInstance(instances, r)
if err != nil {
return -1, err
}
if sum < 0 {
return -1, fmt.Errorf(CpuSumMetricsErrorMsg, tac.Namespace, tac.Name, duration.String())
}
cpuSecsTotal := sum
durationSeconds := duration.Seconds()
utilizationRatio := float64(*metric.Resource.Target.AverageUtilization) / 100.0
expectedCpuSecsTotal := cpuRequestsRatio * durationSeconds * float64(currentReplicas) * utilizationRatio
rc, err := calculate(cpuSecsTotal, expectedCpuSecsTotal, int32(currentReplicas))
if err != nil {
return -1, err
}
return rc, nil
}

func extractCpuRequestsRatio(c *corev1.Container) (float64, error) {
if c.Resources.Requests.Cpu() == nil || c.Resources.Requests.Cpu().MilliValue() < 1 {
return 0, fmt.Errorf("container[%s] cpu requests is empty", c.Name)
}
return float64(c.Resources.Requests.Cpu().MilliValue()) / 1000.0, nil
}
89 changes: 89 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
)

// MetricType describe the current Supported Metric Type to calculate the recommended Replicas
type MetricType string

const (
MetricTypeCPU MetricType = "cpu"
//metricTypeQPS MetricType = "qps"
)

// currently, we only choose one metrics to be computed.
// If there exists several metrics, we tend to choose ResourceMetricSourceType metric
func FilterMetrics(metrics []autoscalingv2beta2.MetricSpec) autoscalingv2beta2.MetricSpec {
for _, m := range metrics {
if m.Type == autoscalingv2beta2.ResourceMetricSourceType && m.Resource != nil {
return m
}
}
return metrics[0]
}

// genMetricType return the supported MetricType in Operator by kubernetes auto-scaling MetricType
func GenMetricType(tac *v1alpha1.TidbClusterAutoScaler, metric autoscalingv2beta2.MetricSpec) (MetricType, error) {
if metric.Type == autoscalingv2beta2.ResourceMetricSourceType && metric.Resource != nil && metric.Resource.Name == corev1.ResourceCPU {
return MetricTypeCPU, nil
}
return "", fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}

// filterContainer is to filter the specific container from the given statefulset(tidb/tikv)
func filterContainer(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, containerName string) (*corev1.Container, error) {
for _, c := range sts.Spec.Template.Spec.Containers {
if c.Name == containerName {
return &c, nil
}
}
return nil, fmt.Errorf("tac[%s/%s]'s Target have not %s container", tac.Namespace, tac.Name, containerName)
}

const (
statusSuccess = "success"
)

// Response is used to marshal the data queried from Prometheus
type Response struct {
Status string `json:"status"`
Data Data `json:"data"`
}

type Data struct {
ResultType string `json:"resultType"`
Result []Result `json:"result"`
}

type Result struct {
Metric Metric `json:"metric"`
Value []interface{} `json:"value"`
}

type Metric struct {
Cluster string `json:"cluster,omitempty"`
Instance string `json:"instance"`
Job string `json:"job,omitempty"`
KubernetesNamespace string `json:"kubernetes_namespace,omitempty"`
KubernetesNode string `json:"kubernetes_node,omitempty"`
KubernetesPodIp string `json:"kubernetes_pod_ip,omitempty"`
}
45 changes: 44 additions & 1 deletion pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package autoscaler

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate"
"github.com/pingcap/tidb-operator/pkg/label"
operatorUtils "github.com/pingcap/tidb-operator/pkg/util"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
)

func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client) error {
Expand All @@ -36,7 +39,11 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
return nil
}
currentReplicas := tc.Spec.TiDB.Replicas
targetReplicas := calculateRecommendedReplicas(tac, v1alpha1.TiDBMemberType, client)
instances := filterTidbInstances(tc)
targetReplicas, err := calculateTidbMetrics(tac, sts, client, instances)
if err != nil {
return err
}
targetReplicas = limitTargetReplicas(targetReplicas, tac, v1alpha1.TiDBMemberType)
if targetReplicas == tc.Spec.TiDB.Replicas {
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
Expand Down Expand Up @@ -80,3 +87,39 @@ func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String()
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
metric := calculate.FilterMetrics(tac.Spec.TiDB.Metrics)
mType, err := calculate.GenMetricType(tac, metric)
if err != nil {
return -1, err
}
duration, err := time.ParseDuration(*tac.Spec.TiDB.MetricsTimeDuration)
if err != nil {
return -1, err
}
sq := &calculate.SingleQuery{
Timestamp: time.Now().Unix(),
Instances: instances,
Metric: metric,
Quary: fmt.Sprintf(calculate.TidbSumCpuMetricsPattern, tac.Spec.Cluster.Name, *tac.Spec.TiDB.MetricsTimeDuration),
}

switch mType {
case calculate.MetricTypeCPU:
return calculate.CalculateRecomendedReplicasByCpuCosts(tac, sq, sts, client, v1alpha1.TiDBMemberType, duration)
default:
return -1, fmt.Errorf(calculate.InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}
}

func filterTidbInstances(tc *v1alpha1.TidbCluster) []string {
var instances []string
for i := 0; int32(i) < tc.Status.TiDB.StatefulSet.Replicas; i++ {
podName := operatorUtils.GetPodName(tc, v1alpha1.TiDBMemberType, int32(i))
if _, existed := tc.Status.TiDB.FailureMembers[podName]; !existed {
instances = append(instances, podName)
}
}
return instances
}
1 change: 1 addition & 0 deletions pkg/autoscaler/autoscaler/tidb_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestSyncTiDBAfterCalculated(t *testing.T) {
tc.Spec.TiDB.Replicas = test.currentReplicas
tac.Annotations[label.AnnTiDBConsecutiveScaleInCount] = fmt.Sprintf("%d", test.currentScaleInCount)
tac.Annotations[label.AnnTiDBConsecutiveScaleOutCount] = fmt.Sprintf("%d", test.currentScaleOutCount)
tac.Spec.TiKV = nil

err := syncTiDBAfterCalculated(tc, tac, test.currentReplicas, test.recommendedReplicas)
g.Expect(err).ShouldNot(HaveOccurred())
Expand Down
Loading