diff --git a/Gopkg.lock b/Gopkg.lock index cbdf41f0478..8515273a9bc 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1281,6 +1281,8 @@ "github.com/bsm/sarama-cluster", "github.com/crossdock/crossdock-go", "github.com/dgraph-io/badger", + "github.com/dgraph-io/badger/options", + "github.com/fsnotify/fsnotify", "github.com/go-openapi/errors", "github.com/go-openapi/loads", "github.com/go-openapi/runtime", diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 0ffc1944b53..63a2bd3d11c 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -40,7 +40,6 @@ import ( ) var ( - grpcServerPort = ":0" errStorageMsgGRPC = "Storage error" errStorageGRPC = errors.New(errStorageMsgGRPC) errStatusStorageGRPC = status.Error(2, errStorageMsgGRPC) @@ -138,7 +137,7 @@ type grpcClient struct { } func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) { - lis, _ := net.Listen("tcp", grpcServerPort) + lis, _ := net.Listen("tcp", ":0") grpcServer := grpc.NewServer() grpcHandler := NewGRPCHandler(q, logger, tracer) api_v2.RegisterQueryServiceServer(grpcServer, grpcHandler) @@ -151,8 +150,10 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, t return grpcServer, lis.Addr() } -func newGRPCClient(t *testing.T, addr net.Addr) *grpcClient { - conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) +func newGRPCClient(t *testing.T, addr string) *grpcClient { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure()) require.NoError(t, err) return &grpcClient{ @@ -192,7 +193,7 @@ func initializeTestServerGRPCWithOptions(t *testing.T) *grpcServer { func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, client *grpcClient)) { server := initializeTestServerGRPCWithOptions(t) - client := newGRPCClient(t, server.lisAddr) + client := newGRPCClient(t, server.lisAddr.String()) defer server.server.Stop() defer client.conn.Close() diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 5d2841aac74..17759e6f879 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -102,9 +102,10 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - grpcListener := cmuxServer.Match( - cmux.HTTP2HeaderField("content-type", "application/grpc"), - cmux.HTTP2HeaderField("content-type", "application/grpc+proto")) + grpcListener := cmuxServer.MatchWithWriters( + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), + ) httpListener := cmuxServer.Match(cmux.Any()) go func() { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 3cde1d21bbc..c32aeb43160 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -15,12 +15,14 @@ package app import ( + "context" "fmt" "testing" "time" opentracing "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -28,6 +30,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) func TestServerError(t *testing.T) { @@ -40,18 +45,30 @@ func TestServerError(t *testing.T) { } func TestServer(t *testing.T) { - flagsSvc := flags.NewService(ports.AgentAdminHTTP) + flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() - querySvc := &querysvc.QueryService{} - tracer := opentracing.NoopTracer{} + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + expectedServices := []string{"test"} + spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) + + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP, - BearerTokenPropagation: true}, tracer) + server := NewServer(flagsSvc, querySvc, + &QueryOptions{Port: ports.QueryHTTP, BearerTokenPropagation: true}, + opentracing.NoopTracer{}) assert.NoError(t, server.Start()) - // TODO wait for servers to come up and test http and grpc endpoints - time.Sleep(1 * time.Second) + client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP)) + defer client.conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) + assert.NoError(t, err) + assert.Equal(t, expectedServices, res.Services) server.Close() for i := 0; i < 10; i++ { @@ -64,7 +81,7 @@ func TestServer(t *testing.T) { } func TestServerGracefulExit(t *testing.T) { - flagsSvc := flags.NewService(ports.AgentAdminHTTP) + flagsSvc := flags.NewService(ports.QueryAdminHTTP) zapCore, logs := observer.New(zap.ErrorLevel) assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.") @@ -77,11 +94,12 @@ func TestServerGracefulExit(t *testing.T) { assert.NoError(t, server.Start()) // Wait for servers to come up before we can call .Close() + // TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests. time.Sleep(1 * time.Second) server.Close() for _, logEntry := range logs.All() { assert.True(t, logEntry.Level != zap.ErrorLevel, - fmt.Sprintf("Error log found on server exit: %v", logEntry)) + "Error log found on server exit: %v", logEntry) } }