Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape queue-proxy metrics in autoscaler #3149

Merged
merged 49 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a82ed49
refactor where we use rate
yanweiguo Jan 28, 2019
2896a0d
extract concurrencyPerPod
yanweiguo Jan 28, 2019
ad11d78
use approximateZero
yanweiguo Jan 29, 2019
de81555
use pods count from Informer
yanweiguo Jan 30, 2019
9895328
get pods when record
yanweiguo Jan 30, 2019
28ee046
merge master
yanweiguo Jan 30, 2019
57527b3
use 1 as min actual pod
yanweiguo Jan 31, 2019
23c132b
add unit test
yanweiguo Jan 31, 2019
10f2552
add unit tests for cmd
yanweiguo Jan 31, 2019
29d9b52
lint
yanweiguo Jan 31, 2019
43844dd
address comments
yanweiguo Feb 1, 2019
79842cf
remove unuse func
yanweiguo Feb 1, 2019
e7672ae
short locked scope
yanweiguo Feb 1, 2019
efa8787
address comment
yanweiguo Feb 1, 2019
63e678d
wrap func into func
yanweiguo Feb 1, 2019
c50e2cd
address comments
yanweiguo Feb 6, 2019
f1fc3aa
add dot
yanweiguo Feb 6, 2019
9149930
change algorithm
yanweiguo Feb 6, 2019
6d2ca83
Merge branch 'master' into use_actual_pods
yanweiguo Feb 6, 2019
efd64dd
revert handle file
yanweiguo Feb 6, 2019
a094cf8
do not send websocket metrics
yanweiguo Feb 6, 2019
8302310
exclude test coverage check for main files
yanweiguo Feb 6, 2019
c074299
remove unuse func
yanweiguo Feb 6, 2019
359a1af
remove websocket stuff in queue
yanweiguo Feb 6, 2019
df2f936
bug fixed
yanweiguo Feb 7, 2019
8341498
clean cache in tests
yanweiguo Feb 7, 2019
090f573
removed cache
yanweiguo Feb 7, 2019
9e3cdef
merge
yanweiguo Feb 7, 2019
6550436
use informer in scraper
yanweiguo Feb 7, 2019
71fc715
add unit tests
yanweiguo Feb 7, 2019
556fcb6
merge master
yanweiguo Feb 8, 2019
adde505
small fixes
yanweiguo Feb 9, 2019
cf31e5c
change to global average
yanweiguo Feb 11, 2019
1eb36c4
extract a function
yanweiguo Feb 11, 2019
c63c1ff
merge master
yanweiguo Feb 11, 2019
31bb24c
solve comments
yanweiguo Feb 11, 2019
51658c0
address comments
yanweiguo Feb 12, 2019
6d61757
merge master
yanweiguo Feb 21, 2019
4e0a593
change to 35 seconds
yanweiguo Feb 22, 2019
9a4c449
merge master
yanweiguo Feb 22, 2019
7dce271
remove a log
yanweiguo Feb 22, 2019
be9ec33
add some comment
yanweiguo Feb 22, 2019
2e08244
change a test to cover more
yanweiguo Feb 23, 2019
e12b80a
hide pods behind scraper
yanweiguo Feb 25, 2019
3fe1c33
address comment
yanweiguo Feb 26, 2019
1b48541
fix a log
yanweiguo Feb 26, 2019
3aa2244
merged master
yanweiguo Feb 26, 2019
36a41a4
drop none activator stats
yanweiguo Feb 27, 2019
71fcbcd
fix the test
yanweiguo Feb 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func main() {

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
statsCh := make(chan *autoscaler.StatMessage, statsBufferLen)

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
Expand Down Expand Up @@ -139,7 +140,8 @@ func main() {
hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers()

// uniScalerFactory depends endpointsInformer to be set.
multiScaler := autoscaler.NewMultiScaler(dynConfig, stopCh, uniScalerFactoryFunc(endpointsInformer), logger)
multiScaler := autoscaler.NewMultiScaler(
dynConfig, stopCh, statsCh, uniScalerFactoryFunc(endpointsInformer), statsScraperFactoryFunc(endpointsInformer), logger)
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
kpaScaler := kpa.NewKPAScaler(servingClientSet, scaleClient, logger, configMapWatcher)
kpaCtl := kpa.NewController(&opt, paInformer, endpointsInformer, multiScaler, kpaScaler, dynConfig)
hpaCtl := hpa.NewController(&opt, paInformer, hpaInformer)
Expand Down Expand Up @@ -171,8 +173,6 @@ func main() {
return hpaCtl.Run(controllerThreads, stopCh)
})

statsCh := make(chan *autoscaler.StatMessage, statsBufferLen)

statsServer := statserver.New(statsServerAddr, statsCh, logger)
eg.Go(func() error {
return statsServer.ListenAndServe()
Expand Down Expand Up @@ -237,6 +237,12 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) f
}
}

func statsScraperFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) {
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
return func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) {
return autoscaler.NewServiceScraper(metric, config, endpointsInformer)
}
}

func labelValueOrEmpty(metric *autoscaler.Metric, labelKey string) string {
if metric.Labels != nil {
if value, ok := metric.Labels[labelKey]; ok {
Expand Down
45 changes: 2 additions & 43 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/knative/pkg/signals"

"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/websocket"
"github.com/knative/serving/cmd/util"
activatorutil "github.com/knative/serving/pkg/activator/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
Expand All @@ -40,7 +38,6 @@ import (
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
"github.com/knative/serving/pkg/utils"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand All @@ -61,10 +58,6 @@ const (
// from its configuration and propagate that to all istio-proxies
// in the mesh.
quitSleepDuration = 20 * time.Second

// Only report errors about a non-existent websocket connection after
// having been up and running for this long.
startupConnectionGrace = 10 * time.Second
)

var (
Expand All @@ -82,7 +75,6 @@ var (
revisionTimeoutSeconds int
statChan = make(chan *autoscaler.Stat, statReportingQueueLength)
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
statSink *websocket.ManagedConnection
logger *zap.SugaredLogger
breaker *queue.Breaker

Expand Down Expand Up @@ -121,35 +113,12 @@ func initEnv() {
func statReporter() {
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
for {
s := <-statChan
if err := sendStat(s); err != nil {
// Hide "not-established" errors until the startupConnectionGrace has passed.
if err != websocket.ErrConnectionNotEstablished || time.Since(startupTime) > startupConnectionGrace {
logger.Errorw("Error while sending stat", zap.Error(err))
}
if err := reporter.Report(float64(s.RequestCount), s.AverageConcurrentRequests); err != nil {
logger.Errorw("Error while sending stat", zap.Error(err))
}
}
}

// sendStat sends a single StatMessage to the autoscaler.
func sendStat(s *autoscaler.Stat) error {
if statSink == nil {
return errors.New("stat sink not (yet) connected")
}
reporter.Report(
float64(s.RequestCount),
float64(s.AverageConcurrentRequests),
)
if healthState.IsShuttingDown() {
// Do not send metrics if the pods is shutting down.
return nil
}
sm := autoscaler.StatMessage{
Stat: *s,
Key: servingRevisionKey,
}
return statSink.Send(sm)
}

func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
if req.ProtoMajor == 2 {
return h2cProxy
Expand Down Expand Up @@ -277,10 +246,6 @@ func main() {
http.ListenAndServe(fmt.Sprintf(":%d", v1alpha1.RequestQueueMetricsPort), mux)
}()

// Open a websocket connection to the autoscaler
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%d", servingAutoscaler, autoscalerNamespace, utils.GetClusterDomainName(), servingAutoscalerPort)
logger.Infof("Connecting to autoscaler at %s", autoscalerEndpoint)
statSink = websocket.NewDurableSendingConnection(autoscalerEndpoint)
go statReporter()

reportTicker := time.NewTicker(queue.ReporterReportingPeriod).C
Expand Down Expand Up @@ -328,11 +293,5 @@ func main() {
if err := adminServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown admin-server", zap.Error(err))
}

if statSink != nil {
if err := statSink.Close(); err != nil {
logger.Errorw("Failed to shutdown websocket connection", zap.Error(err))
}
}
}
}
135 changes: 52 additions & 83 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ const (
// as defined in the metrics it sends.
ActivatorPodName string = "activator"

// If the latest received stat from a pod is in the last activeThreshold duration,
// assume the pod is still active. Otherwise, the active status of a pod is
// unknown.
activeThreshold time.Duration = time.Second

// Activator pod weight is always 1
activatorPodWeight float64 = 1

approximateZero = 1e-8
)

Expand All @@ -65,6 +57,10 @@ type Stat struct {
// Lameduck indicates this Pod has received a shutdown signal.
// Deprecated and no longer used by newly created Pods.
LameDuck bool

// Average number of requests currently being handled by all ready pods of a
// revision.
AverageRevConcurrency float64
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
}

// StatMessage wraps a Stat with identifying information so it can be routed
Expand Down Expand Up @@ -113,92 +109,68 @@ func (agg *totalAggregation) aggregate(stat Stat) {
agg.probeCount++
}

// The number of pods that are observable via stats
// Subtracts the activator pod if its not the only pod reporting stats
func (agg *totalAggregation) observedPods(now time.Time) float64 {
podCount := float64(0.0)
for _, pod := range agg.perPodAggregations {
podCount += pod.podWeight(now)
}

activatorsCount := len(agg.activatorsContained)
// Discount the activators in the pod count.
if activatorsCount > 0 {
discountedPodCount := podCount - float64(activatorsCount)
// Report a minimum of 1 pod if the activators are sending metrics.
if discountedPodCount < 1.0 {
return 1.0
}
return discountedPodCount
}
return podCount
}

// The observed concurrency of a revision (sum of all average concurrencies of
// the observed pods)
// Ignores activator sent metrics if its not the only pod reporting stats
func (agg *totalAggregation) observedConcurrency(now time.Time) float64 {
accumulatedConcurrency := float64(0)
// The observed concurrency of a revision and the observed concurrency per pod.
// Ignores activator sent metrics if its not the only pod reporting stats.
func (agg *totalAggregation) observedConcurrency() (float64, float64) {
accumulatedPodConcurrency := float64(0)
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
accumulatedRevConcurrency := float64(0)
activatorConcurrency := float64(0)
for podName, perPod := range agg.perPodAggregations {
if isActivator(podName) {
activatorConcurrency += perPod.calculateAverage(now)
samplePodCount := 0
for _, perPod := range agg.perPodAggregations {
if perPod.isActivator {
activatorConcurrency += perPod.averagePodConcurrency()
} else {
accumulatedConcurrency += perPod.calculateAverage(now)
accumulatedPodConcurrency += perPod.averagePodConcurrency()
accumulatedRevConcurrency += perPod.averageRevConcurrency()
samplePodCount++
}
}
if accumulatedConcurrency < approximateZero {
return activatorConcurrency
if samplePodCount != 0 {
accumulatedPodConcurrency = accumulatedPodConcurrency / float64(samplePodCount)
accumulatedRevConcurrency = accumulatedRevConcurrency / float64(samplePodCount)
}
return accumulatedConcurrency
}

// The observed concurrency per pod (sum of all average concurrencies
// distributed over the observed pods)
// Ignores activator sent metrics if its not the only pod reporting stats
func (agg *totalAggregation) observedConcurrencyPerPod(now time.Time) float64 {
return divide(agg.observedConcurrency(now), agg.observedPods(now))
if accumulatedPodConcurrency < approximateZero {
// Activator is the only pod reporting stats.
return activatorConcurrency, activatorConcurrency
}
return accumulatedRevConcurrency, accumulatedPodConcurrency
}

// Holds an aggregation per pod
type perPodAggregation struct {
accumulatedConcurrency float64
probeCount int32
window time.Duration
latestStatTime *time.Time
isActivator bool
accumulatedPodConcurrency float64
accumulatedRevConcurrency float64
probeCount int32
window time.Duration
latestStatTime *time.Time
isActivator bool
}

// Aggregates the given concurrency
func (agg *perPodAggregation) aggregate(stat Stat) {
agg.accumulatedConcurrency += stat.AverageConcurrentRequests
agg.accumulatedPodConcurrency += stat.AverageConcurrentRequests
agg.accumulatedRevConcurrency += stat.AverageRevConcurrency
agg.probeCount++
if agg.latestStatTime == nil || agg.latestStatTime.Before(*stat.Time) {
agg.latestStatTime = stat.Time
}
}

// Calculates the average concurrency over all values given
func (agg *perPodAggregation) calculateAverage(now time.Time) float64 {
// Calculates the average concurrency on pod level over all values given.
func (agg *perPodAggregation) averagePodConcurrency() float64 {
if agg.probeCount == 0 {
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
return 0.0
}
return agg.accumulatedConcurrency / float64(agg.probeCount) * agg.podWeight(now)
return agg.accumulatedPodConcurrency / float64(agg.probeCount)
}

// Calculates the pod weight. Assuming the latest stat time is the point when
// pod became out of service.
func (agg *perPodAggregation) podWeight(now time.Time) float64 {
if agg.isActivator {
return activatorPodWeight
}

gapToNow := now.Sub(*agg.latestStatTime)
// Less than activeThreshold means the pod is active, give 1 weight
if gapToNow <= activeThreshold {
return 1.0
// Calculates the average concurrency on revision level over all values given.
func (agg *perPodAggregation) averageRevConcurrency() float64 {
if agg.probeCount == 0 {
return 0.0
}
return 1.0 - (float64(gapToNow) / float64(agg.window))
return agg.accumulatedRevConcurrency / float64(agg.probeCount)
}

// Autoscaler stores current state of an instance of an autoscaler
Expand Down Expand Up @@ -283,13 +255,15 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
config := a.Current()

stableData, panicData, lastStat := a.aggregateData(now, config.StableWindow, config.PanicWindow)
observedStablePods := stableData.observedPods(now)
// Do nothing when we have no data.
if observedStablePods < 1.0 {
if stableData.probeCount < 1 {
logger.Debug("No data to scale on.")
return 0, false
}

observedStableConcurrency, observedStableConcurrencyPerPod := stableData.observedConcurrency()
observedPanicConcurrency, observedPanicConcurrencyPerPod := panicData.observedConcurrency()

// Log system totals
totalCurrentQPS := int32(0)
totalCurrentConcurrency := float64(0)
Expand All @@ -299,27 +273,20 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
}
logger.Debugf("Current QPS: %v Current concurrent clients: %v", totalCurrentQPS, totalCurrentConcurrency)

observedPanicPods := panicData.observedPods(now)
observedStableConcurrency := stableData.observedConcurrency(now)
observedPanicConcurrency := panicData.observedConcurrency(now)
observedStableConcurrencyPerPod := stableData.observedConcurrencyPerPod(now)
observedPanicConcurrencyPerPod := panicData.observedConcurrencyPerPod(now)

target := a.targetConcurrency()
// Desired pod count is observed concurrency of revision over desired (stable) concurrency per pod.
// The scaling up rate limited to within MaxScaleUpRate.
desiredStablePodCount := a.podCountLimited(observedStableConcurrency/target, readyPods)
desiredPanicPodCount := a.podCountLimited(observedPanicConcurrency/target, readyPods)

a.reporter.ReportObservedPodCount(observedStablePods)
a.reporter.ReportStableRequestConcurrency(observedStableConcurrencyPerPod)
a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrencyPerPod)
a.reporter.ReportTargetRequestConcurrency(target)

logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedStableConcurrencyPerPod, config.StableWindow, stableData.probeCount, observedStablePods)
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedPanicConcurrencyPerPod, config.PanicWindow, panicData.probeCount, observedPanicPods)
logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds over %v samples.",
observedStableConcurrencyPerPod, config.StableWindow, stableData.probeCount)
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds over %v samples.",
observedPanicConcurrencyPerPod, config.PanicWindow, panicData.probeCount)

// Stop panicking after the surge has made its way into the stable metric.
if a.panicking && a.panicTime.Add(config.StableWindow).Before(now) {
Expand All @@ -330,7 +297,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
}

// Begin panicking when we cross the 6 second concurrency threshold.
if !a.panicking && observedPanicPods > 0.0 && observedPanicConcurrencyPerPod >= (target*2) {
if !a.panicking && panicData.probeCount > 0 && observedPanicConcurrencyPerPod >= (target*2) {
logger.Info("PANICKING")
a.panicking = true
a.panicTime = &now
Expand All @@ -342,7 +309,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger.Debug("Operating in panic mode.")
a.reporter.ReportPanic(1)
if desiredPanicPodCount > a.maxPanicPods {
logger.Infof("Increasing pods from %v to %v.", observedPanicPods, int(desiredPanicPodCount))
logger.Infof("Increasing pods from %v to %v.", readyPods, int(desiredPanicPodCount))
a.panicTime = &now
a.maxPanicPods = desiredPanicPodCount
}
Expand Down Expand Up @@ -404,6 +371,8 @@ func (a *Autoscaler) podCountLimited(desiredPodCount, currentPodCount float64) f
return math.Min(desiredPodCount, a.Current().MaxScaleUpRate*currentPodCount)
}

// readyPods returns the ready IP count in the K8S Endpoints object for a Revision
// via K8S Informer. This is same as ready Pod count.
func (a *Autoscaler) readyPods() (float64, error) {
readyPods := 0
endpoints, err := a.endpointsLister.Endpoints(a.namespace).Get(a.revisionService)
Expand Down
Loading