Skip to content

Commit

Permalink
Merge pull request #148 from rramkumar1/rate-limit
Browse files Browse the repository at this point in the history
Initial implementation for ingress rate limiting
  • Loading branch information
nicksardo authored Mar 16, 2018
2 parents 211d992 + 8030415 commit 765707e
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 2 deletions.
7 changes: 7 additions & 0 deletions cmd/glbc/app/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/ratelimit"
"k8s.io/ingress-gce/pkg/utils"
)

Expand Down Expand Up @@ -95,6 +96,12 @@ func NewGCEClient() *gce.GCECloud {
provider, err := cloudprovider.GetCloudProvider("gce", configReader())
if err == nil {
cloud := provider.(*gce.GCECloud)
// Configure GCE rate limiting
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values())
if err != nil {
glog.Fatalf("Error configuring rate limiting: %v", err)
}
cloud.SetRateLimiter(rl)
// If this controller is scheduled on a node without compute/rw
// it won't be allowed to list backends. We can assume that the
// user has no need for Ingress in this case. If they grant
Expand Down
37 changes: 37 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ConfigFilePath string
DefaultSvc string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
HealthCheckPath string
HealthzPort int
InCluster bool
Expand All @@ -58,6 +59,7 @@ var (

func init() {
F.NodePortRanges.ports = []string{DefaultNodePortRange}
F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,100", "beta.Operations.Get,qps,10,100", "ga.Operations.Get,qps,10,100"}
}

// Register flags with the command line parser.
Expand Down Expand Up @@ -85,6 +87,15 @@ the default backend.`)
external cloud resources as it's shutting down. Mostly used for testing. In
normal environments the controller should only delete a loadbalancer if the
associated Ingress is deleted.`)
flag.Var(&F.GCERateLimit, "gce-ratelimit",
`Optional, can be used to rate limit certain GCE API calls. Example usage:
--gce-ratelimit=ga.Addresses.Get,qps,1.5,5
(limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5).
Use the flag more than once to rate limit more than one call. If you do not
specify this flag, the default is to rate limit Operations.Get for all versions.
If you do specify this flag one or more times, this default will be overwritten.
If you want to still use the default, simply specify it along with your other
values.`)
flag.StringVar(&F.HealthCheckPath, "health-check-path", "/",
`Path used to health-check a backend service. All Services must serve a
200 page on this path. Currently this is only configurable globally.`)
Expand Down Expand Up @@ -113,6 +124,32 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`This flag has been deprecated and no longer has any effect.`)
}

type RateLimitSpecs struct {
specs []string
isSet bool
}

// Part of the flag.Value interface.
func (r *RateLimitSpecs) String() string {
return strings.Join(r.specs, ";")
}

// Set supports the flag being repeated multiple times. Part of the flag.Value interface.
func (r *RateLimitSpecs) Set(value string) error {
// On first Set(), clear the original defaults
// On subsequent Set()'s, append.
if !r.isSet {
r.specs = []string{}
r.isSet = true
}
r.specs = append(r.specs, value)
return nil
}

func (r *RateLimitSpecs) Values() []string {
return r.specs
}

type PortRanges struct {
ports []string
isSet bool
Expand Down
144 changes: 144 additions & 0 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ratelimit

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/golang/glog"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

// GCERateLimiter implements cloud.RateLimiter
type GCERateLimiter struct {
// Map a RateLimitKey to its rate limiter implementation.
rateLimitImpls map[cloud.RateLimitKey]flowcontrol.RateLimiter
}

// NewGCERateLimiter parses the list of rate limiting specs passed in and
// returns a properly configured cloud.RateLimiter implementation.
// Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."}
func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
rateLimitImpls := make(map[cloud.RateLimitKey]flowcontrol.RateLimiter)
// Within each specification, split on comma to get the operation,
// rate limiter type, and extra parameters.
for _, spec := range specs {
params := strings.Split(spec, ",")
if len(params) < 2 {
return nil, fmt.Errorf("Must at least specify operation and rate limiter type.")
}
// params[0] should consist of the operation to rate limit.
key, err := constructRateLimitKey(params[0])
if err != nil {
return nil, err
}
// params[1:] should consist of the rate limiter type and extra params.
impl, err := constructRateLimitImpl(params[1:])
if err != nil {
return nil, err
}
rateLimitImpls[key] = impl
glog.Infof("Configured rate limiting for: %v", key)
}
if len(rateLimitImpls) == 0 {
return nil, nil
}
return &GCERateLimiter{rateLimitImpls}, nil
}

// Implementation of cloud.RateLimiter
func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
ch := make(chan struct{})
go func() {
// Call flowcontrol.RateLimiter implementation.
impl := l.rateLimitImpl(key)
if impl != nil {
impl.Accept()
}
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// rateLimitImpl returns the flowcontrol.RateLimiter implementation
// associated with the passed in key.
func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.RateLimiter {
// Since the passed in key will have the ProjectID field filled in, we need to
// create a copy which does not, so that retreiving the rate limiter implementation
// through the map works as expected.
keyCopy := cloud.RateLimitKey{
ProjectID: "",
Operation: key.Operation,
Version: key.Version,
Service: key.Service,
}
return l.rateLimitImpls[keyCopy]
}

// Expected format of param is [version].[service].[operation]
func constructRateLimitKey(param string) (cloud.RateLimitKey, error) {
var retVal cloud.RateLimitKey
params := strings.Split(param, ".")
if len(params) != 3 {
return retVal, fmt.Errorf("Must specify rate limit in [version].[service].[operation] format: %v", param)
}
// TODO(rramkumar): Add another layer of validation here?
version := meta.Version(params[0])
service := params[1]
operation := params[2]
retVal = cloud.RateLimitKey{
ProjectID: "",
Operation: operation,
Version: version,
Service: service,
}
return retVal, nil
}

// constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter
// Expected format is [type],[param1],[param2],...
func constructRateLimitImpl(params []string) (flowcontrol.RateLimiter, error) {
// For now, only the "qps" type is supported.
rlType := params[0]
implArgs := params[1:]
if rlType == "qps" {
if len(implArgs) != 2 {
return nil, fmt.Errorf("Invalid number of args for rate limiter type %v. Expected %d, Got %v", rlType, 2, len(implArgs))
}
qps, err := strconv.ParseFloat(implArgs[0], 32)
if err != nil || qps <= 0 {
return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Either %v is not a float or not greater than 0.", rlType, implArgs[0])
}
burst, err := strconv.Atoi(implArgs[1])
if err != nil {
return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Expected %v to be a int.", rlType, implArgs[1])
}
return flowcontrol.NewTokenBucketRateLimiter(float32(qps), burst), nil
}
return nil, fmt.Errorf("Invalid rate limiter type provided: %v", rlType)
}
56 changes: 56 additions & 0 deletions pkg/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ratelimit

import (
"testing"
)

func TestConfigureGCERateLimiting(t *testing.T) {
validTestCases := [][]string{
[]string{"ga.Addresses.Get,qps,1.5,5"},
[]string{"ga.Addresses.List,qps,2,10"},
[]string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"},
[]string{"ga.Operations.Get,qps,10,100"},
}
invalidTestCases := [][]string{
[]string{"gaAddresses.Get,qps,1.5,5"},
[]string{"gaAddresses.Get,qps,0,5"},
[]string{"gaAddresses.Get,qps,-1,5"},
[]string{"ga.Addresses.Get,qps,1.5.5"},
[]string{"gaAddresses.Get,qps,1.5,5.5"},
[]string{"gaAddressesGet,qps,1.5,5.5"},
[]string{"gaAddressesGet,qps,1.5"},
[]string{"ga.Addresses.Get,foo,1.5,5"},
[]string{"ga.Addresses.Get,1.5,5"},
[]string{"ga.Addresses.Get,qps,1.5,5", "gaFirewalls.Get,qps,1.5,5"},
}

for _, testCase := range validTestCases {
_, err := NewGCERateLimiter(testCase)
if err != nil {
t.Errorf("Did not expect an error for test case: %v", testCase)
}
}

for _, testCase := range invalidTestCases {
_, err := NewGCERateLimiter(testCase)
if err == nil {
t.Errorf("Expected an error for test case: %v", testCase)
}
}
}
17 changes: 15 additions & 2 deletions vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 765707e

Please sign in to comment.