Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make thriftbp.NewBaseplateClientPool() behave more like the grpcbp one #602

Merged
merged 4 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions clientpool/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientpool_test

import (
"context"
"testing"

"github.com/reddit/baseplate.go/clientpool"
Expand All @@ -11,8 +12,8 @@ func BenchmarkPoolGetRelease(b *testing.B) {
return &testClient{}, nil
}

const min, max = 0, 100
channelPool, _ := clientpool.NewChannelPool(min, max, opener)
const min, init, max = 0, 0, 100
channelPool, _ := clientpool.NewChannelPool(context.Background(), min, init, max, opener)

for label, pool := range map[string]clientpool.Pool{
"channel": channelPool,
Expand Down
64 changes: 51 additions & 13 deletions clientpool/channel.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package clientpool

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/reddit/baseplate.go/log"
"golang.org/x/time/rate"
)

type channelPool struct {
Expand All @@ -19,25 +24,58 @@ var _ Pool = (*channelPool)(nil)
// NewChannelPool creates a new client pool implemented via channel.
//
// Note that this function could return both non-nil Pool and error,
// when we failed to create all asked initialClients.
// In such case the returned Pool would have the clients we already established.
func NewChannelPool(initialClients, maxClients int, opener ClientOpener) (Pool, error) {
if initialClients > maxClients {
// when we failed to create all asked bestEffortInitialClients.
// In such case the returned Pool would have at least requiredInitialClients
// clients we already established.
func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortInitialClients, maxClients int, opener ClientOpener) (Pool, error) {
if !(requiredInitialClients <= bestEffortInitialClients && bestEffortInitialClients <= maxClients) {
return nil, &ConfigError{
InitialClients: initialClients,
MaxClients: maxClients,
BestEffortInitialClients: bestEffortInitialClients,
RequiredInitialClients: requiredInitialClients,
MaxClients: maxClients,
}
}

var finalErr error
var lastAttemptErr error
pool := make(chan Client, maxClients)
for i := 0; i < initialClients; i++ {
chatty := rate.NewLimiter(rate.Every(2*time.Second), 1)

for i := 0; i < requiredInitialClients; {
if ctxErr := ctx.Err(); ctxErr != nil {
if lastAttemptErr == nil {
// In case the user sets a deadline so short that we don't have
// time to open all the client serially despite all of them working.
// In that case lastAttempErr would be nil so we need to indicate to
// the user that their timeout being too short is the issue.
lastAttemptErr = ctxErr
}
return &channelPool{
pool: pool,
opener: opener,
initialClients: len(pool),
maxClients: maxClients,
}, lastAttemptErr
}
c, err := opener()
if err == nil {
pool <- c
i++
} else {
lastAttemptErr = err
if chatty.Allow() {
log.Warnf("clientpool: error creating required client (will retry): %w", err)
}
}
}
lastAttemptErr = nil

for i := requiredInitialClients; i < bestEffortInitialClients; i++ {
c, err := opener()
if err != nil {
finalErr = fmt.Errorf(
"clientpool: error creating client #%d/%d: %w",
lastAttemptErr = fmt.Errorf(
"clientpool: error creating best-effort client #%d/%d: %w",
i,
initialClients,
bestEffortInitialClients,
err,
)
break
Expand All @@ -48,9 +86,9 @@ func NewChannelPool(initialClients, maxClients int, opener ClientOpener) (Pool,
return &channelPool{
pool: pool,
opener: opener,
initialClients: initialClients,
initialClients: bestEffortInitialClients,
maxClients: maxClients,
}, finalErr
}, lastAttemptErr
}

// Get returns a client from the pool.
Expand Down
25 changes: 13 additions & 12 deletions clientpool/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientpool_test

import (
"context"
"errors"
"sync/atomic"
"testing"
Expand All @@ -9,8 +10,8 @@ import (
)

func TestChannelPoolInvalidConfig(t *testing.T) {
const min, max = 5, 1
_, err := clientpool.NewChannelPool(min, max, nil)
const min, init, max = 5, 1, 1
_, err := clientpool.NewChannelPool(context.Background(), min, init, max, nil)
if err == nil {
t.Errorf(
"NewChannelPool with min %d and max %d expected an error, got nil.",
Expand All @@ -30,15 +31,15 @@ func TestChannelPool(t *testing.T) {
}
}

const min, max = 2, 5
const min, init, max = 1, 2, 5
var openerCalled atomic.Int32
pool, err := clientpool.NewChannelPool(min, max, opener(&openerCalled))
pool, err := clientpool.NewChannelPool(context.Background(), min, init, max, opener(&openerCalled))
if err != nil {
t.Fatal(err)
}
t.Logf("min: %d, max: %d", min, max)
t.Logf("min: %d, init: %d, max: %d", min, init, max)

testPool(t, pool, &openerCalled, min, max)
testPool(t, pool, &openerCalled, init, max)
}

func TestChannelPoolWithOpenerFailure(t *testing.T) {
Expand All @@ -54,11 +55,11 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
}
}

const min, max = 1, 5
const min, init, max = 0, 1, 5
t.Run(
"new-with-min-2-should-fail-initialization",
func(t *testing.T) {
pool, err := clientpool.NewChannelPool(2, max, opener())
pool, err := clientpool.NewChannelPool(context.Background(), min, 2, max, opener())
if err == nil {
t.Error("NewChannelPool with min = 2 should fail but did not.")
}
Expand All @@ -68,7 +69,7 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
},
)

pool, err := clientpool.NewChannelPool(min, max, opener())
pool, err := clientpool.NewChannelPool(context.Background(), min, init, max, opener())
if err != nil {
t.Fatal(err)
}
Expand All @@ -77,14 +78,14 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
t.Run(
"drain-the-pool",
func(t *testing.T) {
for i := 0; i < min; i++ {
for i := 0; i < init; i++ {
_, err := pool.Get()
if err != nil {
t.Errorf("pool.Get returned error: %v", err)
}
}

checkActiveAndAllocated(t, pool, min, 0)
checkActiveAndAllocated(t, pool, init, 0)
},
)

Expand All @@ -97,7 +98,7 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
t.Error("pool.Get should return error, got nil")
}

checkActiveAndAllocated(t, pool, min, 0)
checkActiveAndAllocated(t, pool, init, 0)
},
)
}
10 changes: 6 additions & 4 deletions clientpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ func (exhaustedError) Retryable() int {
// ConfigError is the error type returned when trying to open a new
// client pool, but the configuration values passed in won't work.
type ConfigError struct {
InitialClients int
MaxClients int
BestEffortInitialClients int
RequiredInitialClients int
MaxClients int
}

var _ error = (*ConfigError)(nil)

func (e *ConfigError) Error() string {
return fmt.Sprintf(
"clientpool: initialClients (%d) > maxClients (%d)",
e.InitialClients,
"clientpool: need requiredClients (%d) <= initialClients (%d) <= maxClients (%d)",
e.RequiredInitialClients,
e.BestEffortInitialClients,
e.MaxClients,
)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/automaxprocs v1.5.1
go.uber.org/zap v1.21.0
golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
google.golang.org/grpc v1.47.0
gopkg.in/yaml.v2 v2.4.0
sigs.k8s.io/secrets-store-csi-driver v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181221001348-537d06c36207/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
27 changes: 23 additions & 4 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ func SingleAddressGenerator(addr string) AddressGenerator {
// It always uses SingleAddressGenerator with the server address configured in
// cfg, and THeader+TCompact as the protocol factory.
func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error) {
return NewBaseplateClientPoolWithContext(context.Background(), cfg, middlewares...)
}
func NewBaseplateClientPoolWithContext(ctx context.Context, cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error) {
err := BaseplateClientPoolConfig(cfg).Validate()
if err != nil {
return nil, fmt.Errorf("thriftbp.NewBaseplateClientPool: %w", err)
Expand All @@ -370,7 +373,11 @@ func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMi
},
)
middlewares = append(middlewares, defaults...)
return NewCustomClientPool(
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("thriftbp.NewCustomClientPool: %w", err)
}
return newClientPool(
ctx,
cfg,
SingleAddressGenerator(cfg.Addr),
thrift.NewTHeaderProtocolFactoryConf(cfg.ToTConfiguration()),
Expand All @@ -388,14 +395,24 @@ func NewCustomClientPool(
genAddr AddressGenerator,
protoFactory thrift.TProtocolFactory,
middlewares ...thrift.ClientMiddleware,
) (ClientPool, error) {
return NewCustomClientPoolWithContext(context.Background(), cfg, genAddr, protoFactory, middlewares...)
}
func NewCustomClientPoolWithContext(
ctx context.Context,
cfg ClientPoolConfig,
genAddr AddressGenerator,
protoFactory thrift.TProtocolFactory,
middlewares ...thrift.ClientMiddleware,
) (ClientPool, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("thriftbp.NewCustomClientPool: %w", err)
}
return newClientPool(cfg, genAddr, protoFactory, middlewares...)
return newClientPool(ctx, cfg, genAddr, protoFactory, middlewares...)
}

func newClientPool(
ctx context.Context,
cfg ClientPoolConfig,
genAddr AddressGenerator,
proto thrift.TProtocolFactory,
Expand Down Expand Up @@ -433,6 +450,8 @@ func newClientPool(
)
}
pool, err := clientpool.NewChannelPool(
ctx,
cfg.RequiredInitialConnections,
cfg.InitialConnections,
cfg.MaxConnections,
opener,
Expand All @@ -446,13 +465,13 @@ func newClientPool(
err = batch.Compile()
}
return nil, fmt.Errorf(
"thriftbp: error initializing thrift clientpool for %q: %w",
"thriftbp: error initializing the required number of connections in the thrift clientpool for %q: %w",
cfg.ServiceSlug,
err,
)
}

cfg.InitialConnectionsFallbackLogger.Log(context.Background(), fmt.Sprintf(
cfg.InitialConnectionsFallbackLogger.Log(ctx, fmt.Sprintf(
"thriftbp: Established %d of %d InitialConnections asked for thrift clientpool for %q: %v",
pool.NumAllocated(),
cfg.InitialConnections,
Expand Down
Loading