Skip to content

Commit

Permalink
feat: enhance autoscaling peeking logic (#1432)
Browse files Browse the repository at this point in the history
Signed-off-by: akashjkhamkar <[email protected]>
Signed-off-by: veds-g <[email protected]>
Signed-off-by: GitHub <[email protected]>
Co-authored-by: Vedant Gupta <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
(cherry picked from commit 7fe3225)
  • Loading branch information
akashjkhamkar authored and whynowy committed Jan 13, 2024
1 parent ac716ec commit 17c9c0e
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 41 deletions.
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,21 @@ test-sideinputs-e2e:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-ux,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-webhook,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
$(MAKE) restart-control-plane-components
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
cat test/manifests/e2e-api-pod.yaml | sed '[email protected]/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:latest/:$(VERSION)/' | kubectl -n numaflow-system apply -f -
go generate $(shell find ./test/$* -name '*.go')
go test -v -timeout 15m -count 1 --tags test -p 1 ./test/$*
$(MAKE) cleanup-e2e


image-restart:
$(MAKE) image
$(MAKE) restart-control-plane-components

restart-control-plane-components:
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-ux,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-webhook,app.kubernetes.io/part-of=numaflow --ignore-not-found=true

.PHONY: cleanup-e2e
cleanup-e2e:
Expand Down
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19087,7 +19087,7 @@
"type": "integer"
},
"zeroReplicaSleepSeconds": {
"description": "After scaling down to 0, sleep how many seconds before scaling up to peek.",
"description": "After scaling down the source vertex to 0, sleep how many seconds before scaling the source vertex back up to peek.",
"format": "int64",
"type": "integer"
}
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19074,7 +19074,7 @@
"format": "int64"
},
"zeroReplicaSleepSeconds": {
"description": "After scaling down to 0, sleep how many seconds before scaling up to peek.",
"description": "After scaling down the source vertex to 0, sleep how many seconds before scaling the source vertex back up to peek.",
"type": "integer",
"format": "int64"
}
Expand Down
4 changes: 2 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4071,8 +4071,8 @@ instead. Cooldown seconds after a scaling operation before another one.
<td>
<em>(Optional)</em>
<p>
After scaling down to 0, sleep how many seconds before scaling up to
peek.
After scaling down the source vertex to 0, sleep how many seconds before
scaling the source vertex back up to peek.
</p>
</td>
</tr>
Expand Down
4 changes: 2 additions & 2 deletions docs/user-guide/reference/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ spec:
start processing, because the autoscaling algorithm will divide the TPS by the number of pods even if the pod is not `Running`.
- `scaleDownCooldownSeconds` - After a scaling operation, how many seconds to wait for the same vertex, if the follow-up
operation is a scaling down, defaults to `90`.
- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling down to `0`, defaults to `120`.
Numaflow autoscaler periodically scales up a vertex pod to "peek" the incoming data, this is the period of time to wait before peeking.
- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling a source vertex replicas down to `0`, defaults to `120`.
Numaflow autoscaler periodically scales up a source vertex pod to "peek" the incoming data, this is the period of time to wait before peeking.
- `targetProcessingSeconds` - It is used to tune the aggressiveness of autoscaling for source vertices, it measures how
fast you want the vertex to process all the pending messages, defaults to `20`. It is only effective for the `Source` vertices which
support autoscaling, typically increasing the value leads to lower processing rate, thus less replicas.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ type Scale struct {
// Cooldown seconds after a scaling operation before another one.
// +optional
DeprecatedCooldownSeconds *uint32 `json:"cooldownSeconds,omitempty" protobuf:"varint,5,opt,name=cooldownSeconds"`
// After scaling down to 0, sleep how many seconds before scaling up to peek.
// After scaling down the source vertex to 0, sleep how many seconds before scaling the source vertex back up to peek.
// +optional
ZeroReplicaSleepSeconds *uint32 `json:"zeroReplicaSleepSeconds,omitempty" protobuf:"varint,6,opt,name=zeroReplicaSleepSeconds"`
// TargetProcessingSeconds is used to tune the aggressiveness of autoscaling for source vertices, it measures how fast
Expand Down
99 changes: 71 additions & 28 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Debugf("Vertex %s might be under processing, replicas mismatch.", vertex.Name)
return nil
}
if vertex.Status.Replicas == 0 { // Was scaled to 0
if secondsSinceLastScale >= float64(vertex.Spec.Scale.GetZeroReplicaSleepSeconds()) {
log.Debugf("Vertex %s has slept %v seconds, scaling up to peek.", vertex.Name, secondsSinceLastScale)
return s.patchVertexReplicas(ctx, vertex, 1)
} else {
log.Debugf("Vertex %q has slept %v seconds, hasn't reached zeroReplicaSleepSeconds (%v seconds).", vertex.Name, secondsSinceLastScale, vertex.Spec.Scale.GetZeroReplicaSleepSeconds())
return nil
}
}

var err error
daemonClient, _ := s.daemonClientsCache.Get(pl.GetDaemonServiceURL())
if daemonClient == nil {
Expand All @@ -223,6 +215,37 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
s.daemonClientsCache.Add(pl.GetDaemonServiceURL(), daemonClient)
}

partitionBufferLengths, partitionAvailableBufferLengths, totalBufferLength, totalCurrentPending, err := getBufferInfos(ctx, key, daemonClient, pl, vertex)
if err != nil {
err := fmt.Errorf("error while fetching buffer info, %w", err)
return err
}

if vertex.Status.Replicas == 0 { // Was scaled to 0
// For non-source vertices,
// Check if they have any pending messages in their owned buffers,
// If yes, then scale them back to 1
if !vertex.IsASource() {
if totalCurrentPending <= 0 {
log.Debugf("Vertex %s doesn't have any pending messages, skipping scaling back to 1", vertex.Name)
return nil
} else {
log.Debugf("Vertex %s has some pending messages, scaling back to 1", vertex.Name)
return s.patchVertexReplicas(ctx, vertex, 1)
}
}

// For source vertices,
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(vertex.Spec.Scale.GetZeroReplicaSleepSeconds()) {
log.Debugf("Vertex %s has slept %v seconds, scaling up to peek.", vertex.Name, secondsSinceLastScale)
return s.patchVertexReplicas(ctx, vertex, 1)
} else {
log.Debugf("Vertex %q has slept %v seconds, hasn't reached zeroReplicaSleepSeconds (%v seconds).", vertex.Name, secondsSinceLastScale, vertex.Spec.Scale.GetZeroReplicaSleepSeconds())
return nil
}
}

vMetrics, err := daemonClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
Expand Down Expand Up @@ -261,27 +284,10 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
// Add pending information to cache for back pressure calculation, if there is a backpressure it will impact all the partitions.
// So we only need to add the total pending to the cache.
_ = s.vertexMetricsCache.Add(key+"/pending", totalPending)
partitionBufferLengths := make([]int64, 0)
partitionAvailableBufferLengths := make([]int64, 0)
totalBufferLength := int64(0)
targetAvailableBufferLength := int64(0)
if !vertex.IsASource() { // Only non-source vertex has buffer to read
for _, bufferName := range vertex.OwnedBuffers() {
if bInfo, err := daemonClient.GetPipelineBuffer(ctx, pl.Name, bufferName); err != nil {
return fmt.Errorf("failed to get the read buffer information of vertex %q, %w", vertex.Name, err)
} else {
if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil {
return fmt.Errorf("invalid read buffer information of vertex %q, length or usage limit is missing", vertex.Name)
}
partitionBufferLengths = append(partitionBufferLengths, int64(float64(bInfo.GetBufferLength())*bInfo.GetBufferUsageLimit()))
partitionAvailableBufferLengths = append(partitionAvailableBufferLengths, int64(float64(bInfo.GetBufferLength())*float64(vertex.Spec.Scale.GetTargetBufferAvailability())/100))
totalBufferLength += int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit)
targetAvailableBufferLength += int64(float64(*bInfo.BufferLength) * float64(vertex.Spec.Scale.GetTargetBufferAvailability()) / 100)
}
}
// Add processing rate information to cache for back pressure calculation
if !vertex.IsASource() {
_ = s.vertexMetricsCache.Add(key+"/length", totalBufferLength)
}

var desired int32
current := int32(vertex.GetReplicas())
// if both totalRate and totalPending are 0, we scale down to 0
Expand Down Expand Up @@ -498,3 +504,40 @@ func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, d
func KeyOfVertex(vertex dfv1.Vertex) string {
return fmt.Sprintf("%s/%s", vertex.Namespace, vertex.Name)
}

func getBufferInfos(
ctx context.Context,
key string,
d *daemonclient.DaemonClient,
pl *dfv1.Pipeline,
vertex *dfv1.Vertex,
) (
partitionBufferLengths []int64,
partitionAvailableBufferLengths []int64,
totalBufferLength int64,
totalCurrentPending int64,
err error,
) {
partitionBufferLengths = make([]int64, 0)
partitionAvailableBufferLengths = make([]int64, 0)
totalBufferLength = int64(0)
totalCurrentPending = int64(0)
for _, bufferName := range vertex.OwnedBuffers() {
if bInfo, err := d.GetPipelineBuffer(ctx, pl.Name, bufferName); err != nil {
err = fmt.Errorf("failed to get the buffer information of vertex %q, %w", vertex.Name, err)
return partitionBufferLengths, partitionAvailableBufferLengths, totalBufferLength, totalCurrentPending, err
} else {
if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil || bInfo.PendingCount == nil {
err = fmt.Errorf("invalid read buffer information of vertex %q, length, pendingCount or usage limit is missing", vertex.Name)
return partitionBufferLengths, partitionAvailableBufferLengths, totalBufferLength, totalCurrentPending, err
}

partitionBufferLengths = append(partitionBufferLengths, int64(float64(bInfo.GetBufferLength())*bInfo.GetBufferUsageLimit()))
partitionAvailableBufferLengths = append(partitionAvailableBufferLengths, int64(float64(bInfo.GetBufferLength())*float64(vertex.Spec.Scale.GetTargetBufferAvailability())/100))
totalBufferLength += int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit)
totalCurrentPending += *bInfo.PendingCount
}
}

return partitionBufferLengths, partitionAvailableBufferLengths, totalBufferLength, totalCurrentPending, nil
}

0 comments on commit 17c9c0e

Please sign in to comment.