Skip to content

Commit

Permalink
[Core] KISS 1 - Finite State Machine [Merge me first] - (Issue: #499) (
Browse files Browse the repository at this point in the history
…#520)

## Description

This PR has been extracted from #491 and is, hopefully, more digestible
from a code-review and scope point of view.

In a nutshell: 
- It introduces the a Finite State Machine that is meant to govern the
internal state, transitions and events
- It includes a refactoring of our module initialization pattern using
functional options
(https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis)
- It improves the module registration with a first class... you might
have guessed it: `ModulesRegistry`
- It reduces boilerplate code making our modules more DRY with the
introduction of `base_modules` that provide basic functionality (that
can still be customized/extended when needed)

## Issue

Fixes #499 

## Type of change

Please mark the relevant option(s):

- [x] New feature, functionality or library
- [ ] Bug fix
- [x] Code health or cleanup
- [x] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

In a nutshell: 
- It introduces the a Finite State Machine that is meant to govern the
internal state, transitions and events
- It includes a refactoring of our module initialization pattern using
functional options
(https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis)
- It improves the module registration with a first class... you might
have guessed it: `ModulesRegistry`
- It reduces boilerplate code making our modules more DRY with the
introduction of `base_modules` that provide basic functionality (that
can still be customized/extended when needed)

## Testing

- [x] `make develop_test`
- [x]
[LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md)
w/ all of the steps outlined in the `README`


## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have tested my changes using the available tooling
- [x] I have updated the corresponding CHANGELOG

### If Applicable Checklist

- [ ] I have updated the corresponding README(s); local and/or global
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)

---------

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
deblasis and Olshansk authored Feb 17, 2023
1 parent 77c08ce commit d7060b1
Show file tree
Hide file tree
Showing 59 changed files with 779 additions and 323 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ generate_cli_commands_docs: ## (Re)generates the CLI commands docs (this is mean
cd app/client/cli/docgen && go run .
echo "CLI commands docs generated in ${cli_docs_dir}"

.PHONY: generate_node_state_machine_diagram
generate_node_state_machine_diagram: ## (Re)generates the Node State Machine diagram
go run ./state_machine/visualizer/main.go
echo "Node State Machine diagram generated in state_machine/docs/state-machine.diagram.md"

.PHONY: test_all
test_all: ## Run all go unit tests
go test -p 1 -count=1 ./...
Expand Down
2 changes: 1 addition & 1 deletion app/client/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.14] - 2023-02-16
## [0.0.0.14] - 2023-02-17

- Introduced logical switch to handle parsing of the debug private keys from a local file OR from Kubernetes secret (PR #517)
- Bugfix for `Stake` command. Address erroneously sent instead of the PublicKey. (PR #518)
Expand Down
2 changes: 1 addition & 1 deletion build/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.15] - 2023-02-16
## [0.0.0.15] - 2023-02-17

- Added manifests to handle `Roles`, `RoleBindings` and `ServiceAccounts` and referenced them in the `Tiltfile`
- Updated `cli-client.yaml` to bind the `debug-client-account` `ServiceAccount` that has permissions to read the private keys from the `Secret`
Expand Down
4 changes: 3 additions & 1 deletion consensus/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ func (m *consensusModule) GetNodeState() typesCons.ConsensusNodeState {

func (m *consensusModule) resetToGenesis(_ *messaging.DebugMessage) error {
m.logger.Debug().Msg(typesCons.DebugResetToGenesis)
m.height = 0

m.SetHeight(0)
m.ResetForNewHeight()
m.clearLeader()
m.clearMessagesPool()
m.GetBus().GetUtilityModule().GetMempool().Clear()
if err := m.GetBus().GetPersistenceModule().HandleDebugMessage(&messaging.DebugMessage{
Action: messaging.DebugMessageAction_DEBUG_PERSISTENCE_RESET_TO_GENESIS,
Message: nil,
Expand Down
8 changes: 7 additions & 1 deletion consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.29] - 2023-02-17

- Modules embed `base_modules.IntegratableModule` and `base_modules.InterruptableModule` for DRYness
- Updated modules `Create` to accept generic options
- `resetToGenesis` clears the utility mempool as well
- Publishing `ConsensusNewHeightEvent` on new height

## [0.0.0.28] - 2023-02-14

- Add a few `nolint` comments to fix the code on main

## [0.0.0.27] - 2023-02-09

- Add `state_sync` submodule, with `state_sync` struct
- Implement state sync server to advertise blocks and metadata
- Create new `state_sync_handler.go` source file that handles `StateSyncMessage`s sent to the `Consensus` module
Expand Down
29 changes: 24 additions & 5 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ func CreateTestConsensusPocketNode(
) *shared.Node {
// persistence is a dependency of consensus, so we need to create it first
persistenceMock := basePersistenceMock(t, eventsChannel, bus)
err := (bus).RegisterModule(persistenceMock)
require.NoError(t, err)
bus.RegisterModule(persistenceMock)

_, err = consensus.Create(bus)
_, err := consensus.Create(bus)
require.NoError(t, err)

runtimeMgr := (bus).GetRuntimeMgr()
Expand All @@ -114,16 +113,17 @@ func CreateTestConsensusPocketNode(
telemetryMock := baseTelemetryMock(t, eventsChannel)
loggerMock := baseLoggerMock(t, eventsChannel)
rpcMock := baseRpcMock(t, eventsChannel)
stateMachineMock := baseStateMachineMock(t, eventsChannel)

for _, module := range []modules.Module{
stateMachineMock,
p2pMock,
utilityMock,
telemetryMock,
loggerMock,
rpcMock,
} {
err = (bus).RegisterModule(module)
require.NoError(t, err)
bus.RegisterModule(module)
}

require.NoError(t, err)
Expand Down Expand Up @@ -423,6 +423,7 @@ func baseP2PMock(t *testing.T, eventsChannel modules.EventsChannel) *mockModules
}).
AnyTimes()
p2pMock.EXPECT().GetModuleName().Return(modules.P2PModuleName).AnyTimes()
p2pMock.EXPECT().HandleEvent(gomock.Any()).Return(nil).AnyTimes()

return p2pMock
}
Expand Down Expand Up @@ -494,6 +495,16 @@ func baseRpcMock(t *testing.T, _ modules.EventsChannel) *mockModules.MockRPCModu
return rpcMock
}

func baseStateMachineMock(t *testing.T, _ modules.EventsChannel) *mockModules.MockStateMachineModule {
ctrl := gomock.NewController(t)
stateMachineMock := mockModules.NewMockStateMachineModule(ctrl)
stateMachineMock.EXPECT().Start().Return(nil).AnyTimes()
stateMachineMock.EXPECT().SetBus(gomock.Any()).Return().AnyTimes()
stateMachineMock.EXPECT().GetModuleName().Return(modules.StateMachineModuleName).AnyTimes()

return stateMachineMock
}

func baseTelemetryTimeSeriesAgentMock(t *testing.T) *mockModules.MockTimeSeriesAgent {
ctrl := gomock.NewController(t)
timeSeriesAgentMock := mockModules.NewMockTimeSeriesAgent(ctrl)
Expand Down Expand Up @@ -521,6 +532,14 @@ func baseLoggerMock(t *testing.T, _ modules.EventsChannel) *mockModules.MockLogg

func logTime(t *testing.T, clck *clock.Mock) {
t.Helper()
defer func() {
// this is to recover from a panic that could happen if the goroutine tries to log after the test has finished
// cause of the panic: https://github.com/golang/go/blob/135c470b2277e1c9514ba8a5478408fea0dee8a2/src/testing/testing.go#L1003
//
// spotted for the first time in our CI: https://github.com/pokt-network/pocket/actions/runs/4198025819/jobs/7281103860#step:8:1118
//nolint:errcheck // ignoring completely
recover()
}()
t.Logf("[⌚ CLOCK ⌚] the time is: %v ms from UNIX Epoch [%v]", clck.Now().UTC().UnixMilli(), clck.Now().UTC())
}

Expand Down
14 changes: 14 additions & 0 deletions consensus/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package consensus

import (
"github.com/pokt-network/pocket/shared/messaging"
)

// publishNewHeightEvent publishes a new height event to the bus so that other interested IntegratableModules can react to it if necessary
func (m *consensusModule) publishNewHeightEvent(height uint64) {
newHeightEvent, err := messaging.PackMessage(&messaging.ConsensusNewHeightEvent{Height: height})
if err != nil {
m.logger.Fatal().Err(err).Msg("Failed to pack consensus new height event")
}
m.GetBus().PublishEventToBus(newHeightEvent)
}
34 changes: 9 additions & 25 deletions consensus/leader_election/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package leader_election

import (
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
)

type LeaderElectionModule interface {
Expand All @@ -14,45 +14,29 @@ type LeaderElectionModule interface {
var _ LeaderElectionModule = &leaderElectionModule{}

type leaderElectionModule struct {
bus modules.Bus
base_modules.IntegratableModule
base_modules.InterruptableModule
}

func Create(bus modules.Bus) (modules.Module, error) {
return new(leaderElectionModule).Create(bus)
}

func (*leaderElectionModule) Create(bus modules.Bus) (modules.Module, error) {
func (*leaderElectionModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
m := &leaderElectionModule{}
if err := bus.RegisterModule(m); err != nil {
return nil, err
}
return m, nil
}

func (m *leaderElectionModule) Start() error {
// TODO(olshansky): Use persistence to create leader election module.
return nil
}
for _, option := range options {
option(m)
}

func (m *leaderElectionModule) Stop() error {
return nil
bus.RegisterModule(m)
return m, nil
}

func (m *leaderElectionModule) GetModuleName() string {
return modules.LeaderElectionModuleName
}

func (m *leaderElectionModule) SetBus(pocketBus modules.Bus) {
m.bus = pocketBus
}

func (m *leaderElectionModule) GetBus() modules.Bus {
if m.bus == nil {
logger.Global.Fatal().Msg("PocketBus is not initialized")
}
return m.bus
}

func (m *leaderElectionModule) ElectNextLeader(message *typesCons.HotstuffMessage) (typesCons.NodeId, error) {
nodeId, err := m.electNextLeaderDeterministicRoundRobin(message)
if err != nil {
Expand Down
27 changes: 13 additions & 14 deletions consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
coreTypes "github.com/pokt-network/pocket/shared/core/types"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"google.golang.org/protobuf/types/known/anypb"
)

Expand All @@ -30,7 +31,8 @@ var (
)

type consensusModule struct {
bus modules.Bus
base_modules.IntegratableModule

privateKey cryptoPocket.Ed25519PrivateKey

consCfg *configs.ConsensusConfig
Expand Down Expand Up @@ -90,6 +92,7 @@ type ConsensusDebugModule interface {

func (m *consensusModule) SetHeight(height uint64) {
m.height = height
m.publishNewHeightEvent(height)
}

func (m *consensusModule) SetRound(round uint64) {
Expand Down Expand Up @@ -136,11 +139,11 @@ func (m *consensusModule) ClearLeaderMessagesPool() {
m.clearMessagesPool()
}

func Create(bus modules.Bus) (modules.Module, error) {
return new(consensusModule).Create(bus)
func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(consensusModule).Create(bus, options...)
}

func (*consensusModule) Create(bus modules.Bus) (modules.Module, error) {
func (*consensusModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
leaderElectionMod, err := leader_election.Create(bus)
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,10 +182,13 @@ func (*consensusModule) Create(bus modules.Bus) (modules.Module, error) {

hotstuffMempool: make(map[typesCons.HotstuffStep]*hotstuffFIFOMempool),
}
if err := bus.RegisterModule(m); err != nil {
return nil, err

for _, option := range options {
option(m)
}

bus.RegisterModule(m)

runtimeMgr := bus.GetRuntimeMgr()

consensusCfg := runtimeMgr.GetConfig().Consensus
Expand Down Expand Up @@ -259,15 +265,8 @@ func (m *consensusModule) GetModuleName() string {
return modules.ConsensusModuleName
}

func (m *consensusModule) GetBus() modules.Bus {
if m.bus == nil {
logger.Global.Fatal().Msg("PocketBus is not initialized")
}
return m.bus
}

func (m *consensusModule) SetBus(pocketBus modules.Bus) {
m.bus = pocketBus
m.IntegratableModule.SetBus(pocketBus)
if m.paceMaker != nil {
m.paceMaker.SetBus(pocketBus)
}
Expand Down
32 changes: 11 additions & 21 deletions consensus/pacemaker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/shared/codec"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -44,7 +45,9 @@ type Pacemaker interface {
}

type pacemaker struct {
bus modules.Bus
base_modules.IntegratableModule
base_modules.InterruptableModule

pacemakerCfg *configs.PacemakerConfig
stepCancelFunc context.CancelFunc

Expand All @@ -56,20 +59,21 @@ type pacemaker struct {
logPrefix string
}

func CreatePacemaker(bus modules.Bus) (modules.Module, error) {
var m pacemaker
return m.Create(bus)
func CreatePacemaker(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(pacemaker).Create(bus, options...)
}

func (*pacemaker) Create(bus modules.Bus) (modules.Module, error) {
func (*pacemaker) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
m := &pacemaker{
logPrefix: defaultLogPrefix,
}

if err := bus.RegisterModule(m); err != nil {
return nil, err
for _, option := range options {
option(m)
}

bus.RegisterModule(m)

runtimeMgr := bus.GetRuntimeMgr()
cfg := runtimeMgr.GetConfig()

Expand All @@ -88,25 +92,11 @@ func (m *pacemaker) Start() error {
m.RestartTimer()
return nil
}
func (*pacemaker) Stop() error {
return nil
}

func (*pacemaker) GetModuleName() string {
return pacemakerModuleName
}

func (m *pacemaker) SetBus(pocketBus modules.Bus) {
m.bus = pocketBus
}

func (m *pacemaker) GetBus() modules.Bus {
if m.bus == nil {
log.Fatalf("PocketBus is not initialized")
}
return m.bus
}

func (m *pacemaker) SetLogPrefix(logPrefix string) {
m.logPrefix = logPrefix
}
Expand Down
13 changes: 7 additions & 6 deletions consensus/state_sync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,21 @@ type stateSync struct {
logPrefix string
}

func CreateStateSync(bus modules.Bus) (modules.Module, error) {
var m stateSync
return m.Create(bus)
func CreateStateSync(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(stateSync).Create(bus, options...)
}

func (*stateSync) Create(bus modules.Bus) (modules.Module, error) {
func (*stateSync) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
m := &stateSync{
logPrefix: DefaultLogPrefix,
}

if err := bus.RegisterModule(m); err != nil {
return nil, err
for _, option := range options {
option(m)
}

bus.RegisterModule(m)

// when node is starting, it is in sync mode, as it might need to bootstrap to the latest state
m.currentMode = Sync
m.serverMode = false
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/jackc/pgconn v1.13.0
github.com/jordanorelli/lexnum v0.0.0-20141216151731-460eeb125754
github.com/labstack/echo/v4 v4.9.1
github.com/looplab/fsm v1.0.1
github.com/manifoldco/promptui v0.9.0
github.com/mitchellh/mapstructure v1.5.0
github.com/quasilyte/go-ruleguard/dsl v0.3.21
Expand Down
Loading

0 comments on commit d7060b1

Please sign in to comment.