Skip to content

Commit

Permalink
Fix gRPC over cmux and add unit tests (#1758)
Browse files Browse the repository at this point in the history
* Add unit test for gRPC over cmux

Signed-off-by: Yuri Shkuro <[email protected]>

* Fix tests

Signed-off-by: Yuri Shkuro <[email protected]>

* Fix gRPC query service cmux

breaking change: grpc/grpc-go#2406
workaround described in:
- soheilhy/cmux#64
- https://github.com/soheilhy/cmux#limitations

Signed-off-by: Christian Weichel <[email protected]>

* Fix asertions

Signed-off-by: Yuri Shkuro <[email protected]>

* Use DialContext

Signed-off-by: Yuri Shkuro <[email protected]>

* Clean-up timeouts

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored and pavolloffay committed Sep 2, 2019
1 parent 6724390 commit 701f138
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
)

var (
grpcServerPort = ":0"
errStorageMsgGRPC = "Storage error"
errStorageGRPC = errors.New(errStorageMsgGRPC)
errStatusStorageGRPC = status.Error(2, errStorageMsgGRPC)
Expand Down Expand Up @@ -138,7 +137,7 @@ type grpcClient struct {
}

func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) {
lis, _ := net.Listen("tcp", grpcServerPort)
lis, _ := net.Listen("tcp", ":0")
grpcServer := grpc.NewServer()
grpcHandler := NewGRPCHandler(q, logger, tracer)
api_v2.RegisterQueryServiceServer(grpcServer, grpcHandler)
Expand All @@ -151,8 +150,10 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, t
return grpcServer, lis.Addr()
}

func newGRPCClient(t *testing.T, addr net.Addr) *grpcClient {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
func newGRPCClient(t *testing.T, addr string) *grpcClient {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure())
require.NoError(t, err)

return &grpcClient{
Expand Down Expand Up @@ -192,7 +193,7 @@ func initializeTestServerGRPCWithOptions(t *testing.T) *grpcServer {

func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, client *grpcClient)) {
server := initializeTestServerGRPCWithOptions(t)
client := newGRPCClient(t, server.lisAddr)
client := newGRPCClient(t, server.lisAddr.String())
defer server.server.Stop()
defer client.conn.Close()

Expand Down
7 changes: 4 additions & 3 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func (s *Server) Start() error {
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
grpcListener := cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
httpListener := cmuxServer.Match(cmux.Any())

go func() {
Expand Down
36 changes: 27 additions & 9 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package app

import (
"context"
"fmt"
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestServerError(t *testing.T) {
Expand All @@ -40,18 +45,30 @@ func TestServerError(t *testing.T) {
}

func TestServer(t *testing.T) {
flagsSvc := flags.NewService(ports.AgentAdminHTTP)
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP,
BearerTokenPropagation: true}, tracer)
server := NewServer(flagsSvc, querySvc,
&QueryOptions{Port: ports.QueryHTTP, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
time.Sleep(1 * time.Second)
client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP))
defer client.conn.Close()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
assert.NoError(t, err)
assert.Equal(t, expectedServices, res.Services)

server.Close()
for i := 0; i < 10; i++ {
Expand All @@ -64,7 +81,7 @@ func TestServer(t *testing.T) {
}

func TestServerGracefulExit(t *testing.T) {
flagsSvc := flags.NewService(ports.AgentAdminHTTP)
flagsSvc := flags.NewService(ports.QueryAdminHTTP)

zapCore, logs := observer.New(zap.ErrorLevel)
assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.")
Expand All @@ -77,11 +94,12 @@ func TestServerGracefulExit(t *testing.T) {
assert.NoError(t, server.Start())

// Wait for servers to come up before we can call .Close()
// TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests.
time.Sleep(1 * time.Second)
server.Close()

for _, logEntry := range logs.All() {
assert.True(t, logEntry.Level != zap.ErrorLevel,
fmt.Sprintf("Error log found on server exit: %v", logEntry))
"Error log found on server exit: %v", logEntry)
}
}

0 comments on commit 701f138

Please sign in to comment.