Skip to content

Commit

Permalink
Make thriftbp.NewBaseplateClientPool() behave more like the `grpcbp…
Browse files Browse the repository at this point in the history
…` one (#602)

* Retry the creation of the `requiredInitialConnections`

* add tests

* Update clientpool/channel.go

Co-authored-by: Andrew Boyle <[email protected]>

* Rate limit the warning logs

---------

Co-authored-by: Dorian Jaminais-Grellier <[email protected]>
Co-authored-by: Andrew Boyle <[email protected]>
  • Loading branch information
3 people authored Feb 27, 2023
1 parent c1c012a commit 6f97656
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 49 deletions.
5 changes: 3 additions & 2 deletions clientpool/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientpool_test

import (
"context"
"testing"

"github.com/reddit/baseplate.go/clientpool"
Expand All @@ -11,8 +12,8 @@ func BenchmarkPoolGetRelease(b *testing.B) {
return &testClient{}, nil
}

const min, max = 0, 100
channelPool, _ := clientpool.NewChannelPool(min, max, opener)
const min, init, max = 0, 0, 100
channelPool, _ := clientpool.NewChannelPool(context.Background(), min, init, max, opener)

for label, pool := range map[string]clientpool.Pool{
"channel": channelPool,
Expand Down
64 changes: 51 additions & 13 deletions clientpool/channel.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package clientpool

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/reddit/baseplate.go/log"
"golang.org/x/time/rate"
)

type channelPool struct {
Expand All @@ -19,25 +24,58 @@ var _ Pool = (*channelPool)(nil)
// NewChannelPool creates a new client pool implemented via channel.
//
// Note that this function could return both non-nil Pool and error,
// when we failed to create all asked initialClients.
// In such case the returned Pool would have the clients we already established.
func NewChannelPool(initialClients, maxClients int, opener ClientOpener) (Pool, error) {
if initialClients > maxClients {
// when we failed to create all asked bestEffortInitialClients.
// In such case the returned Pool would have at least requiredInitialClients
// clients we already established.
func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortInitialClients, maxClients int, opener ClientOpener) (Pool, error) {
if !(requiredInitialClients <= bestEffortInitialClients && bestEffortInitialClients <= maxClients) {
return nil, &ConfigError{
InitialClients: initialClients,
MaxClients: maxClients,
BestEffortInitialClients: bestEffortInitialClients,
RequiredInitialClients: requiredInitialClients,
MaxClients: maxClients,
}
}

var finalErr error
var lastAttemptErr error
pool := make(chan Client, maxClients)
for i := 0; i < initialClients; i++ {
chatty := rate.NewLimiter(rate.Every(2*time.Second), 1)

for i := 0; i < requiredInitialClients; {
if ctxErr := ctx.Err(); ctxErr != nil {
if lastAttemptErr == nil {
// In case the user sets a deadline so short that we don't have
// time to open all the client serially despite all of them working.
// In that case lastAttempErr would be nil so we need to indicate to
// the user that their timeout being too short is the issue.
lastAttemptErr = ctxErr
}
return &channelPool{
pool: pool,
opener: opener,
initialClients: len(pool),
maxClients: maxClients,
}, lastAttemptErr
}
c, err := opener()
if err == nil {
pool <- c
i++
} else {
lastAttemptErr = err
if chatty.Allow() {
log.Warnf("clientpool: error creating required client (will retry): %w", err)
}
}
}
lastAttemptErr = nil

for i := requiredInitialClients; i < bestEffortInitialClients; i++ {
c, err := opener()
if err != nil {
finalErr = fmt.Errorf(
"clientpool: error creating client #%d/%d: %w",
lastAttemptErr = fmt.Errorf(
"clientpool: error creating best-effort client #%d/%d: %w",
i,
initialClients,
bestEffortInitialClients,
err,
)
break
Expand All @@ -48,9 +86,9 @@ func NewChannelPool(initialClients, maxClients int, opener ClientOpener) (Pool,
return &channelPool{
pool: pool,
opener: opener,
initialClients: initialClients,
initialClients: bestEffortInitialClients,
maxClients: maxClients,
}, finalErr
}, lastAttemptErr
}

// Get returns a client from the pool.
Expand Down
25 changes: 13 additions & 12 deletions clientpool/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientpool_test

import (
"context"
"errors"
"sync/atomic"
"testing"
Expand All @@ -9,8 +10,8 @@ import (
)

func TestChannelPoolInvalidConfig(t *testing.T) {
const min, max = 5, 1
_, err := clientpool.NewChannelPool(min, max, nil)
const min, init, max = 5, 1, 1
_, err := clientpool.NewChannelPool(context.Background(), min, init, max, nil)
if err == nil {
t.Errorf(
"NewChannelPool with min %d and max %d expected an error, got nil.",
Expand All @@ -30,15 +31,15 @@ func TestChannelPool(t *testing.T) {
}
}

const min, max = 2, 5
const min, init, max = 1, 2, 5
var openerCalled atomic.Int32
pool, err := clientpool.NewChannelPool(min, max, opener(&openerCalled))
pool, err := clientpool.NewChannelPool(context.Background(), min, init, max, opener(&openerCalled))
if err != nil {
t.Fatal(err)
}
t.Logf("min: %d, max: %d", min, max)
t.Logf("min: %d, init: %d, max: %d", min, init, max)

testPool(t, pool, &openerCalled, min, max)
testPool(t, pool, &openerCalled, init, max)
}

func TestChannelPoolWithOpenerFailure(t *testing.T) {
Expand All @@ -54,11 +55,11 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
}
}

const min, max = 1, 5
const min, init, max = 0, 1, 5
t.Run(
"new-with-min-2-should-fail-initialization",
func(t *testing.T) {
pool, err := clientpool.NewChannelPool(2, max, opener())
pool, err := clientpool.NewChannelPool(context.Background(), min, 2, max, opener())
if err == nil {
t.Error("NewChannelPool with min = 2 should fail but did not.")
}
Expand All @@ -68,7 +69,7 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
},
)

pool, err := clientpool.NewChannelPool(min, max, opener())
pool, err := clientpool.NewChannelPool(context.Background(), min, init, max, opener())
if err != nil {
t.Fatal(err)
}
Expand All @@ -77,14 +78,14 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
t.Run(
"drain-the-pool",
func(t *testing.T) {
for i := 0; i < min; i++ {
for i := 0; i < init; i++ {
_, err := pool.Get()
if err != nil {
t.Errorf("pool.Get returned error: %v", err)
}
}

checkActiveAndAllocated(t, pool, min, 0)
checkActiveAndAllocated(t, pool, init, 0)
},
)

Expand All @@ -97,7 +98,7 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
t.Error("pool.Get should return error, got nil")
}

checkActiveAndAllocated(t, pool, min, 0)
checkActiveAndAllocated(t, pool, init, 0)
},
)
}
10 changes: 6 additions & 4 deletions clientpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ func (exhaustedError) Retryable() int {
// ConfigError is the error type returned when trying to open a new
// client pool, but the configuration values passed in won't work.
type ConfigError struct {
InitialClients int
MaxClients int
BestEffortInitialClients int
RequiredInitialClients int
MaxClients int
}

var _ error = (*ConfigError)(nil)

func (e *ConfigError) Error() string {
return fmt.Sprintf(
"clientpool: initialClients (%d) > maxClients (%d)",
e.InitialClients,
"clientpool: need requiredClients (%d) <= initialClients (%d) <= maxClients (%d)",
e.RequiredInitialClients,
e.BestEffortInitialClients,
e.MaxClients,
)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/automaxprocs v1.5.1
go.uber.org/zap v1.21.0
golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
google.golang.org/grpc v1.47.0
gopkg.in/yaml.v2 v2.4.0
sigs.k8s.io/secrets-store-csi-driver v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181221001348-537d06c36207/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
27 changes: 23 additions & 4 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ func SingleAddressGenerator(addr string) AddressGenerator {
// It always uses SingleAddressGenerator with the server address configured in
// cfg, and THeader+TCompact as the protocol factory.
func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error) {
return NewBaseplateClientPoolWithContext(context.Background(), cfg, middlewares...)
}
func NewBaseplateClientPoolWithContext(ctx context.Context, cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error) {
err := BaseplateClientPoolConfig(cfg).Validate()
if err != nil {
return nil, fmt.Errorf("thriftbp.NewBaseplateClientPool: %w", err)
Expand All @@ -370,7 +373,11 @@ func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMi
},
)
middlewares = append(middlewares, defaults...)
return NewCustomClientPool(
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("thriftbp.NewCustomClientPool: %w", err)
}
return newClientPool(
ctx,
cfg,
SingleAddressGenerator(cfg.Addr),
thrift.NewTHeaderProtocolFactoryConf(cfg.ToTConfiguration()),
Expand All @@ -388,14 +395,24 @@ func NewCustomClientPool(
genAddr AddressGenerator,
protoFactory thrift.TProtocolFactory,
middlewares ...thrift.ClientMiddleware,
) (ClientPool, error) {
return NewCustomClientPoolWithContext(context.Background(), cfg, genAddr, protoFactory, middlewares...)
}
func NewCustomClientPoolWithContext(
ctx context.Context,
cfg ClientPoolConfig,
genAddr AddressGenerator,
protoFactory thrift.TProtocolFactory,
middlewares ...thrift.ClientMiddleware,
) (ClientPool, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("thriftbp.NewCustomClientPool: %w", err)
}
return newClientPool(cfg, genAddr, protoFactory, middlewares...)
return newClientPool(ctx, cfg, genAddr, protoFactory, middlewares...)
}

func newClientPool(
ctx context.Context,
cfg ClientPoolConfig,
genAddr AddressGenerator,
proto thrift.TProtocolFactory,
Expand Down Expand Up @@ -433,6 +450,8 @@ func newClientPool(
)
}
pool, err := clientpool.NewChannelPool(
ctx,
cfg.RequiredInitialConnections,
cfg.InitialConnections,
cfg.MaxConnections,
opener,
Expand All @@ -446,13 +465,13 @@ func newClientPool(
err = batch.Compile()
}
return nil, fmt.Errorf(
"thriftbp: error initializing thrift clientpool for %q: %w",
"thriftbp: error initializing the required number of connections in the thrift clientpool for %q: %w",
cfg.ServiceSlug,
err,
)
}

cfg.InitialConnectionsFallbackLogger.Log(context.Background(), fmt.Sprintf(
cfg.InitialConnectionsFallbackLogger.Log(ctx, fmt.Sprintf(
"thriftbp: Established %d of %d InitialConnections asked for thrift clientpool for %q: %v",
pool.NumAllocated(),
cfg.InitialConnections,
Expand Down
Loading

0 comments on commit 6f97656

Please sign in to comment.