Skip to content

Commit

Permalink
Pace operation polling
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Jul 17, 2018
1 parent 2683a0b commit 57bc73f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/app/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewGCEClient() *gce.GCECloud {
if err == nil {
cloud := provider.(*gce.GCECloud)
// Configure GCE rate limiting
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values())
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values(), flags.F.GCEOperationPollInterval)
if err != nil {
glog.Fatalf("Error configuring rate limiting: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
DefaultSvcPortName string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
GCEOperationPollInterval time.Duration
HealthCheckPath string
HealthzPort int
Features *Features
Expand Down Expand Up @@ -111,7 +112,7 @@ func defaultLeaderElectionConfiguration() LeaderElectionConfiguration {

func init() {
F.NodePortRanges.ports = []string{DefaultNodePortRange}
F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,100", "beta.Operations.Get,qps,10,100", "ga.Operations.Get,qps,10,100"}
F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,10", "beta.Operations.Get,qps,10,10", "ga.Operations.Get,qps,10,10"}
F.Features = EnabledFeatures()
F.LeaderElection = defaultLeaderElectionConfiguration()
}
Expand Down Expand Up @@ -180,6 +181,8 @@ specify this flag, the default is to rate limit Operations.Get for all versions.
If you do specify this flag one or more times, this default will be overwritten.
If you want to still use the default, simply specify it along with your other
values.`)
flag.DurationVar(&F.GCEOperationPollInterval, "gce-operation-poll-interval", time.Second,
`Minimum time between polling requests to GCE for checking the status of an operation.`)
flag.StringVar(&F.HealthCheckPath, "health-check-path", "/",
`Path used to health-check a backend service. All Services must serve a
200 page on this path. Currently this is only configurable globally.`)
Expand Down
50 changes: 33 additions & 17 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"k8s.io/client-go/util/flowcontrol"
Expand All @@ -32,12 +33,15 @@ import (
type GCERateLimiter struct {
// Map a RateLimitKey to its rate limiter implementation.
rateLimitImpls map[cloud.RateLimitKey]flowcontrol.RateLimiter
// Minimum polling interval for getting operations. Underlying operations rate limiter
// may increase the time.
operationPollInterval time.Duration
}

// NewGCERateLimiter parses the list of rate limiting specs passed in and
// returns a properly configured cloud.RateLimiter implementation.
// Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."}
func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
func NewGCERateLimiter(specs []string, operationPollInterval time.Duration) (*GCERateLimiter, error) {
rateLimitImpls := make(map[cloud.RateLimitKey]flowcontrol.RateLimiter)
// Within each specification, split on comma to get the operation,
// rate limiter type, and extra parameters.
Expand All @@ -62,27 +66,39 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
if len(rateLimitImpls) == 0 {
return nil, nil
}
return &GCERateLimiter{rateLimitImpls}, nil
return &GCERateLimiter{
rateLimitImpls: rateLimitImpls,
operationPollInterval: operationPollInterval,
}, nil
}

// Implementation of cloud.RateLimiter
// Accept looks up the associated flowcontrol.RateLimiter (if exists) and waits on it.
func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
ch := make(chan struct{})
go func() {
// Call flowcontrol.RateLimiter implementation.
impl := l.rateLimitImpl(key)
if impl != nil {
impl.Accept()
var rl cloud.RateLimiter

impl := l.rateLimitImpl(key)
if impl != nil {
// Wrap the flowcontrol.RateLimiter with a AcceptRateLimiter and handle context.
rl = &cloud.AcceptRateLimiter{Acceptor: impl}
} else {
// Check the context then use the cloud NopRateLimiter which accepts immediately.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
rl = &cloud.NopRateLimiter{}
}
return nil

if key.Operation == "Get" && key.Service == "Operations" {
// Wait a minimum amount of time regardless of rate limiter.
rl = &cloud.MinimumRateLimiter{
RateLimiter: rl,
Minimum: l.operationPollInterval,
}
}

return rl.Accept(ctx, key)
}

// rateLimitImpl returns the flowcontrol.RateLimiter implementation
Expand Down
5 changes: 3 additions & 2 deletions pkg/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ratelimit

import (
"testing"
"time"
)

func TestGCERateLimiter(t *testing.T) {
Expand All @@ -41,14 +42,14 @@ func TestGCERateLimiter(t *testing.T) {
}

for _, testCase := range validTestCases {
_, err := NewGCERateLimiter(testCase)
_, err := NewGCERateLimiter(testCase, time.Second)
if err != nil {
t.Errorf("Did not expect an error for test case: %v", testCase)
}
}

for _, testCase := range invalidTestCases {
_, err := NewGCERateLimiter(testCase)
_, err := NewGCERateLimiter(testCase, time.Second)
if err == nil {
t.Errorf("Expected an error for test case: %v", testCase)
}
Expand Down

0 comments on commit 57bc73f

Please sign in to comment.