From aa454af2e0fc771501a5219fdf1ceda5329161eb Mon Sep 17 00:00:00 2001 From: Ramon Nogueira Date: Fri, 16 Mar 2018 14:21:47 -0700 Subject: [PATCH] Add tests for gRPC span duration an Stackdriver gRPC exporting --- exporter/stackdriver/stackdriver_test.go | 26 +++ internal/testpb/impl.go | 93 +++++++++ internal/testpb/test.pb.go | 28 ++- internal/testpb/test.proto | 1 + plugin/ocgrpc/trace_common_test.go | 247 ----------------------- plugin/ocgrpc/trace_test.go | 234 +++++++++++++++++++++ stats/view/worker.go | 2 + zpages/rpcz_test.go | 67 +----- 8 files changed, 376 insertions(+), 322 deletions(-) create mode 100644 internal/testpb/impl.go create mode 100644 plugin/ocgrpc/trace_test.go diff --git a/exporter/stackdriver/stackdriver_test.go b/exporter/stackdriver/stackdriver_test.go index 796cbb166..59b91303b 100644 --- a/exporter/stackdriver/stackdriver_test.go +++ b/exporter/stackdriver/stackdriver_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "go.opencensus.io/internal/testpb" "go.opencensus.io/plugin/ochttp" "go.opencensus.io/stats/view" "go.opencensus.io/trace" @@ -81,3 +82,28 @@ func TestExport(t *testing.T) { exporter.Flush() exporter.Flush() } + +func TestGRPC(t *testing.T) { + projectID, ok := os.LookupEnv("STACKDRIVER_TEST_PROJECT_ID") + if !ok { + t.Skip("STACKDRIVER_TEST_PROJECT_ID not set") + } + + exporter, err := NewExporter(Options{ProjectID: projectID}) + if err != nil { + t.Fatal(err) + } + defer exporter.Flush() + + trace.RegisterExporter(exporter) + defer trace.UnregisterExporter(exporter) + view.RegisterExporter(exporter) + defer view.UnregisterExporter(exporter) + + trace.SetDefaultSampler(trace.AlwaysSample()) + + client, done := testpb.NewTestClient(t) + defer done() + + client.Single(context.Background(), &testpb.FooRequest{SleepNanos: int64(42 * time.Millisecond)}) +} diff --git a/internal/testpb/impl.go b/internal/testpb/impl.go new file mode 100644 index 000000000..24533afcd --- /dev/null +++ b/internal/testpb/impl.go @@ -0,0 +1,93 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testpb + +import ( + "context" + "fmt" + "io" + "net" + "testing" + "time" + + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/trace" + "google.golang.org/grpc" +) + +type testServer struct{} + +var _ FooServer = (*testServer)(nil) + +func (s *testServer) Single(ctx context.Context, in *FooRequest) (*FooResponse, error) { + if in.SleepNanos > 0 { + _, span := trace.StartSpan(ctx, "testpb.Single.Sleep") + span.AddAttributes(trace.Int64Attribute("sleep_nanos", in.SleepNanos)) + time.Sleep(time.Duration(in.SleepNanos)) + span.End() + } + if in.Fail { + return nil, fmt.Errorf("request failed") + } + return &FooResponse{}, nil +} + +func (s *testServer) Multiple(stream Foo_MultipleServer) error { + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + if in.Fail { + return fmt.Errorf("request failed") + } + if err := stream.Send(&FooResponse{}); err != nil { + return err + } + } +} + +func NewTestClient(l *testing.T) (client FooClient, cleanup func()) { + // initialize server + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + l.Fatal(err) + } + server := grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})) + RegisterFooServer(server, &testServer{}) + go server.Serve(listener) + + // Initialize client. + clientConn, err := grpc.Dial( + listener.Addr().String(), + grpc.WithInsecure(), + grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), + grpc.WithBlock()) + + if err != nil { + l.Fatal(err) + } + client = NewFooClient(clientConn) + + cleanup = func() { + server.GracefulStop() + clientConn.Close() + } + + return client, cleanup +} diff --git a/internal/testpb/test.pb.go b/internal/testpb/test.pb.go index 76346fc1e..b8e2bbc95 100644 --- a/internal/testpb/test.pb.go +++ b/internal/testpb/test.pb.go @@ -34,7 +34,8 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type FooRequest struct { - Fail bool `protobuf:"varint,1,opt,name=fail" json:"fail,omitempty"` + Fail bool `protobuf:"varint,1,opt,name=fail" json:"fail,omitempty"` + SleepNanos int64 `protobuf:"varint,2,opt,name=sleep_nanos,json=sleepNanos" json:"sleep_nanos,omitempty"` } func (m *FooRequest) Reset() { *m = FooRequest{} } @@ -49,6 +50,13 @@ func (m *FooRequest) GetFail() bool { return false } +func (m *FooRequest) GetSleepNanos() int64 { + if m != nil { + return m.SleepNanos + } + return 0 +} + type FooResponse struct { } @@ -203,16 +211,18 @@ var _Foo_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("test.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 137 bytes of a gzipped FileDescriptorProto + // 165 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e, - 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0xb1, 0x0b, 0x92, 0x94, 0x14, 0xb8, 0xb8, + 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0xb1, 0x0b, 0x92, 0x94, 0x1c, 0xb9, 0xb8, 0xdc, 0xf2, 0xf3, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x84, 0xb8, 0x58, 0xd2, 0x12, - 0x33, 0x73, 0x24, 0x18, 0x15, 0x18, 0x35, 0x38, 0x82, 0xc0, 0x6c, 0x25, 0x5e, 0x2e, 0x6e, 0xb0, - 0x8a, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa3, 0x42, 0x2e, 0x66, 0xb7, 0xfc, 0x7c, 0x21, 0x43, - 0x2e, 0xb6, 0xe0, 0xcc, 0xbc, 0xf4, 0x9c, 0x54, 0x21, 0x21, 0x3d, 0x88, 0x51, 0x7a, 0x08, 0x73, - 0xa4, 0x84, 0x51, 0xc4, 0x20, 0x3a, 0x85, 0xcc, 0xb9, 0x38, 0x7c, 0x4b, 0x73, 0x4a, 0x32, 0x0b, - 0x48, 0xd0, 0xa4, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0x76, 0xb2, 0x31, 0x20, 0x00, 0x00, 0xff, - 0xff, 0xda, 0xc5, 0x9f, 0x2f, 0xc0, 0x00, 0x00, 0x00, + 0x33, 0x73, 0x24, 0x18, 0x15, 0x18, 0x35, 0x38, 0x82, 0xc0, 0x6c, 0x21, 0x79, 0x2e, 0xee, 0xe2, + 0x9c, 0xd4, 0xd4, 0x82, 0xf8, 0xbc, 0xc4, 0xbc, 0xfc, 0x62, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xe6, + 0x20, 0x2e, 0xb0, 0x90, 0x1f, 0x48, 0x44, 0x89, 0x97, 0x8b, 0x1b, 0x6c, 0x44, 0x71, 0x41, 0x7e, + 0x5e, 0x71, 0xaa, 0x51, 0x21, 0x17, 0xb3, 0x5b, 0x7e, 0xbe, 0x90, 0x21, 0x17, 0x5b, 0x70, 0x66, + 0x5e, 0x7a, 0x4e, 0xaa, 0x90, 0x90, 0x1e, 0xc4, 0x2e, 0x3d, 0x84, 0x45, 0x52, 0xc2, 0x28, 0x62, + 0x10, 0x9d, 0x42, 0xe6, 0x5c, 0x1c, 0xbe, 0xa5, 0x39, 0x25, 0x99, 0x05, 0x24, 0x68, 0xd2, 0x60, + 0x34, 0x60, 0x4c, 0x62, 0x03, 0xfb, 0xc9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x37, 0xb1, 0x2d, + 0x6e, 0xe1, 0x00, 0x00, 0x00, } //go:generate ./generate.sh diff --git a/internal/testpb/test.proto b/internal/testpb/test.proto index 2a198a6f5..b82d128ac 100644 --- a/internal/testpb/test.proto +++ b/internal/testpb/test.proto @@ -4,6 +4,7 @@ package testpb; message FooRequest { bool fail = 1; + int64 sleep_nanos = 2; } message FooResponse { diff --git a/plugin/ocgrpc/trace_common_test.go b/plugin/ocgrpc/trace_common_test.go index fa36861fb..9e590d994 100644 --- a/plugin/ocgrpc/trace_common_test.go +++ b/plugin/ocgrpc/trace_common_test.go @@ -15,80 +15,14 @@ package ocgrpc import ( - "fmt" - "io" - "net" "testing" - "time" - "go.opencensus.io/internal/testpb" "go.opencensus.io/trace" "golang.org/x/net/context" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" ) -type testServer struct{} - -func (s *testServer) Single(ctx context.Context, in *testpb.FooRequest) (*testpb.FooResponse, error) { - if in.Fail { - return nil, fmt.Errorf("request failed") - } - return &testpb.FooResponse{}, nil -} - -func (s *testServer) Multiple(stream testpb.Foo_MultipleServer) error { - for { - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - if in.Fail { - return fmt.Errorf("request failed") - } - if err := stream.Send(&testpb.FooResponse{}); err != nil { - return err - } - } -} - -func newTracingOnlyTestClientAndServer() (client testpb.FooClient, server *grpc.Server, cleanup func(), err error) { - // initialize server - listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, nil, nil, fmt.Errorf("net.Listen: %v", err) - } - server = grpc.NewServer(grpc.StatsHandler(&ServerHandler{})) - testpb.RegisterFooServer(server, &testServer{}) - go server.Serve(listener) - - // initialize client - clientConn, err := grpc.Dial(listener.Addr().String(), grpc.WithInsecure(), grpc.WithStatsHandler(&ClientHandler{}), grpc.WithBlock()) - if err != nil { - return nil, nil, nil, fmt.Errorf("grpc.Dial: %v", err) - } - client = testpb.NewFooClient(clientConn) - - cleanup = func() { - server.GracefulStop() - clientConn.Close() - } - - return client, server, cleanup, nil -} - -type testExporter struct { - ch chan *trace.SpanData -} - -func (t *testExporter) ExportSpan(s *trace.SpanData) { - go func() { t.ch <- s }() -} - func TestClientHandler_traceTagRPC(t *testing.T) { ch := &ClientHandler{} ch.StartOptions.Sampler = trace.AlwaysSample() @@ -110,184 +44,3 @@ func TestClientHandler_traceTagRPC(t *testing.T) { t.Fatal("no metadata") } } - -func TestStreaming(t *testing.T) { - trace.SetDefaultSampler(trace.AlwaysSample()) - te := testExporter{make(chan *trace.SpanData)} - trace.RegisterExporter(&te) - defer trace.UnregisterExporter(&te) - - client, _, cleanup, err := newTracingOnlyTestClientAndServer() - if err != nil { - t.Fatalf("initializing client and server: %v", err) - } - - stream, err := client.Multiple(context.Background()) - if err != nil { - t.Fatalf("Call failed: %v", err) - } - - err = stream.Send(&testpb.FooRequest{}) - if err != nil { - t.Fatalf("Couldn't send streaming request: %v", err) - } - stream.CloseSend() - - for { - _, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - t.Errorf("stream.Recv() = %v; want no errors", err) - } - } - - cleanup() - - s1 := <-te.ch - s2 := <-te.ch - - checkSpanData(t, s1, s2, ".testpb.Foo.Multiple", true) - - select { - case <-te.ch: - t.Fatal("received extra exported spans") - case <-time.After(time.Second / 10): - } -} - -func TestStreamingFail(t *testing.T) { - trace.SetDefaultSampler(trace.AlwaysSample()) - te := testExporter{make(chan *trace.SpanData)} - trace.RegisterExporter(&te) - defer trace.UnregisterExporter(&te) - - client, _, cleanup, err := newTracingOnlyTestClientAndServer() - if err != nil { - t.Fatalf("initializing client and server: %v", err) - } - - stream, err := client.Multiple(context.Background()) - if err != nil { - t.Fatalf("Call failed: %v", err) - } - - err = stream.Send(&testpb.FooRequest{Fail: true}) - if err != nil { - t.Fatalf("Couldn't send streaming request: %v", err) - } - stream.CloseSend() - - for { - _, err := stream.Recv() - if err == nil || err == io.EOF { - t.Errorf("stream.Recv() = %v; want errors", err) - } else { - break - } - } - - s1 := <-te.ch - s2 := <-te.ch - - checkSpanData(t, s1, s2, ".testpb.Foo.Multiple", false) - cleanup() - - select { - case <-te.ch: - t.Fatal("received extra exported spans") - case <-time.After(time.Second / 10): - } -} - -func TestSingle(t *testing.T) { - trace.SetDefaultSampler(trace.AlwaysSample()) - te := testExporter{make(chan *trace.SpanData)} - trace.RegisterExporter(&te) - defer trace.UnregisterExporter(&te) - - client, _, cleanup, err := newTracingOnlyTestClientAndServer() - if err != nil { - t.Fatalf("initializing client and server: %v", err) - } - - _, err = client.Single(context.Background(), &testpb.FooRequest{}) - if err != nil { - t.Fatalf("Couldn't send request: %v", err) - } - - s1 := <-te.ch - s2 := <-te.ch - - checkSpanData(t, s1, s2, ".testpb.Foo.Single", true) - cleanup() - - select { - case <-te.ch: - t.Fatal("received extra exported spans") - case <-time.After(time.Second / 10): - } -} - -func TestSingleFail(t *testing.T) { - trace.SetDefaultSampler(trace.AlwaysSample()) - te := testExporter{make(chan *trace.SpanData)} - trace.RegisterExporter(&te) - defer trace.UnregisterExporter(&te) - - client, _, cleanup, err := newTracingOnlyTestClientAndServer() - if err != nil { - t.Fatalf("initializing client and server: %v", err) - } - - _, err = client.Single(context.Background(), &testpb.FooRequest{Fail: true}) - if err == nil { - t.Fatalf("Got nil error from request, want non-nil") - } - - s1 := <-te.ch - s2 := <-te.ch - - checkSpanData(t, s1, s2, ".testpb.Foo.Single", false) - cleanup() - - select { - case <-te.ch: - t.Fatal("received extra exported spans") - case <-time.After(time.Second / 10): - } -} - -func checkSpanData(t *testing.T, s1, s2 *trace.SpanData, methodName string, success bool) { - t.Helper() - - if s1.Name < s2.Name { - s1, s2 = s2, s1 - } - - if got, want := s1.Name, "Sent"+methodName; got != want { - t.Errorf("Got name %q want %q", got, want) - } - if got, want := s2.Name, "Recv"+methodName; got != want { - t.Errorf("Got name %q want %q", got, want) - } - if got, want := s2.SpanContext.TraceID, s1.SpanContext.TraceID; got != want { - t.Errorf("Got trace IDs %s and %s, want them equal", got, want) - } - if got, want := s2.ParentSpanID, s1.SpanContext.SpanID; got != want { - t.Errorf("Got ParentSpanID %s, want %s", got, want) - } - if got := (s1.Status.Code == 0); got != success { - t.Errorf("Got success=%t want %t", got, success) - } - if got := (s2.Status.Code == 0); got != success { - t.Errorf("Got success=%t want %t", got, success) - } - if s1.HasRemoteParent { - t.Errorf("Got HasRemoteParent=%t, want false", s1.HasRemoteParent) - } - if !s2.HasRemoteParent { - t.Errorf("Got HasRemoteParent=%t, want true", s2.HasRemoteParent) - } -} diff --git a/plugin/ocgrpc/trace_test.go b/plugin/ocgrpc/trace_test.go new file mode 100644 index 000000000..fffcc840f --- /dev/null +++ b/plugin/ocgrpc/trace_test.go @@ -0,0 +1,234 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ocgrpc_test + +import ( + "io" + "strings" + "testing" + "time" + + "go.opencensus.io/internal/testpb" + "go.opencensus.io/trace" + "golang.org/x/net/context" +) + +type testExporter struct { + ch chan *trace.SpanData +} + +func (t *testExporter) ExportSpan(s *trace.SpanData) { + go func() { t.ch <- s }() +} + +func TestStreaming(t *testing.T) { + trace.SetDefaultSampler(trace.AlwaysSample()) + te := testExporter{make(chan *trace.SpanData)} + trace.RegisterExporter(&te) + defer trace.UnregisterExporter(&te) + + client, cleanup := testpb.NewTestClient(t) + + stream, err := client.Multiple(context.Background()) + if err != nil { + t.Fatalf("Call failed: %v", err) + } + + err = stream.Send(&testpb.FooRequest{}) + if err != nil { + t.Fatalf("Couldn't send streaming request: %v", err) + } + stream.CloseSend() + + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + t.Errorf("stream.Recv() = %v; want no errors", err) + } + } + + cleanup() + + s1 := <-te.ch + s2 := <-te.ch + + checkSpanData(t, s1, s2, ".testpb.Foo.Multiple", true) + + select { + case <-te.ch: + t.Fatal("received extra exported spans") + case <-time.After(time.Second / 10): + } +} + +func TestStreamingFail(t *testing.T) { + trace.SetDefaultSampler(trace.AlwaysSample()) + te := testExporter{make(chan *trace.SpanData)} + trace.RegisterExporter(&te) + defer trace.UnregisterExporter(&te) + + client, cleanup := testpb.NewTestClient(t) + + stream, err := client.Multiple(context.Background()) + if err != nil { + t.Fatalf("Call failed: %v", err) + } + + err = stream.Send(&testpb.FooRequest{Fail: true}) + if err != nil { + t.Fatalf("Couldn't send streaming request: %v", err) + } + stream.CloseSend() + + for { + _, err := stream.Recv() + if err == nil || err == io.EOF { + t.Errorf("stream.Recv() = %v; want errors", err) + } else { + break + } + } + + s1 := <-te.ch + s2 := <-te.ch + + checkSpanData(t, s1, s2, ".testpb.Foo.Multiple", false) + cleanup() + + select { + case <-te.ch: + t.Fatal("received extra exported spans") + case <-time.After(time.Second / 10): + } +} + +func TestSingle(t *testing.T) { + trace.SetDefaultSampler(trace.AlwaysSample()) + te := testExporter{make(chan *trace.SpanData)} + trace.RegisterExporter(&te) + defer trace.UnregisterExporter(&te) + + client, cleanup := testpb.NewTestClient(t) + + _, err := client.Single(context.Background(), &testpb.FooRequest{}) + if err != nil { + t.Fatalf("Couldn't send request: %v", err) + } + + s1 := <-te.ch + s2 := <-te.ch + + checkSpanData(t, s1, s2, ".testpb.Foo.Single", true) + cleanup() + + select { + case <-te.ch: + t.Fatal("received extra exported spans") + case <-time.After(time.Second / 10): + } +} + +func TestServerSpanDuration(t *testing.T) { + client, cleanup := testpb.NewTestClient(t) + defer cleanup() + + te := testExporter{make(chan *trace.SpanData, 100)} + trace.RegisterExporter(&te) + defer trace.UnregisterExporter(&te) + + trace.SetDefaultSampler(trace.AlwaysSample()) + + ctx := context.Background() + const sleep = 100 * time.Millisecond + client.Single(ctx, &testpb.FooRequest{SleepNanos: int64(sleep)}) + +loop: + for { + select { + case span := <-te.ch: + if !strings.HasPrefix(span.Name, "Recv.") { + continue loop + } + if got, want := span.EndTime.Sub(span.StartTime), sleep; got < want { + t.Errorf("span duration = %dns; want at least %dns", got, want) + } + break loop + default: + t.Fatal("no more spans") + } + } +} + +func TestSingleFail(t *testing.T) { + trace.SetDefaultSampler(trace.AlwaysSample()) + te := testExporter{make(chan *trace.SpanData)} + trace.RegisterExporter(&te) + defer trace.UnregisterExporter(&te) + + client, cleanup := testpb.NewTestClient(t) + + _, err := client.Single(context.Background(), &testpb.FooRequest{Fail: true}) + if err == nil { + t.Fatalf("Got nil error from request, want non-nil") + } + + s1 := <-te.ch + s2 := <-te.ch + + checkSpanData(t, s1, s2, ".testpb.Foo.Single", false) + cleanup() + + select { + case <-te.ch: + t.Fatal("received extra exported spans") + case <-time.After(time.Second / 10): + } +} + +func checkSpanData(t *testing.T, s1, s2 *trace.SpanData, methodName string, success bool) { + t.Helper() + + if s1.Name < s2.Name { + s1, s2 = s2, s1 + } + + if got, want := s1.Name, "Sent"+methodName; got != want { + t.Errorf("Got name %q want %q", got, want) + } + if got, want := s2.Name, "Recv"+methodName; got != want { + t.Errorf("Got name %q want %q", got, want) + } + if got, want := s2.SpanContext.TraceID, s1.SpanContext.TraceID; got != want { + t.Errorf("Got trace IDs %s and %s, want them equal", got, want) + } + if got, want := s2.ParentSpanID, s1.SpanContext.SpanID; got != want { + t.Errorf("Got ParentSpanID %s, want %s", got, want) + } + if got := (s1.Status.Code == 0); got != success { + t.Errorf("Got success=%t want %t", got, success) + } + if got := (s2.Status.Code == 0); got != success { + t.Errorf("Got success=%t want %t", got, success) + } + if s1.HasRemoteParent { + t.Errorf("Got HasRemoteParent=%t, want false", s1.HasRemoteParent) + } + if !s2.HasRemoteParent { + t.Errorf("Got HasRemoteParent=%t, want true", s2.HasRemoteParent) + } +} diff --git a/stats/view/worker.go b/stats/view/worker.go index 924ffe5c7..2b4f6bee0 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -94,6 +94,8 @@ func Subscribe(views ...*View) error { // Unsubscribe the given views. Data will not longer be exported for these views // after Unsubscribe returns. +// It is not necessary to unsubscribe from views you expect to collect for the +// duration of your program execution. func Unsubscribe(views ...*View) { names := make([]string, len(views)) for i := range views { diff --git a/zpages/rpcz_test.go b/zpages/rpcz_test.go index 3f511efb9..9764f6fb3 100644 --- a/zpages/rpcz_test.go +++ b/zpages/rpcz_test.go @@ -17,80 +17,15 @@ package zpages import ( "context" - "fmt" - "io" - "log" - "net" "testing" "time" "go.opencensus.io/internal/testpb" - "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" - "google.golang.org/grpc" ) -type testServer struct{} - -var _ testpb.FooServer = (*testServer)(nil) - -func (s *testServer) Single(ctx context.Context, in *testpb.FooRequest) (*testpb.FooResponse, error) { - if in.Fail { - return nil, fmt.Errorf("request failed") - } - return &testpb.FooResponse{}, nil -} - -func (s *testServer) Multiple(stream testpb.Foo_MultipleServer) error { - for { - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - if in.Fail { - return fmt.Errorf("request failed") - } - if err := stream.Send(&testpb.FooResponse{}); err != nil { - return err - } - } -} - -func newClientAndServer() (client testpb.FooClient, server *grpc.Server, cleanup func()) { - // initialize server - listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - log.Fatal(err) - } - server = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})) - testpb.RegisterFooServer(server, &testServer{}) - go server.Serve(listener) - - // Initialize client. - clientConn, err := grpc.Dial( - listener.Addr().String(), - grpc.WithInsecure(), - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), - grpc.WithBlock()) - - if err != nil { - log.Fatal(err) - } - client = testpb.NewFooClient(clientConn) - - cleanup = func() { - server.GracefulStop() - clientConn.Close() - } - - return client, server, cleanup -} - func TestRpcz(t *testing.T) { - client, _, cleanup := newClientAndServer() + client, cleanup := testpb.NewTestClient(t) defer cleanup() _, err := client.Single(context.Background(), &testpb.FooRequest{})