diff --git a/cmd/autoscaler/main.go b/cmd/autoscaler/main.go index 8226e62685dc..f206ff0f34e6 100644 --- a/cmd/autoscaler/main.go +++ b/cmd/autoscaler/main.go @@ -80,6 +80,7 @@ func main() { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() + statsCh := make(chan *autoscaler.StatMessage, statsBufferLen) cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig) if err != nil { @@ -139,7 +140,8 @@ func main() { hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers() // uniScalerFactory depends endpointsInformer to be set. - multiScaler := autoscaler.NewMultiScaler(dynConfig, stopCh, uniScalerFactoryFunc(endpointsInformer), logger) + multiScaler := autoscaler.NewMultiScaler( + dynConfig, stopCh, statsCh, uniScalerFactoryFunc(endpointsInformer), statsScraperFactoryFunc(endpointsInformer), logger) kpaScaler := kpa.NewKPAScaler(servingClientSet, scaleClient, logger, configMapWatcher) kpaCtl := kpa.NewController(&opt, paInformer, endpointsInformer, multiScaler, kpaScaler, dynConfig) hpaCtl := hpa.NewController(&opt, paInformer, hpaInformer) @@ -171,8 +173,6 @@ func main() { return hpaCtl.Run(controllerThreads, stopCh) }) - statsCh := make(chan *autoscaler.StatMessage, statsBufferLen) - statsServer := statserver.New(statsServerAddr, statsCh, logger) eg.Go(func() error { return statsServer.ListenAndServe() @@ -237,6 +237,12 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) f } } +func statsScraperFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) { + return func(metric *autoscaler.Metric, config *autoscaler.DynamicConfig) (autoscaler.StatsScraper, error) { + return autoscaler.NewServiceScraper(metric, config, endpointsInformer) + } +} + func labelValueOrEmpty(metric *autoscaler.Metric, labelKey string) string { if metric.Labels != nil { if value, ok := metric.Labels[labelKey]; ok { diff --git a/cmd/queue/main.go b/cmd/queue/main.go index ea747db2fba0..69a65fb0cd9b 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -18,7 +18,6 @@ package main import ( "context" - "errors" "flag" "fmt" "net/http" @@ -31,7 +30,6 @@ import ( "github.com/knative/pkg/signals" "github.com/knative/pkg/logging/logkey" - "github.com/knative/pkg/websocket" "github.com/knative/serving/cmd/util" activatorutil "github.com/knative/serving/pkg/activator/util" "github.com/knative/serving/pkg/apis/serving/v1alpha1" @@ -41,7 +39,6 @@ import ( "github.com/knative/serving/pkg/network" "github.com/knative/serving/pkg/queue" "github.com/knative/serving/pkg/queue/health" - "github.com/knative/serving/pkg/utils" "go.opencensus.io/exporter/prometheus" "go.opencensus.io/stats/view" "go.uber.org/zap" @@ -62,10 +59,6 @@ const ( // from its configuration and propagate that to all istio-proxies // in the mesh. quitSleepDuration = 20 * time.Second - - // Only report errors about a non-existent websocket connection after - // having been up and running for this long. - startupConnectionGrace = 10 * time.Second ) var ( @@ -83,7 +76,6 @@ var ( revisionTimeoutSeconds int statChan = make(chan *autoscaler.Stat, statReportingQueueLength) reqChan = make(chan queue.ReqEvent, requestCountingQueueLength) - statSink *websocket.ManagedConnection logger *zap.SugaredLogger breaker *queue.Breaker @@ -119,38 +111,15 @@ func initEnv() { reporter = _reporter } -func statReporter() { +func reportStats() { for { s := <-statChan - if err := sendStat(s); err != nil { - // Hide "not-established" errors until the startupConnectionGrace has passed. - if err != websocket.ErrConnectionNotEstablished || time.Since(startupTime) > startupConnectionGrace { - logger.Errorw("Error while sending stat", zap.Error(err)) - } + if err := reporter.Report(float64(s.RequestCount), s.AverageConcurrentRequests); err != nil { + logger.Errorw("Error while sending stat", zap.Error(err)) } } } -// sendStat sends a single StatMessage to the autoscaler. -func sendStat(s *autoscaler.Stat) error { - if statSink == nil { - return errors.New("stat sink not (yet) connected") - } - reporter.Report( - float64(s.RequestCount), - float64(s.AverageConcurrentRequests), - ) - if healthState.IsShuttingDown() { - // Do not send metrics if the pods is shutting down. - return nil - } - sm := autoscaler.StatMessage{ - Stat: *s, - Key: servingRevisionKey, - } - return statSink.Send(sm) -} - func proxyForRequest(req *http.Request) *httputil.ReverseProxy { if req.ProtoMajor == 2 { return h2cProxy @@ -296,11 +265,7 @@ func main() { http.ListenAndServe(fmt.Sprintf(":%d", v1alpha1.RequestQueueMetricsPort), mux) }() - // Open a websocket connection to the autoscaler - autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%d", servingAutoscaler, autoscalerNamespace, utils.GetClusterDomainName(), servingAutoscalerPort) - logger.Infof("Connecting to autoscaler at %s", autoscalerEndpoint) - statSink = websocket.NewDurableSendingConnection(autoscalerEndpoint, logger) - go statReporter() + go reportStats() reportTicker := time.NewTicker(queue.ReporterReportingPeriod).C queue.NewStats(podName, queue.Channels{ @@ -347,11 +312,5 @@ func main() { if err := adminServer.Shutdown(context.Background()); err != nil { logger.Errorw("Failed to shutdown admin-server", zap.Error(err)) } - - if statSink != nil { - if err := statSink.Shutdown(); err != nil { - logger.Errorw("Failed to shutdown websocket connection", zap.Error(err)) - } - } } } diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 18a1ca769935..1294e300587b 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -168,11 +168,13 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) { func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { logger := logging.FromContext(ctx) - readyPods, err := a.readyPods() + originalReadyPodsCount, err := readyPodsCountOfEndpoints(a.endpointsLister, a.namespace, a.revisionService) if err != nil { logger.Errorw("Failed to get Endpoints via K8S Lister", zap.Error(err)) return 0, false } + // Use 1 if there are zero current pods. + readyPodsCount := math.Max(1, float64(originalReadyPodsCount)) config := a.Current() @@ -188,8 +190,8 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { target := a.targetConcurrency() // Desired pod count is observed concurrency of the revision over desired (stable) concurrency per pod. // The scaling up rate is limited to the MaxScaleUpRate. - desiredStablePodCount := a.podCountLimited(math.Ceil(observedStableConcurrency/target), readyPods) - desiredPanicPodCount := a.podCountLimited(math.Ceil(observedPanicConcurrency/target), readyPods) + desiredStablePodCount := a.podCountLimited(math.Ceil(observedStableConcurrency/target), readyPodsCount) + desiredPanicPodCount := a.podCountLimited(math.Ceil(observedPanicConcurrency/target), readyPodsCount) a.reporter.ReportStableRequestConcurrency(observedStableConcurrency) a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrency) @@ -198,7 +200,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds.", observedStableConcurrency, config.StableWindow) logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds.", observedPanicConcurrency, config.PanicWindow) - isOverPanicThreshold := observedPanicConcurrency/readyPods >= target*2 + isOverPanicThreshold := observedPanicConcurrency/readyPodsCount >= target*2 if a.panicTime == nil && isOverPanicThreshold { // Begin panicking when we cross the concurrency threshold in the panic window. @@ -217,7 +219,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { a.reporter.ReportPanic(1) // We do not scale down while in panic mode. Only increases will be applied. if desiredPanicPodCount > a.maxPanicPods { - logger.Infof("Increasing pods from %v to %v.", readyPods, desiredPanicPodCount) + logger.Infof("Increasing pods from %v to %v.", originalReadyPodsCount, desiredPanicPodCount) a.panicTime = &now a.maxPanicPods = desiredPanicPodCount } @@ -289,9 +291,11 @@ func (a *Autoscaler) podCountLimited(desiredPodCount, currentPodCount float64) i return int32(math.Min(desiredPodCount, a.Current().MaxScaleUpRate*currentPodCount)) } -func (a *Autoscaler) readyPods() (float64, error) { +// readyPodsCountOfEndpoints returns the ready IP count in the K8S Endpoints object returned by +// the given K8S Informer with given namespace and name. This is same as ready Pod count. +func readyPodsCountOfEndpoints(lister corev1listers.EndpointsLister, ns, name string) (int, error) { readyPods := 0 - endpoints, err := a.endpointsLister.Endpoints(a.namespace).Get(a.revisionService) + endpoints, err := lister.Endpoints(ns).Get(name) if apierrors.IsNotFound(err) { // Treat not found as zero endpoints, it either hasn't been created // or it has been torn down. @@ -303,6 +307,5 @@ func (a *Autoscaler) readyPods() (float64, error) { } } - // Use 1 as minimum for multiplication and division. - return math.Max(1, float64(readyPods)), nil + return readyPods, nil } diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index 2f34af6e38a1..6adce0f42f17 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -307,29 +307,6 @@ func TestAutoscaler_PanicThenUnPanic_ScaleDown(t *testing.T) { a.expectScale(t, now, 10, true) // back to stable mode } -func TestAutoscaler_PodsAreWeightedBasedOnLatestStatTime(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: 30 * time.Second, - podCount: 10, - }) - now = a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 0, - endConcurrency: 0, - duration: 30 * time.Second, - podCount: 10, - }) - a.expectScale(t, now, 5, true) // 10 pods lameducked half the time count for 5 -} - func TestAutoscaler_Activator_CausesInstantScale(t *testing.T) { a := newTestAutoscaler(10.0) diff --git a/pkg/autoscaler/multiscaler.go b/pkg/autoscaler/multiscaler.go index ce939b767eeb..d5b6e40e3730 100644 --- a/pkg/autoscaler/multiscaler.go +++ b/pkg/autoscaler/multiscaler.go @@ -35,6 +35,11 @@ const ( // seconds while an http request is taking the full timeout of 5 // second. scaleBufferSize = 10 + + // scrapeTickInterval is the interval of time between scraping metrics across + // all pods of a revision. + // TODO(yanweiguo): tuning this value. To be based on pod population? + scrapeTickInterval = time.Second / 3 ) // Metric is a resource which observes the request load of a Revision and @@ -72,6 +77,9 @@ type UniScaler interface { // UniScalerFactory creates a UniScaler for a given PA using the given dynamic configuration. type UniScalerFactory func(*Metric, *DynamicConfig) (UniScaler, error) +// StatsScraperFactory creates a StatsScraper for a given PA using the given dynamic configuration. +type StatsScraperFactory func(*Metric, *DynamicConfig) (StatsScraper, error) + // scalerRunner wraps a UniScaler and a channel for implementing shutdown behavior. type scalerRunner struct { scaler UniScaler @@ -110,10 +118,12 @@ type MultiScaler struct { scalers map[string]*scalerRunner scalersMutex sync.RWMutex scalersStopCh <-chan struct{} + statsCh chan<- *StatMessage dynConfig *DynamicConfig - uniScalerFactory UniScalerFactory + uniScalerFactory UniScalerFactory + statsScraperFactory StatsScraperFactory logger *zap.SugaredLogger @@ -122,14 +132,22 @@ type MultiScaler struct { } // NewMultiScaler constructs a MultiScaler. -func NewMultiScaler(dynConfig *DynamicConfig, stopCh <-chan struct{}, uniScalerFactory UniScalerFactory, logger *zap.SugaredLogger) *MultiScaler { +func NewMultiScaler( + dynConfig *DynamicConfig, + stopCh <-chan struct{}, + statsCh chan<- *StatMessage, + uniScalerFactory UniScalerFactory, + statsScraperFactory StatsScraperFactory, + logger *zap.SugaredLogger) *MultiScaler { logger.Debugf("Creating MultiScaler with configuration %#v", dynConfig) return &MultiScaler{ - scalers: make(map[string]*scalerRunner), - scalersStopCh: stopCh, - dynConfig: dynConfig, - uniScalerFactory: uniScalerFactory, - logger: logger, + scalers: make(map[string]*scalerRunner), + scalersStopCh: stopCh, + statsCh: statsCh, + dynConfig: dynConfig, + uniScalerFactory: uniScalerFactory, + statsScraperFactory: statsScraperFactory, + logger: logger, } } @@ -266,6 +284,26 @@ func (m *MultiScaler) createScaler(ctx context.Context, metric *Metric) (*scaler } }() + scraper, err := m.statsScraperFactory(metric, m.dynConfig) + if err != nil { + return nil, fmt.Errorf("failed to create a stats scraper for metric %q: %v", metric.Name, err) + } + scraperTicker := time.NewTicker(scrapeTickInterval) + go func() { + for { + select { + case <-m.scalersStopCh: + scraperTicker.Stop() + return + case <-stopCh: + scraperTicker.Stop() + return + case <-scraperTicker.C: + scraper.Scrape(ctx, m.statsCh) + } + } + }() + metricKey := NewMetricKey(metric.Namespace, metric.Name) go func() { for { diff --git a/pkg/autoscaler/multiscaler_test.go b/pkg/autoscaler/multiscaler_test.go index a36e958d736a..e6a41bec00bf 100644 --- a/pkg/autoscaler/multiscaler_test.go +++ b/pkg/autoscaler/multiscaler_test.go @@ -35,6 +35,10 @@ const ( tickTimeout = 50 * time.Millisecond ) +var testStatMessage = StatMessage{ + Key: testKPAKey, +} + // watchFunc generates a function to assert the changes happening in the multiscaler. func watchFunc(ctx context.Context, ms *MultiScaler, metric *Metric, desiredScale int, errCh chan error) func(key string) { metricKey := fmt.Sprintf("%s/%s", metric.Namespace, metric.Name) @@ -80,12 +84,24 @@ func verifyNoTick(errCh chan error) error { } } +// verifyTick verifies that we get a tick in a certain amount of time. +func verifyStatMessageTick(statsCh chan *StatMessage) error { + select { + case <-statsCh: + // We got the StatMessage! + return nil + case <-time.After(2 * scrapeTickInterval): + return errors.New("Did not get expected StatMessage") + } +} + func TestMultiScalerScaling(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, }) defer close(stopCh) + defer close(statCh) metric := newMetric() uniScaler.setScaleResult(1, true) @@ -116,6 +132,10 @@ func TestMultiScalerScaling(t *testing.T) { t.Fatal(err) } + if err := verifyStatMessageTick(statCh); err != nil { + t.Fatal(err) + } + if err := ms.Delete(ctx, metric.Namespace, metric.Name); err != nil { t.Errorf("Delete() = %v", err) } @@ -128,11 +148,12 @@ func TestMultiScalerScaling(t *testing.T) { func TestMultiScalerScaleToZero(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, EnableScaleToZero: true, }) defer close(stopCh) + defer close(statCh) metric := newMetric() uniScaler.setScaleResult(0, true) @@ -170,11 +191,12 @@ func TestMultiScalerScaleToZero(t *testing.T) { func TestMultiScalerScaleFromZero(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: time.Second * 60, EnableScaleToZero: true, }) defer close(stopCh) + defer close(statCh) metric := newMetric() uniScaler.setScaleResult(1, true) @@ -208,11 +230,12 @@ func TestMultiScalerScaleFromZero(t *testing.T) { func TestMultiScalerWithoutScaleToZero(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, EnableScaleToZero: false, }) defer close(stopCh) + defer close(statCh) metric := newMetric() uniScaler.setScaleResult(0, true) @@ -253,11 +276,12 @@ func TestMultiScalerWithoutScaleToZero(t *testing.T) { func TestMultiScalerIgnoreNegativeScale(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, EnableScaleToZero: true, }) defer close(stopCh) + defer close(statCh) metric := newMetric() @@ -299,10 +323,11 @@ func TestMultiScalerIgnoreNegativeScale(t *testing.T) { func TestMultiScalerRecordsStatistics(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, }) defer close(stopCh) + defer close(statCh) metric := newMetric() @@ -342,11 +367,12 @@ func TestMultiScalerRecordsStatistics(t *testing.T) { func TestMultiScalerUpdate(t *testing.T) { ctx := context.TODO() - ms, stopCh, uniScaler := createMultiScaler(t, &Config{ + ms, stopCh, statCh, uniScaler := createMultiScaler(t, &Config{ TickInterval: tickInterval, EnableScaleToZero: false, }) defer close(stopCh) + defer close(statCh) metric := newMetric() metric.Spec.TargetConcurrency = 1.0 @@ -379,15 +405,17 @@ func TestMultiScalerUpdate(t *testing.T) { } } -func createMultiScaler(t *testing.T, config *Config) (*MultiScaler, chan<- struct{}, *fakeUniScaler) { +func createMultiScaler(t *testing.T, config *Config) (*MultiScaler, chan<- struct{}, chan *StatMessage, *fakeUniScaler) { logger := TestLogger(t) uniscaler := &fakeUniScaler{} + statsScraper := &fakeStatsScraper{} stopChan := make(chan struct{}) + statChan := make(chan *StatMessage) ms := NewMultiScaler(NewDynamicConfig(config, logger), - stopChan, uniscaler.fakeUniScalerFactory, logger) + stopChan, statChan, uniscaler.fakeUniScalerFactory, statsScraper.fakeStatsScraperFactory, logger) - return ms, stopChan, uniscaler + return ms, stopChan, statChan, uniscaler } type fakeUniScaler struct { @@ -453,3 +481,15 @@ func newMetric() *Metric { Status: MetricStatus{}, } } + +type fakeStatsScraper struct { +} + +func (s *fakeStatsScraper) fakeStatsScraperFactory(*Metric, *DynamicConfig) (StatsScraper, error) { + return s, nil +} + +// Scrape always sends the same test StatMessage. +func (s *fakeStatsScraper) Scrape(ctx context.Context, statsCh chan<- *StatMessage) { + statsCh <- &testStatMessage +} diff --git a/pkg/autoscaler/stats_scraper.go b/pkg/autoscaler/stats_scraper.go index c02ef6bcce34..9266d4e49580 100644 --- a/pkg/autoscaler/stats_scraper.go +++ b/pkg/autoscaler/stats_scraper.go @@ -17,29 +17,42 @@ limitations under the License. package autoscaler import ( + "context" "errors" "fmt" "io" "net/http" "time" + "github.com/knative/pkg/logging" "github.com/knative/serving/pkg/apis/serving" "github.com/knative/serving/pkg/apis/serving/v1alpha1" "github.com/knative/serving/pkg/reconciler" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "go.uber.org/zap" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" ) const ( httpClientTimeout = 3 * time.Second + + cacheTimeout = time.Second + + // scraperPodName is the name used in all stats sent from the scraper to + // the autoscaler. The actual customer pods are hidden behind the scraper. The + // autoscaler does need to know how many customer pods are reporting metrics. + // Instead, the autoscaler knows the stats it receives are either from the + // scraper or the activator. + scraperPodName = "service-scraper" ) // StatsScraper defines the interface for collecting Revision metrics type StatsScraper interface { // Scrape scrapes the Revision queue metric endpoint then sends it as a // StatMessage to the given channel. - Scrape(statsCh chan<- *StatMessage) + Scrape(ctx context.Context, statsCh chan<- *StatMessage) } // cacheDisabledClient is a http client with cache disabled. It is shared by @@ -57,19 +70,37 @@ var cacheDisabledClient = &http.Client{ // https://kubernetes.io/docs/concepts/services-networking/network-policies/ // for details. type ServiceScraper struct { - httpClient *http.Client - url string - metricKey string - logger *zap.SugaredLogger + httpClient *http.Client + endpointsLister corev1listers.EndpointsLister + url string + namespace string + revisionService string + metricKey string } // NewServiceScraper creates a new StatsScraper for the Revision which // the given Metric is responsible for. -func NewServiceScraper(metric *Metric, logger *zap.SugaredLogger) (*ServiceScraper, error) { - return newServiceScraperWithClient(metric, logger, cacheDisabledClient) +func NewServiceScraper(metric *Metric, dynamicConfig *DynamicConfig, endpointsInformer corev1informers.EndpointsInformer) (*ServiceScraper, error) { + return newServiceScraperWithClient(metric, dynamicConfig, endpointsInformer, cacheDisabledClient) } -func newServiceScraperWithClient(metric *Metric, logger *zap.SugaredLogger, httpClient *http.Client) (*ServiceScraper, error) { +func newServiceScraperWithClient( + metric *Metric, + dynamicConfig *DynamicConfig, + endpointsInformer corev1informers.EndpointsInformer, + httpClient *http.Client) (*ServiceScraper, error) { + if metric == nil { + return nil, errors.New("metric must not be nil") + } + if dynamicConfig == nil { + return nil, errors.New("dynamic config must not be nil") + } + if endpointsInformer == nil { + return nil, errors.New("endpoints informer must not be nil") + } + if httpClient == nil { + return nil, errors.New("HTTP client must not be nil") + } revName := metric.Labels[serving.RevisionLabelKey] if revName == "" { return nil, fmt.Errorf("no Revision label found for Metric %s", metric.Name) @@ -77,23 +108,51 @@ func newServiceScraperWithClient(metric *Metric, logger *zap.SugaredLogger, http serviceName := reconciler.GetServingK8SServiceNameForObj(revName) return &ServiceScraper{ - httpClient: httpClient, - url: fmt.Sprintf("http://%s.%s:%d/metrics", serviceName, metric.Namespace, v1alpha1.RequestQueueMetricsPort), - metricKey: NewMetricKey(metric.Namespace, metric.Name), - logger: logger, + httpClient: httpClient, + endpointsLister: endpointsInformer.Lister(), + url: fmt.Sprintf("http://%s.%s:%d/metrics", serviceName, metric.Namespace, v1alpha1.RequestQueueMetricsPort), + metricKey: NewMetricKey(metric.Namespace, metric.Name), + namespace: metric.Namespace, + revisionService: reconciler.GetServingK8SServiceNameForObj(revName), }, nil } // Scrape call the destination service then send it // to the given stats chanel -func (s *ServiceScraper) Scrape(statsCh chan<- *StatMessage) { +func (s *ServiceScraper) Scrape(ctx context.Context, statsCh chan<- *StatMessage) { + logger := logging.FromContext(ctx) + + readyPodsCount, err := readyPodsCountOfEndpoints(s.endpointsLister, s.namespace, s.revisionService) + if err != nil { + logger.Errorw("Failed to get Endpoints via K8S Lister", zap.Error(err)) + return + } + + if readyPodsCount == 0 { + logger.Debug("No ready pods found, nothing to scrape.") + return + } + stat, err := s.scrapeViaURL() if err != nil { - s.logger.Errorw("Failed to get metrics", zap.Error(err)) + logger.Errorw("Failed to get metrics", zap.Error(err)) return } - s.sendStatMessage(*stat, statsCh) + // Assume traffic is route to pods evenly. A particular pod can stand for + // other pods, i.e. other pods have similar concurrency and QPS. + // Hide the actual pods behind scraper and send only one stat for all the + // customer pods per scraping. The pod name is set to a unique value, i.e. + // scraperPodName so in autoscaler all stats are either from activator or + // scraper. + newStat := Stat{ + Time: stat.Time, + PodName: scraperPodName, + AverageConcurrentRequests: stat.AverageConcurrentRequests * float64(readyPodsCount), + RequestCount: stat.RequestCount * int32(readyPodsCount), + } + + s.sendStatMessage(newStat, statsCh) } func (s *ServiceScraper) scrapeViaURL() (*Stat, error) { @@ -126,13 +185,6 @@ func extractData(body io.Reader) (*Stat, error) { } if pMetric := getPrometheusMetric(metricFamilies, "queue_average_concurrent_requests"); pMetric != nil { - // The autoscaler should decide what to do with a Stat with empty pod name - for _, label := range pMetric.Label { - if *label.Name == "destination_pod" { - stat.PodName = *label.Value - break - } - } stat.AverageConcurrentRequests = *pMetric.Gauge.Value } else { return nil, errors.New("Could not find value for queue_average_concurrent_requests in response") diff --git a/pkg/autoscaler/stats_scraper_test.go b/pkg/autoscaler/stats_scraper_test.go index 3c40f300ebab..be36be751fb8 100644 --- a/pkg/autoscaler/stats_scraper_test.go +++ b/pkg/autoscaler/stats_scraper_test.go @@ -19,14 +19,15 @@ package autoscaler import ( "bytes" "errors" - "fmt" "io/ioutil" "net/http" "testing" + "time" . "github.com/knative/pkg/logging/testing" "github.com/knative/serving/pkg/apis/serving" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1informers "k8s.io/client-go/informers/core/v1" ) const ( @@ -48,10 +49,9 @@ queue_operations_per_second{destination_namespace="test-namespace",destination_r ) func TestNewServiceScraperWithClient_HappyCase(t *testing.T) { - metric := getTestMetric() client := newTestClient(nil, nil) - if scraper, err := newServiceScraperWithClient(&metric, TestLogger(t), client); err != nil { - t.Errorf("newServiceScraperWithClient=%v, want no error", err) + if scraper, err := serviceScraperForTest(client); err != nil { + t.Fatalf("newServiceScraperWithClient=%v, want no error", err) } else { if scraper.url != testURL { t.Errorf("scraper.url=%v, want %v", scraper.url, testURL) @@ -62,27 +62,71 @@ func TestNewServiceScraperWithClient_HappyCase(t *testing.T) { } } -func TestNewServiceScraperWithClient_ReturnErrorIfRevisionLabelIsMissing(t *testing.T) { +func TestNewServiceScraperWithClient_ErrorCases(t *testing.T) { metric := getTestMetric() - metric.Labels = map[string]string{} + invalidMetric := getTestMetric() + invalidMetric.Labels = map[string]string{} + dynConfig := &DynamicConfig{} client := newTestClient(nil, nil) - if _, err := newServiceScraperWithClient(&metric, TestLogger(t), client); err != nil { - got := err.Error() - want := fmt.Sprintf("no Revision label found for Metric %s", testRevision) - if got != want { - t.Errorf("Got error message: %v. Want: %v", got, want) + informer := kubeInformer.Core().V1().Endpoints() + testCases := []struct { + name string + metric *Metric + dynConfig *DynamicConfig + client *http.Client + informer corev1informers.EndpointsInformer + expectedErr string + }{{ + name: "Empty Metric", + dynConfig: dynConfig, + client: client, + informer: informer, + expectedErr: "metric must not be nil", + }, { + name: "Missing revision label in Metric", + metric: &invalidMetric, + dynConfig: dynConfig, + client: client, + informer: informer, + expectedErr: "no Revision label found for Metric test-revision", + }, { + name: "Empty DynamicConfig", + metric: &metric, + client: client, + informer: informer, + expectedErr: "dynamic config must not be nil", + }, { + name: "Empty HTTP client", + metric: &metric, + dynConfig: dynConfig, + informer: informer, + expectedErr: "HTTP client must not be nil", + }, { + name: "Empty informer", + metric: &metric, + dynConfig: dynConfig, + client: client, + expectedErr: "endpoints informer must not be nil", + }} + + for _, test := range testCases { + if _, err := newServiceScraperWithClient(test.metric, test.dynConfig, test.informer, test.client); err != nil { + got := err.Error() + want := test.expectedErr + if got != want { + t.Errorf("Got error message: %v. Want: %v", got, want) + } + } else { + t.Errorf("Expected error from CreateNewServiceScraper, got nil") } - } else { - t.Errorf("Expected error from CreateNewServiceScraper, got nil") } } func TestScrapeViaURL_HappyCase(t *testing.T) { - metric := getTestMetric() client := newTestClient(getHTTPResponse(http.StatusOK, testAverageConcurrenyContext+testQPSContext), nil) - scraper, err := newServiceScraperWithClient(&metric, TestLogger(t), client) + scraper, err := serviceScraperForTest(client) if err != nil { - t.Errorf("newServiceScraperWithClient=%v, want no error", err) + t.Fatalf("newServiceScraperWithClient=%v, want no error", err) } stat, err := scraper.scrapeViaURL() if err != nil { @@ -94,9 +138,6 @@ func TestScrapeViaURL_HappyCase(t *testing.T) { if stat.RequestCount != 1 { t.Errorf("stat.RequestCount=%v, want 1", stat.RequestCount) } - if stat.PodName != testPodName { - t.Errorf("stat.PodName=%v, want %v", stat.RequestCount, testPodName) - } } func TestScrapeViaURL_ErrorCases(t *testing.T) { @@ -132,10 +173,9 @@ func TestScrapeViaURL_ErrorCases(t *testing.T) { expectedErr: "Could not find value for queue_operations_per_second in response", }} - metric := getTestMetric() for _, test := range testCases { client := newTestClient(getHTTPResponse(test.responseCode, test.responseContext), test.responseErr) - scraper, err := newServiceScraperWithClient(&metric, TestLogger(t), client) + scraper, err := serviceScraperForTest(client) if err != nil { t.Errorf("newServiceScraperWithClient=%v, want no error", err) } @@ -149,30 +189,70 @@ func TestScrapeViaURL_ErrorCases(t *testing.T) { } } -func TestSendStatMessage(t *testing.T) { - metric := getTestMetric() +func TestScrape_HappyCase(t *testing.T) { client := newTestClient(getHTTPResponse(http.StatusOK, testAverageConcurrenyContext+testQPSContext), nil) - scraper, err := newServiceScraperWithClient(&metric, TestLogger(t), client) + scraper, err := serviceScraperForTest(client) if err != nil { - t.Errorf("newServiceScraperWithClient=%v, want no error", err) + t.Fatalf("newServiceScraperWithClient=%v, want no error", err) } - wantConcurrency := 2.1 - stat := Stat{AverageConcurrentRequests: wantConcurrency} + // Make an Endpoints with 2 pods. + createEndpoints(addIps(makeEndpoints(), 2)) + // Scrape will set a timestamp bigger than this. + now := time.Now() statsCh := make(chan *StatMessage, 1) defer close(statsCh) - scraper.sendStatMessage(stat, statsCh) + scraper.Scrape(TestContextWithLogger(t), statsCh) got := <-statsCh if got.Key != testKPAKey { t.Errorf("StatMessage.Key=%v, want %v", got.Key, testKPAKey) } - if got.Stat.AverageConcurrentRequests != wantConcurrency { + if got.Stat.Time.Before(now) { + t.Errorf("StatMessage.Stat.Time=%v, want bigger than %v", got.Stat.Time, now) + } + if got.Stat.PodName != scraperPodName { + t.Errorf("StatMessage.Stat.PodName=%v, want %v", got.Stat.PodName, scraperPodName) + } + // 2 pods times 2.0 + if got.Stat.AverageConcurrentRequests != 4.0 { t.Errorf("StatMessage.Stat.AverageConcurrentRequests=%v, want %v", - got.Stat.AverageConcurrentRequests, wantConcurrency) + got.Stat.AverageConcurrentRequests, 4.0) + } + // 2 pods times 1 + if got.Stat.RequestCount != 2 { + t.Errorf("StatMessage.Stat.RequestCount=%v, want %v", got.Stat.RequestCount, 2) + } +} + +func TestScrape_DoNotScrapeIfNoPodsFound(t *testing.T) { + client := newTestClient(getHTTPResponse(200, testAverageConcurrenyContext+testQPSContext), nil) + scraper, err := serviceScraperForTest(client) + if err != nil { + t.Fatalf("newServiceScraperWithClient=%v, want no error", err) + } + + // Override the Endpoints with 0 pods. + createEndpoints(addIps(makeEndpoints(), 0)) + + statsCh := make(chan *StatMessage, 1) + defer close(statsCh) + scraper.Scrape(TestContextWithLogger(t), statsCh) + + select { + case <-statsCh: + t.Error("Received unexpected StatMessage.") + case <-time.After(300 * time.Millisecond): + // We got nothing! } } +func serviceScraperForTest(httpClient *http.Client) (*ServiceScraper, error) { + metric := getTestMetric() + dynConfig := &DynamicConfig{} + return newServiceScraperWithClient(&metric, dynConfig, kubeInformer.Core().V1().Endpoints(), httpClient) +} + func getTestMetric() Metric { return Metric{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/autoscaler/statserver/server.go b/pkg/autoscaler/statserver/server.go index 8f130debdf97..6d50ecbf7ca2 100644 --- a/pkg/autoscaler/statserver/server.go +++ b/pkg/autoscaler/statserver/server.go @@ -22,6 +22,7 @@ import ( "encoding/gob" "net" "net/http" + "strings" "sync" "time" @@ -155,8 +156,9 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { } s.logger.Debugf("Received stat message: %+v", sm) - // Drop stats from lameducked pods - if !sm.Stat.LameDuck { + // TODO(yanweiguo): Remove this after version 0.5. + // Drop stats not from activator + if isActivator(sm.Stat.PodName) { s.statsCh <- &sm } } @@ -195,3 +197,7 @@ func (s *Server) Shutdown(timeout time.Duration) { } close(s.statsCh) } + +func isActivator(podName string) bool { + return strings.HasPrefix(podName, "activator") +} diff --git a/pkg/autoscaler/statserver/server_test.go b/pkg/autoscaler/statserver/server_test.go index 18bc198bd487..285c4149ac7a 100644 --- a/pkg/autoscaler/statserver/server_test.go +++ b/pkg/autoscaler/statserver/server_test.go @@ -60,8 +60,8 @@ func TestStatsReceived(t *testing.T) { statSink := dialOk(server.ListenAddr(), t) - assertReceivedOk(newStatMessage("test-namespace/test-revision", "pod1", 2.1, 51), statSink, statsCh, t) - assertReceivedOk(newStatMessage("test-namespace/test-revision2", "pod2", 2.2, 30), statSink, statsCh, t) + assertReceivedOk(newStatMessage("test-namespace/test-revision", "activator1", 2.1, 51), statSink, statsCh, t) + assertReceivedOk(newStatMessage("test-namespace/test-revision2", "activator2", 2.2, 30), statSink, statsCh, t) closeSink(statSink, t) } @@ -75,12 +75,12 @@ func TestServerShutdown(t *testing.T) { listenAddr := server.ListenAddr() statSink := dialOk(listenAddr, t) - assertReceivedOk(newStatMessage("test-namespace/test-revision", "pod1", 2.1, 51), statSink, statsCh, t) + assertReceivedOk(newStatMessage("test-namespace/test-revision", "activator1", 2.1, 51), statSink, statsCh, t) server.Shutdown(time.Second) // Send a statistic to the server - send(statSink, newStatMessage("test-namespace/test-revision2", "pod2", 2.2, 30), t) + send(statSink, newStatMessage("test-namespace/test-revision2", "activator2", 2.2, 30), t) // Check the statistic was not received _, ok := <-statsCh @@ -120,7 +120,7 @@ func TestServerDoesNotLeakGoroutines(t *testing.T) { listenAddr := server.ListenAddr() statSink := dialOk(listenAddr, t) - assertReceivedOk(newStatMessage("test-namespace/test-revision", "pod1", 2.1, 51), statSink, statsCh, t) + assertReceivedOk(newStatMessage("test-namespace/test-revision", "activator1", 2.1, 51), statSink, statsCh, t) closeSink(statSink, t) diff --git a/test/e2e/autoscale_test.go b/test/e2e/autoscale_test.go index 33eef0b2e8bd..c771720c60d1 100644 --- a/test/e2e/autoscale_test.go +++ b/test/e2e/autoscale_test.go @@ -320,9 +320,9 @@ func TestAutoscaleUpCountPods(t *testing.T) { ctx.t.Log("The autoscaler spins up additional replicas when traffic increases.") // note: without the warm-up / gradual increase of load the test is retrieving a 503 (overload) from the envoy - // Increase workload for 2 replicas for 30s - // Assert the number of expected replicas is between n-1 and n+1, where n is the # of desired replicas for 30s. - // Assert the number of expected replicas is n and n+1 at the end of 30s, where n is the # of desired replicas. + // Increase workload for 2 replicas for 60s + // Assert the number of expected replicas is between n-1 and n+1, where n is the # of desired replicas for 60s. + // Assert the number of expected replicas is n and n+1 at the end of 60s, where n is the # of desired replicas. assertAutoscaleUpToNumPods(ctx, 2) // Increase workload scale to 3 replicas, assert between [n-1, n+1] during scale up, assert between [n, n+1] after scaleup. assertAutoscaleUpToNumPods(ctx, 3)