Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TxIndexer] Integration of transaction indexer (issue-#168) #302

Merged
merged 11 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,16 @@ protogen_clean:
## Generate go structures for all of the protobufs
protogen_local: go_protoc-go-inject-tag
$(eval proto_dir = ".")
protoc --go_opt=paths=source_relative -I=./shared/debug/proto --go_out=./shared/debug ./shared/debug/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/codec/proto --go_out=./shared/codec ./shared/codec/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./utility/indexer/proto --go_out=./utility/indexer/ ./utility/indexer/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/proto --go_out=./persistence/types ./persistence/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/debug/proto --go_out=./shared/debug ./shared/debug/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./shared/codec/proto --go_out=./shared/codec ./shared/codec/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/indexer/proto --go_out=./persistence/indexer/ ./persistence/indexer/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./persistence/proto --go_out=./persistence/types ./persistence/proto/*.proto --experimental_allow_proto3_optional
protoc-go-inject-tag -input="./persistence/types/*.pb.go"
protoc --go_opt=paths=source_relative -I=./utility/types/proto --go_out=./utility/types ./utility/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./consensus/types/proto --go_out=./consensus/types ./consensus/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/raintree/types/proto --go_out=./p2p/types ./p2p/raintree/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/types/proto --go_out=./p2p/types ./p2p/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./telemetry/proto --go_out=./telemetry ./telemetry/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./utility/types/proto --go_out=./utility/types ./utility/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./consensus/types/proto --go_out=./consensus/types ./consensus/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/raintree/types/proto --go_out=./p2p/types ./p2p/raintree/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./p2p/types/proto --go_out=./p2p/types ./p2p/types/proto/*.proto --experimental_allow_proto3_optional
protoc --go_opt=paths=source_relative -I=./telemetry/proto --go_out=./telemetry ./telemetry/proto/*.proto --experimental_allow_proto3_optional
echo "View generated proto files by running: make protogen_show"

.PHONY: protogen_docker_m1
Expand Down
2 changes: 2 additions & 0 deletions consensus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

## [0.0.0.6] - 2022-10-12
- Stores transactions alongside blocks during `commit`
- Added current block `[]TxResult` to the module

### [#235](https://github.com/pokt-network/pocket/pull/235) Config and genesis handling

Expand Down
9 changes: 9 additions & 0 deletions consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ func (m *consensusModule) commitBlock(block *typesCons.Block) error {
return nil
}

// IMPROVE(#285): ensure these persistence operations are atomic, we can't commit block without
// committing the transactions and metadata at the same time
func (m *consensusModule) storeBlock(block *typesCons.Block, blockProtoBytes []byte) error {
store := m.utilityContext.GetPersistenceContext()
// Store in KV Store
if err := store.StoreBlock(blockProtoBytes); err != nil {
return err
}
// Store transactions in Indexer
for _, txResult := range m.TxResults {
if err := store.StoreTransaction(txResult); err != nil {
andrewnguyen22 marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}

// Store in SQL Store
header := block.BlockHeader
Expand All @@ -57,6 +65,7 @@ func (m *consensusModule) storeBlock(block *typesCons.Block, blockProtoBytes []b
}

// TODO: Add unit tests specific to block validation
// IMPROVE: (olshansky) rename to provide clarity of operation. ValidateBasic() is typically a stateless check not stateful
func (m *consensusModule) validateBlockBasic(block *typesCons.Block) error {
if block == nil && m.Step != NewRound {
return typesCons.ErrNilBlock
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ func baseUtilityContextMock(t *testing.T) *modulesMock.MockUtilityContext {

utilityContextMock.EXPECT().
GetProposalTransactions(gomock.Any(), maxTxBytes, gomock.AssignableToTypeOf(emptyByzValidators)).
Return(make([][]byte, 0), nil).
Return(make([][]byte, 0), nil, nil).
AnyTimes()
utilityContextMock.EXPECT().
ApplyBlock(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(appHash, nil).
Return(appHash, nil, nil).
AnyTimes()
utilityContextMock.EXPECT().CommitPersistenceContext().Return(nil).AnyTimes()
utilityContextMock.EXPECT().ReleaseContext().Return().AnyTimes()
Expand Down
1 change: 1 addition & 0 deletions consensus/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (m *consensusModule) isOptimisticThresholdMet(n int) error {
func (m *consensusModule) resetForNewHeight() {
m.Round = 0
m.Block = nil
m.TxResults = nil
m.highPrepareQC = nil
m.lockedQC = nil
}
Expand Down
24 changes: 15 additions & 9 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consensus

import (
"encoding/hex"
"github.com/pokt-network/pocket/shared/modules"
"unsafe"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
Expand Down Expand Up @@ -53,22 +54,27 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
// Leader prepares a new block if `highPrepareQC` is not applicable
block, err := m.prepareAndApplyBlock()
block, txResults, err := m.prepareAndApplyBlock()
if err != nil {
m.nodeLogError(typesCons.ErrPrepareBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = block
m.TxResults = txResults
} else {
// DISCUSS: Do we need to call `validateProposal` here?
// Leader acts like a replica if `highPrepareQC` is not `nil`
if err := m.applyBlock(highPrepareQC.Block); err != nil {
// TODO(olshansky): Add test to make sure same block is not applied twice if round is interrrupted.
// been 'Applied'
txResults, err := m.applyBlock(highPrepareQC.Block)
if err != nil {
m.nodeLogError(typesCons.ErrApplyBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = highPrepareQC.Block
m.TxResults = txResults
}

m.Step = Prepare
Expand Down Expand Up @@ -331,9 +337,9 @@ func (m *consensusModule) tempIndexHotstuffMessage(msg *typesCons.HotstuffMessag

// This is a helper function intended to be called by a leader/validator during a view change
// to prepare a new block that is applied to the new underlying context.
func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, []modules.TxResult, error) {
if m.isReplica() {
return nil, typesCons.ErrReplicaPrepareBlock
return nil, nil, typesCons.ErrReplicaPrepareBlock
}

// TECHDEBT: Retrieve this from consensus consensus config
Expand All @@ -343,16 +349,16 @@ func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
lastByzValidators := make([][]byte, 0)

// Reap the mempool for transactions to be applied in this block
txs, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators)
txs, _, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators)
if err != nil {
return nil, err
return nil, nil, err
}

// OPTIMIZE: Determine if we can avoid the `ApplyBlock` call here
// Apply all the transactions in the block
appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators)
appHash, txResults, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators)
if err != nil {
return nil, err
return nil, nil, err
}

// Construct the block
Expand All @@ -369,7 +375,7 @@ func (m *consensusModule) prepareAndApplyBlock() (*typesCons.Block, error) {
Transactions: txs,
}

return block, nil
return block, txResults, nil
}

// Return true if this node, the leader, should prepare a new block
Expand Down
16 changes: 9 additions & 7 deletions consensus/hotstuff_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"encoding/hex"
"fmt"
"github.com/pokt-network/pocket/shared/modules"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
"github.com/pokt-network/pocket/consensus/types"
Expand Down Expand Up @@ -60,13 +61,14 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *consensusM
}

block := msg.GetBlock()
if err := m.applyBlock(block); err != nil {
txResults, err := m.applyBlock(block)
if err != nil {
m.nodeLogError(typesCons.ErrApplyBlock.Error(), err)
m.paceMaker.InterruptRound()
return
}
m.Block = block

m.TxResults = txResults
m.Step = PreCommit

prepareVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Prepare, m.Block, m.privateKey)
Expand Down Expand Up @@ -226,22 +228,22 @@ func (m *consensusModule) validateProposal(msg *typesCons.HotstuffMessage) error
}

// This helper applies the block metadata to the utility & persistence layers
func (m *consensusModule) applyBlock(block *typesCons.Block) error {
func (m *consensusModule) applyBlock(block *typesCons.Block) ([]modules.TxResult, error) {
// TECHDEBT: Retrieve this from persistence
lastByzValidators := make([][]byte, 0)

// Apply all the transactions in the block and get the appHash
appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators)
appHash, txResults, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators)
if err != nil {
return err
return txResults, err
}

// CONSOLIDATE: Terminology of `blockHash`, `appHash` and `stateHash`
if block.BlockHeader.Hash != hex.EncodeToString(appHash) {
return typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash))
return txResults, typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash))
}

return nil
return txResults, nil
}

func (m *consensusModule) validateQuorumCertificate(qc *typesCons.QuorumCertificate) error {
Expand Down
2 changes: 2 additions & 0 deletions consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type consensusModule struct {
Round uint64
Step typesCons.HotstuffStep
Block *typesCons.Block // The current block being proposed / voted on; it has not been committed to finality
// TODO(#315): Move the statefulness of `TxResult` to the persistence module
TxResults []modules.TxResult // The current block applied transaction results / voted on; it has not been committed to finality
andrewnguyen22 marked this conversation as resolved.
Show resolved Hide resolved

highPrepareQC *typesCons.QuorumCertificate // Highest QC for which replica voted PRECOMMIT
lockedQC *typesCons.QuorumCertificate // Highest QC for which replica voted COMMIT
Expand Down
24 changes: 17 additions & 7 deletions persistence/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package persistence
import (
"encoding/binary"
"encoding/hex"
"log"

"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/persistence/types"
"github.com/pokt-network/pocket/shared/modules"
)

// OPTIMIZE(team): get from blockstore or keep in memory
Expand Down Expand Up @@ -40,13 +40,23 @@ func (p PostgresContext) GetHeight() (int64, error) {
}

func (p PostgresContext) TransactionExists(transactionHash string) (bool, error) {
log.Println("TODO: TransactionExists not implemented")
return false, nil
hash, err := hex.DecodeString(transactionHash)
if err != nil {
return false, err
}
res, err := p.txIndexer.GetByHash(hash)
if res == nil {
// check for not found
if err != nil && err.Error() == kvstore.BadgerKeyNotFoundError {
return false, nil
}
return false, err
}
return true, err
}

func (p PostgresContext) StoreTransaction(transactionProtoBytes []byte) error {
log.Println("TODO: StoreTransaction not implemented")
return nil
func (p PostgresContext) StoreTransaction(txResult modules.TxResult) error {
return p.txIndexer.Index(txResult)
}

func (p PostgresContext) StoreBlock(blockProtoBytes []byte) error {
Expand Down
2 changes: 2 additions & 0 deletions persistence/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/pokt-network/pocket/persistence/indexer"
"log"

"github.com/pokt-network/pocket/persistence/types"
Expand Down Expand Up @@ -41,6 +42,7 @@ type PostgresContext struct {
conn *pgx.Conn
tx pgx.Tx
blockstore kvstore.KVStore
txIndexer indexer.TxIndexer
}

func (pg *PostgresContext) GetCtxAndTx() (context.Context, pgx.Tx, error) {
Expand Down
3 changes: 3 additions & 0 deletions persistence/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Made `PersistenceModule` struct unexported
- Updated tests and mocks
- Removed some cross-module dependencies
- Added `TxIndexer` sub-package (previously in Utility Module)
- Added `TxIndexer` to both `PersistenceModule` and `PersistenceContext`
- Implemented `TransactionExists` and `StoreTransaction`

## [0.0.0.6] - 2022-09-30

Expand Down
File renamed without changes.
Loading