Skip to content

Commit

Permalink
[TxIndexer] Integration of transaction indexer (issue-#168) (#302)
Browse files Browse the repository at this point in the history
## Description

Issue-#151 laid the foundation for a proper transaction indexer in isolation.

The integration of the transaction indexer into the M1 lifecycle is pending and required.

Integrate the transaction indexer into the lifecycle of M1

## Issue

Fixes #168 

## Type of change

Please mark the relevant option(s):

- [x] New feature, functionality or library
- [ ] Bug fix
- [ ] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

<!-- List out all the changes made-->

- Added `TxIndexer` sub-package (previously in Utility Module)
- Added `TxIndexer` to both `PersistenceModule` and `PersistenceContext`
- Implemented `TransactionExists` and `StoreTransaction`
- Stores transactions along side blocks during `commit`
- Added current block `[]TxResult` to the module
- Moved TxIndexer package to persistence module
- Added new proto structure `DefaultTxResult`
- Integrated the `TxIndexer` into the lifecycle
  - Captured `TxResult` from each played transaction
  - Moved the storage of transactions to the Consensus Module
  - Returned the `TxResults` in the `ApplyBlock()` and `GetProposalTransactions()`
  - `AnteHandleMessage()` now returns `signer`
  - `HandleMessage()` now returns `messageType` and `recipient`
  - `ApplyTransaction()` returns `TxResult`
- Modified interface for Utility Module `ApplyBlock` and `GetProposalTransactions` to return `TxResults`
- Modified interface for Persistence Module `StoreTransaction` to store the `TxResult`
- Added shared interface `TxResult` under types.go

## Testing

- [x] `make develop_test`
- [x] [LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md) w/ all of the steps outlined in the `README`

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have tested my changes using the available tooling
- [x] I have updated the corresponding CHANGELOG

### If Applicable Checklist
- [ ] I have updated the corresponding README(s); local and/or global
- [x] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added, or updated, [mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding README(s)
- [ ] I have added, or updated, documentation and [mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*` if I updated `shared/*`README(s)
  • Loading branch information
Andrew Nguyen authored Oct 21, 2022
1 parent 5b9f5ca commit 54f244b
Show file tree
Hide file tree
Showing 32 changed files with 363 additions and 165 deletions.
18 changes: 9 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,16 @@ protogen_clean:
## Generate go structures for all of the protobufs
protogen_local: go_protoc-go-inject-tag
$(eval proto_dir = ".")
protoc --go_opt=paths=source_relative -I=./shared/debug/proto --go_out=./shared/debug ./shared/debug/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/codec/proto --go_out=./shared/codec ./shared/codec/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./utility/indexer/proto --go_out=./utility/indexer/ ./utility/indexer/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/proto --go_out=./persistence/types ./persistence/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/debug/proto --go_out=./shared/debug ./shared/debug/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/codec/proto --go_out=./shared/codec ./shared/codec/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/indexer/proto --go_out=./persistence/indexer/ ./persistence/indexer/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/proto --go_out=./persistence/types ./persistence/proto/*.proto --experimental_allow_proto3_optional
protoc-go-inject-tag -input="./persistence/types/*.pb.go"
protoc --go_opt=paths=source_relative -I=./utility/types/proto --go_out=./utility/types ./utility/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./consensus/types/proto --go_out=./consensus/types ./consensus/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/raintree/types/proto --go_out=./p2p/types ./p2p/raintree/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/types/proto --go_out=./p2p/types ./p2p/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./telemetry/proto --go_out=./telemetry ./telemetry/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./utility/types/proto --go_out=./utility/types ./utility/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./consensus/types/proto --go_out=./consensus/types ./consensus/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/raintree/types/proto --go_out=./p2p/types ./p2p/raintree/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/types/proto --go_out=./p2p/types ./p2p/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./telemetry/proto --go_out=./telemetry ./telemetry/proto/*.proto --experimental_allow_proto3_optional
echo "View generated proto files by running: make protogen_show"

.PHONY: protogen_docker_m1
Expand Down
2 changes: 2 additions & 0 deletions consensus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

## [0.0.0.6] - 2022-10-12
- Stores transactions alongside blocks during `commit`
- Added current block `[]TxResult` to the module

### [#235](https://github.com/pokt-network/pocket/pull/235) Config and genesis handling

Expand Down
9 changes: 9 additions & 0 deletions consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ func (m *consensusModule) commitBlock(block *typesCons.Block) error {
return nil
}

// IMPROVE(#285): ensure these persistence operations are atomic, we can't commit block without
// committing the transactions and metadata at the same time
func (m *consensusModule) storeBlock(block *typesCons.Block, blockProtoBytes []byte) error {
store := m.utilityContext.GetPersistenceContext()
// Store in KV Store
if err := store.StoreBlock(blockProtoBytes); err != nil {
return err
}
// Store transactions in Indexer
for _, txResult := range m.TxResults {
if err := store.StoreTransaction(txResult); err != nil {
return err
}
}

// Store in SQL Store
header := block.BlockHeader
Expand All @@ -57,6 +65,7 @@ func (m *consensusModule) storeBlock(block *typesCons.Block, blockProtoBytes []b
}

// TODO: Add unit tests specific to block validation
// IMPROVE: (olshansky) rename to provide clarity of operation. ValidateBasic() is typically a stateless check not stateful
func (m *consensusModule) validateBlockBasic(block *typesCons.Block) error {
if block == nil && m.Step != NewRound {
return typesCons.ErrNilBlock
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ func baseUtilityContextMock(t *testing.T) *modulesMock.MockUtilityContext {

utilityContextMock.EXPECT().
GetProposalTransactions(gomock.Any(), maxTxBytes, gomock.AssignableToTypeOf(emptyByzValidators)).
Return(make([][]byte, 0), nil).
Return(make([][]byte, 0), nil, nil).
AnyTimes()
utilityContextMock.EXPECT().
ApplyBlock(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(appHash, nil).
Return(appHash, nil, nil).
AnyTimes()
utilityContextMock.EXPECT().CommitPersistenceContext().Return(nil).AnyTimes()
utilityContextMock.EXPECT().ReleaseContext().Return().AnyTimes()
Expand Down
1 change: 1 addition & 0 deletions consensus/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (m *consensusModule) isOptimisticThresholdMet(n int) error {
func (m *consensusModule) resetForNewHeight() {
m.Round = 0
m.Block = nil
m.TxResults = nil
m.highPrepareQC = nil
m.lockedQC = nil
}
Expand Down
24 changes: 15 additions & 9 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consensus

import (
"encoding/hex"
"github.com/pokt-network/pocket/shared/modules"
"unsafe"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
Expand Down Expand Up @@ -53,22 +54,27 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
// Leader prepares a new block if `highPrepareQC` is not applicable
block, err := m.prepareAndApplyBlock()
block, txResults, err := m.prepareAndApplyBlock()
if err != nil {
m.nodeLogError(typesCons.ErrPrepareBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = block
m.TxResults = txResults
} else {
// DISCUSS: Do we need to call `validateProposal` here?
// Leader acts like a replica if `highPrepareQC` is not `nil`
if err := m.applyBlock(highPrepareQC.Block); err != nil {
// TODO(olshansky): Add test to make sure same block is not applied twice if round is interrrupted.
// been 'Applied'
txResults, err := m.applyBlock(highPrepareQC.Block)
if err != nil {
m.nodeLogError(typesCons.ErrApplyBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = highPrepareQC.Block
m.TxResults = txResults
}

m.Step = Prepare
Expand Down Expand Up @@ -331,9 +337,9 @@ func (m *consensusModule) tempIndexHotstuffMessage(msg *typesCons.HotstuffMessag

// This is a helper function intended to be called by a leader/validator during a view change
// to prepare a new block that is applied to the new underlying context.
func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, []modules.TxResult, error) {
if m.isReplica() {
return nil, typesCons.ErrReplicaPrepareBlock
return nil, nil, typesCons.ErrReplicaPrepareBlock
}

// TECHDEBT: Retrieve this from consensus consensus config
Expand All @@ -343,16 +349,16 @@ func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
lastByzValidators := make([][]byte, 0)

// Reap the mempool for transactions to be applied in this block
txs, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators)
txs, _, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators)
if err != nil {
return nil, err
return nil, nil, err
}

// OPTIMIZE: Determine if we can avoid the `ApplyBlock` call here
// Apply all the transactions in the block
appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators)
appHash, txResults, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators)
if err != nil {
return nil, err
return nil, nil, err
}

// Construct the block
Expand All @@ -369,7 +375,7 @@ func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
Transactions: txs,
}

return block, nil
return block, txResults, nil
}

// Return true if this node, the leader, should prepare a new block
Expand Down
16 changes: 9 additions & 7 deletions consensus/hotstuff_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"encoding/hex"
"fmt"
"github.com/pokt-network/pocket/shared/modules"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
"github.com/pokt-network/pocket/consensus/types"
Expand Down Expand Up @@ -60,13 +61,14 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *consensusM
}

block := msg.GetBlock()
if err := m.applyBlock(block); err != nil {
txResults, err := m.applyBlock(block)
if err != nil {
m.nodeLogError(typesCons.ErrApplyBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = block

m.TxResults = txResults
m.Step = PreCommit

prepareVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Prepare, m.Block, m.privateKey)
Expand Down Expand Up @@ -226,22 +228,22 @@ func (m *consensusModule) validateProposal(msg *typesCons.HotstuffMessage) error
}

// This helper applies the block metadata to the utility & persistence layers
func (m *consensusModule) applyBlock(block *typesCons.Block) error {
func (m *consensusModule) applyBlock(block *typesCons.Block) ([]modules.TxResult, error) {
// TECHDEBT: Retrieve this from persistence
lastByzValidators := make([][]byte, 0)

// Apply all the transactions in the block and get the appHash
appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators)
appHash, txResults, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators)
if err != nil {
return err
return txResults, err
}

// CONSOLIDATE: Terminology of `blockHash`, `appHash` and `stateHash`
if block.BlockHeader.Hash != hex.EncodeToString(appHash) {
return typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash))
return txResults, typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash))
}

return nil
return txResults, nil
}

func (m *consensusModule) validateQuorumCertificate(qc *typesCons.QuorumCertificate) error {
Expand Down
2 changes: 2 additions & 0 deletions consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type consensusModule struct {
Round uint64
Step typesCons.HotstuffStep
Block *typesCons.Block // The current block being proposed / voted on; it has not been committed to finality
// TODO(#315): Move the statefulness of `TxResult` to the persistence module
TxResults []modules.TxResult // The current block applied transaction results / voted on; it has not been committed to finality

highPrepareQC *typesCons.QuorumCertificate // Highest QC for which replica voted PRECOMMIT
lockedQC *typesCons.QuorumCertificate // Highest QC for which replica voted COMMIT
Expand Down
24 changes: 17 additions & 7 deletions persistence/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package persistence
import (
"encoding/binary"
"encoding/hex"
"log"

"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/persistence/types"
"github.com/pokt-network/pocket/shared/modules"
)

// OPTIMIZE(team): get from blockstore or keep in memory
Expand Down Expand Up @@ -40,13 +40,23 @@ func (p PostgresContext) GetHeight() (int64, error) {
}

func (p PostgresContext) TransactionExists(transactionHash string) (bool, error) {
log.Println("TODO: TransactionExists not implemented")
return false, nil
hash, err := hex.DecodeString(transactionHash)
if err != nil {
return false, err
}
res, err := p.txIndexer.GetByHash(hash)
if res == nil {
// check for not found
if err != nil && err.Error() == kvstore.BadgerKeyNotFoundError {
return false, nil
}
return false, err
}
return true, err
}

func (p PostgresContext) StoreTransaction(transactionProtoBytes []byte) error {
log.Println("TODO: StoreTransaction not implemented")
return nil
func (p PostgresContext) StoreTransaction(txResult modules.TxResult) error {
return p.txIndexer.Index(txResult)
}

func (p PostgresContext) StoreBlock(blockProtoBytes []byte) error {
Expand Down
2 changes: 2 additions & 0 deletions persistence/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/pokt-network/pocket/persistence/indexer"
"log"

"github.com/pokt-network/pocket/persistence/types"
Expand Down Expand Up @@ -41,6 +42,7 @@ type PostgresContext struct {
conn *pgx.Conn
tx pgx.Tx
blockstore kvstore.KVStore
txIndexer indexer.TxIndexer
}

func (pg *PostgresContext) GetCtxAndTx() (context.Context, pgx.Tx, error) {
Expand Down
3 changes: 3 additions & 0 deletions persistence/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Made `PersistenceModule` struct unexported
- Updated tests and mocks
- Removed some cross-module dependencies
- Added `TxIndexer` sub-package (previously in Utility Module)
- Added `TxIndexer` to both `PersistenceModule` and `PersistenceContext`
- Implemented `TransactionExists` and `StoreTransaction`

## [0.0.0.6] - 2022-09-30

Expand Down
File renamed without changes.
Loading

0 comments on commit 54f244b

Please sign in to comment.