diff --git a/cmd/glbc/app/clients.go b/cmd/glbc/app/clients.go index 128657d8ad..40bc5df752 100644 --- a/cmd/glbc/app/clients.go +++ b/cmd/glbc/app/clients.go @@ -88,7 +88,7 @@ func NewGCEClient() *gce.GCECloud { if err == nil { cloud := provider.(*gce.GCECloud) // Configure GCE rate limiting - rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values()) + rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values(), flags.F.GCEOperationPollInterval) if err != nil { glog.Fatalf("Error configuring rate limiting: %v", err) } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 28a8a42de8..0dfa0e6e86 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -68,6 +68,7 @@ var ( DefaultSvcPortName string DeleteAllOnQuit bool GCERateLimit RateLimitSpecs + GCEOperationPollInterval time.Duration HealthCheckPath string HealthzPort int Features *Features @@ -111,7 +112,7 @@ func defaultLeaderElectionConfiguration() LeaderElectionConfiguration { func init() { F.NodePortRanges.ports = []string{DefaultNodePortRange} - F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,100", "beta.Operations.Get,qps,10,100", "ga.Operations.Get,qps,10,100"} + F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,10", "beta.Operations.Get,qps,10,10", "ga.Operations.Get,qps,10,10"} F.Features = EnabledFeatures() F.LeaderElection = defaultLeaderElectionConfiguration() } @@ -180,6 +181,8 @@ specify this flag, the default is to rate limit Operations.Get for all versions. If you do specify this flag one or more times, this default will be overwritten. If you want to still use the default, simply specify it along with your other values.`) + flag.DurationVar(&F.GCEOperationPollInterval, "gce-operation-poll-interval", time.Second, + `Minimum time between polling requests to GCE for checking the status of an operation.`) flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a 200 page on this path. Currently this is only configurable globally.`) diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index d2543d4a62..200cf141f2 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/golang/glog" "k8s.io/client-go/util/flowcontrol" @@ -32,12 +33,15 @@ import ( type GCERateLimiter struct { // Map a RateLimitKey to its rate limiter implementation. rateLimitImpls map[cloud.RateLimitKey]flowcontrol.RateLimiter + // Minimum polling interval for getting operations. Underlying operations rate limiter + // may increase the time. + operationPollInterval time.Duration } // NewGCERateLimiter parses the list of rate limiting specs passed in and // returns a properly configured cloud.RateLimiter implementation. // Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."} -func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { +func NewGCERateLimiter(specs []string, operationPollInterval time.Duration) (*GCERateLimiter, error) { rateLimitImpls := make(map[cloud.RateLimitKey]flowcontrol.RateLimiter) // Within each specification, split on comma to get the operation, // rate limiter type, and extra parameters. @@ -62,27 +66,39 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { if len(rateLimitImpls) == 0 { return nil, nil } - return &GCERateLimiter{rateLimitImpls}, nil + return &GCERateLimiter{ + rateLimitImpls: rateLimitImpls, + operationPollInterval: operationPollInterval, + }, nil } -// Implementation of cloud.RateLimiter +// Accept looks up the associated flowcontrol.RateLimiter (if exists) and waits on it. func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { - ch := make(chan struct{}) - go func() { - // Call flowcontrol.RateLimiter implementation. - impl := l.rateLimitImpl(key) - if impl != nil { - impl.Accept() + var rl cloud.RateLimiter + + impl := l.rateLimitImpl(key) + if impl != nil { + // Wrap the flowcontrol.RateLimiter with a AcceptRateLimiter and handle context. + rl = &cloud.AcceptRateLimiter{Acceptor: impl} + } else { + // Check the context then use the cloud NopRateLimiter which accepts immediately. + select { + case <-ctx.Done(): + return ctx.Err() + default: } - close(ch) - }() - select { - case <-ch: - break - case <-ctx.Done(): - return ctx.Err() + rl = &cloud.NopRateLimiter{} } - return nil + + if key.Operation == "Get" && key.Service == "Operations" { + // Wait a minimum amount of time regardless of rate limiter. + rl = &cloud.MinimumRateLimiter{ + RateLimiter: rl, + Minimum: l.operationPollInterval, + } + } + + return rl.Accept(ctx, key) } // rateLimitImpl returns the flowcontrol.RateLimiter implementation diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go index d428317015..d41b7c4c72 100644 --- a/pkg/ratelimit/ratelimit_test.go +++ b/pkg/ratelimit/ratelimit_test.go @@ -18,6 +18,7 @@ package ratelimit import ( "testing" + "time" ) func TestGCERateLimiter(t *testing.T) { @@ -41,14 +42,14 @@ func TestGCERateLimiter(t *testing.T) { } for _, testCase := range validTestCases { - _, err := NewGCERateLimiter(testCase) + _, err := NewGCERateLimiter(testCase, time.Second) if err != nil { t.Errorf("Did not expect an error for test case: %v", testCase) } } for _, testCase := range invalidTestCases { - _, err := NewGCERateLimiter(testCase) + _, err := NewGCERateLimiter(testCase, time.Second) if err == nil { t.Errorf("Expected an error for test case: %v", testCase) }