From 4dd89b7f552d1f769dd24545c1cede42b072d42f Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Tue, 26 Dec 2023 19:21:56 +0800 Subject: [PATCH 1/6] Implement the circuit breaker pattern for asynchronous set operations in the cache client Signed-off-by: Xiaochao Dong (@damnever) --- pkg/cacheutil/memcached_client.go | 84 ++++++++++++---- pkg/cacheutil/memcached_client_test.go | 130 ++++++++++++++++++++++++- pkg/cacheutil/redis_client.go | 44 ++++++++- pkg/cacheutil/redis_client_test.go | 18 ++++ 4 files changed, 256 insertions(+), 20 deletions(-) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 29caaed02b..6589ea89d3 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sony/gobreaker" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" @@ -40,9 +41,11 @@ const ( ) var ( - errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") - errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") - errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") + errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") + errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") + errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") + errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0") + errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]") defaultMemcachedClientConfig = MemcachedClientConfig{ Timeout: 500 * time.Millisecond, @@ -54,6 +57,11 @@ var ( MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, AutoDiscovery: false, + SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, + SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, + SetAsyncCircuitBreakerMinRequests: 50, + SetAsyncCircuitBreakerConsecutiveFailures: 5, + SetAsyncCircuitBreakerFailurePercent: 0.05, } ) @@ -141,6 +149,20 @@ type MemcachedClientConfig struct { // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution AutoDiscovery bool `yaml:"auto_discovery"` + + // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through + // when the circuit breaker is half-open. + // If set to 0, the circuit breaker allows only 1 request. + SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` + // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. + // If set to 0, the circuit breaker resets it to 60 seconds. + SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` + // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. + SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` + // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. + SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` + // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. + SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` } func (c *MemcachedClientConfig) validate() error { @@ -158,6 +180,12 @@ func (c *MemcachedClientConfig) validate() error { return errMemcachedMaxAsyncConcurrencyNotPositive } + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } return nil } @@ -195,6 +223,8 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec p *AsyncOperationProcessor + + setAsyncCircuitBreaker *gobreaker.CircuitBreaker } // AddressProvider performs node address resolution given a list of clusters. @@ -278,6 +308,17 @@ func newMemcachedClient( gate.Gets, ), p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "memcached-set-async", + MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, + Interval: 10 * time.Second, + Timeout: config.SetAsyncCircuitBreakerOpenDuration, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && + (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || + float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) + }, + }), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -375,22 +416,31 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) start := time.Now() c.operations.WithLabelValues(opSet).Inc() - err := c.client.Set(&memcache.Item{ - Key: key, - Value: value, - Expiration: int32(time.Now().Add(ttl).Unix()), + _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { + return nil, c.client.Set(&memcache.Item{ + Key: key, + Value: value, + Expiration: int32(time.Now().Add(ttl).Unix()), + }) }) if err != nil { - // If the PickServer will fail for any reason the server address will be nil - // and so missing in the logs. We're OK with that (it's a best effort). - serverAddr, _ := c.selector.PickServer(key) - level.Debug(c.logger).Log( - "msg", "failed to store item to memcached", - "key", key, - "sizeBytes", len(value), - "server", serverAddr, - "err", err, - ) + if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { + level.Warn(c.logger).Log( + "msg", "circuit breaker disallows storing item in memcached", + "key", key, + "err", err) + } else { + // If the PickServer will fail for any reason the server address will be nil + // and so missing in the logs. We're OK with that (it's a best effort). + serverAddr, _ := c.selector.PickServer(key) + level.Debug(c.logger).Log( + "msg", "failed to store item to memcached", + "key", key, + "sizeBytes", len(value), + "server", serverAddr, + "err", err, + ) + } c.trackError(opSet, err) return } diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 4aa355deaa..7b0571c32e 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "net" + "strconv" "sync" "testing" "time" @@ -16,9 +17,11 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sony/gobreaker" "go.uber.org/atomic" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" ) @@ -33,6 +36,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: nil, }, @@ -41,6 +46,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedConfigNoAddrs, }, @@ -49,6 +56,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 0, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedMaxAsyncConcurrencyNotPositive, }, @@ -56,9 +65,40 @@ func TestMemcachedClientConfig_validate(t *testing.T) { config: MemcachedClientConfig{ Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedDNSUpdateIntervalNotPositive, }, + "should fail on circuit_breaker_consecutive_failures = 0": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 0, + }, + expected: errCircuitBreakerConsecutiveFailuresNotPositive, + }, + "should fail on circuit_breaker_failure_percent <= 0": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0, + }, + expected: errCircuitBreakerFailurePercentInvalid, + }, + "should fail on circuit_breaker_failure_percent >= 1": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 1.1, + }, + expected: errCircuitBreakerFailurePercentInvalid, + }, } for testName, testData := range tests { @@ -491,6 +531,9 @@ type memcachedClientBackendMock struct { items map[string]*memcache.Item getMultiCount int getMultiErrors int + + setCount int + setErrors int } func newMemcachedClientBackendMock() *memcachedClientBackendMock { @@ -522,17 +565,30 @@ func (c *memcachedClientBackendMock) Set(item *memcache.Item) error { c.lock.Lock() defer c.lock.Unlock() + c.setCount++ + if c.setCount <= c.setErrors { + return errors.New("mocked Set error") + } + c.items[item.Key] = item return nil } +func (c *memcachedClientBackendMock) waitSetCount(expected int) error { + return c.waitFor(expected, "the number of set operations", func() int { return c.setCount }) +} + func (c *memcachedClientBackendMock) waitItems(expected int) error { + return c.waitFor(expected, "items", func() int { return len(c.items) }) +} + +func (c *memcachedClientBackendMock) waitFor(expected int, name string, valueFunc func() int) error { deadline := time.Now().Add(1 * time.Second) for time.Now().Before(deadline) { c.lock.Lock() - count := len(c.items) + count := valueFunc() c.lock.Unlock() if count >= expected { @@ -540,7 +596,7 @@ func (c *memcachedClientBackendMock) waitItems(expected int) error { } } - return errors.New("timeout expired while waiting for items in the memcached mock") + return fmt.Errorf("timeout expired while waiting for %s in the memcached mock", name) } // countingGate implements gate.Gate and counts the number of times Start is called. @@ -630,3 +686,73 @@ func (c *memcachedClientBlockingMock) GetMulti([]string) (map[string]*memcache.I func (c *memcachedClientBlockingMock) Set(*memcache.Item) error { return nil } + +func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { + for _, testdata := range []struct { + name string + setErrors int + minRequests uint32 + consecutiveFailures uint32 + failurePercent float64 + expectCircuitBreakerOpen bool + }{ + { + name: "remains closed due to min requests not satisfied", + setErrors: 10, + minRequests: 100, + consecutiveFailures: 1, + failurePercent: 0.00001, + expectCircuitBreakerOpen: false, + }, + { + name: "opened because too many consecutive failures", + setErrors: 10, + minRequests: 10, + consecutiveFailures: 10, + failurePercent: 1, + expectCircuitBreakerOpen: true, + }, + { + name: "opened because failure percent too high", + setErrors: 10, + minRequests: 10, + consecutiveFailures: 100, + failurePercent: 0.1, + expectCircuitBreakerOpen: true, + }, + } { + t.Run(testdata.name, func(t *testing.T) { + config := defaultMemcachedClientConfig + config.Addresses = []string{"127.0.0.1:11211"} + config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond + config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 + config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests + config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures + config.SetAsyncCircuitBreakerFailurePercent = testdata.failurePercent + + backendMock := newMemcachedClientBackendMock() + backendMock.setErrors = testdata.setErrors + + client, err := prepare(config, backendMock) + testutil.Ok(t, err) + defer client.Stop() + + // Populate memcached with the initial items. + for i := 0; i < testdata.setErrors; i++ { + testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) + } + + testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) + if testdata.expectCircuitBreakerOpen { + testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State()) + time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) + for i := testdata.setErrors; i < testdata.setErrors+10; i++ { + testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) + } + testutil.Ok(t, backendMock.waitItems(10)) + } else { + testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State()) + } + }) + } +} diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 032ed2942d..c3bf986d91 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/redis/rueidis" + "github.com/sony/gobreaker" "gopkg.in/yaml.v3" "github.com/thanos-io/thanos/pkg/extprom" @@ -39,6 +40,11 @@ var ( TLSConfig: TLSConfig{}, MaxAsyncConcurrency: 20, MaxAsyncBufferSize: 10000, + SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, + SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, + SetAsyncCircuitBreakerMinRequests: 50, + SetAsyncCircuitBreakerConsecutiveFailures: 5, + SetAsyncCircuitBreakerFailurePercent: 0.05, } ) @@ -118,6 +124,20 @@ type RedisClientConfig struct { // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + + // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through + // when the circuit breaker is half-open. + // If set to 0, the circuit breaker allows only 1 request. + SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` + // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. + // If set to 0, the circuit breaker resets it to 60 seconds. + SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` + // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. + SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` + // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. + SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` + // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. + SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` } func (c *RedisClientConfig) validate() error { @@ -131,6 +151,12 @@ func (c *RedisClientConfig) validate() error { } } + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } return nil } @@ -150,6 +176,8 @@ type RedisClient struct { durationGetMulti prometheus.Observer p *AsyncOperationProcessor + + setAsyncCircuitBreaker *gobreaker.CircuitBreaker } // NewRedisClient makes a new RedisClient. @@ -232,6 +260,17 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient config.MaxSetMultiConcurrency, gate.Sets, ), + setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "redis-set-async", + MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, + Interval: 10 * time.Second, + Timeout: config.SetAsyncCircuitBreakerOpenDuration, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && + (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || + float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) + }, + }), } duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_redis_operation_duration_seconds", @@ -249,7 +288,10 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { return c.p.EnqueueAsync(func() { start := time.Now() - if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil { + _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { + return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + }) + if err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value)) return } diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index dcdb714012..6e755235fd 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -197,6 +197,24 @@ func TestValidateRedisConfig(t *testing.T) { }, expect_err: true, }, + { + name: "invalidCircuitBreakerFailurePercent", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + return cfg + }, + expect_err: true, + }, + { + name: "invalidCircuitBreakerFailurePercent", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.SetAsyncCircuitBreakerFailurePercent = 0 + return cfg + }, + expect_err: true, + }, } for _, tt := range tests { From c9b1f7cc9ace47801d4f7639ffeb407fbfa8bade Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 25 Jan 2024 15:33:04 +0800 Subject: [PATCH 2/6] Add feature flag for circuitbreaker Signed-off-by: Xiaochao Dong (@damnever) --- CHANGELOG.md | 1 + pkg/cacheutil/cacheutil.go | 22 +++++++++++++++++++ pkg/cacheutil/memcached_client.go | 29 ++++++++++++++++++++------ pkg/cacheutil/memcached_client_test.go | 6 ++++-- pkg/cacheutil/redis_client.go | 26 ++++++++++++++++++----- 5 files changed, 71 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 832465e3e0..d898fe976d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label. - [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions. - [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage. +- [#7010](https://github.com/thanos-io/thanos/pull/7010) Cache: Added `set_async_circuit_breaker_*` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. ### Changed diff --git a/pkg/cacheutil/cacheutil.go b/pkg/cacheutil/cacheutil.go index 5d91bb9a3f..73d183d340 100644 --- a/pkg/cacheutil/cacheutil.go +++ b/pkg/cacheutil/cacheutil.go @@ -8,6 +8,8 @@ import ( "golang.org/x/sync/errgroup" + "github.com/sony/gobreaker" + "github.com/thanos-io/thanos/pkg/gate" ) @@ -40,3 +42,23 @@ func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate } return g.Wait() } + +// CircuitBreaker implements the circuit breaker pattern https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern. +type CircuitBreaker interface { + Execute(func() error) error +} + +type noopCircuitBreaker struct{} + +func (noopCircuitBreaker) Execute(f func() error) error { return f() } + +type gobreakerCircuitBreaker struct { + *gobreaker.CircuitBreaker +} + +func (cb gobreakerCircuitBreaker) Execute(f func() error) error { + _, err := cb.CircuitBreaker.Execute(func() (any, error) { + return nil, f() + }) + return err +} diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 6589ea89d3..a7d08274b3 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -57,6 +57,8 @@ var ( MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, AutoDiscovery: false, + + SetAsyncCircuitBreakerEnabled: false, SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, SetAsyncCircuitBreakerMinRequests: 50, @@ -150,6 +152,18 @@ type MemcachedClientConfig struct { // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution AutoDiscovery bool `yaml:"auto_discovery"` + // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. + // + // The circuit breaker consists of three states: closed, half-open, and open. + // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, + // and either consecutive failures occur or the failure percentage is excessively high according + // to the configured values, the circuit breaker transitions to the open state. + // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, + // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests + // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, + // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds + // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. + SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. // If set to 0, the circuit breaker allows only 1 request. @@ -224,7 +238,7 @@ type memcachedClient struct { p *AsyncOperationProcessor - setAsyncCircuitBreaker *gobreaker.CircuitBreaker + setAsyncCircuitBreaker CircuitBreaker } // AddressProvider performs node address resolution given a list of clusters. @@ -307,8 +321,11 @@ func newMemcachedClient( config.MaxGetMultiConcurrency, gate.Gets, ), - p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), - setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + setAsyncCircuitBreaker: noopCircuitBreaker{}, + } + if config.SetAsyncCircuitBreakerEnabled { + c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "memcached-set-async", MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, Interval: 10 * time.Second, @@ -318,7 +335,7 @@ func newMemcachedClient( (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) }, - }), + })} } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -416,8 +433,8 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) start := time.Now() c.operations.WithLabelValues(opSet).Inc() - _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { - return nil, c.client.Set(&memcache.Item{ + err := c.setAsyncCircuitBreaker.Execute(func() error { + return c.client.Set(&memcache.Item{ Key: key, Value: value, Expiration: int32(time.Now().Add(ttl).Unix()), diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 7b0571c32e..297ff7d281 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -724,6 +724,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { t.Run(testdata.name, func(t *testing.T) { config := defaultMemcachedClientConfig config.Addresses = []string{"127.0.0.1:11211"} + config.SetAsyncCircuitBreakerEnabled = true config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests @@ -743,15 +744,16 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { } testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) + cbimpl := client.setAsyncCircuitBreaker.(gobreakerCircuitBreaker).CircuitBreaker if testdata.expectCircuitBreakerOpen { - testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State()) + testutil.Equals(t, gobreaker.StateOpen, cbimpl.State()) time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) for i := testdata.setErrors; i < testdata.setErrors+10; i++ { testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) } testutil.Ok(t, backendMock.waitItems(10)) } else { - testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State()) + testutil.Equals(t, gobreaker.StateClosed, cbimpl.State()) } }) } diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index c3bf986d91..b2aef1197d 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -125,6 +125,18 @@ type RedisClientConfig struct { // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. + // + // The circuit breaker consists of three states: closed, half-open, and open. + // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, + // and either consecutive failures occur or the failure percentage is excessively high according + // to the configured values, the circuit breaker transitions to the open state. + // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, + // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests + // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, + // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds + // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. + SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. // If set to 0, the circuit breaker allows only 1 request. @@ -177,7 +189,7 @@ type RedisClient struct { p *AsyncOperationProcessor - setAsyncCircuitBreaker *gobreaker.CircuitBreaker + setAsyncCircuitBreaker CircuitBreaker } // NewRedisClient makes a new RedisClient. @@ -260,7 +272,10 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient config.MaxSetMultiConcurrency, gate.Sets, ), - setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + setAsyncCircuitBreaker: noopCircuitBreaker{}, + } + if config.SetAsyncCircuitBreakerEnabled { + c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "redis-set-async", MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, Interval: 10 * time.Second, @@ -270,8 +285,9 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) }, - }), + })} } + duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_redis_operation_duration_seconds", Help: "Duration of operations against redis.", @@ -288,8 +304,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { return c.p.EnqueueAsync(func() { start := time.Now() - _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { - return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + err := c.setAsyncCircuitBreaker.Execute(func() error { + return c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() }) if err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value)) From 808bdc9ededb6eb3040e21d6ca7f9286c1fe9355 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 25 Jan 2024 15:35:13 +0800 Subject: [PATCH 3/6] Sync docs Signed-off-by: Xiaochao Dong (@damnever) --- docs/components/query-frontend.md | 12 ++++++++++++ docs/components/store.md | 18 ++++++++++++++++++ pkg/cacheutil/memcached_client.go | 10 ---------- pkg/cacheutil/redis_client.go | 12 ++---------- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index e374f19ec1..16976bfbd6 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -77,6 +77,12 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false + set_async_circuit_breaker_enabled: false + set_async_circuit_breaker_half_open_max_requests: 0 + set_async_circuit_breaker_open_duration: 0s + set_async_circuit_breaker_min_requests: 0 + set_async_circuit_breaker_consecutive_failures: 0 + set_async_circuit_breaker_failure_percent: 0 expiration: 0s ``` @@ -132,6 +138,12 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 + set_async_circuit_breaker_enabled: false + set_async_circuit_breaker_half_open_max_requests: 10 + set_async_circuit_breaker_open_duration: 5s + set_async_circuit_breaker_min_requests: 50 + set_async_circuit_breaker_consecutive_failures: 5 + set_async_circuit_breaker_failure_percent: 0.05 expiration: 24h0m0s ``` diff --git a/docs/components/store.md b/docs/components/store.md index 8ecc53d68f..d36eda6af2 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -325,6 +325,12 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false + set_async_circuit_breaker_enabled: false + set_async_circuit_breaker_half_open_max_requests: 0 + set_async_circuit_breaker_open_duration: 0s + set_async_circuit_breaker_min_requests: 0 + set_async_circuit_breaker_consecutive_failures: 0 + set_async_circuit_breaker_failure_percent: 0 enabled_items: [] ttl: 0s ``` @@ -340,6 +346,12 @@ While the remaining settings are **optional**: - `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur. - `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed. - `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited. +- `set_async_circuit_breaker_enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `set_async_circuit_breaker_min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `set_async_circuit_breaker_open_duration`, the circuit breaker transitions to the half-open state, where it allows `set_async_circuit_breaker_half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. +- `set_async_circuit_breaker_half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request. +- `set_async_circuit_breaker_open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker resets it to 60 seconds. +- `set_async_circuit_breaker_min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements. +- `set_async_circuit_breaker_consecutive_failures`: consecutive failures based on `set_async_circuit_breaker_min_requests` to determine if the circuit breaker should open. +- `set_async_circuit_breaker_failure_percent`: the failure percentage, which is based on `set_async_circuit_breaker_min_requests`, to determine if the circuit breaker should open. - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. @@ -376,6 +388,12 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 + set_async_circuit_breaker_enabled: false + set_async_circuit_breaker_half_open_max_requests: 10 + set_async_circuit_breaker_open_duration: 5s + set_async_circuit_breaker_min_requests: 50 + set_async_circuit_breaker_consecutive_failures: 5 + set_async_circuit_breaker_failure_percent: 0.05 enabled_items: [] ttl: 0s ``` diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index a7d08274b3..5754709932 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -153,16 +153,6 @@ type MemcachedClientConfig struct { AutoDiscovery bool `yaml:"auto_discovery"` // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. - // - // The circuit breaker consists of three states: closed, half-open, and open. - // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, - // and either consecutive failures occur or the failure percentage is excessively high according - // to the configured values, the circuit breaker transitions to the open state. - // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, - // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests - // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, - // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds - // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index b2aef1197d..6a0a9aedc6 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -40,6 +40,8 @@ var ( TLSConfig: TLSConfig{}, MaxAsyncConcurrency: 20, MaxAsyncBufferSize: 10000, + + SetAsyncCircuitBreakerEnabled: false, SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, SetAsyncCircuitBreakerMinRequests: 50, @@ -126,16 +128,6 @@ type RedisClientConfig struct { MaxAsyncConcurrency int `yaml:"max_async_concurrency"` // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. - // - // The circuit breaker consists of three states: closed, half-open, and open. - // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, - // and either consecutive failures occur or the failure percentage is excessively high according - // to the configured values, the circuit breaker transitions to the open state. - // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, - // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests - // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, - // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds - // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. From f76994e59b34bffb228b777abb4b59fe30110979 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Mon, 29 Jan 2024 18:55:16 +0800 Subject: [PATCH 4/6] Skip configuration validation if the circuit breaker is disabled Signed-off-by: Xiaochao Dong (@damnever) --- pkg/cacheutil/memcached_client.go | 12 +++++---- pkg/cacheutil/memcached_client_test.go | 37 +++++++++++++------------- pkg/cacheutil/redis_client.go | 12 +++++---- pkg/cacheutil/redis_client_test.go | 15 +++++++++++ 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 5754709932..1b6a681b1b 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -184,11 +184,13 @@ func (c *MemcachedClientConfig) validate() error { return errMemcachedMaxAsyncConcurrencyNotPositive } - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid + if c.SetAsyncCircuitBreakerEnabled { + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } } return nil } diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 297ff7d281..3ebf8b515d 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -36,8 +36,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: nil, }, @@ -46,8 +44,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedConfigNoAddrs, }, @@ -56,8 +52,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 0, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedMaxAsyncConcurrencyNotPositive, }, @@ -65,25 +59,25 @@ func TestMemcachedClientConfig_validate(t *testing.T) { config: MemcachedClientConfig{ Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedDNSUpdateIntervalNotPositive, }, "should fail on circuit_breaker_consecutive_failures = 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 0, }, expected: errCircuitBreakerConsecutiveFailuresNotPositive, }, "should fail on circuit_breaker_failure_percent <= 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 1, SetAsyncCircuitBreakerFailurePercent: 0, }, @@ -91,9 +85,10 @@ func TestMemcachedClientConfig_validate(t *testing.T) { }, "should fail on circuit_breaker_failure_percent >= 1": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 1, SetAsyncCircuitBreakerFailurePercent: 1.1, }, @@ -725,7 +720,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { config := defaultMemcachedClientConfig config.Addresses = []string{"127.0.0.1:11211"} config.SetAsyncCircuitBreakerEnabled = true - config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond + config.SetAsyncCircuitBreakerOpenDuration = 2 * time.Millisecond config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures @@ -746,7 +741,11 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) cbimpl := client.setAsyncCircuitBreaker.(gobreakerCircuitBreaker).CircuitBreaker if testdata.expectCircuitBreakerOpen { - testutil.Equals(t, gobreaker.StateOpen, cbimpl.State()) + // Trigger the state transaction. + time.Sleep(time.Millisecond) + testutil.Ok(t, client.SetAsync(strconv.Itoa(testdata.setErrors), []byte("value"), time.Second)) + testutil.Equals(t, gobreaker.StateOpen, cbimpl.State(), "state should be open") + time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) for i := testdata.setErrors; i < testdata.setErrors+10; i++ { testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 6a0a9aedc6..b6c63bc619 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -155,11 +155,13 @@ func (c *RedisClientConfig) validate() error { } } - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid + if c.SetAsyncCircuitBreakerEnabled { + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } } return nil } diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index 6e755235fd..6845b45663 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -197,10 +197,23 @@ func TestValidateRedisConfig(t *testing.T) { }, expect_err: true, }, + { + name: "SetAsyncCircuitBreakerDisabled", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = false + cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + return cfg + }, + expect_err: false, + }, { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 return cfg }, @@ -210,6 +223,8 @@ func TestValidateRedisConfig(t *testing.T) { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerFailurePercent = 0 return cfg }, From 07d913426c8f689cf9699ca14bbd745b0734ab5b Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 1 Feb 2024 15:49:19 +0800 Subject: [PATCH 5/6] Make lint happy Signed-off-by: Xiaochao Dong (@damnever) --- pkg/cacheutil/redis_client_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index 6845b45663..3f3f92bb02 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -138,6 +138,7 @@ func TestRedisClient(t *testing.T) { } func TestValidateRedisConfig(t *testing.T) { + addr := "127.0.0.1:6789" tests := []struct { name string config func() RedisClientConfig @@ -147,7 +148,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "simpleConfig", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.Username = "user" cfg.Password = "1234" return cfg @@ -158,7 +159,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "tlsConfigDefaults", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.Username = "user" cfg.Password = "1234" cfg.TLSEnabled = true @@ -170,7 +171,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "tlsClientCertConfig", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.Username = "user" cfg.Password = "1234" cfg.TLSEnabled = true @@ -186,7 +187,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "tlsInvalidClientCertConfig", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.Username = "user" cfg.Password = "1234" cfg.TLSEnabled = true @@ -201,7 +202,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "SetAsyncCircuitBreakerDisabled", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.SetAsyncCircuitBreakerEnabled = false cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 return cfg @@ -212,7 +213,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 return cfg @@ -223,7 +224,7 @@ func TestValidateRedisConfig(t *testing.T) { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig - cfg.Addr = "127.0.0.1:6789" + cfg.Addr = addr cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerFailurePercent = 0 return cfg From e299abe5627cbe3b2755fc6e33ccbd3f75ae34d2 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Mon, 19 Feb 2024 15:09:00 +0800 Subject: [PATCH 6/6] Abstract the logic of the circuit breaker Signed-off-by: Xiaochao Dong (@damnever) --- docs/components/query-frontend.md | 26 +++++----- docs/components/store.md | 39 ++++++++------- pkg/cacheutil/cacheutil.go | 69 +++++++++++++++++++++++++- pkg/cacheutil/memcached_client.go | 56 ++++----------------- pkg/cacheutil/memcached_client_test.go | 54 +++++++++++--------- pkg/cacheutil/redis_client.go | 49 +++--------------- pkg/cacheutil/redis_client_test.go | 12 ++--- 7 files changed, 153 insertions(+), 152 deletions(-) diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index 16976bfbd6..5df99529f2 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -77,12 +77,13 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false - set_async_circuit_breaker_enabled: false - set_async_circuit_breaker_half_open_max_requests: 0 - set_async_circuit_breaker_open_duration: 0s - set_async_circuit_breaker_min_requests: 0 - set_async_circuit_breaker_consecutive_failures: 0 - set_async_circuit_breaker_failure_percent: 0 + set_async_circuit_breaker_config: + enabled: false + half_open_max_requests: 0 + open_duration: 0s + min_requests: 0 + consecutive_failures: 0 + failure_percent: 0 expiration: 0s ``` @@ -138,12 +139,13 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 - set_async_circuit_breaker_enabled: false - set_async_circuit_breaker_half_open_max_requests: 10 - set_async_circuit_breaker_open_duration: 5s - set_async_circuit_breaker_min_requests: 50 - set_async_circuit_breaker_consecutive_failures: 5 - set_async_circuit_breaker_failure_percent: 0.05 + set_async_circuit_breaker_config: + enabled: false + half_open_max_requests: 10 + open_duration: 5s + min_requests: 50 + consecutive_failures: 5 + failure_percent: 0.05 expiration: 24h0m0s ``` diff --git a/docs/components/store.md b/docs/components/store.md index d36eda6af2..94a56d37f1 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -325,12 +325,13 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false - set_async_circuit_breaker_enabled: false - set_async_circuit_breaker_half_open_max_requests: 0 - set_async_circuit_breaker_open_duration: 0s - set_async_circuit_breaker_min_requests: 0 - set_async_circuit_breaker_consecutive_failures: 0 - set_async_circuit_breaker_failure_percent: 0 + set_async_circuit_breaker_config: + enabled: false + half_open_max_requests: 0 + open_duration: 0s + min_requests: 0 + consecutive_failures: 0 + failure_percent: 0 enabled_items: [] ttl: 0s ``` @@ -346,16 +347,17 @@ While the remaining settings are **optional**: - `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur. - `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed. - `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited. -- `set_async_circuit_breaker_enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `set_async_circuit_breaker_min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `set_async_circuit_breaker_open_duration`, the circuit breaker transitions to the half-open state, where it allows `set_async_circuit_breaker_half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. -- `set_async_circuit_breaker_half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request. -- `set_async_circuit_breaker_open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker resets it to 60 seconds. -- `set_async_circuit_breaker_min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements. -- `set_async_circuit_breaker_consecutive_failures`: consecutive failures based on `set_async_circuit_breaker_min_requests` to determine if the circuit breaker should open. -- `set_async_circuit_breaker_failure_percent`: the failure percentage, which is based on `set_async_circuit_breaker_min_requests`, to determine if the circuit breaker should open. - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. - `auto_discovery`: whether to use the auto-discovery mechanism for memcached. +- `set_async_circuit_breaker_config`: the configuration for the circuit breaker for asynchronous set operations. + - `enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `open_duration`, the circuit breaker transitions to the half-open state, where it allows `half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. + - `half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request. + - `open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker utilizes the default value of 60 seconds. + - `min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements. + - `consecutive_failures`: consecutive failures based on `min_requests` to determine if the circuit breaker should open. + - `failure_percent`: the failure percentage, which is based on `min_requests`, to determine if the circuit breaker should open. - `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. - `ttl`: ttl to store index cache items in memcached. @@ -388,12 +390,13 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 - set_async_circuit_breaker_enabled: false - set_async_circuit_breaker_half_open_max_requests: 10 - set_async_circuit_breaker_open_duration: 5s - set_async_circuit_breaker_min_requests: 50 - set_async_circuit_breaker_consecutive_failures: 5 - set_async_circuit_breaker_failure_percent: 0.05 + set_async_circuit_breaker_config: + enabled: false + half_open_max_requests: 10 + open_duration: 5s + min_requests: 50 + consecutive_failures: 5 + failure_percent: 0.05 enabled_items: [] ttl: 0s ``` diff --git a/pkg/cacheutil/cacheutil.go b/pkg/cacheutil/cacheutil.go index 73d183d340..8f6dea7b7e 100644 --- a/pkg/cacheutil/cacheutil.go +++ b/pkg/cacheutil/cacheutil.go @@ -5,14 +5,29 @@ package cacheutil import ( "context" + "time" - "golang.org/x/sync/errgroup" - + "github.com/pkg/errors" "github.com/sony/gobreaker" + "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/gate" ) +var ( + errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("circuit breaker: consecutive failures must be greater than 0") + errCircuitBreakerFailurePercentInvalid = errors.New("circuit breaker: failure percent must be in range (0,1]") + + defaultCircuitBreakerConfig = CircuitBreakerConfig{ + Enabled: false, + HalfOpenMaxRequests: 10, + OpenDuration: 5 * time.Second, + MinRequests: 50, + ConsecutiveFailures: 5, + FailurePercent: 0.05, + } +) + // doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate. func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error { if totalSize == 0 { @@ -48,6 +63,39 @@ type CircuitBreaker interface { Execute(func() error) error } +// CircuitBreakerConfig is the config for the circuite breaker. +type CircuitBreakerConfig struct { + // Enabled enables circuite breaker. + Enabled bool `yaml:"enabled"` + + // HalfOpenMaxRequests is the maximum number of requests allowed to pass through + // when the circuit breaker is half-open. + // If set to 0, the circuit breaker allows only 1 request. + HalfOpenMaxRequests uint32 `yaml:"half_open_max_requests"` + // OpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. + // If set to 0, the circuit breaker utilizes the default value of 60 seconds. + OpenDuration time.Duration `yaml:"open_duration"` + // MinRequests is minimal requests to trigger the circuit breaker. + MinRequests uint32 `yaml:"min_requests"` + // ConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. + ConsecutiveFailures uint32 `yaml:"consecutive_failures"` + // FailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. + FailurePercent float64 `yaml:"failure_percent"` +} + +func (c CircuitBreakerConfig) validate() error { + if !c.Enabled { + return nil + } + if c.ConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.FailurePercent <= 0 || c.FailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } + return nil +} + type noopCircuitBreaker struct{} func (noopCircuitBreaker) Execute(f func() error) error { return f() } @@ -62,3 +110,20 @@ func (cb gobreakerCircuitBreaker) Execute(f func() error) error { }) return err } + +func newCircuitBreaker(name string, config CircuitBreakerConfig) CircuitBreaker { + if !config.Enabled { + return noopCircuitBreaker{} + } + return gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: name, + MaxRequests: config.HalfOpenMaxRequests, + Interval: 10 * time.Second, + Timeout: config.OpenDuration, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.Requests >= config.MinRequests && + (counts.ConsecutiveFailures >= uint32(config.ConsecutiveFailures) || + float64(counts.TotalFailures)/float64(counts.Requests) >= config.FailurePercent) + }, + })} +} diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 1b6a681b1b..8c10a9a874 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -41,11 +41,9 @@ const ( ) var ( - errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") - errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") - errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") - errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0") - errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]") + errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") + errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") + errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") defaultMemcachedClientConfig = MemcachedClientConfig{ Timeout: 500 * time.Millisecond, @@ -58,12 +56,7 @@ var ( DNSProviderUpdateInterval: 10 * time.Second, AutoDiscovery: false, - SetAsyncCircuitBreakerEnabled: false, - SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, - SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, - SetAsyncCircuitBreakerMinRequests: 50, - SetAsyncCircuitBreakerConsecutiveFailures: 5, - SetAsyncCircuitBreakerFailurePercent: 0.05, + SetAsyncCircuitBreaker: defaultCircuitBreakerConfig, } ) @@ -152,21 +145,8 @@ type MemcachedClientConfig struct { // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution AutoDiscovery bool `yaml:"auto_discovery"` - // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. - SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` - // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through - // when the circuit breaker is half-open. - // If set to 0, the circuit breaker allows only 1 request. - SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` - // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. - // If set to 0, the circuit breaker resets it to 60 seconds. - SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` - // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. - SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` - // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. - SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` - // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. - SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` + // SetAsyncCircuitBreaker configures the circuit breaker for SetAsync operations. + SetAsyncCircuitBreaker CircuitBreakerConfig `yaml:"set_async_circuit_breaker_config"` } func (c *MemcachedClientConfig) validate() error { @@ -184,13 +164,8 @@ func (c *MemcachedClientConfig) validate() error { return errMemcachedMaxAsyncConcurrencyNotPositive } - if c.SetAsyncCircuitBreakerEnabled { - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid - } + if err := c.SetAsyncCircuitBreaker.validate(); err != nil { + return err } return nil } @@ -314,20 +289,7 @@ func newMemcachedClient( gate.Gets, ), p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), - setAsyncCircuitBreaker: noopCircuitBreaker{}, - } - if config.SetAsyncCircuitBreakerEnabled { - c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "memcached-set-async", - MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, - Interval: 10 * time.Second, - Timeout: config.SetAsyncCircuitBreakerOpenDuration, - ReadyToTrip: func(counts gobreaker.Counts) bool { - return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && - (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || - float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) - }, - })} + setAsyncCircuitBreaker: newCircuitBreaker("memcached-set-async", config.SetAsyncCircuitBreaker), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 3ebf8b515d..cac450ef9e 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -64,33 +64,39 @@ func TestMemcachedClientConfig_validate(t *testing.T) { }, "should fail on circuit_breaker_consecutive_failures = 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerEnabled: true, - SetAsyncCircuitBreakerConsecutiveFailures: 0, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreaker: CircuitBreakerConfig{ + Enabled: true, + ConsecutiveFailures: 0, + }, }, expected: errCircuitBreakerConsecutiveFailuresNotPositive, }, "should fail on circuit_breaker_failure_percent <= 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerEnabled: true, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreaker: CircuitBreakerConfig{ + Enabled: true, + ConsecutiveFailures: 1, + FailurePercent: 0, + }, }, expected: errCircuitBreakerFailurePercentInvalid, }, "should fail on circuit_breaker_failure_percent >= 1": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerEnabled: true, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 1.1, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreaker: CircuitBreakerConfig{ + Enabled: true, + ConsecutiveFailures: 1, + FailurePercent: 1.1, + }, }, expected: errCircuitBreakerFailurePercentInvalid, }, @@ -719,12 +725,12 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { t.Run(testdata.name, func(t *testing.T) { config := defaultMemcachedClientConfig config.Addresses = []string{"127.0.0.1:11211"} - config.SetAsyncCircuitBreakerEnabled = true - config.SetAsyncCircuitBreakerOpenDuration = 2 * time.Millisecond - config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 - config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests - config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures - config.SetAsyncCircuitBreakerFailurePercent = testdata.failurePercent + config.SetAsyncCircuitBreaker.Enabled = true + config.SetAsyncCircuitBreaker.OpenDuration = 2 * time.Millisecond + config.SetAsyncCircuitBreaker.HalfOpenMaxRequests = 100 + config.SetAsyncCircuitBreaker.MinRequests = testdata.minRequests + config.SetAsyncCircuitBreaker.ConsecutiveFailures = testdata.consecutiveFailures + config.SetAsyncCircuitBreaker.FailurePercent = testdata.failurePercent backendMock := newMemcachedClientBackendMock() backendMock.setErrors = testdata.setErrors @@ -746,7 +752,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { testutil.Ok(t, client.SetAsync(strconv.Itoa(testdata.setErrors), []byte("value"), time.Second)) testutil.Equals(t, gobreaker.StateOpen, cbimpl.State(), "state should be open") - time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) + time.Sleep(config.SetAsyncCircuitBreaker.OpenDuration) for i := testdata.setErrors; i < testdata.setErrors+10; i++ { testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) } diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index b6c63bc619..09e664b4dc 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/redis/rueidis" - "github.com/sony/gobreaker" "gopkg.in/yaml.v3" "github.com/thanos-io/thanos/pkg/extprom" @@ -41,12 +40,7 @@ var ( MaxAsyncConcurrency: 20, MaxAsyncBufferSize: 10000, - SetAsyncCircuitBreakerEnabled: false, - SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, - SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, - SetAsyncCircuitBreakerMinRequests: 50, - SetAsyncCircuitBreakerConsecutiveFailures: 5, - SetAsyncCircuitBreakerFailurePercent: 0.05, + SetAsyncCircuitBreaker: defaultCircuitBreakerConfig, } ) @@ -127,21 +121,8 @@ type RedisClientConfig struct { // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. MaxAsyncConcurrency int `yaml:"max_async_concurrency"` - // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. - SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` - // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through - // when the circuit breaker is half-open. - // If set to 0, the circuit breaker allows only 1 request. - SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` - // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. - // If set to 0, the circuit breaker resets it to 60 seconds. - SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` - // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. - SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` - // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. - SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` - // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. - SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` + // SetAsyncCircuitBreaker configures the circuit breaker for SetAsync operations. + SetAsyncCircuitBreaker CircuitBreakerConfig `yaml:"set_async_circuit_breaker_config"` } func (c *RedisClientConfig) validate() error { @@ -155,13 +136,8 @@ func (c *RedisClientConfig) validate() error { } } - if c.SetAsyncCircuitBreakerEnabled { - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid - } + if err := c.SetAsyncCircuitBreaker.validate(); err != nil { + return err } return nil } @@ -266,20 +242,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient config.MaxSetMultiConcurrency, gate.Sets, ), - setAsyncCircuitBreaker: noopCircuitBreaker{}, - } - if config.SetAsyncCircuitBreakerEnabled { - c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "redis-set-async", - MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, - Interval: 10 * time.Second, - Timeout: config.SetAsyncCircuitBreakerOpenDuration, - ReadyToTrip: func(counts gobreaker.Counts) bool { - return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && - (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || - float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) - }, - })} + setAsyncCircuitBreaker: newCircuitBreaker("redis-set-async", config.SetAsyncCircuitBreaker), } duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index 3f3f92bb02..938d4337fc 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -203,8 +203,8 @@ func TestValidateRedisConfig(t *testing.T) { config: func() RedisClientConfig { cfg := DefaultRedisClientConfig cfg.Addr = addr - cfg.SetAsyncCircuitBreakerEnabled = false - cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + cfg.SetAsyncCircuitBreaker.Enabled = false + cfg.SetAsyncCircuitBreaker.ConsecutiveFailures = 0 return cfg }, expect_err: false, @@ -214,8 +214,8 @@ func TestValidateRedisConfig(t *testing.T) { config: func() RedisClientConfig { cfg := DefaultRedisClientConfig cfg.Addr = addr - cfg.SetAsyncCircuitBreakerEnabled = true - cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + cfg.SetAsyncCircuitBreaker.Enabled = true + cfg.SetAsyncCircuitBreaker.ConsecutiveFailures = 0 return cfg }, expect_err: true, @@ -225,8 +225,8 @@ func TestValidateRedisConfig(t *testing.T) { config: func() RedisClientConfig { cfg := DefaultRedisClientConfig cfg.Addr = addr - cfg.SetAsyncCircuitBreakerEnabled = true - cfg.SetAsyncCircuitBreakerFailurePercent = 0 + cfg.SetAsyncCircuitBreaker.Enabled = true + cfg.SetAsyncCircuitBreaker.FailurePercent = 0 return cfg }, expect_err: true,