Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Commit

Permalink
Add streaming message receive and send histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Mar 25, 2019
1 parent 68e3a13 commit 0c422c3
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 8 deletions.
87 changes: 79 additions & 8 deletions client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ import (
// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec
clientStreamRecvHistogramEnabled bool
clientStreamRecvHistogramOpts prom.HistogramOpts
clientStreamRecvHistogram *prom.HistogramVec
clientStreamSendHistogramEnabled bool
clientStreamSendHistogramOpts prom.HistogramOpts
clientStreamSendHistogram *prom.HistogramVec
}

// NewClientMetrics returns a ClientMetrics object. Use a new instance of
Expand Down Expand Up @@ -59,7 +65,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
},
clientHandledHistogram: nil,
clientHandledHistogram: nil,
clientStreamRecvHistogramEnabled: false,
clientStreamRecvHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_recv_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
Buckets: prom.DefBuckets,
},
clientStreamRecvHistogram: nil,
clientStreamSendHistogramEnabled: false,
clientStreamSendHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_send_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prom.DefBuckets,
},
clientStreamSendHistogram: nil,
}
}

Expand All @@ -74,6 +94,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Describe(ch)
}
if m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram.Describe(ch)
}
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Describe(ch)
}
}

// Collect is called by the Prometheus registry when collecting
Expand All @@ -87,6 +113,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Collect(ch)
}
if m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram.Collect(ch)
}
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Collect(ch)
}
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
Expand All @@ -104,6 +136,40 @@ func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOptio
m.clientHandledHistogramEnabled = true
}

// EnableClientStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientStreamReceiveTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}

if !m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram = prom.NewHistogramVec(
m.clientStreamRecvHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.clientStreamRecvHistogramEnabled = true
}

// EnableClientStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}

if !m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram = prom.NewHistogramVec(
m.clientStreamSendHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.clientStreamSendHistogramEnabled = true
}

// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -149,15 +215,20 @@ type monitoredClientStream struct {
}

func (s *monitoredClientStream) SendMsg(m interface{}) error {
timer := s.monitor.SendMessageTimer()
err := s.ClientStream.SendMsg(m)
timer.ObserveDuration()
if err == nil {
s.monitor.SentMessage()
}
return err
}

func (s *monitoredClientStream) RecvMsg(m interface{}) error {
timer := s.monitor.ReceiveMessageTimer()
err := s.ClientStream.RecvMsg(m)
timer.ObserveDuration()

if err == nil {
s.monitor.ReceivedMessage()
} else if err == io.EOF {
Expand Down
34 changes: 34 additions & 0 deletions client_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package grpc_prometheus
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
)

Expand All @@ -30,11 +31,44 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c
return r
}

// timer is a helper interface to time functions.
type timer interface {
ObserveDuration() time.Duration
}

type noOpTimer struct {
}

func (noOpTimer) ObserveDuration() time.Duration {
return 0
}

var emptyTimer = noOpTimer{}

func (r *clientReporter) ReceiveMessageTimer() timer {
if r.metrics.clientStreamRecvHistogramEnabled {
hist := r.metrics.clientStreamRecvHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
return prometheus.NewTimer(hist)
}

return emptyTimer
}

func (r *clientReporter) ReceivedMessage() {
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *clientReporter) SendMessageTimer() timer {
if r.metrics.clientStreamSendHistogramEnabled {
hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
return prometheus.NewTimer(hist)
}

return emptyTimer
}

func (r *clientReporter) SentMessage() {

r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

Expand Down

0 comments on commit 0c422c3

Please sign in to comment.