Skip to content

Commit

Permalink
Add a simple network prober to the activator.
Browse files Browse the repository at this point in the history
When the flag `-enable-network-probing` is passed (on by default) the activator will replace its retring transport logic with a simple network probe based on knative#3256 with a similar number of retries to what the retrying transport was previously configured to use.  Enabling this allows the GRPC test with streaming and cold-start fairly reliably on my cluster (and also with the GRPC ping sample in knative/docs, with my fixes).

This change also refactors the GRPC test into 4 tests, for each of the logical things tested, which will hopefully reduce the amount of time this adds to e2e dramatically when we switch to use `t.Parallel()` since it will parallelize the two times this waits for a scale-to-zero.

Fixes: knative#3239
Fixes: knative#2856
  • Loading branch information
mattmoor committed Feb 22, 2019
1 parent 924fc77 commit b5dbff6
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 119 deletions.
19 changes: 18 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")

enableNetworkProbe = flag.Bool("enable-network-probe", true, "Whether to replace request retries with network level probing.")

logger *zap.SugaredLogger

statSink *websocket.ManagedConnection
Expand Down Expand Up @@ -237,6 +239,14 @@ func main() {
Steps: maxRetries,
}
rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, backoffSettings, shouldRetry)
getProbeCount := 0
// When network probing is enabled remove the retrying transport
// and pass in the retry count for our network probes instead.
if *enableNetworkProbe {
logger.Info("Enabling network probing for activation.")
getProbeCount = maxRetries
rt = activatorutil.AutoTransport
}

// Open a websocket connection to the autoscaler
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%s", "autoscaler", system.Namespace(), utils.GetClusterDomainName(), "8080")
Expand All @@ -253,7 +263,14 @@ func main() {
// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
var ah http.Handler
ah = &activatorhandler.ActivationHandler{Activator: a, Transport: rt, Logger: logger, Reporter: reporter, Throttler: throttler}
ah = &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
GetProbeCount: getProbeCount,
}
ah = &activatorhandler.EnforceMaxContentLengthHandler{MaxContentLengthBytes: maxUploadBytes, NextHandler: ah}
ah = activatorhandler.NewRequestEventHandler(reqChan, ah)
ah = &activatorhandler.FilteringHandler{NextHandler: ah}
Expand Down
56 changes: 55 additions & 1 deletion pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
pkghttp "github.com/knative/serving/pkg/http"
"github.com/knative/serving/pkg/network"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)

// ActivationHandler will wait for an active endpoint for a revision
Expand All @@ -38,6 +40,12 @@ type ActivationHandler struct {
Transport http.RoundTripper
Reporter activator.StatsReporter
Throttler *activator.Throttler

// GetProbeCount is the number of attempts we should
// make to network probe the queue-proxy after the revision becomes
// ready before forwarding the payload. If zero, a network probe
// is not required.
GetProbeCount int
}

func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -61,7 +69,53 @@ func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

err := a.Throttler.Try(revID, func() {
attempts, httpStatus := a.proxyRequest(w, r, target)
var (
attempts int
httpStatus int
)
// If a GET probe interval has been configured, then probe
// the queue-proxy with our network probe header until it
// returns a 200 status code.
success := (a.GetProbeCount == 0)
if !success {
probeReq := &http.Request{
Method: http.MethodGet,
URL: target,
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
Host: r.Host,
Header: map[string][]string{
http.CanonicalHeaderKey(network.ProbeHeaderName): []string{"true"},
},
}
settings := wait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.3,
Steps: a.GetProbeCount,
}
err := wait.ExponentialBackoff(settings, func() (bool, error) {
probeResp, err := a.Transport.RoundTrip(probeReq)
if err != nil {
a.Logger.Errorw("Pod probe failed", zap.Error(err))
return false, nil
}
httpStatus = probeResp.StatusCode
if httpStatus == http.StatusServiceUnavailable {
a.Logger.Errorf("Pod probe sent status: %d", httpStatus)
return false, nil
}
return true, nil
})
success = (err == nil) && httpStatus == http.StatusOK
}
if success {
// Once we see a successful probe, send traffic.
attempts, httpStatus = a.proxyRequest(w, r, target)
} else {
httpStatus = http.StatusInternalServerError
w.WriteHeader(httpStatus)
}

// Report the metrics
duration := time.Since(start)
Expand Down
86 changes: 78 additions & 8 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/queue"
)

Expand Down Expand Up @@ -95,7 +96,10 @@ func TestActivationHandler(t *testing.T) {
wantBody string
wantCode int
wantErr error
probeErr error
probeCode int
attempts string
gpc int
endpointsGetter func(activator.RevisionID) (int32, error)
reporterCalls []reporterCall
}{{
Expand Down Expand Up @@ -124,6 +128,7 @@ func TestActivationHandler(t *testing.T) {
Config: "config-real-name",
StatusCode: http.StatusOK,
}},
gpc: 1,
}, {
label: "active endpoint with missing count header",
namespace: testNamespace,
Expand Down Expand Up @@ -158,6 +163,56 @@ func TestActivationHandler(t *testing.T) {
wantErr: nil,
endpointsGetter: goodEndpointsGetter,
reporterCalls: nil,
}, {
label: "active endpoint (probe failure)",
namespace: testNamespace,
name: testRevName,
probeErr: errors.New("probe error"),
wantCode: http.StatusInternalServerError,
endpointsGetter: goodEndpointsGetter,
gpc: 1,
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
Attempts: 0,
Value: 1,
}, {
Op: "ReportResponseTime",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
}},
}, {
label: "active endpoint (probe 500)",
namespace: testNamespace,
name: testRevName,
probeCode: http.StatusServiceUnavailable,
wantCode: http.StatusInternalServerError,
endpointsGetter: goodEndpointsGetter,
gpc: 1,
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
Attempts: 0,
Value: 1,
}, {
Op: "ReportResponseTime",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
}},
}, {
label: "request error",
namespace: testNamespace,
Expand Down Expand Up @@ -219,12 +274,20 @@ func TestActivationHandler(t *testing.T) {
attempts: "hi there",
endpointsGetter: brokenEndpointGetter,
reporterCalls: nil,
},
}
}}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
if r.Header.Get(network.ProbeHeaderName) != "" {
if e.probeErr != nil {
return nil, e.probeErr
}
fake := httptest.NewRecorder()
fake.WriteHeader(e.probeCode)
fake.WriteString("queue")
return fake.Result(), nil
}
if e.wantErr != nil {
return nil, e.wantErr
}
Expand All @@ -242,20 +305,27 @@ func TestActivationHandler(t *testing.T) {

reporter := &fakeReporter{}
params := queue.BreakerParams{QueueDepth: 1000, MaxConcurrency: 1000, InitialCapacity: 0}
throttlerParams := activator.ThrottlerParams{BreakerParams: params, Logger: TestLogger(t), GetRevision: stubRevisionGetter, GetEndpoints: e.endpointsGetter}
throttlerParams := activator.ThrottlerParams{
BreakerParams: params,
Logger: TestLogger(t),
GetRevision: stubRevisionGetter,
GetEndpoints: e.endpointsGetter,
}
handler := ActivationHandler{
Activator: act,
Transport: rt,
Logger: TestLogger(t),
Reporter: reporter,
Throttler: activator.NewThrottler(throttlerParams),
Activator: act,
Transport: rt,
Logger: TestLogger(t),
Reporter: reporter,
Throttler: activator.NewThrottler(throttlerParams),
GetProbeCount: e.gpc,
}

resp := httptest.NewRecorder()

req := httptest.NewRequest("POST", "http://example.com", nil)
req.Header.Set(activator.RevisionHeaderNamespace, e.namespace)
req.Header.Set(activator.RevisionHeaderName, e.name)

handler.ServeHTTP(resp, req)

if resp.Code != e.wantCode {
Expand Down
Loading

0 comments on commit b5dbff6

Please sign in to comment.