Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Add stats support for net/http servers (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
rakyll authored Mar 7, 2018
1 parent 9d3e85d commit 09d3108
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 32 deletions.
3 changes: 2 additions & 1 deletion plugin/ochttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func TestClient(t *testing.T) {
case *view.DistributionData:
count = data.Count
default:
t.Errorf("don't know how to handle data type: %v", data)
t.Errorf("Unkown data type: %v", data)
continue
}
if got := count; got != reqCount {
t.Fatalf("%s = %d; want %d", viewName, got, reqCount)
Expand Down
63 changes: 62 additions & 1 deletion plugin/ochttp/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,28 @@ var (
ClientLatency, _ = stats.Float64("opencensus.io/http/client/latency", "End-to-end latency", stats.UnitMilliseconds)
)

// The following server HTTP measures are supported for use in custom views:
var (
ServerRequestCount, _ = stats.Int64("opencensus.io/http/server/request_count", "Number of HTTP requests started", stats.UnitNone)
ServerRequestBytes, _ = stats.Int64("opencensus.io/http/server/request_bytes", "HTTP request body size if set as ContentLength (uncompressed)", stats.UnitBytes)
ServerResponseBytes, _ = stats.Int64("opencensus.io/http/server/response_bytes", "HTTP response body size (uncompressed)", stats.UnitBytes)
ServerLatency, _ = stats.Float64("opencensus.io/http/server/latency", "End-to-end latency", stats.UnitMilliseconds)
)

// The following tags are applied to stats recorded by this package. Host, Path
// and Method are applied to all measures. StatusCode is not applied to
// ClientRequestCount, since it is recorded before the status is known.
// ClientRequestCount or ServerRequestCount, since it is recorded before the status is known.
var (
// Host is the value of the HTTP Host header.
Host, _ = tag.NewKey("http.host")

// StatusCode is the numeric HTTP response status code,
// or "error" if a transport error occurred and no status code was read.
StatusCode, _ = tag.NewKey("http.status")

// Path is the URL path (not including query string) in the request.
Path, _ = tag.NewKey("http.path")

// Method is the HTTP method of the request, capitalized (GET, POST, etc.).
Method, _ = tag.NewKey("http.method")
)
Expand Down Expand Up @@ -95,12 +106,62 @@ var (
Aggregation: view.CountAggregation{},
}

ServerRequestCountView = &view.View{
Name: "opencensus.io/http/server/request_count",
Description: "Count of HTTP requests started",
Measure: ServerRequestCount,
Aggregation: view.CountAggregation{},
}

ServerRequestBytesView = &view.View{
Name: "opencensus.io/http/server/request_bytes",
Description: "Size distribution of HTTP request body",
Measure: ServerRequestBytes,
Aggregation: DefaultSizeDistribution,
}

ServerResponseBytesView = &view.View{
Name: "opencensus.io/http/server/response_bytes",
Description: "Size distribution of HTTP response body",
Measure: ServerResponseBytes,
Aggregation: DefaultSizeDistribution,
}

ServerLatencyView = &view.View{
Name: "opencensus.io/http/server/latency",
Description: "Latency distribution of HTTP requests",
Measure: ServerLatency,
Aggregation: DefaultLatencyDistribution,
}

ServerRequestCountByMethod = &view.View{
Name: "opencensus.io/http/server/request_count_by_method",
Description: "Server request count by HTTP method",
TagKeys: []tag.Key{Method},
Measure: ServerRequestCount,
Aggregation: view.CountAggregation{},
}

ServerResponseCountByStatusCode = &view.View{
Name: "opencensus.io/http/server/response_count_by_status_code",
Description: "Server response count by status code",
TagKeys: []tag.Key{StatusCode},
Measure: ServerLatency,
Aggregation: view.CountAggregation{},
}

DefaultViews = []*view.View{
ClientRequestCountView,
ClientRequestBytesView,
ClientResponseBytesView,
ClientLatencyView,
ClientRequestCountByMethod,
ClientResponseCountByStatusCode,
ServerRequestCountView,
ServerRequestBytesView,
ServerResponseBytesView,
ServerLatencyView,
ServerRequestCountByMethod,
ServerResponseCountByStatusCode,
}
)
118 changes: 104 additions & 14 deletions plugin/ochttp/stats_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,117 @@
package ochttp

import (
"bytes"
"net/http"
"net/http/httptest"
"testing"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/stats/view"
)

func TestVarsInitialized(t *testing.T) {
// Test that global initialization was successful
for i, k := range []tag.Key{Host, StatusCode, Path, Method} {
if k.Name() == "" {
t.Errorf("key not initialized: %d", i)
}
func httpHandler(statusCode, respSize int) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(statusCode)
body := make([]byte, respSize)
w.Write(body)
})
}

func updateMean(mean float64, sample, count int) float64 {
if count == 1 {
return float64(sample)
}
for i, m := range []stats.Measure{ClientRequestCount, ClientResponseBytes, ClientRequestBytes, ClientLatency} {
if m == nil {
t.Errorf("measure not initialized: %d", i)
}
return mean + (float64(sample)-mean)/float64(count)
}

func TestHandlerStatsCollection(t *testing.T) {
for _, v := range DefaultViews {
v.Subscribe()
}

views := []string{
"opencensus.io/http/server/request_count",
"opencensus.io/http/server/latency",
"opencensus.io/http/server/request_bytes",
"opencensus.io/http/server/response_bytes",
}

// TODO: test latency measurements?
tests := []struct {
name, method, target string
count, statusCode, reqSize, respSize int
}{
{"get 200", "GET", "http://opencensus.io/request/one", 10, 200, 512, 512},
{"post 503", "POST", "http://opencensus.io/request/two", 5, 503, 1024, 16384},
{"no body 302", "GET", "http://opencensus.io/request/three", 2, 302, 0, 0},
}
totalCount, meanReqSize, meanRespSize := 0, 0.0, 0.0

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
body := bytes.NewBuffer(make([]byte, test.reqSize))
r := httptest.NewRequest(test.method, test.target, body)
w := httptest.NewRecorder()
h := &Handler{
NoTrace: true,
Handler: httpHandler(test.statusCode, test.respSize),
}

for i := 0; i < test.count; i++ {
h.ServeHTTP(w, r)
totalCount++
// Distributions do not track sum directly, we must
// mimic their behaviour to avoid rounding failures.
meanReqSize = updateMean(meanReqSize, test.reqSize, totalCount)
meanRespSize = updateMean(meanRespSize, test.respSize, totalCount)
}
})
}
for i, v := range DefaultViews {

for _, viewName := range views {
v := view.Find(viewName)
if v == nil {
t.Errorf("view not initialized: %d", i)
t.Errorf("view not found %q", viewName)
continue
}
rows, err := view.RetrieveData(viewName)
if err != nil {
t.Error(err)
continue
}
if got, want := len(rows), 1; got != want {
t.Errorf("len(%q) = %d; want %d", viewName, got, want)
continue
}
data := rows[0].Data

var count int
var sum float64
switch data := data.(type) {
case *view.CountData:
count = int(*data)
case *view.DistributionData:
count = int(data.Count)
sum = data.Sum()
default:
t.Errorf("Unkown data type: %v", data)
continue
}

if got, want := count, totalCount; got != want {
t.Fatalf("%s = %d; want %d", viewName, got, want)
}

// We can only check sum for distribution views.
switch viewName {
case "opencensus.io/http/server/request_bytes":
if got, want := sum, meanReqSize*float64(totalCount); got != want {
t.Fatalf("%s = %g; want %g", viewName, got, want)
}
case "opencensus.io/http/server/response_bytes":
if got, want := sum, meanRespSize*float64(totalCount); got != want {
t.Fatalf("%s = %g; want %g", viewName, got, want)
}
}
}
}
70 changes: 70 additions & 0 deletions plugin/ochttp/stats_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,73 @@ func (t *tracker) Close() error {
t.end()
return t.body.Close()
}

func (h *Handler) startStats(w http.ResponseWriter, r *http.Request) (http.ResponseWriter, *http.Request, func()) {
ctx, _ := tag.New(r.Context(),
tag.Upsert(Host, r.URL.Host),
tag.Upsert(Path, r.URL.Path),
tag.Upsert(Method, r.Method))
track := &trackingResponseWriter{
start: time.Now(),
ctx: ctx,
writer: w,
}
if r.Body == nil {
// TODO: Handle cases where ContentLength is not set.
track.reqSize = -1
} else if r.ContentLength > 0 {
track.reqSize = r.ContentLength
}
stats.Record(ctx, ServerRequestCount.M(1))
return track, r.WithContext(ctx), func() { track.end() }
}

type trackingResponseWriter struct {
reqSize int64
respSize int64
ctx context.Context
start time.Time
statusCode string
endOnce sync.Once
writer http.ResponseWriter
}

var _ http.ResponseWriter = (*trackingResponseWriter)(nil)

func (t *trackingResponseWriter) end() {
t.endOnce.Do(func() {
if t.statusCode == "" {
t.statusCode = "200"
}
m := []stats.Measurement{
ServerLatency.M(float64(time.Since(t.start)) / float64(time.Millisecond)),
ServerResponseBytes.M(t.respSize),
}
if t.reqSize >= 0 {
m = append(m, ServerRequestBytes.M(t.reqSize))
}
ctx, _ := tag.New(t.ctx, tag.Upsert(StatusCode, t.statusCode))
stats.Record(ctx, m...)
})
}

func (t *trackingResponseWriter) Header() http.Header {
return t.writer.Header()
}

func (t *trackingResponseWriter) Write(data []byte) (int, error) {
n, err := t.writer.Write(data)
t.respSize += int64(n)
return n, err
}

func (t *trackingResponseWriter) WriteHeader(statusCode int) {
t.writer.WriteHeader(statusCode)
t.statusCode = strconv.Itoa(statusCode)
}

func (t *trackingResponseWriter) Flush() {
if flusher, ok := t.writer.(http.Flusher); ok {
flusher.Flush()
}
}
45 changes: 29 additions & 16 deletions plugin/ochttp/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,30 +158,43 @@ type Handler struct {
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var endTrace, endStats func()
if !h.NoTrace {
name := spanNameFromURL("Recv", r.URL)
p := h.Propagation
if p == nil {
p = defaultFormat
}
ctx := r.Context()
var span *trace.Span
if sc, ok := p.SpanContextFromRequest(r); ok {
ctx, span = trace.StartSpanWithRemoteParent(ctx, name, sc, trace.StartOptions{})
} else {
ctx, span = trace.StartSpan(ctx, name)
}
defer span.End()

span.SetAttributes(requestAttrs(r)...)
r = r.WithContext(ctx)
w, r, endTrace = h.startTrace(w, r)
}
if !h.NoStats {
w, r, endStats = h.startStats(w, r)
}

handler := h.Handler
if handler == nil {
handler = http.DefaultServeMux
}
handler.ServeHTTP(w, r)
if endTrace != nil {
endTrace()
}
if endStats != nil {
endStats()
}
}

func (h *Handler) startTrace(w http.ResponseWriter, r *http.Request) (http.ResponseWriter, *http.Request, func()) {
name := spanNameFromURL("Recv", r.URL)
p := h.Propagation
if p == nil {
p = defaultFormat
}
ctx := r.Context()
var span *trace.Span
if sc, ok := p.SpanContextFromRequest(r); ok {
ctx, span = trace.StartSpanWithRemoteParent(ctx, name, sc, trace.StartOptions{})
} else {
ctx, span = trace.StartSpan(ctx, name)
}

span.SetAttributes(requestAttrs(r)...)
return w, r.WithContext(ctx), func() { span.End() }
}

func spanNameFromURL(prefix string, u *url.URL) string {
Expand Down

0 comments on commit 09d3108

Please sign in to comment.