Skip to content

Commit

Permalink
thriftbp: Metrics cleanup
Browse files Browse the repository at this point in the history
Remove statsd metrics, and rename some prometheus metrics to comply with
prometheus naming convention. For renamed prometheus metrics,
temporarily emitting both old and new, but mark old ones as deprecated
to be removed later.

Also remove some statsd only features:

1. CountedServerTransport is marked as deprecated and no longer used by
   other code.
2. In ReportPayloadSizeMetrics server middleware, the rate arg is
   deprecated and the prometheus metrics is always reported.
3. ClientPoolConfig.MetricsTags & ClientPoolConfig.ReportPoolStats are
   deprecated. The pool stats are always reported as prometheus
   metrrics.
  • Loading branch information
fishy committed Nov 2, 2022
1 parent 0ee5876 commit bcdec55
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 145 deletions.
13 changes: 8 additions & 5 deletions thriftbp/client_middlewares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestPrometheusClientMiddleware(t *testing.T) {
latencyLabels := prometheus.Labels{
methodLabel: methodIsHealthy,
successLabel: prometheusbp.BoolString(!tt.wantFail),
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

totalRequestLabels := prometheus.Labels{
Expand All @@ -511,20 +511,20 @@ func TestPrometheusClientMiddleware(t *testing.T) {
exceptionLabel: tt.exceptionType,
baseplateStatusCodeLabel: "",
baseplateStatusLabel: "",
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

activeRequestLabels := prometheus.Labels{
methodLabel: methodIsHealthy,
remoteServiceClientNameLabel: thrifttest.DefaultServiceSlug,
remoteServiceClientNameLabel: tt.name,
}

defer thriftbp.PrometheusClientMetricsTest(t, latencyLabels, totalRequestLabels, activeRequestLabels).CheckMetrics()
defer spectest.ValidateSpec(t, "thrift", "client")

ctx := context.Background()
handler := mockBaseplateService{fail: tt.wantFail, err: tt.wantErr}
client := setupFake(ctx, t, handler)
client := setupFake(ctx, t, handler, tt.name)
bpClient := baseplatethrift.NewBaseplateServiceV2Client(client.TClient())
result, err := bpClient.IsHealthy(
ctx,
Expand Down Expand Up @@ -554,9 +554,12 @@ func (srv mockBaseplateService) IsHealthy(ctx context.Context, req *baseplatethr
return !srv.fail, srv.err
}

func setupFake(ctx context.Context, t *testing.T, handler baseplatethrift.BaseplateServiceV2) thriftbp.ClientPool {
func setupFake(ctx context.Context, t *testing.T, handler baseplatethrift.BaseplateServiceV2, slug string) thriftbp.ClientPool {
srv, err := thrifttest.NewBaseplateServer(thrifttest.ServerConfig{
Processor: baseplatethrift.NewBaseplateServiceV2Processor(handler),
ClientConfig: thriftbp.ClientPoolConfig{
ServiceSlug: slug,
},
})
if err != nil {
t.Fatalf("SETUP: Setting up baseplate server: %s", err)
Expand Down
85 changes: 36 additions & 49 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

// DefaultPoolGaugeInterval is the fallback value to be used when
// ClientPoolConfig.PoolGaugeInterval <= 0.
//
// Deprecated: Prometheus gauges are auto scraped.
const DefaultPoolGaugeInterval = time.Second * 10

// PoolError is returned by ClientPool.TClient.Call when it fails to get a
Expand Down Expand Up @@ -144,6 +146,8 @@ type ClientPoolConfig struct {

// Any tags that should be applied to metrics logged by the ClientPool.
// This includes the optional pool stats.
//
// Deprecated: We no longer emit any statsd metrics so this has no effect.
MetricsTags metricsbp.Tags `yaml:"metricsTags"`

// DefaultRetryOptions is the list of retry.Options to apply as the defaults
Expand All @@ -168,13 +172,18 @@ type ClientPoolConfig struct {
//
// The reporting goroutine is cancelled when the global metrics client
// context is Done.
//
// Deprecated: The statsd metrics are deprecated and the prometheus metrics
// are always reported.
ReportPoolStats bool `yaml:"reportPoolStats"`

// PoolGaugeInterval indicates how often we should update the active
// connections gauge when collecting pool stats.
//
// When PoolGaugeInterval <= 0 and ReportPoolStats is true,
// DefaultPoolGaugeInterval will be used instead.
//
// Deprecated: Not used any more. Prometheus gauges are auto scraped.
PoolGaugeInterval time.Duration `yaml:"poolGaugeInterval"`

// Suppress some of the errors returned by the server before sending them to
Expand Down Expand Up @@ -396,7 +405,6 @@ func newClientPool(
"thrift_pool": cfg.ServiceSlug,
}).Set(float64(cfg.MaxConnections))
tConfig := cfg.ToTConfiguration()
tags := cfg.MetricsTags.AsStatsdTags()
jitter := DefaultMaxConnectionAgeJitter
if cfg.MaxConnectionAgeJitter != nil {
jitter = *cfg.MaxConnectionAgeJitter
Expand All @@ -405,7 +413,6 @@ func newClientPool(
return newClient(
tConfig,
cfg.ServiceSlug,
cfg.MetricsTags,
cfg.MaxConnectionAge,
jitter,
genAddr,
Expand Down Expand Up @@ -440,33 +447,24 @@ func newClientPool(
err,
))
}
if cfg.ReportPoolStats {
go reportPoolStats(
metricsbp.M.Ctx(),
cfg.ServiceSlug,
pool,
cfg.PoolGaugeInterval,
tags,
)

if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{
slug: cfg.ServiceSlug,
pool: pool,
}); err != nil {
// Register should never fail because clientPoolGaugeExporter.Describe is
// a no-op, but just in case.

var batch errorsbp.Batch
batch.Add(err)
if err := pool.Close(); err != nil {
batch.AddPrefix("close pool", err)
}
return nil, fmt.Errorf(
"thriftbp: error registering prometheus exporter for client pool %q: %w",
cfg.ServiceSlug,
batch.Compile(),
)
if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{
slug: cfg.ServiceSlug,
pool: pool,
}); err != nil {
// Register should never fail because clientPoolGaugeExporter.Describe is
// a no-op, but just in case.

var batch errorsbp.Batch
batch.Add(err)
if err := pool.Close(); err != nil {
batch.AddPrefix("close pool", err)
}
return nil, fmt.Errorf(
"thriftbp: error registering prometheus exporter for client pool %q: %w",
cfg.ServiceSlug,
batch.Compile(),
)
}

// create the base clientPool, this is not ready for use.
Expand All @@ -489,14 +487,15 @@ func newClientPool(
clientPoolExhaustedCounter.With(labels)
clientPoolClosedConnectionsCounter.With(labels)
clientPoolReleaseErrorCounter.With(labels)
legacyClientPoolExhaustedCounter.With(labels)
legacyClientPoolReleaseErrorCounter.With(labels)

return pooledClient, nil
}

func newClient(
cfg *thrift.TConfiguration,
slug string,
tags metricsbp.Tags,
maxConnectionAge time.Duration,
maxConnectionAgeJitter float64,
genAddr AddressGenerator,
Expand All @@ -517,27 +516,7 @@ func newClient(
protoFactory.GetProtocol(transport),
protoFactory.GetProtocol(transport),
), transport, nil
}, maxConnectionAge, maxConnectionAgeJitter, slug, tags)
}

func reportPoolStats(ctx context.Context, slug string, pool clientpool.Pool, tickerDuration time.Duration, tags []string) {
activeGauge := metricsbp.M.RuntimeGauge(slug + ".pool-active-connections").With(tags...)
allocatedGauge := metricsbp.M.RuntimeGauge(slug + ".pool-allocated-clients").With(tags...)

if tickerDuration <= 0 {
tickerDuration = DefaultPoolGaugeInterval
}
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
activeGauge.Set(float64(pool.NumActiveClients()))
allocatedGauge.Set(float64(pool.NumAllocated()))
}
}
}, maxConnectionAge, maxConnectionAgeJitter, slug)
}

type clientPool struct {
Expand Down Expand Up @@ -615,6 +594,10 @@ func (p *clientPool) getClient() (_ Client, err error) {
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
legacyClientPoolExhaustedCounter.With(prometheus.Labels{
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
}
log.Errorw(
"Failed to get client from pool",
Expand All @@ -637,6 +620,10 @@ func (p *clientPool) releaseClient(c Client) {
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
legacyClientPoolReleaseErrorCounter.With(prometheus.Labels{
clientNameLabel: p.slug,
"thrift_pool": p.slug,
}).Inc()
}
}

Expand Down
36 changes: 26 additions & 10 deletions thriftbp/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,17 @@ var (
methodLabel,
}

panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyPanicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemServer,
Name: "panic_recover_total",
Help: "The number of panics recovered from thrift server handlers",
Help: "Deprecated: Use thriftbp_server_recovered_panics_total instead",
}, panicRecoverLabels)

panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_server_recovered_panics_total",
Help: "The number of panics recovered from thrift server handlers",
}, panicRecoverLabels)
)

Expand All @@ -181,25 +187,35 @@ var (
"thrift_pool",
}

clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyClientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "exhausted_total",
Help: "The number of pool exhaustion for a thrift client pool",
Help: "Deprecated: Use thriftbp_client_pool_exhaustions_total instead",
}, clientPoolLabels)

clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_client_pool_exhaustions_total",
Help: "The number of pool exhaustions for a thrift client pool",
}, clientPoolLabels)

clientPoolClosedConnectionsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "closed_connections_total",
Help: "The number of times we closed the client after used it from the pool",
Name: "thriftbp_client_pool_closed_connections_total",
Help: "The number of times we closed the client after used it from the pool",
}, clientPoolLabels)

clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
// TODO: Remove after the next release (v0.9.12)
legacyClientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: subsystemClientPool,
Name: "release_error_total",
Help: "The number of times we failed to release a client back to the pool",
Help: "Deprecated: Use thriftbp_client_pool_release_errors_total instead",
}, clientPoolLabels)

clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "thriftbp_client_pool_release_errors_total",
Help: "The number of times we failed to release a client back to the pool",
}, clientPoolLabels)

clientPoolGetsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{
Expand Down
13 changes: 2 additions & 11 deletions thriftbp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
//lint:ignore SA1019 This library is internal only, not actually deprecated
"github.com/reddit/baseplate.go/internalv2compat"
"github.com/reddit/baseplate.go/log"
"github.com/reddit/baseplate.go/metricsbp"
)

const (
meterNameThriftSocketErrorCounter = "thrift.socket.timeout"
)

// ServerConfig is the arg struct for both NewServer and NewBaseplateServer.
Expand Down Expand Up @@ -58,6 +53,8 @@ type ServerConfig struct {
//
// Report the number of clients connected to the server as a runtime gauge
// with metric name of 'thrift.connections'
//
// Deprecated: This feature is removed.
ReportConnectionCount bool

// Optional, used only by NewServer.
Expand Down Expand Up @@ -108,10 +105,6 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) {
transport = cfg.Socket
}

if cfg.ReportConnectionCount {
transport = &CountedTServerTransport{transport}
}

server := thrift.NewTSimpleServer4(
thrift.WrapProcessor(cfg.Processor, cfg.Middlewares...),
transport,
Expand Down Expand Up @@ -160,10 +153,8 @@ func NewBaseplateServer(
}

func suppressTimeoutLogger(logger thrift.Logger) thrift.Logger {
c := metricsbp.M.Counter(meterNameThriftSocketErrorCounter)
return func(msg string) {
if strings.Contains(msg, "i/o timeout") {
c.Add(1)
return
}

Expand Down
Loading

0 comments on commit bcdec55

Please sign in to comment.