Skip to content

Commit

Permalink
runner: close rate limiter on Stop (#635)
Browse files Browse the repository at this point in the history
redis client is never closed, even after graceful shutdown. It is
possible to observe this by creating a runner, and calling Stop(), and
stopping redis. Runner is expected to be stopped, and all components
closed, however still logs can be seen that redis is unavailable.

this patch allows runner to actually gracefully close rate limit
implementations, so all components are closed gracefully on Stop.

Signed-off-by: Johannes Brüderl <[email protected]>
  • Loading branch information
birdayz authored Aug 1, 2024
1 parent 05bf226 commit f4af2db
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"io"
"math/rand"

"github.com/coocood/freecache"
Expand All @@ -12,15 +13,18 @@ import (
"github.com/envoyproxy/ratelimit/src/utils"
)

func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freecache.Cache, srv server.Server, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, statsManager stats.Manager) limiter.RateLimitCache {
func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freecache.Cache, srv server.Server, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, statsManager stats.Manager) (limiter.RateLimitCache, io.Closer) {
closer := &utils.MultiCloser{}
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)
closer.Closers = append(closer.Closers, perSecondPool)
}

otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)
closer.Closers = append(closer.Closers, otherPool)

return NewFixedRateLimitCacheImpl(
otherPool,
Expand All @@ -33,5 +37,5 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
s.CacheKeyPrefix,
statsManager,
s.StopCacheKeyIncrementWhenOverlimit,
)
), closer
}
22 changes: 15 additions & 7 deletions src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

type Runner struct {
statsManager stats.Manager
settings settings.Settings
srv server.Server
mu sync.Mutex
statsManager stats.Manager
settings settings.Settings
srv server.Server
mu sync.Mutex
ratelimitCloser io.Closer
}

func NewRunner(s settings.Settings) Runner {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (runner *Runner) GetStatsStore() gostats.Store {
return runner.statsManager.GetStatsStore()
}

func createLimiter(srv server.Server, s settings.Settings, localCache *freecache.Cache, statsManager stats.Manager) limiter.RateLimitCache {
func createLimiter(srv server.Server, s settings.Settings, localCache *freecache.Cache, statsManager stats.Manager) (limiter.RateLimitCache, io.Closer) {
switch s.BackendType {
case "redis", "":
return redis.NewRateLimiterCacheImplFromSettings(
Expand All @@ -99,7 +100,7 @@ func createLimiter(srv server.Server, s settings.Settings, localCache *freecache
rand.New(utils.NewLockedSource(time.Now().Unix())),
localCache,
srv.Scope(),
statsManager)
statsManager), &utils.MultiCloser{} // memcache client can't be closed
default:
logger.Fatalf("Invalid setting for BackendType: %s", s.BackendType)
panic("This line should not be reachable")
Expand Down Expand Up @@ -147,8 +148,11 @@ func (runner *Runner) Run() {
runner.srv = srv
runner.mu.Unlock()

limiter, limiterCloser := createLimiter(srv, s, localCache, runner.statsManager)
runner.ratelimitCloser = limiterCloser

service := ratelimit.NewService(
createLimiter(srv, s, localCache, runner.statsManager),
limiter,
srv.Provider(),
runner.statsManager,
srv.HealthChecker(),
Expand Down Expand Up @@ -184,4 +188,8 @@ func (runner *Runner) Stop() {
if srv != nil {
srv.Stop()
}

if runner.ratelimitCloser != nil {
_ = runner.ratelimitCloser.Close()
}
}
18 changes: 18 additions & 0 deletions src/utils/multi_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package utils

import (
"errors"
"io"
)

type MultiCloser struct {
Closers []io.Closer
}

func (m *MultiCloser) Close() error {
var e error
for _, closer := range m.Closers {
e = errors.Join(closer.Close())
}
return e
}

0 comments on commit f4af2db

Please sign in to comment.