Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
rpcz doesn't display any data
Browse files Browse the repository at this point in the history
The change to remove view.New in favor of constructing a View
directly broke rpcz because we stopped preserving the identity
of the *View pointer.

This restores the identity of *View such that the same pointer is
always returned in view.Data (and can therefore be used in maps and
switch statements like it was in rpcz).
  • Loading branch information
Ramon Nogueira committed Mar 16, 2018
1 parent a2a7d39 commit 5e1a3e8
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 82 deletions.
2 changes: 1 addition & 1 deletion examples/grpc/helloworld_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
view.RegisterExporter(&exporter.PrintExporter{})

// Subscribe to collect server request count.
if err := ocgrpc.ServerRequestCountView.Subscribe(); err != nil {
if err := view.Subscribe(ocgrpc.DefaultServerViews...); err != nil {
log.Fatal(err)
}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion plugin/ocgrpc/trace_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

"go.opencensus.io/plugin/ocgrpc/internal/testpb"
"go.opencensus.io/internal/testpb"
"go.opencensus.io/trace"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down
33 changes: 13 additions & 20 deletions stats/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,26 @@ func (v *View) same(other *View) bool {

// canonicalized returns a validated View canonicalized by setting explicit
// defaults for Name and Description and sorting the TagKeys
func (v *View) canonicalized() (*View, error) {
func (v *View) canonicalize() error {
if v.Measure == nil {
return nil, fmt.Errorf("cannot subscribe view %q: measure not set", v.Name)
return fmt.Errorf("cannot subscribe view %q: measure not set", v.Name)
}
if v.Aggregation == nil {
return nil, fmt.Errorf("cannot subscribe view %q: aggregation not set", v.Name)
return fmt.Errorf("cannot subscribe view %q: aggregation not set", v.Name)
}
vc := *v
if vc.Name == "" {
vc.Name = vc.Measure.Name()
if v.Name == "" {
v.Name = v.Measure.Name()
}
if vc.Description == "" {
vc.Description = vc.Measure.Description()
if v.Description == "" {
v.Description = v.Measure.Description()
}
if err := checkViewName(vc.Name); err != nil {
return nil, err
if err := checkViewName(v.Name); err != nil {
return err
}
vc.TagKeys = make([]tag.Key, len(v.TagKeys))
copy(vc.TagKeys, v.TagKeys)
sort.Slice(vc.TagKeys, func(i, j int) bool {
return vc.TagKeys[i].Name() < vc.TagKeys[j].Name()
sort.Slice(v.TagKeys, func(i, j int) bool {
return v.TagKeys[i].Name() < v.TagKeys[j].Name()
})
return &vc, nil
return nil
}

// viewInternal is the internal representation of a View.
Expand All @@ -116,12 +113,8 @@ type viewInternal struct {
}

func newViewInternal(v *View) (*viewInternal, error) {
vc, err := v.canonicalized()
if err != nil {
return nil, err
}
return &viewInternal{
view: vc,
view: v,
collector: &collector{make(map[string]AggregationData), v.Aggregation},
}, nil
}
Expand Down
10 changes: 5 additions & 5 deletions stats/view/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,20 +314,20 @@ func TestCanonicalize(t *testing.T) {
k2, _ := tag.NewKey("k2")
m, _ := stats.Int64("TestCanonicalize/m1", "desc desc", stats.UnitNone)
v := &View{TagKeys: []tag.Key{k2, k1}, Measure: m, Aggregation: MeanAggregation{}}
vc, err := v.canonicalized()
err := v.canonicalize()
if err != nil {
t.Fatal(err)
}
if got, want := vc.Name, "TestCanonicalize/m1"; got != want {
if got, want := v.Name, "TestCanonicalize/m1"; got != want {
t.Errorf("vc.Name = %q; want %q", got, want)
}
if got, want := vc.Description, "desc desc"; got != want {
if got, want := v.Description, "desc desc"; got != want {
t.Errorf("vc.Description = %q; want %q", got, want)
}
if got, want := len(vc.TagKeys), 2; got != want {
if got, want := len(v.TagKeys), 2; got != want {
t.Errorf("len(vc.TagKeys) = %d; want %d", got, want)
}
if got, want := vc.TagKeys[0].Name(), "k1"; got != want {
if got, want := v.TagKeys[0].Name(), "k1"; got != want {
t.Errorf("vc.TagKeys[0].Name() = %q; want %q", got, want)
}
}
Expand Down
6 changes: 6 additions & 0 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (v *View) Subscribe() error {
// Subscribe begins collecting data for the given views.
// Once a view is subscribed, it reports data to the registered exporters.
func Subscribe(views ...*View) error {
for _, v := range views {
err := v.canonicalize()
if err != nil {
return err
}
}
req := &subscribeToViewReq{
views: views,
err: make(chan error),
Expand Down
55 changes: 0 additions & 55 deletions zpages/rpcz.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"text/tabwriter"
"time"

"go.opencensus.io/internal"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
)
Expand Down Expand Up @@ -142,60 +141,6 @@ type headerData struct {
Title string
}

type summaryPageData struct {
Header []string
LatencyBucketNames []string
Links bool
TracesEndpoint string
Rows []summaryPageRow
}

type summaryPageRow struct {
Name string
Active int
Latency []int
Errors int
}

func (s *summaryPageData) Len() int { return len(s.Rows) }
func (s *summaryPageData) Less(i, j int) bool { return s.Rows[i].Name < s.Rows[j].Name }
func (s *summaryPageData) Swap(i, j int) { s.Rows[i], s.Rows[j] = s.Rows[j], s.Rows[i] }

func getSummaryPageData() summaryPageData {
data := summaryPageData{
Links: true,
TracesEndpoint: "/tracez",
}
internalTrace := internal.Trace.(interface {
ReportSpansPerMethod() map[string]internal.PerMethodSummary
})
for name, s := range internalTrace.ReportSpansPerMethod() {
if len(data.Header) == 0 {
data.Header = []string{"Name", "Active"}
for _, b := range s.LatencyBuckets {
l := b.MinLatency
s := fmt.Sprintf(">%v", l)
if l == 100*time.Second {
s = ">100s"
}
data.Header = append(data.Header, s)
data.LatencyBucketNames = append(data.LatencyBucketNames, s)
}
data.Header = append(data.Header, "Errors")
}
row := summaryPageRow{Name: name, Active: s.Active}
for _, l := range s.LatencyBuckets {
row.Latency = append(row.Latency, l.Size)
}
for _, e := range s.ErrorBuckets {
row.Errors += e.Size
}
data.Rows = append(data.Rows, row)
}
sort.Sort(&data)
return data
}

// statsPage aggregates stats on the page for 'sent' and 'received' categories
type statsPage struct {
StatGroups []*statGroup
Expand Down
117 changes: 117 additions & 0 deletions zpages/rpcz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package zpages

import (
"context"
"fmt"
"io"
"log"
"net"
"testing"
"time"

"go.opencensus.io/internal/testpb"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"google.golang.org/grpc"
)

type testServer struct{}

var _ testpb.FooServer = (*testServer)(nil)

func (s *testServer) Single(ctx context.Context, in *testpb.FooRequest) (*testpb.FooResponse, error) {
if in.Fail {
return nil, fmt.Errorf("request failed")
}
return &testpb.FooResponse{}, nil
}

func (s *testServer) Multiple(stream testpb.Foo_MultipleServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if in.Fail {
return fmt.Errorf("request failed")
}
if err := stream.Send(&testpb.FooResponse{}); err != nil {
return err
}
}
}

func newClientAndServer() (client testpb.FooClient, server *grpc.Server, cleanup func()) {
// initialize server
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatal(err)
}
server = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
testpb.RegisterFooServer(server, &testServer{})
go server.Serve(listener)

// Initialize client.
clientConn, err := grpc.Dial(
listener.Addr().String(),
grpc.WithInsecure(),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithBlock())

if err != nil {
log.Fatal(err)
}
client = testpb.NewFooClient(clientConn)

cleanup = func() {
server.GracefulStop()
clientConn.Close()
}

return client, server, cleanup
}

func TestRpcz(t *testing.T) {
client, _, cleanup := newClientAndServer()
defer cleanup()

_, err := client.Single(context.Background(), &testpb.FooRequest{})
if err != nil {
t.Fatal(err)
}

view.SetReportingPeriod(time.Millisecond)
time.Sleep(2 * time.Millisecond)
view.SetReportingPeriod(time.Second)

if len(snaps) == 0 {
t.Fatal("Expected len(snaps) > 0")
}

snapshot, ok := snaps[methodKey{"testpb.Foo/Single", false}]
if !ok {
t.Fatal("Expected method stats not recorded")
}

if got, want := snapshot.CountTotal, 1; got != want {
t.Errorf("snapshot.CountTotal = %d; want %d", got, want)
}
}
54 changes: 54 additions & 0 deletions zpages/tracez.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,57 @@ func writeTextTraces(w io.Writer, data traceData) {
}
tw.Flush()
}

type summaryPageData struct {
Header []string
LatencyBucketNames []string
Links bool
TracesEndpoint string
Rows []summaryPageRow
}

type summaryPageRow struct {
Name string
Active int
Latency []int
Errors int
}

func (s *summaryPageData) Len() int { return len(s.Rows) }
func (s *summaryPageData) Less(i, j int) bool { return s.Rows[i].Name < s.Rows[j].Name }
func (s *summaryPageData) Swap(i, j int) { s.Rows[i], s.Rows[j] = s.Rows[j], s.Rows[i] }

func getSummaryPageData() summaryPageData {
data := summaryPageData{
Links: true,
TracesEndpoint: "/tracez",
}
internalTrace := internal.Trace.(interface {
ReportSpansPerMethod() map[string]internal.PerMethodSummary
})
for name, s := range internalTrace.ReportSpansPerMethod() {
if len(data.Header) == 0 {
data.Header = []string{"Name", "Active"}
for _, b := range s.LatencyBuckets {
l := b.MinLatency
s := fmt.Sprintf(">%v", l)
if l == 100*time.Second {
s = ">100s"
}
data.Header = append(data.Header, s)
data.LatencyBucketNames = append(data.LatencyBucketNames, s)
}
data.Header = append(data.Header, "Errors")
}
row := summaryPageRow{Name: name, Active: s.Active}
for _, l := range s.LatencyBuckets {
row.Latency = append(row.Latency, l.Size)
}
for _, e := range s.ErrorBuckets {
row.Errors += e.Size
}
data.Rows = append(data.Rows, row)
}
sort.Sort(&data)
return data
}

0 comments on commit 5e1a3e8

Please sign in to comment.