Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] Add MinBlockTime to delay mempool reaping #924

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8608bce
[WIP] Add MinBlockTime
red-0ne Jul 21, 2023
4a25b62
[Consensus] Feat: Configurable min block production time
red-0ne Jul 25, 2023
1028ae9
[Consensus] Refactor: decouple timer registration from subscription
red-0ne Jul 26, 2023
a7fd743
address review comments
red-0ne Jul 31, 2023
867e841
[Docs] Update development docs to warn to not use the changelog hook …
h5law Jul 24, 2023
0d448c9
[IBC] chore: Rename FlushAllEntries => FlushCachesToStore (#934)
h5law Jul 24, 2023
accccfc
[Utility] Feat: add client-side session cache (#888)
adshmh Jul 25, 2023
3165b8d
[IBC] Clone `cosmos/ics23` protobuf definitions into IBC repo (#922)
h5law Jul 26, 2023
990321e
[CLI] Consistent config/flag parsing & common helpers (#891)
bryanchriswhite Jul 26, 2023
21d7024
[IBC] Change Events to not have a Height field and use uint64 in quer…
h5law Jul 26, 2023
c67fa14
[IBC] Add ICS-02 Client Interfaces (#932)
h5law Jul 26, 2023
db8d8d6
[Persistence] Adds `node` subcommand to CLI (#935)
dylanlott Jul 26, 2023
74a5816
[IBC] chore: enable IBC module in k8s validators (#941)
h5law Jul 27, 2023
950ccc3
[Utility] Use TreeStore as source of truth (#937)
h5law Jul 27, 2023
d3bf0ad
[IBC] Enable validators and thus IBC host creation in K8s (#942)
h5law Jul 28, 2023
c903ca1
[Utility] Create trustless_relay_validation.md (#938)
adshmh Jul 31, 2023
298b08f
[Persistence] Adds atomic Update for TreeStore (#861)
dylanlott Jul 31, 2023
a68af5c
[chore] Replaces multierr usage with go native errors package (#939)
dylanlott Jul 31, 2023
0941549
hack: 😴 sleep enough for cli debug subcommands to broadcast (#954)
0xBigBoss Jul 31, 2023
50f8846
DevLog 12 (#957)
Olshansk Aug 1, 2023
e0e9fd4
[Utility] servicer signs relays (#952)
adshmh Aug 1, 2023
2a226cc
[LocalNet] Fix metrics scraping (#940)
okdas Aug 1, 2023
6c7599e
prevent sending to closed channels
red-0ne Aug 2, 2023
92ece19
disable block preparation delay when manual mode is on
red-0ne Aug 4, 2023
fef4217
[E2E Test] Utilities for State Sync Test (#874)
Olshansk Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Implement minimum block production pace by delaying block preparation
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

## [0.0.0.54] - 2023-06-13

- Fix tests
Expand Down
97 changes: 97 additions & 0 deletions consensus/e2e_tests/pacemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,103 @@ func forcePacemakerTimeout(t *testing.T, clockMock *clock.Mock, paceMakerTimeout
advanceTime(t, clockMock, paceMakerTimeout+10*time.Millisecond)
}

// TODO: Add more tests for minBlockTime behavior:
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// 1. Block preparation triggers ASAP if conditions are met AFTER minBlockTime has triggered.
// 2. Block preparation is always discarded if a new one with better QC is received within minBlockTime.
// 3. Mempool reaped is the one present at minBlockTime or later.
// 4. Successive blocks timings are at least minBlockTime apart.
func TestPacemakerMinBlockTime(t *testing.T) {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// Test preparation
clockMock := clock.NewMock()
timeReminder(t, clockMock, time.Second)

// UnitTestNet configs
paceMakerTimeoutMsec := uint64(300000)
consensusMessageTimeout := time.Duration(paceMakerTimeoutMsec / 5) // Must be smaller than pacemaker timeout because we expect a deterministic number of consensus messages.
paceMakerMinBlockTimeMsec := uint64(5000) // Make sure it is larger than the consensus message timeout
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock)
for _, runtimeConfig := range runtimeMgrs {
consCfg := runtimeConfig.GetConfig().Consensus.PacemakerConfig
consCfg.TimeoutMsec = paceMakerTimeoutMsec
consCfg.MinBlockTimeMsec = paceMakerMinBlockTimeMsec
}
buses := GenerateBuses(t, runtimeMgrs)

// Create & start test pocket nodes
eventsChannel := make(modules.EventsChannel, 100)
pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel)
err := StartAllTestPocketNodes(t, pocketNodes)
require.NoError(t, err)

replicas := IdToNodeMapping{}
// First round ever has leaderId=2 ((height+round+step-1)%numValidators)+1
// See: consensus/leader_election/module.go#electNextLeaderDeterministicRoundRobin
leaderId := typesCons.NodeId(2)
leader := IdToNodeMapping{}
numReplicas := len(pocketNodes) - 1

// Debug message to start consensus by triggering next view
// Get leader out of replica set
for id, pocketNode := range pocketNodes {
TriggerNextView(t, pocketNode)
if id == leaderId {
leader[id] = pocketNode
} else {
replicas[id] = pocketNode
}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Right after triggering the next view
// Consensus started and all nodes are at NewRound step
step := typesCons.HotstuffStep(pocketNode.GetBus().GetConsensusModule().CurrentStep())
require.Equal(t, consensus.NewRound, step)
}

newRoundMessages, err := WaitForNetworkConsensusEvents(
t, clockMock, eventsChannel, typesCons.HotstuffStep(consensus.NewRound), typesCons.HotstuffMessageType(consensus.NewRound),
numReplicas, // We want new round messages from replicas only
consensusMessageTimeout, false,
)
require.NoError(t, err)

// Broadcast new round messages to leader to build a block
broadcastMessages(t, newRoundMessages, leader)

var step typesCons.HotstuffStep
var pivotTime = 1 * time.Millisecond // Min time it takes to switch from NewRound to Prepare step
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Give go routines time to trigger
advanceTime(t, clockMock, 0)

// We get consensus module from leader to get its POV
leaderConsensusModule := leader[leaderId].GetBus().GetConsensusModule()
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Make sure all nodes are aligned to the same leader
for _, pocketNode := range pocketNodes {
nodeLeader := pocketNode.GetBus().GetConsensusModule().GetLeaderForView(1, 0, uint8(consensus.NewRound))
require.Equal(t, typesCons.NodeId(nodeLeader), leaderId)
}

// Timer is blocking the proposal step
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())
require.Equal(t, consensus.NewRound, step)

// Advance time right before minBlockTime triggers
advanceTime(t, clockMock, time.Duration(paceMakerMinBlockTimeMsec*uint64(time.Millisecond))-pivotTime)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Should still be blocking proposal step
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())
require.Equal(t, consensus.NewRound, step)

// Advance time just enough to trigger minBlockTime
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
advanceTime(t, clockMock, pivotTime)
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())

// Time advanced by minBlockTime
require.Equal(t, uint64(clockMock.Now().UnixMilli()), paceMakerMinBlockTimeMsec)
// Leader is at proposal step
require.Equal(t, consensus.Prepare, step)
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Implement these tests and use them as a starting point for new ones. Consider using ChatGPT to help you out :)

func TestPacemakerDifferentHeightsCatchup(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ func advanceTime(t *testing.T, clck *clock.Mock, duration time.Duration) {
clck.Add(duration)
t.Logf("[⌚ CLOCK ⏩] advanced by %v", duration)
logTime(t, clck)
// Give goroutines a chance to run
clck.Add(0)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

// sleep pauses the goroutine for the given duration on the mock clock and logs what just happened.
Expand Down
7 changes: 5 additions & 2 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
return
}

// DISCUSS: Do we need to pause for `MinBlockFreqMSec` here to let more transactions or should we stick with optimistic responsiveness?

if err := m.didReceiveEnoughMessageForStep(NewRound); err != nil {
m.logger.Info().Fields(hotstuffMsgToLoggingFields(msg)).Msgf("⏳ Waiting ⏳for more messages; %s", err.Error())
return
Expand Down Expand Up @@ -64,6 +62,11 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add test to make sure same block is not applied twice if round is interrupted after being 'Applied'.
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
doPrepare := <-m.paceMaker.ProcessDelayedBlockPrepare(m.height)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if !doPrepare {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m.logger.Info().Msg("skip prepare new block")
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
return
}
block, err := m.prepareBlock(highPrepareQC)
if err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrPrepareBlock.Error())
Expand Down
109 changes: 106 additions & 3 deletions consensus/pacemaker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pacemaker
import (
"context"
"fmt"
"sync"
"time"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Pacemaker interface {
PacemakerDebug

ShouldHandleMessage(message *typesCons.HotstuffMessage) (bool, error)
ProcessDelayedBlockPrepare(height uint64) chan bool

RestartTimer()
NewHeight()
Expand All @@ -48,9 +50,10 @@ type pacemaker struct {
base_modules.IntegrableModule
base_modules.InterruptableModule

pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
latestPrepareRequest latestPrepareRequest

// Only used for development and debugging.
debug pacemakerDebug
Expand All @@ -60,6 +63,16 @@ type pacemaker struct {
logPrefix string
}

// Structure to handle delaying block preparation (reaping the block mempool)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
type latestPrepareRequest struct {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m sync.Mutex
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
ch chan bool
cancelFunc context.CancelFunc
blockProposed bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
deadlinePassed bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
delayedHeight uint64
}

func CreatePacemaker(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(pacemaker).Create(bus, options...)
}
Expand All @@ -85,6 +98,14 @@ func (*pacemaker) Create(bus modules.Bus, options ...modules.ModuleOption) (modu
debugTimeBetweenStepsMsec: m.pacemakerCfg.GetDebugTimeBetweenStepsMsec(),
quorumCertificate: nil,
}
m.latestPrepareRequest = latestPrepareRequest{
m: sync.Mutex{},
ch: nil,
cancelFunc: nil,
blockProposed: false,
deadlinePassed: false,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
delayedHeight: 0,
}

return m, nil
}
Expand Down Expand Up @@ -171,6 +192,7 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e

func (m *pacemaker) RestartTimer() {
// NOTE: Not deferring a cancel call because this function is asynchronous.
// DISCUSS: Should we have a lock to manipulate m.roundCancelFunc?
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if m.roundCancelFunc != nil {
m.roundCancelFunc()
}
Expand Down Expand Up @@ -243,6 +265,87 @@ func (m *pacemaker) NewHeight() {
)
}

// This is called each time there is a NewRound message received by the leader from replicas
// With the introduction of MinBlockTimeMsec delay, multiple concurrent calls may happen
// It makes sure that:
// - Block proposal is made by only one of the possible `HotstuffLeaderMessageHandler.HandleNewRoundMessage()` concurrent (because delayed) calls
// - If the timer expires, the first call to this method will trigger the block proposal
// - If a late message is received after a block is proposed by another call, the late message is discarded
// - Reads and affectations to pacemaker.latestPrepareRequest state are protected by a mutex
func (m *pacemaker) ProcessDelayedBlockPrepare(currentHeight uint64) chan bool {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m.latestPrepareRequest.m.Lock()
defer m.latestPrepareRequest.m.Unlock()

// Prepare channel for the the current request
ch := make(chan bool)

// First time to build a block for current height, cancel previous timer if any
if m.latestPrepareRequest.delayedHeight < currentHeight {
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// Discard the request building the old height
if m.latestPrepareRequest.ch != nil {
m.latestPrepareRequest.ch <- false
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}
// Discard the timer for the old height
if m.latestPrepareRequest.cancelFunc != nil {
m.latestPrepareRequest.cancelFunc()
}

m.latestPrepareRequest.ch = ch
m.latestPrepareRequest.blockProposed = false
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
m.latestPrepareRequest.deadlinePassed = false
m.latestPrepareRequest.delayedHeight = currentHeight

// DISCUSS: This may be the the wrong time/place to start the timer,
// this means that its starts after the first NewRound message satisfying quorum is received
// We may start it when first NewRound message ever is received
minBlockTime := time.Duration(m.pacemakerCfg.MinBlockTimeMsec * uint64(time.Millisecond))
ctx, cancel := context.WithCancel(context.TODO())
m.latestPrepareRequest.cancelFunc = cancel

go func() {
select {
case <-ctx.Done():
return
case <-m.GetBus().GetRuntimeMgr().GetClock().After(minBlockTime):
m.latestPrepareRequest.m.Lock()
defer m.latestPrepareRequest.m.Unlock()

// After the timeout, if there was any candidate request waiting for a signal, tell it to build the block
if m.latestPrepareRequest.ch != nil {
m.latestPrepareRequest.ch <- true
m.latestPrepareRequest.blockProposed = true
}

// From now on, build the block ASAP
m.latestPrepareRequest.deadlinePassed = true
}
}()

return ch
}

if m.latestPrepareRequest.blockProposed {
go func() { ch <- false }()

return ch
}

if m.latestPrepareRequest.ch != nil {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m.latestPrepareRequest.ch <- false
}

m.latestPrepareRequest.ch = ch

if m.latestPrepareRequest.deadlinePassed {
go func() {
ch <- true
m.latestPrepareRequest.blockProposed = true
}()
}

return ch
}

func (m *pacemaker) startNextView(qc *typesCons.QuorumCertificate, forceNextView bool) {
defer m.RestartTimer()

Expand Down
1 change: 1 addition & 0 deletions runtime/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewDefaultConfig(options ...func(*Config)) *Config {
TimeoutMsec: defaults.DefaultPacemakerTimeoutMsec,
Manual: defaults.DefaultPacemakerManual,
DebugTimeBetweenStepsMsec: defaults.DefaultPacemakerDebugTimeBetweenStepsMsec,
MinBlockTimeMsec: defaults.DefaultPacemakerMinBlockTimeMsec,
},
},
Utility: &UtilityConfig{
Expand Down
1 change: 1 addition & 0 deletions runtime/configs/proto/consensus_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ message PacemakerConfig {
uint64 timeout_msec = 1;
bool manual = 2;
uint64 debug_time_between_steps_msec = 3;
uint64 min_block_time_msec = 4;
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions runtime/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
DefaultPacemakerTimeoutMsec = uint64(10000)
DefaultPacemakerManual = true
DefaultPacemakerDebugTimeBetweenStepsMsec = uint64(1000)
DefaultPacemakerMinBlockTimeMsec = uint64(5000)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// utility
DefaultUtilityMaxMempoolTransactionBytes = uint64(1024 ^ 3) // 1GB V0 defaults
DefaultUtilityMaxMempoolTransactions = uint32(9000)
Expand Down
2 changes: 2 additions & 0 deletions runtime/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Add a new MinBlockTimeMsec config field to consensus
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

## [0.0.0.44] - 2023-06-26

- Add a new ServiceConfig field to servicer config
Expand Down
1 change: 1 addition & 0 deletions runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ func TestNewManagerFromReaders(t *testing.T) {
TimeoutMsec: 10000,
Manual: true,
DebugTimeBetweenStepsMsec: 1000,
MinBlockTimeMsec: 5000,
},
ServerModeEnabled: true,
},
Expand Down
Loading