From 008281cfd6e668e85e287cd7dcd4cbb6d3aea746 Mon Sep 17 00:00:00 2001 From: c-node Date: Thu, 19 Jan 2023 01:54:57 -0600 Subject: [PATCH] Introduce light node & refactor RPC client (#681) * introduce light node * add config option to start light node * refactor `rpc/server.go/Server` to accept the `tendermint/rpc/client` interface Closes #662; Closes #663; Closes #664; Closes #682. Co-authored-by: Connor O'Hara --- config/config.go | 4 + config/defaults.go | 1 + node/full.go | 406 ++++++++++++++++++ rpc/client/client.go => node/full_client.go | 88 ++-- .../full_client_test.go | 40 +- ..._test.go => full_node_integration_test.go} | 14 +- node/{node_test.go => full_node_test.go} | 4 +- node/light.go | 20 + node/light_client.go | 180 ++++++++ node/node.go | 395 +---------------- rpc/json/service.go | 55 +-- rpc/json/service_test.go | 12 +- rpc/server.go | 7 +- 13 files changed, 726 insertions(+), 500 deletions(-) create mode 100644 node/full.go rename rpc/client/client.go => node/full_client.go (86%) rename rpc/client/client_test.go => node/full_client_test.go (95%) rename node/{integration_test.go => full_node_integration_test.go} (94%) rename node/{node_test.go => full_node_test.go} (86%) create mode 100644 node/light.go create mode 100644 node/light_client.go diff --git a/config/config.go b/config/config.go index 8d1e43aac8f..1b9b1209968 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,7 @@ const ( flagDAStartHeight = "rollmint.da_start_height" flagNamespaceID = "rollmint.namespace_id" flagFraudProofs = "rollmint.experimental_insecure_fraud_proofs" + flagLight = "rollmint.light" ) // NodeConfig stores rollmint node configuration. @@ -33,6 +34,7 @@ type NodeConfig struct { BlockManagerConfig `mapstructure:",squash"` DALayer string `mapstructure:"da_layer"` DAConfig string `mapstructure:"da_config"` + Light bool `mapstructure:"light"` } // BlockManagerConfig consists of all parameters required by BlockManagerConfig @@ -59,6 +61,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { nc.BlockTime = v.GetDuration(flagBlockTime) nsID := v.GetString(flagNamespaceID) nc.FraudProofs = v.GetBool(flagFraudProofs) + nc.Light = v.GetBool(flagLight) bytes, err := hex.DecodeString(nsID) if err != nil { return err @@ -80,4 +83,5 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(flagDAStartHeight, def.DAStartHeight, "starting DA block height (for syncing)") cmd.Flags().BytesHex(flagNamespaceID, def.NamespaceID[:], "namespace identifies (8 bytes in hex)") cmd.Flags().Bool(flagFraudProofs, def.FraudProofs, "enable fraud proofs (experimental & insecure)") + cmd.Flags().Bool(flagLight, def.Light, "run light client") } diff --git a/config/defaults.go b/config/defaults.go index 6a12576498a..37af3aac623 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -26,4 +26,5 @@ var DefaultNodeConfig = NodeConfig{ }, DALayer: "mock", DAConfig: "", + Light: false, } diff --git a/node/full.go b/node/full.go new file mode 100644 index 00000000000..a756e96afac --- /dev/null +++ b/node/full.go @@ -0,0 +1,406 @@ +package node + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" + "github.com/libp2p/go-libp2p/core/crypto" + "go.uber.org/multierr" + + abciclient "github.com/tendermint/tendermint/abci/client" + abci "github.com/tendermint/tendermint/abci/types" + llcfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" + corep2p "github.com/tendermint/tendermint/p2p" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/rollmint/block" + "github.com/celestiaorg/rollmint/config" + "github.com/celestiaorg/rollmint/da" + "github.com/celestiaorg/rollmint/da/registry" + "github.com/celestiaorg/rollmint/mempool" + mempoolv1 "github.com/celestiaorg/rollmint/mempool/v1" + "github.com/celestiaorg/rollmint/p2p" + "github.com/celestiaorg/rollmint/state/indexer" + blockidxkv "github.com/celestiaorg/rollmint/state/indexer/block/kv" + "github.com/celestiaorg/rollmint/state/txindex" + "github.com/celestiaorg/rollmint/state/txindex/kv" + "github.com/celestiaorg/rollmint/store" + "github.com/celestiaorg/rollmint/types" +) + +// prefixes used in KV store to separate main node data from DALC data +var ( + mainPrefix = "0" + dalcPrefix = "1" + indexerPrefix = "2" // indexPrefix uses "i", so using "0-2" to avoid clash +) + +const ( + // genesisChunkSize is the maximum size, in bytes, of each + // chunk in the genesis structure for the chunked API + genesisChunkSize = 16 * 1024 * 1024 // 16 MiB +) + +var _ Node = &FullNode{} + +// FullNode represents a client node in rollmint network. +// It connects all the components and orchestrates their work. +type FullNode struct { + service.BaseService + eventBus *tmtypes.EventBus + appClient abciclient.Client + + genesis *tmtypes.GenesisDoc + // cache of chunked genesis data. + genChunks []string + + conf config.NodeConfig + P2P *p2p.Client + + // TODO(tzdybal): consider extracting "mempool reactor" + Mempool mempool.Mempool + mempoolIDs *mempoolIDs + incomingTxCh chan *p2p.GossipMessage + + Store store.Store + blockManager *block.Manager + dalc da.DataAvailabilityLayerClient + + TxIndexer txindex.TxIndexer + BlockIndexer indexer.BlockIndexer + IndexerService *txindex.IndexerService + + // keep context here only because of API compatibility + // - it's used in `OnStart` (defined in service.Service interface) + ctx context.Context +} + +// NewNode creates new rollmint node. +func newFullNode( + ctx context.Context, + conf config.NodeConfig, + p2pKey crypto.PrivKey, + signingKey crypto.PrivKey, + appClient abciclient.Client, + genesis *tmtypes.GenesisDoc, + logger log.Logger, +) (*FullNode, error) { + eventBus := tmtypes.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + if err := eventBus.Start(); err != nil { + return nil, err + } + + var baseKV ds.TxnDatastore + var err error + if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing + logger.Info("WARNING: working in in-memory mode") + baseKV, err = store.NewDefaultInMemoryKVStore() + } else { + baseKV, err = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") + } + if err != nil { + return nil, err + } + + mainKV := newPrefixKV(baseKV, mainPrefix) + dalcKV := newPrefixKV(baseKV, dalcPrefix) + indexerKV := newPrefixKV(baseKV, indexerPrefix) + + client, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p")) + if err != nil { + return nil, err + } + s := store.New(ctx, mainKV) + + dalc := registry.GetClient(conf.DALayer) + if dalc == nil { + return nil, fmt.Errorf("couldn't get data availability client named '%s'", conf.DALayer) + } + err = dalc.Init(conf.NamespaceID, []byte(conf.DAConfig), dalcKV, logger.With("module", "da_client")) + if err != nil { + return nil, fmt.Errorf("data availability layer client initialization error: %w", err) + } + + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, conf, indexerKV, eventBus, logger) + if err != nil { + return nil, err + } + + mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0) + mpIDs := newMempoolIDs() + + blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager")) + if err != nil { + return nil, fmt.Errorf("BlockManager initialization error: %w", err) + } + + node := &FullNode{ + appClient: appClient, + eventBus: eventBus, + genesis: genesis, + conf: conf, + P2P: client, + blockManager: blockManager, + dalc: dalc, + Mempool: mp, + mempoolIDs: mpIDs, + incomingTxCh: make(chan *p2p.GossipMessage), + Store: s, + TxIndexer: txIndexer, + IndexerService: indexerService, + BlockIndexer: blockIndexer, + ctx: ctx, + } + + node.BaseService = *service.NewBaseService(logger, "Node", node) + + node.P2P.SetTxValidator(node.newTxValidator()) + node.P2P.SetHeaderValidator(node.newHeaderValidator()) + node.P2P.SetCommitValidator(node.newCommitValidator()) + node.P2P.SetFraudProofValidator(node.newFraudProofValidator()) + + return node, nil +} + +// initGenesisChunks creates a chunked format of the genesis document to make it easier to +// iterate through larger genesis structures. +func (n *FullNode) initGenesisChunks() error { + if n.genChunks != nil { + return nil + } + + if n.genesis == nil { + return nil + } + + data, err := json.Marshal(n.genesis) + if err != nil { + return err + } + + for i := 0; i < len(data); i += genesisChunkSize { + end := i + genesisChunkSize + + if end > len(data) { + end = len(data) + } + + n.genChunks = append(n.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) + } + + return nil +} + +func (n *FullNode) headerPublishLoop(ctx context.Context) { + for { + select { + case signedHeader := <-n.blockManager.HeaderOutCh: + headerBytes, err := signedHeader.MarshalBinary() + if err != nil { + n.Logger.Error("failed to serialize signed block header", "error", err) + } + err = n.P2P.GossipSignedHeader(ctx, headerBytes) + if err != nil { + n.Logger.Error("failed to gossip signed block header", "error", err) + } + case <-ctx.Done(): + return + } + } +} + +// OnStart is a part of Service interface. +func (n *FullNode) OnStart() error { + n.Logger.Info("starting P2P client") + err := n.P2P.Start(n.ctx) + if err != nil { + return fmt.Errorf("error while starting P2P client: %w", err) + } + err = n.dalc.Start() + if err != nil { + return fmt.Errorf("error while starting data availability layer client: %w", err) + } + if n.conf.Aggregator { + n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime) + go n.blockManager.AggregationLoop(n.ctx) + go n.headerPublishLoop(n.ctx) + } + go n.blockManager.RetrieveLoop(n.ctx) + go n.blockManager.SyncLoop(n.ctx) + + return nil +} + +// GetGenesis returns entire genesis doc. +func (n *FullNode) GetGenesis() *tmtypes.GenesisDoc { + return n.genesis +} + +// GetGenesisChunks returns chunked version of genesis. +func (n *FullNode) GetGenesisChunks() ([]string, error) { + err := n.initGenesisChunks() + if err != nil { + return nil, err + } + return n.genChunks, err +} + +// OnStop is a part of Service interface. +func (n *FullNode) OnStop() { + err := n.dalc.Stop() + err = multierr.Append(err, n.P2P.Close()) + n.Logger.Error("errors while stopping node:", "errors", err) +} + +// OnReset is a part of Service interface. +func (n *FullNode) OnReset() error { + panic("OnReset - not implemented!") +} + +// SetLogger sets the logger used by node. +func (n *FullNode) SetLogger(logger log.Logger) { + n.Logger = logger +} + +// GetLogger returns logger. +func (n *FullNode) GetLogger() log.Logger { + return n.Logger +} + +// EventBus gives access to Node's event bus. +func (n *FullNode) EventBus() *tmtypes.EventBus { + return n.eventBus +} + +// AppClient returns ABCI proxy connections to communicate with application. +func (n *FullNode) AppClient() abciclient.Client { + return n.appClient +} + +// newTxValidator creates a pubsub validator that uses the node's mempool to check the +// transaction. If the transaction is valid, then it is added to the mempool +func (n *FullNode) newTxValidator() p2p.GossipValidator { + return func(m *p2p.GossipMessage) bool { + n.Logger.Debug("transaction received", "bytes", len(m.Data)) + checkTxResCh := make(chan *abci.Response, 1) + err := n.Mempool.CheckTx(m.Data, func(resp *abci.Response) { + checkTxResCh <- resp + }, mempool.TxInfo{ + SenderID: n.mempoolIDs.GetForPeer(m.From), + SenderP2PID: corep2p.ID(m.From), + }) + switch { + case errors.Is(err, mempool.ErrTxInCache): + return true + case errors.Is(err, mempool.ErrMempoolIsFull{}): + return true + case errors.Is(err, mempool.ErrTxTooLarge{}): + return false + case errors.Is(err, mempool.ErrPreCheck{}): + return false + default: + } + res := <-checkTxResCh + checkTxResp := res.GetCheckTx() + + return checkTxResp.Code == abci.CodeTypeOK + } +} + +// newHeaderValidator returns a pubsub validator that runs basic checks and forwards +// the deserialized header for further processing +func (n *FullNode) newHeaderValidator() p2p.GossipValidator { + return func(headerMsg *p2p.GossipMessage) bool { + n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data)) + var header types.SignedHeader + err := header.UnmarshalBinary(headerMsg.Data) + if err != nil { + n.Logger.Error("failed to deserialize header", "error", err) + return false + } + err = header.ValidateBasic() + if err != nil { + n.Logger.Error("failed to validate header", "error", err) + return false + } + n.blockManager.HeaderInCh <- &header + return true + } +} + +// newCommitValidator returns a pubsub validator that runs basic checks and forwards +// the deserialized commit for further processing +func (n *FullNode) newCommitValidator() p2p.GossipValidator { + return func(commitMsg *p2p.GossipMessage) bool { + n.Logger.Debug("commit received", "from", commitMsg.From, "bytes", len(commitMsg.Data)) + var commit types.Commit + err := commit.UnmarshalBinary(commitMsg.Data) + if err != nil { + n.Logger.Error("failed to deserialize commit", "error", err) + return false + } + err = commit.ValidateBasic() + if err != nil { + n.Logger.Error("failed to validate commit", "error", err) + return false + } + n.Logger.Debug("commit received", "height", commit.Height) + n.blockManager.CommitInCh <- &commit + return true + } +} + +// newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards +// it to be verified +func (n *FullNode) newFraudProofValidator() p2p.GossipValidator { + return func(fraudProofMsg *p2p.GossipMessage) bool { + n.Logger.Debug("fraud proof received", "from", fraudProofMsg.From, "bytes", len(fraudProofMsg.Data)) + var fraudProof types.FraudProof + err := fraudProof.UnmarshalBinary(fraudProofMsg.Data) + if err != nil { + n.Logger.Error("failed to deserialize fraud proof", "error", err) + return false + } + // TODO(manav): Add validation checks for fraud proof here + n.blockManager.FraudProofCh <- &fraudProof + return true + } +} + +func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { + return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) +} + +func createAndStartIndexerService( + ctx context.Context, + conf config.NodeConfig, + kvStore ds.TxnDatastore, + eventBus *tmtypes.EventBus, + logger log.Logger, +) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { + + var ( + txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer + ) + + txIndexer = kv.NewTxIndex(ctx, kvStore) + blockIndexer = blockidxkv.New(ctx, newPrefixKV(kvStore, "block_events")) + + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) + + if err := indexerService.Start(); err != nil { + return nil, nil, nil, err + } + + return indexerService, txIndexer, blockIndexer, nil +} diff --git a/rpc/client/client.go b/node/full_client.go similarity index 86% rename from rpc/client/client.go rename to node/full_client.go index 31e4df20e27..07c6e9d3b6f 100644 --- a/rpc/client/client.go +++ b/node/full_client.go @@ -1,4 +1,4 @@ -package client +package node import ( "context" @@ -25,7 +25,6 @@ import ( rconfig "github.com/celestiaorg/rollmint/config" abciconv "github.com/celestiaorg/rollmint/conv/abci" "github.com/celestiaorg/rollmint/mempool" - "github.com/celestiaorg/rollmint/node" ) const ( @@ -41,29 +40,33 @@ var ( ErrConsensusStateNotAvailable = errors.New("consensus state not available in rollmint") ) -var _ rpcclient.Client = &Client{} +var _ rpcclient.Client = &FullClient{} -// Client implements tendermint RPC client interface. +// FullClient implements tendermint RPC client interface. // // This is the type that is used in communication between cosmos-sdk app and rollmint. -type Client struct { +type FullClient struct { *types.EventBus config *config.RPCConfig - node *node.Node + node *FullNode } -// NewClient returns Client working with given node. -func NewClient(node *node.Node) *Client { - return &Client{ +// NewFullClient returns Client working with given node. +func NewFullClient(node *FullNode) *FullClient { + return &FullClient{ EventBus: node.EventBus(), config: config.DefaultRPCConfig(), node: node, } } +func (n *FullNode) GetClient() rpcclient.Client { + return NewFullClient(n) +} + // ABCIInfo returns basic information about application state. -func (c *Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { +func (c *FullClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { resInfo, err := c.appClient().InfoSync(proxy.RequestInfo) if err != nil { return nil, err @@ -72,12 +75,12 @@ func (c *Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { } // ABCIQuery queries for data from application. -func (c *Client) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { +func (c *FullClient) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions) } // ABCIQueryWithOptions queries for data from application. -func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { +func (c *FullClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { resQuery, err := c.appClient().QuerySync(abci.RequestQuery{ Path: path, Data: data, @@ -93,7 +96,7 @@ func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmb // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit -func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *FullClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // This implementation corresponds to Tendermints implementation from rpc/core/mempool.go. // ctx.RemoteAddr godoc: If neither HTTPReq nor WSConn is set, an empty string is returned. // This code is a local client, so we can assume that subscriber is "" @@ -184,7 +187,7 @@ func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re // BroadcastTxAsync returns right away, with no response. Does not wait for // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async -func (c *Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *FullClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := c.node.Mempool.CheckTx(tx, nil, mempool.TxInfo{}) if err != nil { return nil, err @@ -200,7 +203,7 @@ func (c *Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.Res // BroadcastTxSync returns with the response from CheckTx. Does not wait for // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync -func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *FullClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := c.node.Mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res @@ -236,7 +239,7 @@ func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.Resu } // Subscribe subscribe given subscriber to a query. -func (c *Client) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { +func (c *FullClient) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { q, err := tmquery.New(query) if err != nil { return nil, fmt.Errorf("failed to parse query: %w", err) @@ -264,7 +267,7 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string, outCap } // Unsubscribe unsubscribes given subscriber from a query. -func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error { +func (c *FullClient) Unsubscribe(ctx context.Context, subscriber, query string) error { q, err := tmquery.New(query) if err != nil { return fmt.Errorf("failed to parse query: %w", err) @@ -273,12 +276,12 @@ func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) erro } // Genesis returns entire genesis. -func (c *Client) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { +func (c *FullClient) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { return &ctypes.ResultGenesis{Genesis: c.node.GetGenesis()}, nil } // GenesisChunked returns given chunk of genesis. -func (c *Client) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { +func (c *FullClient) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { genChunks, err := c.node.GetGenesisChunks() if err != nil { return nil, fmt.Errorf("error while creating chunks of the genesis document: %w", err) @@ -304,7 +307,7 @@ func (c *Client) GenesisChunked(context context.Context, id uint) (*ctypes.Resul } // BlockchainInfo returns ABCI block meta information for given height range. -func (c *Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { +func (c *FullClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { const limit int64 = 20 // Currently blocks are not pruned and are synced linearly so the base height is 0 @@ -342,7 +345,7 @@ func (c *Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) } // NetInfo returns basic information about client P2P connections. -func (c *Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { +func (c *FullClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { res := ctypes.ResultNetInfo{ Listening: true, } @@ -364,19 +367,19 @@ func (c *Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { } // DumpConsensusState always returns error as there is no consensus state in rollmint. -func (c *Client) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { +func (c *FullClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { return nil, ErrConsensusStateNotAvailable } // ConsensusState always returns error as there is no consensus state in rollmint. -func (c *Client) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { +func (c *FullClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { return nil, ErrConsensusStateNotAvailable } // ConsensusParams returns consensus params at given height. // // Currently, consensus params changes are not supported and this method returns params as defined in genesis. -func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { +func (c *FullClient) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { // TODO(tzdybal): implement consensus params handling: https://github.com/celestiaorg/rollmint/issues/291 params := c.node.GetGenesis().ConsensusParams return &ctypes.ResultConsensusParams{ @@ -403,14 +406,14 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Re } // Health endpoint returns empty value. It can be used to monitor service availability. -func (c *Client) Health(ctx context.Context) (*ctypes.ResultHealth, error) { +func (c *FullClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { return &ctypes.ResultHealth{}, nil } // Block method returns BlockID and block itself for given height. // // If height is nil, it returns information about last known block. -func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { +func (c *FullClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { heightValue := c.normalizeHeight(height) block, err := c.node.Store.LoadBlock(heightValue) if err != nil { @@ -434,7 +437,7 @@ func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, } // BlockByHash returns BlockID and block itself for given hash. -func (c *Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { +func (c *FullClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { block, err := c.node.Store.LoadBlockByHash(hash) if err != nil { return nil, err @@ -457,7 +460,7 @@ func (c *Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBl } // BlockResults returns information about transactions, events and updates of validator set and consensus params. -func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { +func (c *FullClient) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { var h uint64 if height == nil { h = c.node.Store.Height() @@ -480,7 +483,7 @@ func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.Resul } // Commit returns signed header (aka commit) at given height. -func (c *Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { +func (c *FullClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { heightValue := c.normalizeHeight(height) com, err := c.node.Store.LoadCommit(heightValue) if err != nil { @@ -500,7 +503,7 @@ func (c *Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommi } // Validators returns paginated list of validators at given height. -func (c *Client) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { +func (c *FullClient) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { height := c.normalizeHeight(heightPtr) validators, err := c.node.Store.LoadValidators(height) if err != nil { @@ -525,7 +528,7 @@ func (c *Client) Validators(ctx context.Context, heightPtr *int64, pagePtr, perP } // Tx returns detailed information about transaction identified by its hash. -func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { +func (c *FullClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { res, err := c.node.TxIndexer.Get(hash) if err != nil { return nil, err @@ -560,7 +563,7 @@ func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Resul } // TxSearch returns detailed information about transactions matching query. -func (c *Client) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { +func (c *FullClient) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { q, err := tmquery.New(query) if err != nil { return nil, err @@ -628,7 +631,7 @@ func (c *Client) TxSearch(ctx context.Context, query string, prove bool, pagePtr // BlockSearch defines a method to search for a paginated set of blocks by // BeginBlock and EndBlock event search criteria. -func (c *Client) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { +func (c *FullClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { q, err := tmquery.New(query) if err != nil { return nil, err @@ -689,7 +692,7 @@ func (c *Client) BlockSearch(ctx context.Context, query string, page, perPage *i } // Status returns detailed information about current status of the node. -func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { +func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { latest, err := c.node.Store.LoadBlock(c.node.Store.Height()) if err != nil { return nil, fmt.Errorf("failed to find latest block: %w", err) @@ -752,14 +755,14 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { } // BroadcastEvidence is not yet implemented. -func (c *Client) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { +func (c *FullClient) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { return &ctypes.ResultBroadcastEvidence{ Hash: evidence.Hash(), }, nil } // NumUnconfirmedTxs returns information about transactions in mempool. -func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { +func (c *FullClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{ Count: c.node.Mempool.Size(), Total: c.node.Mempool.Size(), @@ -769,7 +772,7 @@ func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirm } // UnconfirmedTxs returns transactions in mempool. -func (c *Client) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { +func (c *FullClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit := validatePerPage(limitPtr) @@ -784,7 +787,7 @@ func (c *Client) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.Res // CheckTx executes a new transaction against the application to determine its validity. // // If valid, the tx is automatically added to the mempool. -func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { +func (c *FullClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { res, err := c.appClient().CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err @@ -792,7 +795,8 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckT return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil } -func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { +func (c *FullClient) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { + defer close(outc) for { select { case msg := <-sub.Out(): @@ -823,7 +827,7 @@ func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpu } // Try to resubscribe with exponential backoff. -func (c *Client) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { +func (c *FullClient) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { attempts := 0 for { if !c.IsRunning() { @@ -840,11 +844,11 @@ func (c *Client) resubscribe(subscriber string, q tmpubsub.Query) types.Subscrip } } -func (c *Client) appClient() abcicli.Client { +func (c *FullClient) appClient() abcicli.Client { return c.node.AppClient() } -func (c *Client) normalizeHeight(height *int64) uint64 { +func (c *FullClient) normalizeHeight(height *int64) uint64 { var heightValue uint64 if height == nil { heightValue = c.node.Store.Height() diff --git a/rpc/client/client_test.go b/node/full_client_test.go similarity index 95% rename from rpc/client/client_test.go rename to node/full_client_test.go index 0a9550f4147..114b685f9d8 100644 --- a/rpc/client/client_test.go +++ b/node/full_client_test.go @@ -1,9 +1,8 @@ -package client +package node import ( "context" crand "crypto/rand" - cryptorand "crypto/rand" "fmt" "math/rand" "testing" @@ -31,7 +30,6 @@ import ( "github.com/celestiaorg/rollmint/config" abciconv "github.com/celestiaorg/rollmint/conv/abci" "github.com/celestiaorg/rollmint/mocks" - "github.com/celestiaorg/rollmint/node" "github.com/celestiaorg/rollmint/types" ) @@ -89,11 +87,11 @@ func TestGenesisChunked(t *testing.T) { mockApp := &mocks.Application{} mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) - privKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) - signingKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) - n, _ := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) + privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) + signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) + n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) - rpc := NewClient(n) + rpc := NewFullClient(n) var expectedID uint = 2 gc, err := rpc.GenesisChunked(context.Background(), expectedID) @@ -404,7 +402,7 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{ + node, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ @@ -416,7 +414,7 @@ func TestTx(t *testing.T) { require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) mockApp.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) mockApp.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) @@ -661,11 +659,11 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) err = node.Start() @@ -767,7 +765,7 @@ func getRandomBytes(n int) []byte { return data } -func getBlockMeta(rpc *Client, n int64) *tmtypes.BlockMeta { +func getBlockMeta(rpc *FullClient, n int64) *tmtypes.BlockMeta { b, err := rpc.node.Store.LoadBlock(uint64(n)) if err != nil { return nil @@ -780,25 +778,25 @@ func getBlockMeta(rpc *Client, n int64) *tmtypes.BlockMeta { return bmeta } -func getRPC(t *testing.T) (*mocks.Application, *Client) { +func getRPC(t *testing.T) (*mocks.Application, *FullClient) { t.Helper() require := require.New(t) app := &mocks.Application{} app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) return app, rpc } // From state/indexer/block/kv/kv_test -func indexBlocks(t *testing.T, rpc *Client, heights []int64) { +func indexBlocks(t *testing.T, rpc *FullClient, heights []int64) { t.Helper() for _, h := range heights { @@ -852,7 +850,7 @@ func TestMempool2Nodes(t *testing.T) { id1, err := peer.IDFromPrivateKey(key1) require.NoError(err) - node1, err := node.NewNode(context.Background(), config.NodeConfig{ + node1, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9001", @@ -861,7 +859,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(err) require.NotNil(node1) - node2, err := node.NewNode(context.Background(), config.NodeConfig{ + node2, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9002", @@ -882,7 +880,7 @@ func TestMempool2Nodes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - local := NewClient(node1) + local := NewFullClient(node1) require.NotNil(local) // broadcast the bad Tx, this should not be propogated or added to the local mempool @@ -936,7 +934,7 @@ func TestStatus(t *testing.T) { } } - node, err := node.NewNode( + node, err := newFullNode( context.Background(), config.NodeConfig{ DALayer: "mock", @@ -968,7 +966,7 @@ func TestStatus(t *testing.T) { err = node.Store.UpdateState(types.State{LastValidators: validatorSet, NextValidators: validatorSet, Validators: validatorSet}) assert.NoError(err) - rpc := NewClient(node) + rpc := NewFullClient(node) assert.NotNil(rpc) earliestBlock := getRandomBlockWithProposer(1, 1, validators[0].Address.Bytes()) diff --git a/node/integration_test.go b/node/full_node_integration_test.go similarity index 94% rename from node/integration_test.go rename to node/full_node_integration_test.go index 7fce22d1eda..883c3254c14 100644 --- a/node/integration_test.go +++ b/node/full_node_integration_test.go @@ -52,7 +52,7 @@ func TestAggregatorMode(t *testing.T) { BlockTime: 1 * time.Second, NamespaceID: rmtypes.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8}, } - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -191,7 +191,7 @@ func TestFraudProofTrigger(t *testing.T) { } // Creates a starts the given number of client nodes along with an aggregator node. Uses the given flag to decide whether to have the aggregator produce malicious blocks. -func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*Node, []*mocks.Application) { +func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*FullNode, []*mocks.Application) { var wg sync.WaitGroup aggCtx, aggCancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background()) @@ -209,7 +209,7 @@ func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*No // Starts the given nodes using the given wait group to synchronize them // and wait for them to gossip transactions -func startNodes(nodes []*Node, wg *sync.WaitGroup, t *testing.T) { +func startNodes(nodes []*FullNode, wg *sync.WaitGroup, t *testing.T) { numNodes := len(nodes) wg.Add((numNodes) * (numNodes - 1)) for _, n := range nodes { @@ -238,7 +238,7 @@ func startNodes(nodes []*Node, wg *sync.WaitGroup, t *testing.T) { } // Creates the given number of nodes the given nodes using the given wait group to synchornize them -func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *sync.WaitGroup, t *testing.T) ([]*Node, []*mocks.Application) { +func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *sync.WaitGroup, t *testing.T) ([]*FullNode, []*mocks.Application) { t.Helper() if aggCtx == nil { @@ -254,7 +254,7 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn keys[i], _, _ = crypto.GenerateEd25519Key(rand.Reader) } - nodes := make([]*Node, num) + nodes := make([]*FullNode, num) apps := make([]*mocks.Application, num) dalc := &mockda.DataAvailabilityLayerClient{} ds, _ := store.NewDefaultInMemoryKVStore() @@ -268,7 +268,7 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn return nodes, apps } -func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, wg *sync.WaitGroup, t *testing.T) (*Node, *mocks.Application) { +func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, wg *sync.WaitGroup, t *testing.T) (*FullNode, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface @@ -318,7 +318,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d } signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode( + node, err := newFullNode( ctx, config.NodeConfig{ P2P: p2pConfig, diff --git a/node/node_test.go b/node/full_node_test.go similarity index 86% rename from node/node_test.go rename to node/full_node_test.go index d6877945f67..81212255796 100644 --- a/node/node_test.go +++ b/node/full_node_test.go @@ -31,7 +31,7 @@ func TestStartup(t *testing.T) { app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -57,7 +57,7 @@ func TestMempoolDirectly(t *testing.T) { signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) diff --git a/node/light.go b/node/light.go new file mode 100644 index 00000000000..a8ce6ad395b --- /dev/null +++ b/node/light.go @@ -0,0 +1,20 @@ +package node + +import ( + "github.com/tendermint/tendermint/libs/service" + rpcclient "github.com/tendermint/tendermint/rpc/client" +) + +var _ Node = &LightNode{} + +type LightNode struct { + service.BaseService +} + +func (n *LightNode) GetClient() rpcclient.Client { + return NewLightClient(n) +} + +func newLightNode() (Node, error) { + return &LightNode{}, nil +} diff --git a/node/light_client.go b/node/light_client.go new file mode 100644 index 00000000000..7ca08551b89 --- /dev/null +++ b/node/light_client.go @@ -0,0 +1,180 @@ +package node + +import ( + "context" + + tmbytes "github.com/tendermint/tendermint/libs/bytes" + rpcclient "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +var _ rpcclient.Client = &LightClient{} + +type LightClient struct { + types.EventBus + node *LightNode +} + +func NewLightClient(node *LightNode) *LightClient { + return &LightClient{ + node: node, + } +} + +// ABCIInfo returns basic information about application state. +func (c *LightClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { + panic("Not implemented") +} + +// ABCIQuery queries for data from application. +func (c *LightClient) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { + panic("Not implemented") +} + +// ABCIQueryWithOptions queries for data from application. +func (c *LightClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { + panic("Not implemented") +} + +// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit +func (c *LightClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + panic("Not implemented") +} + +// BroadcastTxAsync returns right away, with no response. Does not wait for +// CheckTx nor DeliverTx results. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async +func (c *LightClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + panic("Not implemented") +} + +// BroadcastTxSync returns with the response from CheckTx. Does not wait for +// DeliverTx result. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync +func (c *LightClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + panic("Not implemented") +} + +// Subscribe subscribe given subscriber to a query. +func (c *LightClient) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { + panic("Not implemented") +} + +// Unsubscribe unsubscribes given subscriber from a query. +func (c *LightClient) Unsubscribe(ctx context.Context, subscriber, query string) error { + panic("Not implemented") +} + +// Genesis returns entire genesis. +func (c *LightClient) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { + panic("Not implemented") +} + +// GenesisChunked returns given chunk of genesis. +func (c *LightClient) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + panic("Not implemented") +} + +// BlockchainInfo returns ABCI block meta information for given height range. +func (c *LightClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { + panic("Not implemented") +} + +// NetInfo returns basic information about client P2P connections. +func (c *LightClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { + panic("Not implemented") +} + +// DumpConsensusState always returns error as there is no consensus state in rollmint. +func (c *LightClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { + panic("Not implemented") +} + +// ConsensusState always returns error as there is no consensus state in rollmint. +func (c *LightClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { + panic("Not implemented") +} + +// ConsensusParams returns consensus params at given height. +// +// Currently, consensus params changes are not supported and this method returns params as defined in genesis. +func (c *LightClient) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { + panic("Not implemented") +} + +// Health endpoint returns empty value. It can be used to monitor service availability. +func (c *LightClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { + panic("Not implemented") +} + +// Block method returns BlockID and block itself for given height. +// +// If height is nil, it returns information about last known block. +func (c *LightClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { + panic("Not implemented") +} + +// BlockByHash returns BlockID and block itself for given hash. +func (c *LightClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { + panic("Not implemented") +} + +// BlockResults returns information about transactions, events and updates of validator set and consensus params. +func (c *LightClient) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { + panic("Not implemented") +} + +// Commit returns signed header (aka commit) at given height. +func (c *LightClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { + panic("Not implemented") +} + +// Validators returns paginated list of validators at given height. +func (c *LightClient) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { + panic("Not implemented") +} + +// Tx returns detailed information about transaction identified by its hash. +func (c *LightClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + panic("Not implemented") +} + +// TxSearch returns detailed information about transactions matching query. +func (c *LightClient) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { + panic("Not implemented") +} + +// BlockSearch defines a method to search for a paginated set of blocks by +// BeginBlock and EndBlock event search criteria. +func (c *LightClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { + panic("Not implemented") +} + +// Status returns detailed information about current status of the node. +func (c *LightClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { + panic("Not implemented") +} + +// BroadcastEvidence is not yet implemented. +func (c *LightClient) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { + panic("Not implemented") +} + +// NumUnconfirmedTxs returns information about transactions in mempool. +func (c *LightClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { + panic("Not implemented") +} + +// UnconfirmedTxs returns transactions in mempool. +func (c *LightClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { + panic("Not implemented") +} + +// CheckTx executes a new transaction against the application to determine its validity. +// +// If valid, the tx is automatically added to the mempool. +func (c *LightClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + panic("Not implemented") +} diff --git a/node/node.go b/node/node.go index 0cd8592bf1b..c6ea7300b61 100644 --- a/node/node.go +++ b/node/node.go @@ -2,85 +2,22 @@ package node import ( "context" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - ds "github.com/ipfs/go-datastore" - ktds "github.com/ipfs/go-datastore/keytransform" "github.com/libp2p/go-libp2p/core/crypto" - "go.uber.org/multierr" abciclient "github.com/tendermint/tendermint/abci/client" - abci "github.com/tendermint/tendermint/abci/types" - llcfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" - corep2p "github.com/tendermint/tendermint/p2p" + rpcclient "github.com/tendermint/tendermint/rpc/client" tmtypes "github.com/tendermint/tendermint/types" - "github.com/celestiaorg/rollmint/block" "github.com/celestiaorg/rollmint/config" - "github.com/celestiaorg/rollmint/da" - "github.com/celestiaorg/rollmint/da/registry" - "github.com/celestiaorg/rollmint/mempool" - mempoolv1 "github.com/celestiaorg/rollmint/mempool/v1" - "github.com/celestiaorg/rollmint/p2p" - "github.com/celestiaorg/rollmint/state/indexer" - blockidxkv "github.com/celestiaorg/rollmint/state/indexer/block/kv" - "github.com/celestiaorg/rollmint/state/txindex" - "github.com/celestiaorg/rollmint/state/txindex/kv" - "github.com/celestiaorg/rollmint/store" - "github.com/celestiaorg/rollmint/types" ) -// prefixes used in KV store to separate main node data from DALC data -var ( - mainPrefix = "0" - dalcPrefix = "1" - indexerPrefix = "2" // indexPrefix uses "i", so using "0-2" to avoid clash -) - -const ( - // genesisChunkSize is the maximum size, in bytes, of each - // chunk in the genesis structure for the chunked API - genesisChunkSize = 16 * 1024 * 1024 // 16 MiB -) - -// Node represents a client node in rollmint network. -// It connects all the components and orchestrates their work. -type Node struct { - service.BaseService - eventBus *tmtypes.EventBus - appClient abciclient.Client - - genesis *tmtypes.GenesisDoc - // cache of chunked genesis data. - genChunks []string - - conf config.NodeConfig - P2P *p2p.Client - - // TODO(tzdybal): consider extracting "mempool reactor" - Mempool mempool.Mempool - mempoolIDs *mempoolIDs - incomingTxCh chan *p2p.GossipMessage - - Store store.Store - blockManager *block.Manager - dalc da.DataAvailabilityLayerClient - - TxIndexer txindex.TxIndexer - BlockIndexer indexer.BlockIndexer - IndexerService *txindex.IndexerService - - // keep context here only because of API compatibility - // - it's used in `OnStart` (defined in service.Service interface) - ctx context.Context +type Node interface { + Start() error + GetClient() rpcclient.Client } -// NewNode creates new rollmint node. func NewNode( ctx context.Context, conf config.NodeConfig, @@ -89,318 +26,18 @@ func NewNode( appClient abciclient.Client, genesis *tmtypes.GenesisDoc, logger log.Logger, -) (*Node, error) { - eventBus := tmtypes.NewEventBus() - eventBus.SetLogger(logger.With("module", "events")) - if err := eventBus.Start(); err != nil { - return nil, err - } - - var baseKV ds.TxnDatastore - - var err error - if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing - logger.Info("WARNING: working in in-memory mode") - baseKV, err = store.NewDefaultInMemoryKVStore() +) (Node, error) { + if !conf.Light { + return newFullNode( + ctx, + conf, + p2pKey, + signingKey, + appClient, + genesis, + logger, + ) } else { - baseKV, err = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") - } - if err != nil { - return nil, err - } - - mainKV := newPrefixKV(baseKV, mainPrefix) - dalcKV := newPrefixKV(baseKV, dalcPrefix) - indexerKV := newPrefixKV(baseKV, indexerPrefix) - - client, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p")) - if err != nil { - return nil, err - } - - s := store.New(ctx, mainKV) - - dalc := registry.GetClient(conf.DALayer) - if dalc == nil { - return nil, fmt.Errorf("couldn't get data availability client named '%s'", conf.DALayer) - } - err = dalc.Init(conf.NamespaceID, []byte(conf.DAConfig), dalcKV, logger.With("module", "da_client")) - if err != nil { - return nil, fmt.Errorf("data availability layer client initialization error: %w", err) - } - - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, conf, indexerKV, eventBus, logger) - if err != nil { - return nil, err - } - - mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0) - mpIDs := newMempoolIDs() - - blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager")) - if err != nil { - return nil, fmt.Errorf("BlockManager initialization error: %w", err) - } - - node := &Node{ - appClient: appClient, - eventBus: eventBus, - genesis: genesis, - conf: conf, - P2P: client, - blockManager: blockManager, - dalc: dalc, - Mempool: mp, - mempoolIDs: mpIDs, - incomingTxCh: make(chan *p2p.GossipMessage), - Store: s, - TxIndexer: txIndexer, - IndexerService: indexerService, - BlockIndexer: blockIndexer, - ctx: ctx, - } - - node.BaseService = *service.NewBaseService(logger, "Node", node) - - node.P2P.SetTxValidator(node.newTxValidator()) - node.P2P.SetHeaderValidator(node.newHeaderValidator()) - node.P2P.SetCommitValidator(node.newCommitValidator()) - node.P2P.SetFraudProofValidator(node.newFraudProofValidator()) - - return node, nil -} - -// initGenesisChunks creates a chunked format of the genesis document to make it easier to -// iterate through larger genesis structures. -func (n *Node) initGenesisChunks() error { - if n.genChunks != nil { - return nil - } - - if n.genesis == nil { - return nil + return newLightNode() } - - data, err := json.Marshal(n.genesis) - if err != nil { - return err - } - - for i := 0; i < len(data); i += genesisChunkSize { - end := i + genesisChunkSize - - if end > len(data) { - end = len(data) - } - - n.genChunks = append(n.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) - } - - return nil -} - -func (n *Node) headerPublishLoop(ctx context.Context) { - for { - select { - case signedHeader := <-n.blockManager.HeaderOutCh: - headerBytes, err := signedHeader.MarshalBinary() - if err != nil { - n.Logger.Error("failed to serialize signed block header", "error", err) - } - err = n.P2P.GossipSignedHeader(ctx, headerBytes) - if err != nil { - n.Logger.Error("failed to gossip signed block header", "error", err) - } - case <-ctx.Done(): - return - } - } -} - -// OnStart is a part of Service interface. -func (n *Node) OnStart() error { - n.Logger.Info("starting P2P client") - err := n.P2P.Start(n.ctx) - if err != nil { - return fmt.Errorf("error while starting P2P client: %w", err) - } - err = n.dalc.Start() - if err != nil { - return fmt.Errorf("error while starting data availability layer client: %w", err) - } - if n.conf.Aggregator { - n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime) - go n.blockManager.AggregationLoop(n.ctx) - go n.headerPublishLoop(n.ctx) - } - go n.blockManager.RetrieveLoop(n.ctx) - go n.blockManager.SyncLoop(n.ctx) - - return nil -} - -// GetGenesis returns entire genesis doc. -func (n *Node) GetGenesis() *tmtypes.GenesisDoc { - return n.genesis -} - -// GetGenesisChunks returns chunked version of genesis. -func (n *Node) GetGenesisChunks() ([]string, error) { - err := n.initGenesisChunks() - if err != nil { - return nil, err - } - return n.genChunks, err -} - -// OnStop is a part of Service interface. -func (n *Node) OnStop() { - err := n.dalc.Stop() - err = multierr.Append(err, n.P2P.Close()) - n.Logger.Error("errors while stopping node:", "errors", err) -} - -// OnReset is a part of Service interface. -func (n *Node) OnReset() error { - panic("OnReset - not implemented!") -} - -// SetLogger sets the logger used by node. -func (n *Node) SetLogger(logger log.Logger) { - n.Logger = logger -} - -// GetLogger returns logger. -func (n *Node) GetLogger() log.Logger { - return n.Logger -} - -// EventBus gives access to Node's event bus. -func (n *Node) EventBus() *tmtypes.EventBus { - return n.eventBus -} - -// AppClient returns ABCI proxy connections to communicate with application. -func (n *Node) AppClient() abciclient.Client { - return n.appClient -} - -// newTxValidator creates a pubsub validator that uses the node's mempool to check the -// transaction. If the transaction is valid, then it is added to the mempool -func (n *Node) newTxValidator() p2p.GossipValidator { - return func(m *p2p.GossipMessage) bool { - n.Logger.Debug("transaction received", "bytes", len(m.Data)) - checkTxResCh := make(chan *abci.Response, 1) - err := n.Mempool.CheckTx(m.Data, func(resp *abci.Response) { - checkTxResCh <- resp - }, mempool.TxInfo{ - SenderID: n.mempoolIDs.GetForPeer(m.From), - SenderP2PID: corep2p.ID(m.From), - }) - switch { - case errors.Is(err, mempool.ErrTxInCache): - return true - case errors.Is(err, mempool.ErrMempoolIsFull{}): - return true - case errors.Is(err, mempool.ErrTxTooLarge{}): - return false - case errors.Is(err, mempool.ErrPreCheck{}): - return false - default: - } - res := <-checkTxResCh - checkTxResp := res.GetCheckTx() - - return checkTxResp.Code == abci.CodeTypeOK - } -} - -// newHeaderValidator returns a pubsub validator that runs basic checks and forwards -// the deserialized header for further processing -func (n *Node) newHeaderValidator() p2p.GossipValidator { - return func(headerMsg *p2p.GossipMessage) bool { - n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data)) - var header types.SignedHeader - err := header.UnmarshalBinary(headerMsg.Data) - if err != nil { - n.Logger.Error("failed to deserialize header", "error", err) - return false - } - err = header.ValidateBasic() - if err != nil { - n.Logger.Error("failed to validate header", "error", err) - return false - } - n.blockManager.HeaderInCh <- &header - return true - } -} - -// newCommitValidator returns a pubsub validator that runs basic checks and forwards -// the deserialized commit for further processing -func (n *Node) newCommitValidator() p2p.GossipValidator { - return func(commitMsg *p2p.GossipMessage) bool { - n.Logger.Debug("commit received", "from", commitMsg.From, "bytes", len(commitMsg.Data)) - var commit types.Commit - err := commit.UnmarshalBinary(commitMsg.Data) - if err != nil { - n.Logger.Error("failed to deserialize commit", "error", err) - return false - } - err = commit.ValidateBasic() - if err != nil { - n.Logger.Error("failed to validate commit", "error", err) - return false - } - n.Logger.Debug("commit received", "height", commit.Height) - n.blockManager.CommitInCh <- &commit - return true - } -} - -// newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards -// it to be verified -func (n *Node) newFraudProofValidator() p2p.GossipValidator { - return func(fraudProofMsg *p2p.GossipMessage) bool { - n.Logger.Debug("fraud proof received", "from", fraudProofMsg.From, "bytes", len(fraudProofMsg.Data)) - var fraudProof types.FraudProof - err := fraudProof.UnmarshalBinary(fraudProofMsg.Data) - if err != nil { - n.Logger.Error("failed to deserialize fraud proof", "error", err) - return false - } - // TODO(manav): Add validation checks for fraud proof here - n.blockManager.FraudProofCh <- &fraudProof - return true - } -} - -func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { - return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) -} - -func createAndStartIndexerService( - ctx context.Context, - conf config.NodeConfig, - kvStore ds.TxnDatastore, - eventBus *tmtypes.EventBus, - logger log.Logger, -) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { - - var ( - txIndexer txindex.TxIndexer - blockIndexer indexer.BlockIndexer - ) - - txIndexer = kv.NewTxIndex(ctx, kvStore) - blockIndexer = blockidxkv.New(ctx, newPrefixKV(kvStore, "block_events")) - - indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) - indexerService.SetLogger(logger.With("module", "txindex")) - - if err := indexerService.Start(); err != nil { - return nil, nil, nil, err - } - - return indexerService, txIndexer, blockIndexer, nil } diff --git a/rpc/json/service.go b/rpc/json/service.go index af27c367a60..a8840f57af6 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -9,17 +9,14 @@ import ( "time" "github.com/gorilla/rpc/v2/json2" - "github.com/tendermint/tendermint/libs/pubsub" - tmquery "github.com/tendermint/tendermint/libs/pubsub/query" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/celestiaorg/rollmint/log" - "github.com/celestiaorg/rollmint/rpc/client" ) // GetHTTPHandler returns handler configured to serve Tendermint-compatible RPC. -func GetHTTPHandler(l *client.Client, logger log.Logger) (http.Handler, error) { +func GetHTTPHandler(l rpcclient.Client, logger log.Logger) (http.Handler, error) { return newHandler(newService(l, logger), json2.NewCodec(), logger), nil } @@ -42,12 +39,12 @@ func newMethod(m interface{}) *method { } type service struct { - client *client.Client + client rpcclient.Client methods map[string]*method logger log.Logger } -func newService(c *client.Client, l log.Logger) *service { +func newService(c rpcclient.Client, l log.Logger) *service { s := service{ client: c, logger: l, @@ -87,51 +84,31 @@ func newService(c *client.Client, l log.Logger) *service { } func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) { - addr := req.RemoteAddr - // TODO(tzdybal): pass config and check subscriptions limits - - q, err := tmquery.New(args.Query) - if err != nil { - return nil, fmt.Errorf("failed to parse query: %w", err) - } - - s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query) - // TODO(tzdybal): extract consts or configs const SubscribeTimeout = 5 * time.Second const subBufferSize = 100 + + addr := req.RemoteAddr + query := args.Query + ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) defer cancel() - sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize) + sub, err := s.client.Subscribe(ctx, addr, query, subBufferSize) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } go func() { - for { - select { - case msg := <-sub.Out(): - data, err := json.Marshal(msg.Data()) - if err != nil { - s.logger.Error("failed to marshal response data", "error", err) - continue - } - if wsConn != nil { - wsConn.queue <- data - } - case <-sub.Cancelled(): - if sub.Err() != pubsub.ErrUnsubscribed { - var reason string - if sub.Err() == nil { - reason = "unknown failure" - } else { - reason = sub.Err().Error() - } - s.logger.Error("subscription was cancelled", "reason", reason) - } - return + for msg := range sub { + data, err := json.Marshal(msg.Data) + if err != nil { + s.logger.Error("failed to marshal response data", "error", err) + continue + } + if wsConn != nil { + wsConn.queue <- data } } }() diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index 2be2a3bf949..44a093baa5f 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -22,12 +22,12 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/types" "github.com/celestiaorg/rollmint/config" "github.com/celestiaorg/rollmint/mocks" "github.com/celestiaorg/rollmint/node" - "github.com/celestiaorg/rollmint/rpc/client" ) func TestHandlerMapping(t *testing.T) { @@ -269,7 +269,7 @@ func TestSubscription(t *testing.T) { } // copied from rpc -func getRPC(t *testing.T) (*mocks.Application, *client.Client) { +func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { t.Helper() require := require.New(t) app := &mocks.Application{} @@ -292,14 +292,14 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) { }) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + n, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}, Light: false}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) - require.NotNil(node) + require.NotNil(n) - err = node.Start() + err = n.Start() require.NoError(err) - local := client.NewClient(node) + local := n.GetClient() require.NotNil(local) return app, local diff --git a/rpc/server.go b/rpc/server.go index 3aa1b78381a..a74f12cbec1 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -16,7 +16,6 @@ import ( "golang.org/x/net/netutil" "github.com/celestiaorg/rollmint/node" - "github.com/celestiaorg/rollmint/rpc/client" "github.com/celestiaorg/rollmint/rpc/json" ) @@ -25,16 +24,16 @@ type Server struct { *service.BaseService config *config.RPCConfig - client *client.Client + client rpcclient.Client server http.Server } // NewServer creates new instance of Server with given configuration. -func NewServer(node *node.Node, config *config.RPCConfig, logger log.Logger) *Server { +func NewServer(node node.Node, config *config.RPCConfig, logger log.Logger) *Server { srv := &Server{ config: config, - client: client.NewClient(node), + client: node.GetClient(), } srv.BaseService = service.NewBaseService(logger, "RPC", srv) return srv