diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index de553589c975..95186f0b16da 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -2,6 +2,7 @@ package baseapp import ( "fmt" + "reflect" gogogrpc "github.com/gogo/protobuf/grpc" abci "github.com/tendermint/tendermint/abci/types" @@ -12,13 +13,19 @@ import ( "github.com/cosmos/cosmos-sdk/client/grpc/reflection" codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) var protoCodec = encoding.GetCodec(proto.Name) // GRPCQueryRouter routes ABCI Query requests to GRPC handlers type GRPCQueryRouter struct { - routes map[string]GRPCQueryHandler + routes map[string]GRPCQueryHandler + // returnTypes is a map of FQ method name => its return type. It is used + // for cache purposes: the first time a method handler is run, we save its + // return type in this map. Then, on subsequent method handler calls, we + // decode the ABCI response bytes using the cached return type. + returnTypes map[string]reflect.Type interfaceRegistry codectypes.InterfaceRegistry serviceData []serviceData } @@ -34,7 +41,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{} // NewGRPCQueryRouter creates a new GRPCQueryRouter func NewGRPCQueryRouter() *GRPCQueryRouter { return &GRPCQueryRouter{ - routes: map[string]GRPCQueryHandler{}, + returnTypes: map[string]reflect.Type{}, + routes: map[string]GRPCQueryHandler{}, } } @@ -89,8 +97,17 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf if qrt.interfaceRegistry != nil { return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry) } + return nil }, nil) + + // If it's the first time we call this handler, then we save + // the return type of the handler in the `returnTypes` map. + // The return type will be used for decoding subsequent requests. + if _, found := qrt.returnTypes[fqName]; !found { + qrt.returnTypes[fqName] = reflect.TypeOf(res) + } + if err != nil { return abci.ResponseQuery{}, err } @@ -127,3 +144,16 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In reflection.NewReflectionServiceServer(interfaceRegistry), ) } + +// returnTypeOf returns the return type of a gRPC method handler. With the way the +// `returnTypes` cache map is set up, the return type of a method handler is +// guaranteed to be found if it's retrieved **after** the method handler ran at +// least once. If not, then a logic error is return. +func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) { + returnType, found := qrt.returnTypes[method] + if !found { + return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method) + } + + return returnType, nil +} diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index a4342e0b8955..c1db08a555aa 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -2,67 +2,78 @@ package baseapp import ( "context" - "strconv" + "reflect" gogogrpc "github.com/gogo/protobuf/grpc" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/client" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" - grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" + "github.com/cosmos/cosmos-sdk/types/tx" ) // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } // RegisterGRPCServer registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { - // Define an interceptor for all gRPC queries: this interceptor will create - // a new sdk.Context, and pass it into the query handler. - interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - // If there's some metadata in the context, retrieve it. - md, ok := metadata.FromIncomingContext(grpcCtx) - if !ok { - return nil, status.Error(codes.Internal, "unable to retrieve metadata") - } +func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) { + // Define an interceptor for all gRPC queries: this interceptor will route + // the query through the `clientCtx`, which itself queries Tendermint. + interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) { + // Two things can happen here: + // 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly, + // 2. or we are querying for state, in which case we call ABCI's Query. - // Get height header from the request context, if present. - var height int64 - if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 { - height, err = strconv.ParseInt(heightHeaders[0], 10, 64) - if err != nil { - return nil, sdkerrors.Wrapf( - sdkerrors.ErrInvalidRequest, - "Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err) - } - if err := checkNegativeHeight(height); err != nil { - return nil, err + // Case 1. Broadcasting a Tx. + if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { + if !ok { + return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) } + + return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto) + } + + // Case 2. Querying state. + inMd, _ := metadata.FromIncomingContext(grpcCtx) + abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd) + if err != nil { + return nil, err } - // Create the sdk.Context. Passing false as 2nd arg, as we can't - // actually support proofs with gRPC right now. - sdkCtx, err := app.createQueryContext(height, false) + // We need to know the return type of the grpc method for + // unmarshalling abciRes.Value. + // + // When we call each method handler for the first time, we save its + // return type in the `returnTypes` map (see the method handler in + // `grpcrouter.go`). By this time, the method handler has already run + // at least once (in the RunGRPCQuery call), so we're sure the + // returnType maps is populated for this method. We're retrieving it + // for decoding. + returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod) if err != nil { return nil, err } - // Attach the sdk.Context into the gRPC's context.Context. - grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) + // returnType is a pointer to a struct. Here, we're creating res which + // is a new pointer to the underlying struct. + res := reflect.New(returnType.Elem()).Interface() - // Add relevant gRPC headers - if height == 0 { - height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest + err = protoCodec.Unmarshal(abciRes.Value, res) + if err != nil { + return nil, err + } + + // Send the metadata header back. The metadata currently includes: + // - block height. + err = grpc.SendHeader(grpcCtx, outMd) + if err != nil { + return nil, err } - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10)) - grpc.SetHeader(grpcCtx, md) - return handler(grpcCtx, req) + return res, nil } // Loop through all services and methods, add the interceptor, and register diff --git a/client/grpc_query.go b/client/grpc_query.go index fc8cdeeb80d5..011523944c54 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -24,51 +24,86 @@ var _ gogogrpc.ClientConn = Context{} var protoCodec = encoding.GetCodec(proto.Name) // Invoke implements the grpc ClientConn.Invoke method -func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) { +func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) { // Two things can happen here: // 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly, // 2. or we are querying for state, in which case we call ABCI's Query. - // In both cases, we don't allow empty request args (it will panic unexpectedly). - if reflect.ValueOf(args).IsNil() { + // In both cases, we don't allow empty request req (it will panic unexpectedly). + if reflect.ValueOf(req).IsNil() { return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil") } // Case 1. Broadcasting a Tx. - if isBroadcast(method) { - req, ok := args.(*tx.BroadcastTxRequest) + if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { if !ok { - return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args) + return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) } - res, ok := reply.(*tx.BroadcastTxResponse) + resProto, ok := reply.(*tx.BroadcastTxResponse) if !ok { - return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args) + return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req) } - broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req) + broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto) if err != nil { return err } - *res = *broadcastRes + *resProto = *broadcastRes return err } // Case 2. Querying state. - reqBz, err := protoCodec.Marshal(args) + inMd, _ := metadata.FromOutgoingContext(grpcCtx) + abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd) if err != nil { return err } + err = protoCodec.Unmarshal(abciRes.Value, reply) + if err != nil { + return err + } + + for _, callOpt := range opts { + header, ok := callOpt.(grpc.HeaderCallOption) + if !ok { + continue + } + + *header.HeaderAddr = outMd + } + + if ctx.InterfaceRegistry != nil { + return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) + } + + return nil +} + +// NewStream implements the grpc ClientConn.NewStream method +func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, fmt.Errorf("streaming rpc not supported") +} + +// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary +// arguments for the gRPC method, and returns the ABCI response. It is used +// to factorize code between client (Invoke) and server (RegisterGRPCServer) +// gRPC handlers. +func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) { + reqBz, err := protoCodec.Marshal(req) + if err != nil { + return abci.ResponseQuery{}, nil, err + } + // parse height header - md, _ := metadata.FromOutgoingContext(grpcCtx) if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { height, err := strconv.ParseInt(heights[0], 10, 64) if err != nil { - return err + return abci.ResponseQuery{}, nil, err } if height < 0 { - return sdkerrors.Wrapf( + return abci.ResponseQuery{}, nil, sdkerrors.Wrapf( sdkerrors.ErrInvalidRequest, "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) } @@ -76,19 +111,14 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply ctx = ctx.WithHeight(height) } - req := abci.RequestQuery{ + abciReq := abci.RequestQuery{ Path: method, Data: reqBz, } - res, err := ctx.QueryABCI(req) + abciRes, err := ctx.QueryABCI(abciReq) if err != nil { - return err - } - - err = protoCodec.Unmarshal(res.Value, reply) - if err != nil { - return err + return abci.ResponseQuery{}, nil, err } // Create header metadata. For now the headers contain: @@ -96,28 +126,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply // We then parse all the call options, if the call option is a // HeaderCallOption, then we manually set the value of that header to the // metadata. - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) - for _, callOpt := range opts { - header, ok := callOpt.(grpc.HeaderCallOption) - if !ok { - continue - } - - *header.HeaderAddr = md - } - - if ctx.InterfaceRegistry != nil { - return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) - } - - return nil -} - -// NewStream implements the grpc ClientConn.NewStream method -func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, fmt.Errorf("streaming rpc not supported") -} + md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10)) -func isBroadcast(method string) bool { - return method == "/cosmos.tx.v1beta1.Service/BroadcastTx" + return abciRes, md, nil } diff --git a/server/grpc/server.go b/server/grpc/server.go index f03c1d87b9fd..d9d0be1a032c 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -8,13 +8,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/reflection" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/server/types" ) // StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) { +func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) { grpcSrv := grpc.NewServer() - app.RegisterGRPCServer(grpcSrv) + app.RegisterGRPCServer(clientCtx, grpcSrv) // Reflection allows external clients to see what services and methods // the gRPC server exposes. diff --git a/server/grpc/server_test.go b/server/grpc/server_test.go index 846ce31a5968..5750b9e2e2b7 100644 --- a/server/grpc/server_test.go +++ b/server/grpc/server_test.go @@ -92,7 +92,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() { grpc.Header(&header), ) blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader) - s.Require().Equal([]string{"1"}, blockHeight) + s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height. } func (s *IntegrationTestSuite) TestGRPCServer_Reflection() { @@ -124,8 +124,6 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() { Events: []string{"message.action=send"}, }, ) - // TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this - // should not error anymore. s.Require().NoError(err) } @@ -185,9 +183,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() { value string wantErr string }{ - {"-1", "height < 0"}, + {"-1", "\"x-cosmos-block-height\" must be >= 0"}, {"9223372036854775808", "value out of range"}, // > max(int64) by 1 - {"-10", "height < 0"}, + {"-10", "\"x-cosmos-block-height\" must be >= 0"}, {"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64) {"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64 } diff --git a/server/start.go b/server/start.go index 764a67be52f4..eef353fa7ef8 100644 --- a/server/start.go +++ b/server/start.go @@ -318,7 +318,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App grpcWebSrv *http.Server ) if config.GRPC.Enable { - grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address) + grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address) if err != nil { return err } diff --git a/server/types/app.go b/server/types/app.go index bac708729488..ba702074b2a6 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -38,7 +38,7 @@ type ( // RegisterGRPCServer registers gRPC services directly with the gRPC // server. - RegisterGRPCServer(grpc.Server) + RegisterGRPCServer(client.Context, grpc.Server) // RegisterTxService registers the gRPC Query service for tx (such as tx // simulation, fetching txs by hash...). diff --git a/testutil/network/util.go b/testutil/network/util.go index 3a66039179ef..4f36d464018c 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -93,7 +93,7 @@ func startInProcess(cfg Config, val *Validator) error { } if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(app, val.AppConfig.GRPC.Address) + grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC.Address) if err != nil { return err }