Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thriftbp: Metrics cleanup #581

Merged
merged 1 commit into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions thriftbp/client_middlewares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestPrometheusClientMiddleware(t *testing.T) {
latencyLabels := prometheus.Labels{
methodLabel: methodIsHealthy,
successLabel: prometheusbp.BoolString(!tt.wantFail),
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

totalRequestLabels := prometheus.Labels{
Expand All @@ -511,20 +511,20 @@ func TestPrometheusClientMiddleware(t *testing.T) {
exceptionLabel: tt.exceptionType,
baseplateStatusCodeLabel: "",
baseplateStatusLabel: "",
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

activeRequestLabels := prometheus.Labels{
methodLabel: methodIsHealthy,
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

defer thriftbp.PrometheusClientMetricsTest(t, latencyLabels, totalRequestLabels, activeRequestLabels).CheckMetrics()
defer spectest.ValidateSpec(t, "thrift", "client")

ctx := context.Background()
handler := mockBaseplateService{fail: tt.wantFail, err: tt.wantErr}
client := setupFake(ctx, t, handler)
client := setupFake(ctx, t, handler, tt.name)
bpClient := baseplatethrift.NewBaseplateServiceV2Client(client.TClient())
result, err := bpClient.IsHealthy(
ctx,
Expand Down Expand Up @@ -554,9 +554,12 @@ func (srv mockBaseplateService) IsHealthy(ctx context.Context, req *baseplatethr
return !srv.fail, srv.err
}

func setupFake(ctx context.Context, t *testing.T, handler baseplatethrift.BaseplateServiceV2) thriftbp.ClientPool {
func setupFake(ctx context.Context, t *testing.T, handler baseplatethrift.BaseplateServiceV2, slug string) thriftbp.ClientPool {
srv, err := thrifttest.NewBaseplateServer(thrifttest.ServerConfig{
Processor: baseplatethrift.NewBaseplateServiceV2Processor(handler),
ClientConfig: thriftbp.ClientPoolConfig{
ServiceSlug: slug,
},
})
if err != nil {
t.Fatalf("SETUP: Setting up baseplate server: %s", err)
Expand Down
85 changes: 36 additions & 49 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

// DefaultPoolGaugeInterval is the fallback value to be used when
// ClientPoolConfig.PoolGaugeInterval <= 0.
//
// Deprecated: Prometheus gauges are auto scraped.
const DefaultPoolGaugeInterval = time.Second * 10

// PoolError is returned by ClientPool.TClient.Call when it fails to get a
Expand Down Expand Up @@ -144,6 +146,8 @@ type ClientPoolConfig struct {

// Any tags that should be applied to metrics logged by the ClientPool.
// This includes the optional pool stats.
//
// Deprecated: We no longer emit any statsd metrics so this has no effect.
MetricsTags metricsbp.Tags `yaml:"metricsTags"`

// DefaultRetryOptions is the list of retry.Options to apply as the defaults
Expand All @@ -168,13 +172,18 @@ type ClientPoolConfig struct {
//
// The reporting goroutine is cancelled when the global metrics client
// context is Done.
//
// Deprecated: The statsd metrics are deprecated and the prometheus metrics
// are always reported.
ReportPoolStats bool `yaml:"reportPoolStats"`

// PoolGaugeInterval indicates how often we should update the active
// connections gauge when collecting pool stats.
//
// When PoolGaugeInterval <= 0 and ReportPoolStats is true,
// DefaultPoolGaugeInterval will be used instead.
//
// Deprecated: Not used any more. Prometheus gauges are auto scraped.
PoolGaugeInterval time.Duration `yaml:"poolGaugeInterval"`

// Suppress some of the errors returned by the server before sending them to
Expand Down Expand Up @@ -396,7 +405,6 @@ func newClientPool(
"thrift_pool": cfg.ServiceSlug,
}).Set(float64(cfg.MaxConnections))
tConfig := cfg.ToTConfiguration()
tags := cfg.MetricsTags.AsStatsdTags()
jitter := DefaultMaxConnectionAgeJitter
if cfg.MaxConnectionAgeJitter != nil {
jitter = *cfg.MaxConnectionAgeJitter
Expand All @@ -405,7 +413,6 @@ func newClientPool(
return newClient(
tConfig,
cfg.ServiceSlug,
cfg.MetricsTags,
cfg.MaxConnectionAge,
jitter,
genAddr,
Expand Down Expand Up @@ -440,33 +447,24 @@ func newClientPool(
err,
))
}
if cfg.ReportPoolStats {
go reportPoolStats(
metricsbp.M.Ctx(),
cfg.ServiceSlug,
pool,
cfg.PoolGaugeInterval,
tags,
)

if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{
slug: cfg.ServiceSlug,
pool: pool,
}); err != nil {
// Register should never fail because clientPoolGaugeExporter.Describe is
// a no-op, but just in case.

var batch errorsbp.Batch
batch.Add(err)
if err := pool.Close(); err != nil {
batch.AddPrefix("close pool", err)
}
return nil, fmt.Errorf(
"thriftbp: error registering prometheus exporter for client pool %q: %w",
cfg.ServiceSlug,
batch.Compile(),
)
if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{
slug: cfg.ServiceSlug,
pool: pool,
}); err != nil {
// Register should never fail because clientPoolGaugeExporter.Describe is
// a no-op, but just in case.

var batch errorsbp.Batch
batch.Add(err)
if err := pool.Close(); err != nil {
batch.AddPrefix("close pool", err)
}
return nil, fmt.Errorf(
"thriftbp: error registering prometheus exporter for client pool %q: %w",
cfg.ServiceSlug,
batch.Compile(),
)
}

// create the base clientPool, this is not ready for use.
Expand All @@ -489,14 +487,15 @@ func newClientPool(
clientPoolExhaustedCounter.With(labels)
clientPoolClosedConnectionsCounter.With(labels)
clientPoolReleaseErrorCounter.With(labels)
legacyClientPoolExhaustedCounter.With(labels)
legacyClientPoolReleaseErrorCounter.With(labels)

return pooledClient, nil
}

func newClient(
cfg *thrift.TConfiguration,
slug string,
tags metricsbp.Tags,
maxConnectionAge time.Duration,
maxConnectionAgeJitter float64,
genAddr AddressGenerator,
Expand All @@ -517,27 +516,7 @@ func newClient(
protoFactory.GetProtocol(transport),
protoFactory.GetProtocol(transport),
), transport, nil
}, maxConnectionAge, maxConnectionAgeJitter, slug, tags)
}

func reportPoolStats(ctx context.Context, slug string, pool clientpool.Pool, tickerDuration time.Duration, tags []string) {
activeGauge := metricsbp.M.RuntimeGauge(slug + ".pool-active-connections").With(tags...)
allocatedGauge := metricsbp.M.RuntimeGauge(slug + ".pool-allocated-clients").With(tags...)

if tickerDuration <= 0 {
tickerDuration = DefaultPoolGaugeInterval
}
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
activeGauge.Set(float64(pool.NumActiveClients()))
allocatedGauge.Set(float64(pool.NumAllocated()))
}
}
}, maxConnectionAge, maxConnectionAgeJitter, slug)
}

type clientPool struct {
Expand Down Expand Up @@ -615,6 +594,10 @@ func (p *clientPool) getClient() (_ Client, err error) {
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
legacyClientPoolExhaustedCounter.With(prometheus.Labels{
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
}
log.Errorw(
"Failed to get client from pool",
Expand All @@ -637,6 +620,10 @@ func (p *clientPool) releaseClient(c Client) {
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
legacyClientPoolReleaseErrorCounter.With(prometheus.Labels{
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
}
}

Expand Down
36 changes: 26 additions & 10 deletions thriftbp/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,17 @@ var (
methodLabel,
}

panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyPanicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemServer,
Name: "panic_recover_total",
Help: "The number of panics recovered from thrift server handlers",
Help: "Deprecated: Use thriftbp_server_recovered_panics_total instead",
}, panicRecoverLabels)

panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_server_recovered_panics_total",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically (according to some dictionaries) "panic" is uncountable as a noun, but I think we pretty much use it as countable noun in go.

Help: "The number of panics recovered from thrift server handlers",
}, panicRecoverLabels)
)

Expand All @@ -181,25 +187,35 @@ var (
"thrift_pool",
}

clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyClientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "exhausted_total",
Help: "The number of pool exhaustion for a thrift client pool",
Help: "Deprecated: Use thriftbp_client_pool_exhaustions_total instead",
}, clientPoolLabels)

clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_client_pool_exhaustions_total",
Help: "The number of pool exhaustions for a thrift client pool",
}, clientPoolLabels)

clientPoolClosedConnectionsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "closed_connections_total",
Help: "The number of times we closed the client after used it from the pool",
Name: "thriftbp_client_pool_closed_connections_total",
Help: "The number of times we closed the client after used it from the pool",
}, clientPoolLabels)

clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyClientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "release_error_total",
Help: "The number of times we failed to release a client back to the pool",
Help: "Deprecated: Use thriftbp_client_pool_release_errors_total instead",
}, clientPoolLabels)

clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_client_pool_release_errors_total",
Help: "The number of times we failed to release a client back to the pool",
}, clientPoolLabels)

clientPoolGetsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Expand Down
13 changes: 2 additions & 11 deletions thriftbp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
//lint:ignore SA1019 This library is internal only, not actually deprecated
"github.com/reddit/baseplate.go/internalv2compat"
"github.com/reddit/baseplate.go/log"
"github.com/reddit/baseplate.go/metricsbp"
)

const (
meterNameThriftSocketErrorCounter = "thrift.socket.timeout"
)

// ServerConfig is the arg struct for both NewServer and NewBaseplateServer.
Expand Down Expand Up @@ -58,6 +53,8 @@ type ServerConfig struct {
//
// Report the number of clients connected to the server as a runtime gauge
// with metric name of 'thrift.connections'
//
// Deprecated: This feature is removed.
ReportConnectionCount bool

// Optional, used only by NewServer.
Expand Down Expand Up @@ -108,10 +105,6 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) {
transport = cfg.Socket
}

if cfg.ReportConnectionCount {
transport = &CountedTServerTransport{transport}
}

server := thrift.NewTSimpleServer4(
thrift.WrapProcessor(cfg.Processor, cfg.Middlewares...),
transport,
Expand Down Expand Up @@ -160,10 +153,8 @@ func NewBaseplateServer(
}

func suppressTimeoutLogger(logger thrift.Logger) thrift.Logger {
c := metricsbp.M.Counter(meterNameThriftSocketErrorCounter)
return func(msg string) {
if strings.Contains(msg, "i/o timeout") {
c.Add(1)
return
}

Expand Down
Loading