Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Non default registry #20

Merged
merged 2 commits into from
Jun 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 23 additions & 56 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,34 @@
package grpc_prometheus

import (
"io"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
prom "github.com/prometheus/client_golang/prometheus"
)

// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
monitor := newClientReporter(Unary, method)
monitor.SentMessage()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
monitor.ReceivedMessage()
}
monitor.Handled(grpc.Code(err))
return err
}
var (
// DefaultClientMetrics is the default instance of ClientMetrics. It is
// intended to be used in conjunction the default Prometheus metrics
// registry.
DefaultClientMetrics = NewClientMetrics()

// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
monitor := newClientReporter(clientStreamType(desc), method)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
monitor.Handled(grpc.Code(err))
return nil, err
}
return &monitoredClientStream{clientStream, monitor}, nil
}
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
UnaryClientInterceptor = DefaultClientMetrics.UnaryClientInterceptor()

func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}

// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
type monitoredClientStream struct {
grpc.ClientStream
monitor *clientReporter
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
)

func (s *monitoredClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
func init() {
prom.MustRegister(DefaultClientMetrics.clientStartedCounter)
prom.MustRegister(DefaultClientMetrics.clientHandledCounter)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgReceived)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgSent)
}

func (s *monitoredClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
} else if err == io.EOF {
s.monitor.Handled(codes.OK)
} else {
s.monitor.Handled(grpc.Code(err))
}
return err
// EnableClientHandlingTimeHistogram turns on recording of handling time of
// RPCs. Histogram metrics can be very expensive for Prometheus to retain and
// query. This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableClientHandlingTimeHistogram(opts...)
prom.Register(DefaultClientMetrics.clientHandledHistogram)
}
139 changes: 139 additions & 0 deletions client_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package grpc_prometheus

import (
"io"

prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec
}

// NewClientMetrics returns a ClientMetrics object. Use a new instance of
// ClientMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewClientMetrics() *ClientMetrics {
return &ClientMetrics{
clientStartedCounter: prom.NewCounterVec(
prom.CounterOpts{
Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}),

clientHandledCounter: prom.NewCounterVec(
prom.CounterOpts{
Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),

clientStreamMsgReceived: prom.NewCounterVec(
prom.CounterOpts{
Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}),

clientStreamMsgSent: prom.NewCounterVec(
prom.CounterOpts{
Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}),

clientHandledHistogramEnabled: false,
clientHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
},
clientHandledHistogram: nil,
}
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}
if !m.clientHandledHistogramEnabled {
m.clientHandledHistogram = prom.NewHistogramVec(
m.clientHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.clientHandledHistogramEnabled = true
}

// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
monitor := newClientReporter(m, Unary, method)
monitor.SentMessage()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
monitor.ReceivedMessage()
}
monitor.Handled(grpc.Code(err))
return err
}
}

// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
monitor := newClientReporter(m, clientStreamType(desc), method)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
monitor.Handled(grpc.Code(err))
return nil, err
}
return &monitoredClientStream{clientStream, monitor}, nil
}
}

func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}

// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
type monitoredClientStream struct {
grpc.ClientStream
monitor *clientReporter
}

func (s *monitoredClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
}

func (s *monitoredClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
} else if err == io.EOF {
s.monitor.Handled(codes.OK)
} else {
s.monitor.Handled(grpc.Code(err))
}
return err
}
91 changes: 13 additions & 78 deletions client_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,105 +7,40 @@ import (
"time"

"google.golang.org/grpc/codes"

prom "github.com/prometheus/client_golang/prometheus"
)

var (
clientStartedCounter = prom.NewCounterVec(
prom.CounterOpts{
Namespace: "grpc",
Subsystem: "client",
Name: "started_total",
Help: "Total number of RPCs started on the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"})

clientHandledCounter = prom.NewCounterVec(
prom.CounterOpts{
Namespace: "grpc",
Subsystem: "client",
Name: "handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})

clientStreamMsgReceived = prom.NewCounterVec(
prom.CounterOpts{
Namespace: "grpc",
Subsystem: "client",
Name: "msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"})

clientStreamMsgSent = prom.NewCounterVec(
prom.CounterOpts{
Namespace: "grpc",
Subsystem: "client",
Name: "msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"})

clientHandledHistogramEnabled = false
clientHandledHistogramOpts = prom.HistogramOpts{
Namespace: "grpc",
Subsystem: "client",
Name: "handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
}
clientHandledHistogram *prom.HistogramVec
)

func init() {
prom.MustRegister(clientStartedCounter)
prom.MustRegister(clientHandledCounter)
prom.MustRegister(clientStreamMsgReceived)
prom.MustRegister(clientStreamMsgSent)
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&clientHandledHistogramOpts)
}
if !clientHandledHistogramEnabled {
clientHandledHistogram = prom.NewHistogramVec(
clientHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
prom.Register(clientHandledHistogram)
}
clientHandledHistogramEnabled = true
}

type clientReporter struct {
metrics *ClientMetrics
rpcType grpcType
serviceName string
methodName string
startTime time.Time
}

func newClientReporter(rpcType grpcType, fullMethod string) *clientReporter {
r := &clientReporter{rpcType: rpcType}
if clientHandledHistogramEnabled {
func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *clientReporter {
r := &clientReporter{
metrics: m,
rpcType: rpcType,
}
if r.metrics.clientHandledHistogramEnabled {
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
return r
}

func (r *clientReporter) ReceivedMessage() {
clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *clientReporter) SentMessage() {
clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *clientReporter) Handled(code codes.Code) {
clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if clientHandledHistogramEnabled {
clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.clientHandledHistogramEnabled {
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}
Loading