diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 3570648d48ab..9c15b695176c 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -2,7 +2,6 @@ package baseapp import ( "fmt" - "reflect" "github.com/cosmos/cosmos-sdk/client/grpc/reflection" @@ -14,19 +13,13 @@ import ( codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) var protoCodec = encoding.GetCodec(proto.Name) // GRPCQueryRouter routes ABCI Query requests to GRPC handlers type GRPCQueryRouter struct { - routes map[string]GRPCQueryHandler - // returnTypes is a map of FQ method name => its return type. It is used - // for cache purposes: the first time a method handler is run, we save its - // return type in this map. Then, on subsequent method handler calls, we - // decode the ABCI response bytes using the cached return type. - returnTypes map[string]reflect.Type + routes map[string]GRPCQueryHandler interfaceRegistry codectypes.InterfaceRegistry serviceData []serviceData } @@ -42,8 +35,7 @@ var _ gogogrpc.Server = &GRPCQueryRouter{} // NewGRPCQueryRouter creates a new GRPCQueryRouter func NewGRPCQueryRouter() *GRPCQueryRouter { return &GRPCQueryRouter{ - returnTypes: map[string]reflect.Type{}, - routes: map[string]GRPCQueryHandler{}, + routes: map[string]GRPCQueryHandler{}, } } @@ -98,17 +90,8 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf if qrt.interfaceRegistry != nil { return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry) } - return nil }, nil) - - // If it's the first time we call this handler, then we save - // the return type of the handler in the `returnTypes` map. - // The return type will be used for decoding subsequent requests. - if _, found := qrt.returnTypes[fqName]; !found { - qrt.returnTypes[fqName] = reflect.TypeOf(res) - } - if err != nil { return abci.ResponseQuery{}, err } @@ -144,16 +127,3 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In reflection.NewReflectionServiceServer(interfaceRegistry), ) } - -// returnTypeOf returns the return type of a gRPC method handler. With the way the -// `returnTypes` cache map is set up, the return type of a method handler is -// guaranteed to be found if it's retrieved **after** the method handler ran at -// least once. If not, then a logic error is return. -func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) { - returnType, found := qrt.returnTypes[method] - if !found { - return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method) - } - - return returnType, nil -} diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index c1db08a555aa..a4342e0b8955 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -2,78 +2,67 @@ package baseapp import ( "context" - "reflect" + "strconv" gogogrpc "github.com/gogo/protobuf/grpc" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" - "github.com/cosmos/cosmos-sdk/client" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" - "github.com/cosmos/cosmos-sdk/types/tx" + grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" ) // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } // RegisterGRPCServer registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) { - // Define an interceptor for all gRPC queries: this interceptor will route - // the query through the `clientCtx`, which itself queries Tendermint. - interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) { - // Two things can happen here: - // 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly, - // 2. or we are querying for state, in which case we call ABCI's Query. - - // Case 1. Broadcasting a Tx. - if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { - if !ok { - return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) - } - - return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto) +func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { + // Define an interceptor for all gRPC queries: this interceptor will create + // a new sdk.Context, and pass it into the query handler. + interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + // If there's some metadata in the context, retrieve it. + md, ok := metadata.FromIncomingContext(grpcCtx) + if !ok { + return nil, status.Error(codes.Internal, "unable to retrieve metadata") } - // Case 2. Querying state. - inMd, _ := metadata.FromIncomingContext(grpcCtx) - abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd) - if err != nil { - return nil, err + // Get height header from the request context, if present. + var height int64 + if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 { + height, err = strconv.ParseInt(heightHeaders[0], 10, 64) + if err != nil { + return nil, sdkerrors.Wrapf( + sdkerrors.ErrInvalidRequest, + "Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err) + } + if err := checkNegativeHeight(height); err != nil { + return nil, err + } } - // We need to know the return type of the grpc method for - // unmarshalling abciRes.Value. - // - // When we call each method handler for the first time, we save its - // return type in the `returnTypes` map (see the method handler in - // `grpcrouter.go`). By this time, the method handler has already run - // at least once (in the RunGRPCQuery call), so we're sure the - // returnType maps is populated for this method. We're retrieving it - // for decoding. - returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod) + // Create the sdk.Context. Passing false as 2nd arg, as we can't + // actually support proofs with gRPC right now. + sdkCtx, err := app.createQueryContext(height, false) if err != nil { return nil, err } - // returnType is a pointer to a struct. Here, we're creating res which - // is a new pointer to the underlying struct. - res := reflect.New(returnType.Elem()).Interface() + // Attach the sdk.Context into the gRPC's context.Context. + grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) - err = protoCodec.Unmarshal(abciRes.Value, res) - if err != nil { - return nil, err - } - - // Send the metadata header back. The metadata currently includes: - // - block height. - err = grpc.SendHeader(grpcCtx, outMd) - if err != nil { - return nil, err + // Add relevant gRPC headers + if height == 0 { + height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest } + md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10)) + grpc.SetHeader(grpcCtx, md) - return res, nil + return handler(grpcCtx, req) } // Loop through all services and methods, add the interceptor, and register diff --git a/client/grpc_query.go b/client/grpc_query.go index 011523944c54..c587fe996e17 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -24,86 +24,51 @@ var _ gogogrpc.ClientConn = Context{} var protoCodec = encoding.GetCodec(proto.Name) // Invoke implements the grpc ClientConn.Invoke method -func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) { +func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) { // Two things can happen here: // 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly, // 2. or we are querying for state, in which case we call ABCI's Query. - // In both cases, we don't allow empty request req (it will panic unexpectedly). - if reflect.ValueOf(req).IsNil() { + // In both cases, we don't allow empty request args (it will panic unexpectedly). + if reflect.ValueOf(args).IsNil() { return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil") } // Case 1. Broadcasting a Tx. - if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { + if isBroadcast(method) { + req, ok := args.(*tx.BroadcastTxRequest) if !ok { - return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) + return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args) } - resProto, ok := reply.(*tx.BroadcastTxResponse) + res, ok := reply.(*tx.BroadcastTxResponse) if !ok { - return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req) + return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args) } - broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto) + broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req) if err != nil { return err } - *resProto = *broadcastRes + *res = *broadcastRes return err } // Case 2. Querying state. - inMd, _ := metadata.FromOutgoingContext(grpcCtx) - abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd) + reqBz, err := protoCodec.Marshal(args) if err != nil { return err } - err = protoCodec.Unmarshal(abciRes.Value, reply) - if err != nil { - return err - } - - for _, callOpt := range opts { - header, ok := callOpt.(grpc.HeaderCallOption) - if !ok { - continue - } - - *header.HeaderAddr = outMd - } - - if ctx.InterfaceRegistry != nil { - return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) - } - - return nil -} - -// NewStream implements the grpc ClientConn.NewStream method -func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, fmt.Errorf("streaming rpc not supported") -} - -// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary -// arguments for the gRPC method, and returns the ABCI response. It is used -// to factorize code between client (Invoke) and server (RegisterGRPCServer) -// gRPC handlers. -func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) { - reqBz, err := protoCodec.Marshal(req) - if err != nil { - return abci.ResponseQuery{}, nil, err - } - // parse height header + md, _ := metadata.FromOutgoingContext(grpcCtx) if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { height, err := strconv.ParseInt(heights[0], 10, 64) if err != nil { - return abci.ResponseQuery{}, nil, err + return err } if height < 0 { - return abci.ResponseQuery{}, nil, sdkerrors.Wrapf( + return sdkerrors.Wrapf( sdkerrors.ErrInvalidRequest, "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) } @@ -111,14 +76,20 @@ func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req int ctx = ctx.WithHeight(height) } - abciReq := abci.RequestQuery{ - Path: method, - Data: reqBz, + req := abci.RequestQuery{ + Path: method, + Data: reqBz, + Height: ctx.Height, } - abciRes, err := ctx.QueryABCI(abciReq) + res, err := ctx.QueryABCI(req) if err != nil { - return abci.ResponseQuery{}, nil, err + return err + } + + err = protoCodec.Unmarshal(res.Value, reply) + if err != nil { + return err } // Create header metadata. For now the headers contain: @@ -126,7 +97,28 @@ func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req int // We then parse all the call options, if the call option is a // HeaderCallOption, then we manually set the value of that header to the // metadata. - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10)) + md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) + for _, callOpt := range opts { + header, ok := callOpt.(grpc.HeaderCallOption) + if !ok { + continue + } + + *header.HeaderAddr = md + } + + if ctx.InterfaceRegistry != nil { + return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) + } + + return nil +} + +// NewStream implements the grpc ClientConn.NewStream method +func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, fmt.Errorf("streaming rpc not supported") +} - return abciRes, md, nil +func isBroadcast(method string) bool { + return method == "/cosmos.tx.v1beta1.Service/BroadcastTx" } diff --git a/server/grpc/server.go b/server/grpc/server.go index 40a3c7716d97..0b41a57cd323 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -17,7 +17,7 @@ import ( // StartGRPCServer starts a gRPC server on the given address. func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) { grpcSrv := grpc.NewServer() - app.RegisterGRPCServer(clientCtx, grpcSrv) + app.RegisterGRPCServer(grpcSrv) // reflection allows consumers to build dynamic clients that can write // to any cosmos-sdk application without relying on application packages at compile time err := reflection.Register(grpcSrv, reflection.Config{ diff --git a/server/grpc/server_test.go b/server/grpc/server_test.go index 7f3c7a742cb4..9119b1ec25be 100644 --- a/server/grpc/server_test.go +++ b/server/grpc/server_test.go @@ -104,7 +104,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() { ) s.Require().NoError(err) blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader) - s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height. + s.Require().Equal([]string{"1"}, blockHeight) } func (s *IntegrationTestSuite) TestGRPCServer_Reflection() { @@ -163,6 +163,8 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() { Events: []string{"message.action='send'"}, }, ) + // TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this + // should not error anymore. s.Require().NoError(err) } @@ -199,9 +201,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() { value string wantErr string }{ - {"-1", "\"x-cosmos-block-height\" must be >= 0"}, + {"-1", "height < 0"}, {"9223372036854775808", "value out of range"}, // > max(int64) by 1 - {"-10", "\"x-cosmos-block-height\" must be >= 0"}, + {"-10", "height < 0"}, {"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64) {"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64 } diff --git a/server/types/app.go b/server/types/app.go index cf6b6ad9a1ff..467f627c605f 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -43,7 +43,7 @@ type ( // RegisterGRPCServer registers gRPC services directly with the gRPC // server. - RegisterGRPCServer(client.Context, grpc.Server) + RegisterGRPCServer(grpc.Server) // RegisterTxService registers the gRPC Query service for tx (such as tx // simulation, fetching txs by hash...).