Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape queue-proxy metrics in autoscaler #3149

Merged
merged 49 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a82ed49
refactor where we use rate
yanweiguo Jan 28, 2019
2896a0d
extract concurrencyPerPod
yanweiguo Jan 28, 2019
ad11d78
use approximateZero
yanweiguo Jan 29, 2019
de81555
use pods count from Informer
yanweiguo Jan 30, 2019
9895328
get pods when record
yanweiguo Jan 30, 2019
28ee046
merge master
yanweiguo Jan 30, 2019
57527b3
use 1 as min actual pod
yanweiguo Jan 31, 2019
23c132b
add unit test
yanweiguo Jan 31, 2019
10f2552
add unit tests for cmd
yanweiguo Jan 31, 2019
29d9b52
lint
yanweiguo Jan 31, 2019
43844dd
address comments
yanweiguo Feb 1, 2019
79842cf
remove unuse func
yanweiguo Feb 1, 2019
e7672ae
short locked scope
yanweiguo Feb 1, 2019
efa8787
address comment
yanweiguo Feb 1, 2019
63e678d
wrap func into func
yanweiguo Feb 1, 2019
c50e2cd
address comments
yanweiguo Feb 6, 2019
f1fc3aa
add dot
yanweiguo Feb 6, 2019
9149930
change algorithm
yanweiguo Feb 6, 2019
6d2ca83
Merge branch 'master' into use_actual_pods
yanweiguo Feb 6, 2019
efd64dd
revert handle file
yanweiguo Feb 6, 2019
a094cf8
do not send websocket metrics
yanweiguo Feb 6, 2019
8302310
exclude test coverage check for main files
yanweiguo Feb 6, 2019
c074299
remove unuse func
yanweiguo Feb 6, 2019
359a1af
remove websocket stuff in queue
yanweiguo Feb 6, 2019
df2f936
bug fixed
yanweiguo Feb 7, 2019
8341498
clean cache in tests
yanweiguo Feb 7, 2019
090f573
removed cache
yanweiguo Feb 7, 2019
9e3cdef
merge
yanweiguo Feb 7, 2019
6550436
use informer in scraper
yanweiguo Feb 7, 2019
71fc715
add unit tests
yanweiguo Feb 7, 2019
556fcb6
merge master
yanweiguo Feb 8, 2019
adde505
small fixes
yanweiguo Feb 9, 2019
cf31e5c
change to global average
yanweiguo Feb 11, 2019
1eb36c4
extract a function
yanweiguo Feb 11, 2019
c63c1ff
merge master
yanweiguo Feb 11, 2019
31bb24c
solve comments
yanweiguo Feb 11, 2019
51658c0
address comments
yanweiguo Feb 12, 2019
6d61757
merge master
yanweiguo Feb 21, 2019
4e0a593
change to 35 seconds
yanweiguo Feb 22, 2019
9a4c449
merge master
yanweiguo Feb 22, 2019
7dce271
remove a log
yanweiguo Feb 22, 2019
be9ec33
add some comment
yanweiguo Feb 22, 2019
2e08244
change a test to cover more
yanweiguo Feb 23, 2019
e12b80a
hide pods behind scraper
yanweiguo Feb 25, 2019
3fe1c33
address comment
yanweiguo Feb 26, 2019
1b48541
fix a log
yanweiguo Feb 26, 2019
3aa2244
merged master
yanweiguo Feb 26, 2019
36a41a4
drop none activator stats
yanweiguo Feb 27, 2019
71fcbcd
fix the test
yanweiguo Feb 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func main() {

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
statsCh := make(chan *autoscaler.StatMessage, statsBufferLen)

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
Expand Down Expand Up @@ -139,7 +140,8 @@ func main() {
hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers()

// uniScalerFactory depends endpointsInformer to be set.
multiScaler := autoscaler.NewMultiScaler(dynConfig, stopCh, uniScalerFactoryFunc(endpointsInformer), logger)
multiScaler := autoscaler.NewMultiScaler(
dynConfig, stopCh, statsCh, uniScalerFactoryFunc(endpointsInformer), statsScraperFactoryFunc(endpointsInformer), logger)
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
kpaScaler := kpa.NewKPAScaler(servingClientSet, scaleClient, logger, configMapWatcher)
kpaCtl := kpa.NewController(&opt, paInformer, endpointsInformer, multiScaler, kpaScaler, dynConfig)
hpaCtl := hpa.NewController(&opt, paInformer, hpaInformer)
Expand Down Expand Up @@ -171,8 +173,6 @@ func main() {
return hpaCtl.Run(controllerThreads, stopCh)
})

statsCh := make(chan *autoscaler.StatMessage, statsBufferLen)

statsServer := statserver.New(statsServerAddr, statsCh, logger)
eg.Go(func() error {
return statsServer.ListenAndServe()
Expand Down Expand Up @@ -237,6 +237,12 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) f
}
}

func statsScraperFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) {
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
return func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) {
return autoscaler.NewServiceScraper(metric, config, endpointsInformer)
}
}

func labelValueOrEmpty(metric *autoscaler.Metric, labelKey string) string {
if metric.Labels != nil {
if value, ok := metric.Labels[labelKey]; ok {
Expand Down
49 changes: 4 additions & 45 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/knative/pkg/signals"

"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/websocket"
"github.com/knative/serving/cmd/util"
activatorutil "github.com/knative/serving/pkg/activator/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
Expand All @@ -41,7 +39,6 @@ import (
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
"github.com/knative/serving/pkg/utils"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand All @@ -62,10 +59,6 @@ const (
// from its configuration and propagate that to all istio-proxies
// in the mesh.
quitSleepDuration = 20 * time.Second

// Only report errors about a non-existent websocket connection after
// having been up and running for this long.
startupConnectionGrace = 10 * time.Second
)

var (
Expand All @@ -83,7 +76,6 @@ var (
revisionTimeoutSeconds int
statChan = make(chan *autoscaler.Stat, statReportingQueueLength)
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
statSink *websocket.ManagedConnection
logger *zap.SugaredLogger
breaker *queue.Breaker

Expand Down Expand Up @@ -119,38 +111,15 @@ func initEnv() {
reporter = _reporter
}

func statReporter() {
func reportStats() {
for {
s := <-statChan
if err := sendStat(s); err != nil {
// Hide "not-established" errors until the startupConnectionGrace has passed.
if err != websocket.ErrConnectionNotEstablished || time.Since(startupTime) > startupConnectionGrace {
logger.Errorw("Error while sending stat", zap.Error(err))
}
if err := reporter.Report(float64(s.RequestCount), s.AverageConcurrentRequests); err != nil {
logger.Errorw("Error while sending stat", zap.Error(err))
}
}
}

// sendStat sends a single StatMessage to the autoscaler.
func sendStat(s *autoscaler.Stat) error {
if statSink == nil {
return errors.New("stat sink not (yet) connected")
}
reporter.Report(
float64(s.RequestCount),
float64(s.AverageConcurrentRequests),
)
if healthState.IsShuttingDown() {
// Do not send metrics if the pods is shutting down.
return nil
}
sm := autoscaler.StatMessage{
Stat: *s,
Key: servingRevisionKey,
}
return statSink.Send(sm)
}

func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
if req.ProtoMajor == 2 {
return h2cProxy
Expand Down Expand Up @@ -296,11 +265,7 @@ func main() {
http.ListenAndServe(fmt.Sprintf(":%d", v1alpha1.RequestQueueMetricsPort), mux)
}()

// Open a websocket connection to the autoscaler
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%d", servingAutoscaler, autoscalerNamespace, utils.GetClusterDomainName(), servingAutoscalerPort)
logger.Infof("Connecting to autoscaler at %s", autoscalerEndpoint)
statSink = websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
go statReporter()
go reportStats()

reportTicker := time.NewTicker(queue.ReporterReportingPeriod).C
queue.NewStats(podName, queue.Channels{
Expand Down Expand Up @@ -347,11 +312,5 @@ func main() {
if err := adminServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown admin-server", zap.Error(err))
}

if statSink != nil {
if err := statSink.Shutdown(); err != nil {
logger.Errorw("Failed to shutdown websocket connection", zap.Error(err))
}
}
}
}
88 changes: 67 additions & 21 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"math"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -57,6 +58,10 @@ type Stat struct {
// Lameduck indicates this Pod has received a shutdown signal.
// Deprecated and no longer used by newly created Pods.
LameDuck bool

// Average number of requests currently being handled by all ready pods of a
// revision.
AverageRevConcurrency float64
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
}

// StatMessage wraps a Stat with identifying information so it can be routed
Expand All @@ -75,21 +80,56 @@ func (b statsBucket) add(stat *Stat) {
b[stat.PodName] = append(b[stat.PodName], stat)
}

// concurrency calculates the overall concurrency as measured by this
// concurrency calculates the overall revision concurrency as measured by this
// bucket. All stats that belong to the same pod will be averaged.
// The overall concurrency is the sum of the measured concurrency of all
// pods (including activator metrics).
// The overall revision concurrency is the measured average revision concurrency
// from queue proxy plus the sum of the measured concurrency of all activator
// pods. This is because the autoscaler can get all stats from all activator,
// but it can only get stats from some sample custom pods(queue proxies) and the
// revision concurrency is estimated based on pod concurrency from queue proxy
// and the ready pods number at that time.
func (b statsBucket) concurrency() float64 {
var total float64
for _, podStats := range b {
var subtotal float64
for _, stat := range podStats {
subtotal += stat.AverageConcurrentRequests
var (
activatorPodTotal float64
averageQueueProxyRevTotal float64
accumulatedQueueProxyRevTotal float64
queueProxyCount int
)
for podName, podStats := range b {
if isActivator(podName) {
activatorPodTotal += averagePodConcurrency(podStats)
} else {
accumulatedQueueProxyRevTotal += averageRevConcurrency(podStats)
queueProxyCount++
}
total += subtotal / float64(len(podStats))
}

return total
if queueProxyCount != 0 {
averageQueueProxyRevTotal = accumulatedQueueProxyRevTotal / float64(queueProxyCount)
}
return averageQueueProxyRevTotal + activatorPodTotal
}

// averagePodConcurrency calculates the average of AverageConcurrentRequests
// for the given Stat point array. The point of the array MUST not be nil and
// the array MUST not be empty.
func averagePodConcurrency(podStats []*Stat) float64 {
total := 0.0
for _, stat := range podStats {
total += stat.AverageConcurrentRequests
}
return total / float64(len(podStats))
}

// averageRevConcurrency calculates the average of AverageRevConcurrency
// for the given Stat point array. The point of the array MUST not be nil and
// the array MUST not be empty.
func averageRevConcurrency(podStats []*Stat) float64 {
total := 0.0
for _, stat := range podStats {
total += stat.AverageRevConcurrency
}
return total / float64(len(podStats))
}

// Autoscaler stores current state of an instance of an autoscaler
Expand Down Expand Up @@ -168,7 +208,7 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger := logging.FromContext(ctx)

readyPods, err := a.readyPods()
readyPodsCount, err := readyPodsCountOfEndpoints(a.endpointsLister, a.namespace, a.revisionService)
if err != nil {
logger.Errorw("Failed to get Endpoints via K8S Lister", zap.Error(err))
return 0, false
Expand All @@ -188,8 +228,8 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
target := a.targetConcurrency()
// Desired pod count is observed concurrency of the revision over desired (stable) concurrency per pod.
// The scaling up rate is limited to the MaxScaleUpRate.
desiredStablePodCount := a.podCountLimited(math.Ceil(observedStableConcurrency/target), readyPods)
desiredPanicPodCount := a.podCountLimited(math.Ceil(observedPanicConcurrency/target), readyPods)
desiredStablePodCount := a.podCountLimited(math.Ceil(observedStableConcurrency/target), readyPodsCount)
desiredPanicPodCount := a.podCountLimited(math.Ceil(observedPanicConcurrency/target), readyPodsCount)

a.reporter.ReportStableRequestConcurrency(observedStableConcurrency)
a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrency)
Expand All @@ -198,7 +238,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds.", observedStableConcurrency, config.StableWindow)
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds.", observedPanicConcurrency, config.PanicWindow)

isOverPanicThreshold := observedPanicConcurrency/readyPods >= target*2
isOverPanicThreshold := observedPanicConcurrency/float64(readyPodsCount) >= target*2
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved

if a.panicTime == nil && isOverPanicThreshold {
// Begin panicking when we cross the concurrency threshold in the panic window.
Expand All @@ -217,7 +257,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
a.reporter.ReportPanic(1)
// We do not scale down while in panic mode. Only increases will be applied.
if desiredPanicPodCount > a.maxPanicPods {
logger.Infof("Increasing pods from %v to %v.", readyPods, desiredPanicPodCount)
logger.Infof("Increasing pods from %v to %v.", readyPodsCount, desiredPanicPodCount)
a.panicTime = &now
a.maxPanicPods = desiredPanicPodCount
}
Expand Down Expand Up @@ -285,13 +325,20 @@ func (a *Autoscaler) targetConcurrency() float64 {
return a.target
}

func (a *Autoscaler) podCountLimited(desiredPodCount, currentPodCount float64) int32 {
return int32(math.Min(desiredPodCount, a.Current().MaxScaleUpRate*currentPodCount))
func (a *Autoscaler) podCountLimited(desiredPodCount float64, currentPodCount int) int32 {
return int32(math.Min(desiredPodCount, a.Current().MaxScaleUpRate*math.Max(1, float64(currentPodCount))))
}

func isActivator(podName string) bool {
// TODO(#2282): This can cause naming collisions.
return strings.HasPrefix(podName, ActivatorPodName)
}

func (a *Autoscaler) readyPods() (float64, error) {
// readyPodsCountOfEndpoints returns the ready IP count in the K8S Endpoints object returned by
// the given K8S Informer with given namespace and name. This is same as ready Pod count.
func readyPodsCountOfEndpoints(lister corev1listers.EndpointsLister, ns, name string) (int, error) {
readyPods := 0
endpoints, err := a.endpointsLister.Endpoints(a.namespace).Get(a.revisionService)
endpoints, err := lister.Endpoints(ns).Get(name)
if apierrors.IsNotFound(err) {
// Treat not found as zero endpoints, it either hasn't been created
// or it has been torn down.
Expand All @@ -303,6 +350,5 @@ func (a *Autoscaler) readyPods() (float64, error) {
}
}

// Use 1 as minimum for multiplication and division.
return math.Max(1, float64(readyPods)), nil
return readyPods, nil
}
Loading