-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pkg/client/apiutil: add a dynamic RESTMapper will reload the delegated
meta.RESTMapper on a cache miss. API calls will return ErrRateLimited if a rate limit is hit. pkg/manager: use dynamic RESTMapper as default.
- Loading branch information
Showing
5 changed files
with
246 additions
and
2 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
/* | ||
Copyright 2019 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package apiutil | ||
|
||
import ( | ||
"time" | ||
|
||
"golang.org/x/time/rate" | ||
"golang.org/x/xerrors" | ||
"k8s.io/apimachinery/pkg/api/meta" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/client-go/discovery" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/restmapper" | ||
) | ||
|
||
// ErrRateLimited is returned by a dynamicRESTMapper method if the number | ||
// of API calls has exceeded a limit within a certain time period. | ||
type ErrRateLimited struct { | ||
// Duration to wait until the next API call can be made. | ||
Delay time.Duration | ||
} | ||
|
||
func (e ErrRateLimited) Error() string { | ||
return "too many API calls to the dynamicRESTMapper within a timeframe" | ||
} | ||
|
||
// DelayIfRateLimited returns the delay time until the next API call is | ||
// allowed and true if err is of type ErrRateLimited. The zero | ||
// time.Duration value and false are returned if err is not a ErrRateLimited. | ||
func DelayIfRateLimited(err error) (time.Duration, bool) { | ||
var rlerr ErrRateLimited | ||
if xerrors.As(err, &rlerr) { | ||
return rlerr.Delay, true | ||
} | ||
return 0, false | ||
} | ||
|
||
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource | ||
// types at runtime. | ||
type dynamicRESTMapper struct { | ||
client discovery.DiscoveryInterface | ||
staticMapper meta.RESTMapper | ||
limiter *dynamicLimiter | ||
lazy bool | ||
} | ||
|
||
// WithLimiter sets the RESTMapper's underlying limiter to lim. | ||
func WithLimiter(lim *rate.Limiter) func(*dynamicRESTMapper) error { | ||
return func(drm *dynamicRESTMapper) error { | ||
drm.limiter = &dynamicLimiter{lim} | ||
return nil | ||
} | ||
} | ||
|
||
// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings | ||
// until an API call is made. | ||
var WithLazyDiscovery = func(drm *dynamicRESTMapper) error { | ||
drm.lazy = true | ||
return nil | ||
} | ||
|
||
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic | ||
// RESTMapper dynamically discovers resource types at runtime. opts | ||
// configure the RESTMapper. | ||
func NewDynamicRESTMapper(cfg *rest.Config, opts ...func(*dynamicRESTMapper) error) (meta.RESTMapper, error) { | ||
client, err := discovery.NewDiscoveryClientForConfig(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
drm := &dynamicRESTMapper{ | ||
client: client, | ||
limiter: &dynamicLimiter{ | ||
rate.NewLimiter(rate.Limit(defaultLimitRate), defaultLimitSize), | ||
}, | ||
} | ||
for _, opt := range opts { | ||
if err = opt(drm); err != nil { | ||
return nil, err | ||
} | ||
} | ||
if !drm.lazy { | ||
if err := drm.setStaticMapper(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
return drm, nil | ||
} | ||
|
||
var ( | ||
// defaultLimitRate is the number of RESTMapper API calls allowed | ||
// per second assuming the rate of API calls <= defaultLimitRate. | ||
defaultLimitRate = 600 | ||
// defaultLimitSize is the maximum number of simultaneous RESTMapper | ||
// API calls allowed. | ||
defaultLimitSize = 5 | ||
) | ||
|
||
// setStaticMapper sets drm's staticMapper by querying its client. | ||
func (drm *dynamicRESTMapper) setStaticMapper() error { | ||
groupResources, err := restmapper.GetAPIGroupResources(drm.client) | ||
if err != nil { | ||
return err | ||
} | ||
drm.staticMapper = restmapper.NewDiscoveryRESTMapper(groupResources) | ||
return nil | ||
} | ||
|
||
// reload reloads the static RESTMapper, and will return an error only | ||
// if a rate limit has been hit. | ||
func (drm *dynamicRESTMapper) reload() error { | ||
if err := drm.limiter.checkRate(); err != nil { | ||
return err | ||
} | ||
if err := drm.setStaticMapper(); err != nil { | ||
utilruntime.HandleError(err) | ||
} | ||
return nil | ||
} | ||
|
||
func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) { | ||
gvk, err := drm.staticMapper.KindFor(resource) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return schema.GroupVersionKind{}, rerr | ||
} | ||
gvk, err = drm.staticMapper.KindFor(resource) | ||
} | ||
return gvk, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { | ||
gvks, err := drm.staticMapper.KindsFor(resource) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return nil, rerr | ||
} | ||
gvks, err = drm.staticMapper.KindsFor(resource) | ||
} | ||
return gvks, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { | ||
gvr, err := drm.staticMapper.ResourceFor(input) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return schema.GroupVersionResource{}, rerr | ||
} | ||
gvr, err = drm.staticMapper.ResourceFor(input) | ||
} | ||
return gvr, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { | ||
gvrs, err := drm.staticMapper.ResourcesFor(input) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return nil, rerr | ||
} | ||
gvrs, err = drm.staticMapper.ResourcesFor(input) | ||
} | ||
return gvrs, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { | ||
mapping, err := drm.staticMapper.RESTMapping(gk, versions...) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return nil, rerr | ||
} | ||
mapping, err = drm.staticMapper.RESTMapping(gk, versions...) | ||
} | ||
return mapping, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { | ||
mappings, err := drm.staticMapper.RESTMappings(gk, versions...) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return nil, rerr | ||
} | ||
mappings, err = drm.staticMapper.RESTMappings(gk, versions...) | ||
} | ||
return mappings, err | ||
} | ||
|
||
func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) { | ||
singular, err := drm.staticMapper.ResourceSingularizer(resource) | ||
if xerrors.Is(err, &meta.NoKindMatchError{}) { | ||
if rerr := drm.reload(); rerr != nil { | ||
return "", rerr | ||
} | ||
singular, err = drm.staticMapper.ResourceSingularizer(resource) | ||
} | ||
return singular, err | ||
} | ||
|
||
// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users. | ||
type dynamicLimiter struct { | ||
*rate.Limiter | ||
} | ||
|
||
// checkRate returns an ErrRateLimited if too many API calls have been made | ||
// within the set limit. | ||
func (b *dynamicLimiter) checkRate() error { | ||
res := b.Reserve() | ||
if res.Delay() == 0 { | ||
return nil | ||
} | ||
return ErrRateLimited{res.Delay()} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters