Skip to content

Commit

Permalink
Add feature flag for circuitbreaker
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
  • Loading branch information
damnever committed Jan 25, 2024
1 parent 4dd89b7 commit 135a1af
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.
- [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage.
- [#7010](https://github.com/thanos-io/thanos/pull/7010) Cache: Added `set_async_circuit_breaker_*` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations.

### Changed

Expand Down
22 changes: 22 additions & 0 deletions pkg/cacheutil/cacheutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"golang.org/x/sync/errgroup"

"github.com/sony/gobreaker"

"github.com/thanos-io/thanos/pkg/gate"
)

Expand Down Expand Up @@ -40,3 +42,23 @@ func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate
}
return g.Wait()
}

// CircuitBreaker implements the circuit breaker pattern https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern.
type CircuitBreaker interface {
Execute(func() error) error
}

type noopCircuitBreaker struct{}

func (noopCircuitBreaker) Execute(f func() error) error { return f() }

type gobreakerCircuitBreaker struct {
*gobreaker.CircuitBreaker
}

func (cb gobreakerCircuitBreaker) Execute(f func() error) error {
_, err := cb.CircuitBreaker.Execute(func() (any, error) {
return nil, f()
})
return err
}
29 changes: 23 additions & 6 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var (
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,

SetAsyncCircuitBreakerEnabled: false,
SetAsyncCircuitBreakerHalfOpenMaxRequests: 10,
SetAsyncCircuitBreakerOpenDuration: 5 * time.Second,
SetAsyncCircuitBreakerMinRequests: 50,
Expand Down Expand Up @@ -150,6 +152,18 @@ type MemcachedClientConfig struct {
// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`

// SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations.
//
// The circuit breaker consists of three states: closed, half-open, and open.
// It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests,
// and either consecutive failures occur or the failure percentage is excessively high according
// to the configured values, the circuit breaker transitions to the open state.
// This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration,
// the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests
// SetAsync requests to be processed in order to test if the conditions have improved. If they have not,
// the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds
// interval in the closed state, the circuit breaker resets its metrics and repeats this cycle.
SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"`
// SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
Expand Down Expand Up @@ -224,7 +238,7 @@ type memcachedClient struct {

p *AsyncOperationProcessor

setAsyncCircuitBreaker *gobreaker.CircuitBreaker
setAsyncCircuitBreaker CircuitBreaker
}

// AddressProvider performs node address resolution given a list of clusters.
Expand Down Expand Up @@ -307,8 +321,11 @@ func newMemcachedClient(
config.MaxGetMultiConcurrency,
gate.Gets,
),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
setAsyncCircuitBreaker: noopCircuitBreaker{},
}
if config.SetAsyncCircuitBreakerEnabled {
c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "memcached-set-async",
MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests,
Interval: 10 * time.Second,
Expand All @@ -318,7 +335,7 @@ func newMemcachedClient(
(counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent)
},
}),
})}
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down Expand Up @@ -416,8 +433,8 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()

_, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) {
return nil, c.client.Set(&memcache.Item{
err := c.setAsyncCircuitBreaker.Execute(func() error {
return c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
Expand Down
6 changes: 4 additions & 2 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) {
t.Run(testdata.name, func(t *testing.T) {
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.SetAsyncCircuitBreakerEnabled = true
config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond
config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100
config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests
Expand All @@ -743,15 +744,16 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) {
}

testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors))
cbimpl := client.setAsyncCircuitBreaker.(gobreakerCircuitBreaker).CircuitBreaker
if testdata.expectCircuitBreakerOpen {
testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State())
testutil.Equals(t, gobreaker.StateOpen, cbimpl.State())
time.Sleep(config.SetAsyncCircuitBreakerOpenDuration)
for i := testdata.setErrors; i < testdata.setErrors+10; i++ {
testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second))
}
testutil.Ok(t, backendMock.waitItems(10))
} else {
testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State())
testutil.Equals(t, gobreaker.StateClosed, cbimpl.State())
}
})
}
Expand Down
26 changes: 21 additions & 5 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ type RedisClientConfig struct {
// MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines.
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`

// SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations.
//
// The circuit breaker consists of three states: closed, half-open, and open.
// It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests,
// and either consecutive failures occur or the failure percentage is excessively high according
// to the configured values, the circuit breaker transitions to the open state.
// This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration,
// the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests
// SetAsync requests to be processed in order to test if the conditions have improved. If they have not,
// the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds
// interval in the closed state, the circuit breaker resets its metrics and repeats this cycle.
SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"`
// SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
Expand Down Expand Up @@ -177,7 +189,7 @@ type RedisClient struct {

p *AsyncOperationProcessor

setAsyncCircuitBreaker *gobreaker.CircuitBreaker
setAsyncCircuitBreaker CircuitBreaker
}

// NewRedisClient makes a new RedisClient.
Expand Down Expand Up @@ -260,7 +272,10 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
config.MaxSetMultiConcurrency,
gate.Sets,
),
setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{
setAsyncCircuitBreaker: noopCircuitBreaker{},
}
if config.SetAsyncCircuitBreakerEnabled {
c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "redis-set-async",
MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests,
Interval: 10 * time.Second,
Expand All @@ -270,8 +285,9 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
(counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent)
},
}),
})}
}

duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_redis_operation_duration_seconds",
Help: "Duration of operations against redis.",
Expand All @@ -288,8 +304,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error {
return c.p.EnqueueAsync(func() {
start := time.Now()
_, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) {
return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error()
err := c.setAsyncCircuitBreaker.Execute(func() error {
return c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error()
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
Expand Down

0 comments on commit 135a1af

Please sign in to comment.