From 1c617f5101ed2d6e12ffd571b581a5f40a51b362 Mon Sep 17 00:00:00 2001 From: Eric Stroczynski Date: Wed, 7 Aug 2019 17:21:37 -0700 Subject: [PATCH] pkg/client/apiutil: add a dynamic RESTMapper will reload the delegated meta.RESTMapper on a cache miss. API calls will return ErrRateLimited if a rate limit is hit. pkg/manager: use dynamic RESTMapper as default. --- Gopkg.lock | 13 ++ go.mod | 3 +- go.sum | 2 + pkg/client/apiutil/dynamicrestmapper.go | 226 ++++++++++++++++++++++++ pkg/manager/manager.go | 4 +- 5 files changed, 246 insertions(+), 2 deletions(-) create mode 100644 pkg/client/apiutil/dynamicrestmapper.go diff --git a/Gopkg.lock b/Gopkg.lock index 0996047409..0648562c67 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -516,6 +516,17 @@ pruneopts = "UT" revision = "fbb02b2291d28baffd63558aa44b4b56f178d650" +[[projects]] + branch = "master" + digest = "0:" + name = "golang.org/x/xerrors" + packages = [ + ".", + "internal", + ] + pruneopts = "UT" + revision = "a985d3407aa71f30cf86696ee0a2f409709f22e1" + [[projects]] digest = "1:7d4fd782f2a710d08834b2c01742b3bba7fb0248383f9cc6c3dc95b3025689d7" name = "gomodules.xyz/jsonpatch" @@ -929,6 +940,8 @@ "go.uber.org/zap", "go.uber.org/zap/buffer", "go.uber.org/zap/zapcore", + "golang.org/x/time/rate", + "golang.org/x/xerrors", "gomodules.xyz/jsonpatch/v2", "gopkg.in/fsnotify.v1", "k8s.io/api/admission/v1beta1", diff --git a/go.mod b/go.mod index 928622f224..24c956abad 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,8 @@ require ( golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac // indirect golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect - golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect + golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 + golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 gomodules.xyz/jsonpatch/v2 v2.0.0 google.golang.org/appengine v1.1.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/go.sum b/go.sum index e092650230..9b6ccf5524 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gomodules.xyz/jsonpatch/v2 v2.0.0 h1:OyHbl+7IOECpPKfVK42oFr6N7+Y2dR+Jsb/IiDV3hOo= gomodules.xyz/jsonpatch/v2 v2.0.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= diff --git a/pkg/client/apiutil/dynamicrestmapper.go b/pkg/client/apiutil/dynamicrestmapper.go new file mode 100644 index 0000000000..fd40b655df --- /dev/null +++ b/pkg/client/apiutil/dynamicrestmapper.go @@ -0,0 +1,226 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiutil + +import ( + "time" + + "golang.org/x/time/rate" + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" +) + +// ErrRateLimited is returned by a dynamicRESTMapper method if the number +// of API calls has exceeded a limit within a certain time period. +type ErrRateLimited struct { + // Duration to wait until the next API call can be made. + Delay time.Duration +} + +func (e ErrRateLimited) Error() string { + return "too many API calls to the dynamicRESTMapper within a timeframe" +} + +// DelayIfRateLimited returns the delay time until the next API call is +// allowed and true if err is of type ErrRateLimited. The zero +// time.Duration value and false are returned if err is not a ErrRateLimited. +func DelayIfRateLimited(err error) (time.Duration, bool) { + var rlerr ErrRateLimited + if xerrors.As(err, &rlerr) { + return rlerr.Delay, true + } + return 0, false +} + +// dynamicRESTMapper is a RESTMapper that dynamically discovers resource +// types at runtime. +type dynamicRESTMapper struct { + client discovery.DiscoveryInterface + staticMapper meta.RESTMapper + limiter *dynamicLimiter + lazy bool +} + +// WithLimiter sets the RESTMapper's underlying limiter to lim. +func WithLimiter(lim *rate.Limiter) func(*dynamicRESTMapper) error { + return func(drm *dynamicRESTMapper) error { + drm.limiter = &dynamicLimiter{lim} + return nil + } +} + +// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings +// until an API call is made. +var WithLazyDiscovery = func(drm *dynamicRESTMapper) error { + drm.lazy = true + return nil +} + +// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic +// RESTMapper dynamically discovers resource types at runtime. opts +// configure the RESTMapper. +func NewDynamicRESTMapper(cfg *rest.Config, opts ...func(*dynamicRESTMapper) error) (meta.RESTMapper, error) { + client, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return nil, err + } + drm := &dynamicRESTMapper{ + client: client, + limiter: &dynamicLimiter{ + rate.NewLimiter(rate.Limit(defaultLimitRate), defaultLimitSize), + }, + } + for _, opt := range opts { + if err = opt(drm); err != nil { + return nil, err + } + } + if !drm.lazy { + if err := drm.setStaticMapper(); err != nil { + return nil, err + } + } + return drm, nil +} + +var ( + // defaultLimitRate is the number of RESTMapper API calls allowed + // per second assuming the rate of API calls <= defaultLimitRate. + defaultLimitRate = 600 + // defaultLimitSize is the maximum number of simultaneous RESTMapper + // API calls allowed. + defaultLimitSize = 5 +) + +// setStaticMapper sets drm's staticMapper by querying its client. +func (drm *dynamicRESTMapper) setStaticMapper() error { + groupResources, err := restmapper.GetAPIGroupResources(drm.client) + if err != nil { + return err + } + drm.staticMapper = restmapper.NewDiscoveryRESTMapper(groupResources) + return nil +} + +// reload reloads the static RESTMapper, and will return an error only +// if a rate limit has been hit. +func (drm *dynamicRESTMapper) reload() error { + if err := drm.limiter.checkRate(); err != nil { + return err + } + if err := drm.setStaticMapper(); err != nil { + utilruntime.HandleError(err) + } + return nil +} + +func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) { + gvk, err := drm.staticMapper.KindFor(resource) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return schema.GroupVersionKind{}, rerr + } + gvk, err = drm.staticMapper.KindFor(resource) + } + return gvk, err +} + +func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + gvks, err := drm.staticMapper.KindsFor(resource) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return nil, rerr + } + gvks, err = drm.staticMapper.KindsFor(resource) + } + return gvks, err +} + +func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { + gvr, err := drm.staticMapper.ResourceFor(input) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return schema.GroupVersionResource{}, rerr + } + gvr, err = drm.staticMapper.ResourceFor(input) + } + return gvr, err +} + +func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + gvrs, err := drm.staticMapper.ResourcesFor(input) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return nil, rerr + } + gvrs, err = drm.staticMapper.ResourcesFor(input) + } + return gvrs, err +} + +func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + mapping, err := drm.staticMapper.RESTMapping(gk, versions...) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return nil, rerr + } + mapping, err = drm.staticMapper.RESTMapping(gk, versions...) + } + return mapping, err +} + +func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { + mappings, err := drm.staticMapper.RESTMappings(gk, versions...) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return nil, rerr + } + mappings, err = drm.staticMapper.RESTMappings(gk, versions...) + } + return mappings, err +} + +func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) { + singular, err := drm.staticMapper.ResourceSingularizer(resource) + if xerrors.Is(err, &meta.NoKindMatchError{}) { + if rerr := drm.reload(); rerr != nil { + return "", rerr + } + singular, err = drm.staticMapper.ResourceSingularizer(resource) + } + return singular, err +} + +// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users. +type dynamicLimiter struct { + *rate.Limiter +} + +// checkRate returns an ErrRateLimited if too many API calls have been made +// within the set limit. +func (b *dynamicLimiter) checkRate() error { + res := b.Reserve() + if res.Delay() == 0 { + return nil + } + return ErrRateLimited{res.Delay()} +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 56a34cba6e..5cc63d9c1a 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -303,7 +303,9 @@ func setOptionsDefaults(options Options) Options { } if options.MapperProvider == nil { - options.MapperProvider = apiutil.NewDiscoveryRESTMapper + options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { + return apiutil.NewDynamicRESTMapper(c) + } } // Allow newClient to be mocked