Skip to content

Commit

Permalink
refactor(runtime/v2): untie runtimev2 from the legacy usage of gRPC (#…
Browse files Browse the repository at this point in the history
…22043)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
testinginprod and coderabbitai[bot] authored Oct 3, 2024
1 parent 947ffe0 commit 0f63ade
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 116 deletions.
12 changes: 5 additions & 7 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"errors"
"slices"

gogoproto "github.com/cosmos/gogoproto/proto"

runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/registry"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
Expand Down Expand Up @@ -43,9 +42,8 @@ type App[T transaction.Tx] struct {
amino registry.AminoRegistrar
moduleManager *MM[T]

// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
// QueryHandlers defines the query handlers
QueryHandlers map[string]appmodulev2.Handler

storeLoader StoreLoader
}
Expand Down Expand Up @@ -120,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return a.GRPCMethodsToMessageMap
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
return a.QueryHandlers
}
156 changes: 88 additions & 68 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,46 +615,46 @@ func (m *MM[T]) assertNoForgottenModules(
}

func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
grpcQueryDecoders: map[string]func() gogoproto.Message{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}

// case module with services
if services, ok := s.(hasServicesV1); ok {
c := &configurator{
queryHandlers: map[string]appmodulev2.Handler{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}
if err := services.RegisterServices(c); err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
} else {
// If module not implement RegisterServices, register msg & query handler.
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterMsgHandlers(&wrapper)
if wrapper.error != nil {
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
}
}

if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterQueryHandlers(&wrapper)

for path, decoder := range wrapper.decoders {
app.GRPCMethodsToMessageMap[path] = decoder
}
if c.err != nil {
app.logger.Warn("error registering services", "error", c.err)
}

// merge maps
for path, decoder := range c.queryHandlers {
app.QueryHandlers[path] = decoder
}
}

if c.err != nil {
app.logger.Warn("error registering services", "error", c.err)
// if module implements register msg handlers
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterMsgHandlers(&wrapper)
if wrapper.error != nil {
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
}
}

// merge maps
for path, decoder := range c.grpcQueryDecoders {
app.GRPCMethodsToMessageMap[path] = decoder
// if module implements register query handlers
if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.queryRouterBuilder}
module.RegisterQueryHandlers(&wrapper)

for path, handler := range wrapper.handlers {
app.QueryHandlers[path] = handler
}
}

return nil
Expand All @@ -663,9 +663,7 @@ func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], re
var _ grpc.ServiceRegistrar = (*configurator)(nil)

type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func() gogoproto.Message
queryHandlers map[string]appmodulev2.Handler

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
Expand Down Expand Up @@ -697,61 +695,59 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
// TODO(tip): what if a query is not deterministic?
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)

handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register query handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
return fmt.Errorf("unable to make a appmodulev2.HandlerFunc from gRPC handler (%s, %s): %w", sd.ServiceName, md.MethodName, err)
}

// register gRPC query method.
typ := gogoproto.MessageType(requestFullName)
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func() gogoproto.Message {
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
// register to stf query router.
err = c.stfQueryRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
if err != nil {
return fmt.Errorf("unable to register handler to stf router (%s, %s): %w", sd.ServiceName, md.MethodName, err)
}
methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName)
c.grpcQueryDecoders[methodName] = decoderFunc

// register query handler using the same mapping used in stf
c.queryHandlers[gogoproto.MessageName(handler.MakeMsg())] = handler
}
return nil
}

func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
if err != nil {
return err
}
err = c.stfMsgRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
if err != nil {
return fmt.Errorf("unable to register msg handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
}
}
return nil
}

// requestFullNameFromMethodDesc returns the fully-qualified name of the request message of the provided service's method.
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, error) {
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return "", fmt.Errorf("invalid method descriptor %s", methodFullName)
}
return methodDesc.Input().FullName(), nil
}

func registerMethod(
stfRouter *stf.MsgRouterBuilder,
// grpcHandlerToAppModuleHandler converts a gRPC handler into an appmodulev2.HandlerFunc.
func grpcHandlerToAppModuleHandler(
sd *grpc.ServiceDesc,
md grpc.MethodDesc,
ss interface{},
) (string, error) {
requestName, err := requestFullNameFromMethodDesc(sd, md)
) (appmodulev2.Handler, error) {
requestName, responseName, err := requestFullNameFromMethodDesc(sd, md)
if err != nil {
return "", err
return appmodulev2.Handler{}, err
}

return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
requestTyp := gogoproto.MessageType(string(requestName))
if requestTyp == nil {
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", requestName)
}
responseTyp := gogoproto.MessageType(string(responseName))
if responseTyp == nil {
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", responseName)
}

handlerFunc := func(
ctx context.Context,
msg transaction.Msg,
) (resp transaction.Msg, err error) {
Expand All @@ -760,7 +756,17 @@ func registerMethod(
return nil, err
}
return res.(transaction.Msg), nil
})
}

return appmodulev2.Handler{
Func: handlerFunc,
MakeMsg: func() transaction.Msg {
return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
},
MakeMsgResp: func() transaction.Msg {
return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
},
}, nil
}

func noopDecoder(_ interface{}) error { return nil }
Expand All @@ -776,6 +782,20 @@ func messagePassingInterceptor(msg transaction.Msg) grpc.UnaryServerInterceptor
}
}

// requestFullNameFromMethodDesc returns the fully-qualified name of the request message and response of the provided service's method.
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, protoreflect.FullName, error) {
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return "", "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return "", "", fmt.Errorf("invalid method descriptor %s", methodFullName)
}
return methodDesc.Input().FullName(), methodDesc.Output().FullName(), nil
}

// defaultMigrationsOrder returns a default migrations order: ascending alphabetical by module name,
// except x/auth which will run last, see:
// https://github.com/cosmos/cosmos-sdk/issues/10591
Expand Down Expand Up @@ -815,7 +835,7 @@ type stfRouterWrapper struct {

error error

decoders map[string]func() gogoproto.Message
handlers map[string]appmodulev2.Handler
}

func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) {
Expand All @@ -831,7 +851,7 @@ func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) {

// also make the decoder
if s.error == nil {
s.decoders = map[string]func() gogoproto.Message{}
s.handlers = map[string]appmodulev2.Handler{}
}
s.decoders[requestName] = handler.MakeMsg
s.handlers[requestName] = handler
}
14 changes: 7 additions & 7 deletions runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func ProvideAppBuilder[T transaction.Tx](

msgRouterBuilder := stf.NewMsgRouterBuilder()
app := &App[T]{
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
GRPCMethodsToMessageMap: map[string]func() proto.Message{},
storeLoader: DefaultStoreLoader,
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
QueryHandlers: map[string]appmodulev2.Handler{},
storeLoader: DefaultStoreLoader,
}
appBuilder := &AppBuilder[T]{app: app}

Expand Down
35 changes: 29 additions & 6 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
"net"
"slices"
"strconv"
"strings"
"sync"

"github.com/cosmos/gogoproto/proto"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
Expand Down Expand Up @@ -53,7 +57,7 @@ func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.L
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
methodsMap := appI.GetGPRCMethodsToMessageMap()
methodsMap := appI.GetQueryHandlers()

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
Expand All @@ -80,21 +84,40 @@ func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
return flags
}

func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface {
Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error)
func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier interface {
Query(ctx context.Context, version uint64, msg gogoproto.Message) (gogoproto.Message, error)
},
) grpc.StreamHandler {
getRegistry := sync.OnceValues(gogoproto.MergedRegistry)

return func(srv any, stream grpc.ServerStream) error {
method, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Error(codes.InvalidArgument, "unable to get method")
}
makeMsg, exists := messageMap[method]
// if this fails we cannot serve queries anymore...
registry, err := getRegistry()
if err != nil {
return fmt.Errorf("failed to get registry: %w", err)
}
fullName := protoreflect.FullName(strings.ReplaceAll(method, "/", "."))
// get descriptor from the invoke method
desc, err := registry.FindDescriptorByName(fullName)
if err != nil {
return fmt.Errorf("failed to find descriptor %s: %w", method, err)
}
md, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return fmt.Errorf("%s is not a method", method)
}
// find handler
handler, exists := handlers[string(md.Input().FullName())]
if !exists {
return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method)
}

for {
req := makeMsg()
req := handler.MakeMsg()
err := stream.RecvMsg(req)
if err != nil {
if errors.Is(err, io.EOF) {
Expand Down
Loading

0 comments on commit 0f63ade

Please sign in to comment.