Skip to content

Commit

Permalink
add support for the traffic weight > 100.
Browse files Browse the repository at this point in the history
  • Loading branch information
andyliuliming committed Dec 11, 2023
1 parent a90a901 commit c9ec175
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 109 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/rollouts/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,14 @@ type RolloutTrafficRouting struct {
ManagedRoutes []MangedRoutes `json:"managedRoutes,omitempty" protobuf:"bytes,8,rep,name=managedRoutes"`
// Apisix holds specific configuration to use Apisix to route traffic
Apisix *ApisixTrafficRouting `json:"apisix,omitempty" protobuf:"bytes,9,opt,name=apisix"`

// +kubebuilder:validation:Schemaless
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Type=object
// Plugins holds specific configuration that traffic router plugins can use for routing traffic
Plugins map[string]json.RawMessage `json:"plugins,omitempty" protobuf:"bytes,10,opt,name=plugins"`

MaxTrafficWeight *int32 `json:"maxTrafficWeight,omitempty" protobuf:"varint,11,opt,name=maxTrafficWeight"`
}

type MangedRoutes struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/apis/rollouts/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/defaults"
"github.com/argoproj/argo-rollouts/utils/weightutil"
)

const (
// Validate Spec constants

// MissingFieldMessage the message to indicate rollout is missing a field
MissingFieldMessage = "Rollout has missing field '%s'"
// InvalidSetWeightMessage indicates the setweight value needs to be between 0 and 100
InvalidSetWeightMessage = "SetWeight needs to be between 0 and 100"
// InvalidSetWeightMessage indicates the setweight value needs to be between 0 and max weight
InvalidSetWeightMessage = "SetWeight needs to be between 0 and %d"
// InvalidCanaryExperimentTemplateWeightWithoutTrafficRouting indicates experiment weight cannot be set without trafficRouting
InvalidCanaryExperimentTemplateWeightWithoutTrafficRouting = "Experiment template weight cannot be set unless TrafficRouting is enabled"
// InvalidSetCanaryScaleTrafficPolicy indicates that TrafficRouting, required for SetCanaryScale, is missing
Expand Down Expand Up @@ -292,8 +293,11 @@ func ValidateRolloutStrategyCanary(rollout *v1alpha1.Rollout, fldPath *field.Pat
step.Experiment == nil, step.Pause == nil, step.SetWeight == nil, step.Analysis == nil, step.SetCanaryScale == nil, step.SetHeaderRoute == nil, step.SetMirrorRoute == nil)
allErrs = append(allErrs, field.Invalid(stepFldPath, errVal, InvalidStepMessage))
}
if step.SetWeight != nil && (*step.SetWeight < 0 || *step.SetWeight > 100) {
allErrs = append(allErrs, field.Invalid(stepFldPath.Child("setWeight"), *canary.Steps[i].SetWeight, InvalidSetWeightMessage))

maxTrafficWeight := weightutil.MaxTrafficWeight(rollout)

if step.SetWeight != nil && (*step.SetWeight < 0 || *step.SetWeight > maxTrafficWeight) {
allErrs = append(allErrs, field.Invalid(stepFldPath.Child("setWeight"), *canary.Steps[i].SetWeight, fmt.Sprintf(InvalidSetWeightMessage, maxTrafficWeight)))
}
if step.Pause != nil && step.Pause.DurationSeconds() < 0 {
allErrs = append(allErrs, field.Invalid(stepFldPath.Child("pause").Child("duration"), step.Pause.DurationSeconds(), InvalidDurationMessage))
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/rollouts/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestValidateRolloutStrategyCanary(t *testing.T) {
invalidRo := ro.DeepCopy()
invalidRo.Spec.Strategy.Canary.Steps[0].SetWeight = &setWeight
allErrs := ValidateRolloutStrategyCanary(invalidRo, field.NewPath(""))
assert.Equal(t, InvalidSetWeightMessage, allErrs[0].Detail)
assert.Equal(t, fmt.Sprintf(InvalidSetWeightMessage, 100), allErrs[0].Detail)
})

t.Run("invalid duration set in paused step", func(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubectl-argo-rollouts/info/rollout_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/argoproj/argo-rollouts/utils/defaults"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
rolloututil "github.com/argoproj/argo-rollouts/utils/rollout"
"github.com/argoproj/argo-rollouts/utils/weightutil"
)

func NewRolloutInfo(
Expand Down Expand Up @@ -58,12 +59,12 @@ func NewRolloutInfo(
currentStep, _ := replicasetutil.GetCurrentCanaryStep(ro)

if currentStep == nil {
roInfo.ActualWeight = "100"
roInfo.ActualWeight = fmt.Sprintf("%d", weightutil.MaxTrafficWeight(ro))
} else if ro.Status.AvailableReplicas > 0 {
if ro.Spec.Strategy.Canary.TrafficRouting == nil {
for _, rs := range roInfo.ReplicaSets {
if rs.Canary {
roInfo.ActualWeight = fmt.Sprintf("%d", (rs.Available*100)/ro.Status.AvailableReplicas)
roInfo.ActualWeight = fmt.Sprintf("%d", (rs.Available*weightutil.MaxTrafficWeight(ro))/ro.Status.AvailableReplicas)
}
}
} else {
Expand Down
14 changes: 10 additions & 4 deletions rollout/trafficrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/argoproj/argo-rollouts/utils/record"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
rolloututil "github.com/argoproj/argo-rollouts/utils/rollout"
"github.com/argoproj/argo-rollouts/utils/weightutil"
)

// NewTrafficRoutingReconciler identifies return the TrafficRouting Plugin that the rollout wants to modify
Expand Down Expand Up @@ -132,6 +133,7 @@ func (c *Controller) NewTrafficRoutingReconciler(roCtx *rolloutContext) ([]traff
return nil, nil
}

// this currently only be used in the canary strategy
func (c *rolloutContext) reconcileTrafficRouting() error {
reconcilers, err := c.newTrafficRoutingReconciler(c)
// a return here does ensure that all trafficReconcilers are healthy
Expand Down Expand Up @@ -199,7 +201,8 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
// But we can only increase canary weight according to available replica counts of the canary.
// we will need to set the desiredWeight to 0 when the newRS is not available.
if c.rollout.Spec.Strategy.Canary.DynamicStableScale {
desiredWeight = (100 * c.newRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas
// TODO: handle total weight
desiredWeight = (weightutil.MaxTrafficWeight(c.rollout) * c.newRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas
} else if c.rollout.Status.Canary.Weights != nil {
desiredWeight = c.rollout.Status.Canary.Weights.Canary.Weight
}
Expand Down Expand Up @@ -227,7 +230,7 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
desiredWeight = replicasetutil.GetCurrentSetWeight(c.rollout)
weightDestinations = append(weightDestinations, c.calculateWeightDestinationsFromExperiment()...)
} else {
desiredWeight = 100
desiredWeight = weightutil.MaxTrafficWeight(c.rollout)
}
}
// We need to check for revision > 1 because when we first install the rollout we run step 0 this prevents that.
Expand Down Expand Up @@ -301,7 +304,9 @@ func (c *rolloutContext) calculateDesiredWeightOnAbortOrStableRollback() int32 {
}
// When using dynamic stable scaling, we must dynamically decreasing the weight to the canary
// according to the availability of the stable (whatever it can support).
desiredWeight := 100 - ((100 * c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas)
// TODO: handle total weight
// c.rollout.Spec.Strategy.Canary.TrafficRouting.MaxTrafficWeight
desiredWeight := weightutil.MaxTrafficWeight(c.rollout) - ((weightutil.MaxTrafficWeight(c.rollout) * c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas)
if c.rollout.Status.Canary.Weights != nil {
// This ensures that if we are already at a lower weight, then we will not
// increase the weight because stable availability is flapping (e.g. pod restarts)
Expand Down Expand Up @@ -334,7 +339,8 @@ func calculateWeightStatus(ro *v1alpha1.Rollout, canaryHash, stableHash string,
ServiceName: ro.Spec.Strategy.Canary.CanaryService,
},
}
stableWeight := 100 - desiredWeight
// TODO: handle total weight
stableWeight := weightutil.MaxTrafficWeight(ro) - desiredWeight
for _, weightDest := range weightDestinations {
weights.Additional = append(weights.Additional, weightDest)
stableWeight -= weightDest.Weight
Expand Down
4 changes: 4 additions & 0 deletions rollout/trafficrouting/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ func (r *Reconciler) buildLegacyCanaryIngress(stableIngress *extensionsv1beta1.I
desiredCanaryIngress.Annotations[fmt.Sprintf("%s/canary", annotationPrefix)] = "true"
desiredCanaryIngress.Annotations[fmt.Sprintf("%s/canary-weight", annotationPrefix)] = fmt.Sprintf("%d", desiredWeight)

if r.cfg.Rollout.Spec.Strategy.Canary.TrafficRouting.MaxTrafficWeight != nil {
weightTotal := *r.cfg.Rollout.Spec.Strategy.Canary.TrafficRouting.MaxTrafficWeight
desiredCanaryIngress.Annotations[fmt.Sprintf("%s/canary-weight-total", annotationPrefix)] = fmt.Sprintf("%d", weightTotal)
}
return ingressutil.NewLegacyIngress(desiredCanaryIngress), nil
}

Expand Down
52 changes: 28 additions & 24 deletions utils/replicaset/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/defaults"
"github.com/argoproj/argo-rollouts/utils/weightutil"
)

const (
Expand Down Expand Up @@ -56,11 +57,11 @@ func AtDesiredReplicaCountsForCanary(ro *v1alpha1.Rollout, newRS, stableRS *apps
// when using the basic canary strategy. The function calculates the desired number of replicas for
// the new and stable RS using the following equations:
//
// newRS Replica count = spec.Replica * (setweight / 100)
// stableRS Replica count = spec.Replica * (1 - setweight / 100)
// newRS Replica count = spec.Replica * (setweight / maxweight)
// stableRS Replica count = spec.Replica * (1 - setweight / maxweight)
//
// In both equations, the function rounds the desired replica count up if the math does not divide into whole numbers
// because the rollout guarantees at least one replica for both the stable and new RS when the setWeight is not 0 or 100.
// because the rollout guarantees at least one replica for both the stable and new RS when the setWeight is not 0 or maxweight.
// Then, the function finds the number of replicas it can scale up using the following equation:
//
// scaleUpCount := (maxSurge + rollout.Spec.Replica) - sum of rollout's RSs spec.Replica
Expand Down Expand Up @@ -91,8 +92,9 @@ func CalculateReplicaCountsForBasicCanary(rollout *v1alpha1.Rollout, newRS *apps
rolloutSpecReplica := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
_, desiredWeight := GetCanaryReplicasOrWeight(rollout)
maxSurge := MaxSurge(rollout)
maxWeight := weightutil.MaxTrafficWeight(rollout)

desiredNewRSReplicaCount, desiredStableRSReplicaCount := approximateWeightedCanaryStableReplicaCounts(rolloutSpecReplica, desiredWeight, maxSurge)
desiredNewRSReplicaCount, desiredStableRSReplicaCount := approximateWeightedCanaryStableReplicaCounts(rolloutSpecReplica, desiredWeight, maxWeight, maxSurge)

stableRSReplicaCount := int32(0)
newRSReplicaCount := int32(0)
Expand Down Expand Up @@ -180,7 +182,7 @@ func CalculateReplicaCountsForBasicCanary(rollout *v1alpha1.Rollout, newRS *apps
// canary/stable replica counts might sum to either spec.replicas or spec.replicas + 1 but will not
// exceed spec.replicas if maxSurge is 0. If the canary weight is between 1-99, and spec.replicas is > 1,
// we will always return a minimum of 1 for stable and canary as to not return 0.
func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, maxSurge int32) (int32, int32) {
func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, maxWeight, maxSurge int32) (int32, int32) {
if specReplicas == 0 {
return 0, 0
}
Expand All @@ -192,14 +194,14 @@ func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, m
}
var options []canaryOption

ceilWeightedCanaryCount := int32(math.Ceil(float64(specReplicas*desiredWeight) / 100.0))
floorWeightedCanaryCount := int32(math.Floor(float64(specReplicas*desiredWeight) / 100.0))
ceilWeightedCanaryCount := int32(math.Ceil(float64(specReplicas*desiredWeight) / float64(maxWeight)))
floorWeightedCanaryCount := int32(math.Floor(float64(specReplicas*desiredWeight) / float64(maxWeight)))

tied := floorCeilingTied(desiredWeight, specReplicas)
tied := floorCeilingTied(desiredWeight, maxWeight, specReplicas)

// zeroAllowed indicates if are allowed to return the floored value if it is zero. We don't allow
// the value to be zero if when user has a weight from 1-99, and they run 2+ replicas (surge included)
zeroAllowed := desiredWeight == 100 || desiredWeight == 0 || (specReplicas == 1 && maxSurge == 0)
zeroAllowed := desiredWeight == (maxWeight) || desiredWeight == 0 || (specReplicas == 1 && maxSurge == 0)

if ceilWeightedCanaryCount < specReplicas || zeroAllowed {
options = append(options, canaryOption{ceilWeightedCanaryCount, specReplicas})
Expand All @@ -213,7 +215,7 @@ func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, m
// in order to achieve a closer canary weight
if maxSurge > 0 {
options = append(options, canaryOption{ceilWeightedCanaryCount, specReplicas + 1})
surgeIsTied := floorCeilingTied(desiredWeight, specReplicas+1)
surgeIsTied := floorCeilingTied(desiredWeight, maxWeight, specReplicas+1)
if !surgeIsTied && (floorWeightedCanaryCount != 0 || zeroAllowed) {
options = append(options, canaryOption{floorWeightedCanaryCount, specReplicas + 1})
}
Expand All @@ -225,10 +227,10 @@ func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, m
}

bestOption := options[0]
bestDelta := weightDelta(desiredWeight, bestOption.canary, bestOption.total)
bestDelta := weightDelta(desiredWeight, maxWeight, bestOption.canary, bestOption.total)
for i := 1; i < len(options); i++ {
currOption := options[i]
currDelta := weightDelta(desiredWeight, currOption.canary, currOption.total)
currDelta := weightDelta(desiredWeight, maxWeight, currOption.canary, currOption.total)
if currDelta < bestDelta {
bestOption = currOption
bestDelta = currDelta
Expand All @@ -241,15 +243,15 @@ func approximateWeightedCanaryStableReplicaCounts(specReplicas, desiredWeight, m
// For example: replicas: 3, desiredWeight: 50%
// A canary count of 1 (33.33%) or 2 (66.66%) are both equidistant from desired weight of 50%.
// When this happens, we will pick the larger canary count
func floorCeilingTied(desiredWeight, totalReplicas int32) bool {
_, frac := math.Modf(float64(totalReplicas) * (float64(desiredWeight) / 100))
func floorCeilingTied(desiredWeight, maxWeight, totalReplicas int32) bool {
_, frac := math.Modf(float64(totalReplicas) * (float64(desiredWeight) / float64(maxWeight)))
return frac == 0.5
}

// weightDelta calculates the difference that the canary replicas will be from the desired weight
// This is used to pick the closest approximation of canary counts.
func weightDelta(desiredWeight, canaryReplicas, totalReplicas int32) float64 {
actualWeight := float64(canaryReplicas*100) / float64(totalReplicas)
func weightDelta(desiredWeight, maxWeight, canaryReplicas, totalReplicas int32) float64 {
actualWeight := float64(canaryReplicas*maxWeight) / float64(totalReplicas)
return math.Abs(actualWeight - float64(desiredWeight))
}

Expand Down Expand Up @@ -337,11 +339,12 @@ func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, wei
var canaryCount, stableCount int32
rolloutSpecReplica := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
setCanaryScaleReplicas, desiredWeight := GetCanaryReplicasOrWeight(rollout)
maxWeight := weightutil.MaxTrafficWeight(rollout)
if setCanaryScaleReplicas != nil {
// a canary count was explicitly set
canaryCount = *setCanaryScaleReplicas
} else {
canaryCount = CheckMinPodsPerReplicaSet(rollout, trafficWeightToReplicas(rolloutSpecReplica, desiredWeight))
canaryCount = CheckMinPodsPerReplicaSet(rollout, trafficWeightToReplicas(rolloutSpecReplica, desiredWeight, maxWeight))
}

if !rollout.Spec.Strategy.Canary.DynamicStableScale {
Expand All @@ -357,9 +360,10 @@ func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, wei
// high, until we reduce traffic to it.
// Case 2 occurs when we are going from high to low canary weight. In this scenario,
// we need to increase the stable scale in preparation for increase of traffic to stable.
stableCount = trafficWeightToReplicas(rolloutSpecReplica, 100-desiredWeight)
// TODO calculate the replica set count from the max traffic weight.
stableCount = trafficWeightToReplicas(rolloutSpecReplica, maxWeight-desiredWeight, maxWeight)
if weights != nil {
actualStableWeightReplicaCount := trafficWeightToReplicas(rolloutSpecReplica, weights.Stable.Weight)
actualStableWeightReplicaCount := trafficWeightToReplicas(rolloutSpecReplica, weights.Stable.Weight, maxWeight)
stableCount = max(stableCount, actualStableWeightReplicaCount)

if rollout.Status.Abort {
Expand All @@ -368,7 +372,7 @@ func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, wei
// 1. actual canary traffic weight
// 2. desired canary traffic weight
// This if block makes sure we don't scale down the canary prematurely
trafficWeightReplicaCount := trafficWeightToReplicas(rolloutSpecReplica, weights.Canary.Weight)
trafficWeightReplicaCount := trafficWeightToReplicas(rolloutSpecReplica, weights.Canary.Weight, maxWeight)
canaryCount = max(trafficWeightReplicaCount, canaryCount)
}
}
Expand All @@ -377,8 +381,8 @@ func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, wei

// trafficWeightToReplicas returns the appropriate replicas given the full spec.replicas and a weight
// Rounds up if not evenly divisible.
func trafficWeightToReplicas(replicas, weight int32) int32 {
return int32(math.Ceil(float64(weight*replicas) / 100))
func trafficWeightToReplicas(replicas, weight, maxWeight int32) int32 {
return int32(math.Ceil(float64(weight*replicas) / float64(maxWeight)))
}

func max(left, right int32) int32 {
Expand Down Expand Up @@ -459,7 +463,7 @@ func GetCurrentCanaryStep(rollout *v1alpha1.Rollout) (*v1alpha1.CanaryStep, *int
// GetCanaryReplicasOrWeight either returns a static set of replicas or a weight percentage
func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout) (*int32, int32) {
if rollout.Status.PromoteFull || rollout.Status.StableRS == "" || rollout.Status.CurrentPodHash == rollout.Status.StableRS {
return nil, 100
return nil, weightutil.MaxTrafficWeight(rollout)
}
if scs := UseSetCanaryScale(rollout); scs != nil {
if scs.Replicas != nil {
Expand All @@ -480,7 +484,7 @@ func GetCurrentSetWeight(rollout *v1alpha1.Rollout) int32 {
}
currentStep, currentStepIndex := GetCurrentCanaryStep(rollout)
if currentStep == nil {
return 100
return weightutil.MaxTrafficWeight(rollout)
}

for i := *currentStepIndex; i >= 0; i-- {
Expand Down
Loading

0 comments on commit c9ec175

Please sign in to comment.