Skip to content

Commit

Permalink
thriftbp: Add background refresh for ttlClients
Browse files Browse the repository at this point in the history
For every ttlClient that has a ttl, add a background goroutine to
refresh the connection when ttl hits.

This adds one goroutine per connection in the pool, until the connection
fails to refresh.

Both IsOpen and Call are protected by a locked state now. When ttlClient
is used, we always call IsOpen first then Call, and in theory these 2
calls might not be using the same connection (e.g. IsOpen is called
right before the connection expires), but even in that scenario, Call
would use a freshly refreshed connection, so things should still work.
  • Loading branch information
fishy committed Nov 2, 2021
1 parent 5ff49d6 commit aeb1b61
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 43 deletions.
43 changes: 30 additions & 13 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ type ClientPoolConfig struct {
// MaxConnectionAge. Default to 10% (see DefaultMaxConnectionAgeJitter).
// For example, when MaxConnectionAge is 5min and MaxConnectionAgeJitter is
// 10%, the TTL of the clients would be in range of (4:30, 5:30).
//
// When this is enabled, there will be one additional goroutine per connection
// in the pool to do background housekeeping (to replace the expired
// connections). We emit <ServiceSlug>.connection-housekeeping counter with
// success=True/False tag to provide observalibility into the background
// housekeeping.
//
// Due to a Go runtime bug [1], if you use a very small MaxConnectionAge or a
// jitter very close to 1, the background housekeeping could cause excessive
// CPU overhead.
//
// [1]: https://github.com/golang/go/issues/27707
MaxConnectionAge time.Duration `yaml:"maxConnectionAge"`
MaxConnectionAgeJitter *float64 `yaml:"maxConnectionAgeJitter"`

Expand Down Expand Up @@ -387,6 +399,8 @@ func newClientPool(
opener := func() (clientpool.Client, error) {
return newClient(
tConfig,
cfg.ServiceSlug,
cfg.MetricsTags,
cfg.MaxConnectionAge,
jitter,
genAddr,
Expand Down Expand Up @@ -465,26 +479,29 @@ func newClientPool(

func newClient(
cfg *thrift.TConfiguration,
slug string,
tags metricsbp.Tags,
maxConnectionAge time.Duration,
maxConnectionAgeJitter float64,
genAddr AddressGenerator,
protoFactory thrift.TProtocolFactory,
) (*ttlClient, error) {
addr, err := genAddr()
if err != nil {
return nil, fmt.Errorf("thriftbp: error getting next address for new Thrift client: %w", err)
}
return newTTLClient(func() (thrift.TClient, thrift.TTransport, error) {
addr, err := genAddr()
if err != nil {
return nil, nil, fmt.Errorf("thriftbp: error getting next address for new Thrift client: %w", err)
}

transport := thrift.NewTSocketConf(addr, cfg)
if err = transport.Open(); err != nil {
return nil, fmt.Errorf("thriftbp: error opening TSocket for new Thrift client: %w", err)
}
transport := thrift.NewTSocketConf(addr, cfg)
if err := transport.Open(); err != nil {
return nil, nil, fmt.Errorf("thriftbp: error opening TSocket for new Thrift client: %w", err)
}

client := thrift.NewTStandardClient(
protoFactory.GetProtocol(transport),
protoFactory.GetProtocol(transport),
)
return newTTLClient(transport, client, maxConnectionAge, maxConnectionAgeJitter), nil
return thrift.NewTStandardClient(
protoFactory.GetProtocol(transport),
protoFactory.GetProtocol(transport),
), transport, nil
}, maxConnectionAge, maxConnectionAgeJitter, slug, tags)
}

func reportPoolStats(ctx context.Context, prefix string, pool clientpool.Pool, tickerDuration time.Duration, tags []string) {
Expand Down
129 changes: 110 additions & 19 deletions thriftbp/ttl_client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package thriftbp

import (
"context"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-kit/kit/metrics"

"github.com/reddit/baseplate.go/metricsbp"
"github.com/reddit/baseplate.go/randbp"
)

type ttlClientGenerator func() (thrift.TClient, thrift.TTransport, error)

// DefaultMaxConnectionAge is the default max age for a Thrift client connection.
const DefaultMaxConnectionAge = time.Minute * 5

Expand All @@ -17,50 +22,136 @@ const DefaultMaxConnectionAgeJitter = 0.1

var _ Client = (*ttlClient)(nil)

type ttlClientState struct {
client thrift.TClient
transport thrift.TTransport
expiration time.Time // if expiration is zero, then the client will be kept open indefinetly.
timer *time.Timer
closed bool
}

// renew updates expiration and timer in s base on the given timestamp and
// client.
func (s *ttlClientState) renew(now time.Time, client *ttlClient) {
if client.ttl < 0 {
return
}
s.expiration = now.Add(client.ttl)
s.timer = time.AfterFunc(client.ttl, client.refresh)
}

// ttlClient is a Client implementation wrapping thrift's TTransport with a TTL.
type ttlClient struct {
thrift.TClient
// configs needed for refresh to work
generator ttlClientGenerator
ttl time.Duration

transport thrift.TTransport
replaceCounter metrics.Counter

// if expiration is zero, then the client will be kept open indefinetly.
expiration time.Time
// state guarded by lock (buffer-1 channel)
state chan *ttlClientState
}

// Close implements Client interface.
//
// It calls underlying TTransport's Close function.
func (c *ttlClient) Close() error {
return c.transport.Close()
state := <-c.state
defer func() {
c.state <- state
}()
state.closed = true
if state.timer != nil {
state.timer.Stop()
}
return state.transport.Close()
}

func (c *ttlClient) Call(ctx context.Context, method string, args, result thrift.TStruct) (thrift.ResponseMeta, error) {
state := <-c.state
defer func() {
c.state <- state
}()
return state.client.Call(ctx, method, args, result)
}

// IsOpen implements Client interface.
//
// If TTL has passed, it closes the underlying TTransport and returns false.
// Otherwise it just calls the underlying TTransport's IsOpen function.
// It checks underlying TTransport's IsOpen first,
// if that returns false, it returns false.
// Otherwise it checks TTL,
// returns false if TTL has passed and also close the underlying TTransport.
func (c *ttlClient) IsOpen() bool {
if !c.transport.IsOpen() {
state := <-c.state
defer func() {
c.state <- state
}()
if !state.transport.IsOpen() {
return false
}
if !c.expiration.IsZero() && time.Now().After(c.expiration) {
c.transport.Close()
if !state.expiration.IsZero() && time.Now().After(state.expiration) {
state.transport.Close()
return false
}
return true
}

// newTTLClient creates a ttlClient with a thrift TTransport and a ttl+jitter.
func newTTLClient(transport thrift.TTransport, client thrift.TClient, ttl time.Duration, jitter float64) *ttlClient {
var expiration time.Time
// refresh is called when the ttl hits to try to refresh the connection.
func (c *ttlClient) refresh() {
client, transport, err := c.generator()
if err != nil {
// We cannot replace this connection in the background,
// leave client and transport be,
// this connection will be replaced by the pool upon next use.
c.replaceCounter.With("success", "False").Add(1)
return
}

// replace with the refreshed connection
state := <-c.state
defer func() {
c.state <- state
}()
if state.closed {
// If Close was called after we entered this function,
// close the newly created connection and return early.
transport.Close()
return
}
state.renew(time.Now(), c)
state.client = client
if state.transport != nil {
// close the old transport before replacing it, to avoid connection leaks.
state.transport.Close()
}
state.transport = transport
c.replaceCounter.With("success", "True").Add(1)
}

// newTTLClient creates a ttlClient with a thrift TTransport and ttl+jitter.
func newTTLClient(generator ttlClientGenerator, ttl time.Duration, jitter float64, slug string, tags metricsbp.Tags) (*ttlClient, error) {
client, transport, err := generator()
if err != nil {
return nil, err
}

if ttl == 0 {
ttl = DefaultMaxConnectionAge
}
if ttl > 0 {
expiration = time.Now().Add(randbp.JitterDuration(ttl, jitter))
duration := randbp.JitterDuration(ttl, jitter)
c := &ttlClient{
generator: generator,
ttl: duration,

replaceCounter: metricsbp.M.Counter(slug + ".connection-housekeeping").With(tags.AsStatsdTags()...),

state: make(chan *ttlClientState, 1),
}
return &ttlClient{
TClient: client,
transport: transport,
expiration: expiration,
state := &ttlClientState{
client: client,
transport: transport,
}
state.renew(time.Now(), c)
c.state <- state
return c, nil
}
Loading

0 comments on commit aeb1b61

Please sign in to comment.