Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep resending critical messages in ISS #122

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pkg/iss/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package iss

import (
"bytes"
"github.com/filecoin-project/mir/pkg/pb/eventpb"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
Expand Down Expand Up @@ -57,15 +58,25 @@ type checkpointTracker struct {

// Set of Checkpoint messages that were received ahead of time.
pendingMessages map[t.NodeID]*isspb.Checkpoint

// Time interval for repeated retransmission of checkpoint messages.
resendPeriod t.TimeDuration
}

// newCheckpointTracker allocates and returns a new instance of a checkpointTracker associated with sequence number sn.
func newCheckpointTracker(ownID t.NodeID, sn t.SeqNr, epoch t.EpochNr, logger logging.Logger) *checkpointTracker {
func newCheckpointTracker(
ownID t.NodeID,
sn t.SeqNr,
epoch t.EpochNr,
resendPeriod t.TimeDuration,
logger logging.Logger,
) *checkpointTracker {
return &checkpointTracker{
Logger: logger,
ownID: ownID,
seqNr: sn,
epoch: epoch,
resendPeriod: resendPeriod,
signatures: make(map[t.NodeID][]byte),
confirmations: make(map[t.NodeID]struct{}),
pendingMessages: make(map[t.NodeID]*isspb.Checkpoint),
Expand Down Expand Up @@ -125,9 +136,13 @@ func (ct *checkpointTracker) ProcessCheckpointSignResult(signature []byte) *even
walEvent := events.WALAppend(walModuleName, persistEvent, t.WALRetIndex(ct.epoch))

// Send a checkpoint message to all nodes after persisting checkpoint to the WAL.
// TODO: Implement checkpoint message retransmission.
m := CheckpointMessage(ct.epoch, ct.seqNr, ct.appSnapshotHash, signature)
walEvent.FollowUp(events.SendMessage(netModuleName, m, ct.membership))
walEvent.FollowUp(events.TimerRepeat(
"timer",
[]*eventpb.Event{events.SendMessage(netModuleName, m, ct.membership)},
ct.resendPeriod,
t.TimerRetIndex(ct.epoch)),
)

// Apply pending Checkpoint messages
for s, m := range ct.pendingMessages {
Expand Down
4 changes: 4 additions & 0 deletions pkg/iss/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Config struct {
// and, if so, sends them the latest state.
CatchUpTimerPeriod time.Duration

// Time interval for repeated retransmission of checkpoint messages.
CheckpointResendPeriod time.Duration

// View change timeout for the PBFT sub-protocol, in ticks.
// TODO: Separate this in a sub-group of the ISS config, maybe even use a field of type PBFTConfig in Config.
PBFTDoneResendPeriod time.Duration
Expand Down Expand Up @@ -194,6 +197,7 @@ func DefaultConfig(membership []t.NodeID) *Config {
CatchUpTimerPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
PBFTDoneResendPeriod: maxProposeDelay,
PBFTCatchUpDelay: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
CheckpointResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
PBFTViewChangeBatchTimeout: 4 * maxProposeDelay,
PBFTViewChangeSegmentTimeout: 2 * time.Duration(segmentLength) * maxProposeDelay,
PBFTViewChangeResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
Expand Down
9 changes: 7 additions & 2 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,13 @@ func (iss *ISS) initEpoch(newEpoch t.EpochNr) {
epoch := &epochInfo{
Nr: newEpoch,
Membership: iss.config.Membership, // TODO: Make a proper copy once reconfiguration is supported.
Checkpoint: newCheckpointTracker(iss.ownID, iss.nextDeliveredSN, newEpoch,
logging.Decorate(iss.logger, "CT: ", "epoch", newEpoch)),
Checkpoint: newCheckpointTracker(
iss.ownID,
iss.nextDeliveredSN,
newEpoch,
t.TimeDuration(iss.config.CheckpointResendPeriod),
logging.Decorate(iss.logger, "CT: ", "epoch", newEpoch),
),
}
iss.epochs[newEpoch] = epoch
iss.epoch = epoch
Expand Down
10 changes: 8 additions & 2 deletions pkg/iss/pbftgoodcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (pbft *pbftInstance) propose(batch *requestpb.Batch) *events.EventList {
preprepare := pbftPreprepareMsg(sn, pbft.view, batch, false)

// Create a Preprepare message send Event.
// No need for periodic re-transmission.
// In the worst case, dropping of these messages may result in a view change, but will not compromise correctness.
msgSendEvent := pbft.eventService.SendMessage(
PbftPreprepareSBMessage(preprepare),
pbft.segment.Membership,
Expand Down Expand Up @@ -234,7 +236,9 @@ func (pbft *pbftInstance) sendPrepare(prepare *isspbftpb.Prepare) *events.EventL
// Create persist event.
persistEvent := pbft.eventService.WALAppend(PbftPersistPrepare(prepare))

// Append send event as a follow-up
// Append send event as a follow-up.
// No need for periodic re-transmission.
// In the worst case, dropping of these messages may result in a view change, but will not compromise correctness.
persistEvent.FollowUp(pbft.eventService.SendMessage(
PbftPrepareSBMessage(prepare),
pbft.segment.Membership,
Expand Down Expand Up @@ -279,7 +283,9 @@ func (pbft *pbftInstance) sendCommit(commit *isspbftpb.Commit) *events.EventList
// Create persist event.
persistEvent := pbft.eventService.WALAppend(PbftPersistCommit(commit))

// Append send event as a follow-up
// Append send event as a follow-up.
// No need for periodic re-transmission.
// In the worst case, dropping of these messages may result in a view change, but will not compromise correctness.
persistEvent.FollowUp(pbft.eventService.SendMessage(
PbftCommitSBMessage(commit),
pbft.segment.Membership,
Expand Down
1 change: 1 addition & 0 deletions pkg/iss/pbftsegmentchkp.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (pbft *pbftInstance) applyMsgCatchUpRequest(
if preprepare := pbft.lookUpPreprepare(t.SeqNr(catchUpReq.Sn), catchUpReq.Digest); preprepare != nil {

// If the requested Preprepare message is available, send it to the originator of the request.
// No need for periodic re-transmission. The requester will re-transmit the request if needed.
return events.ListOf(pbft.eventService.SendMessage(PbftCatchUpResponseSBMessage(preprepare), []t.NodeID{from}))
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/iss/pbftviewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func (pbft *pbftInstance) applyEmptyPreprepareHashResult(digests [][]byte, view
return pbft.sendNewView(view, state)
}

pbft.logger.Log(logging.LevelDebug, "Some Preprepares missing. Asking for retransmission.")
// If some Preprepares for re-proposing are still missing, fetch them from other nodes.
pbft.logger.Log(logging.LevelDebug, "Some Preprepares missing. Asking for retransmission.")
return state.askForMissingPreprepares(pbft.eventService)
}

Expand All @@ -228,6 +228,9 @@ func (pbft *pbftInstance) applyMsgPreprepareRequest(
if preprepare := pbft.lookUpPreprepare(t.SeqNr(preprepareRequest.Sn), preprepareRequest.Digest); preprepare != nil {

// If the requested Preprepare message is available, send it to the originator of the request.
// No need for periodic re-transmission.
// In the worst case, dropping of these messages may result in another view change,
// but will not compromise correctness.
return events.ListOf(
pbft.eventService.SendMessage(PbftMissingPreprepareSBMessage(preprepare), []t.NodeID{from}),
)
Expand Down Expand Up @@ -313,6 +316,8 @@ func (pbft *pbftInstance) sendNewView(view t.PBFTViewNr, vcState *pbftViewChange
})

// Construct, persist and send the NewView message.
// No need for periodic re-transmission.
// In the worst case, dropping of these messages may result in a view change, but will not compromise correctness.
newView := pbftNewViewMsg(view, viewChangeSenders, signedViewChanges, preprepareSeqNrs, preprepares)
persistEvent := pbft.eventService.WALAppend(PbftPersistNewView(newView))
persistEvent.FollowUp(pbft.eventService.SendMessage(PbftNewViewSBMessage(newView), pbft.segment.Membership))
Expand Down
10 changes: 8 additions & 2 deletions pkg/iss/pbftviewchangestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,16 @@ func (vcState *pbftViewChangeState) SetLocalPreprepares(pbft *pbftInstance, view
}
}

// askForMissingPreprepares requests the Preprepare messages that are part of a new view.
// The new primary might have received a prepare certificate from other nodes in the ViewChange messages they sent
// and thus the new primary has to re-propose the corresponding batch by including the corresponding Preprepare message
// the NewView message. However, the new primary might not have all the corresponding Preprepare messages,
// in which case it calls this function.
// Note that the requests for missing Preprepare messages need not necessarily be periodically re-transmitted.
// If they are dropped, the new primary will simply never send a NewView message
// and will be succeeded by another primary after another view change.
func (vcState *pbftViewChangeState) askForMissingPreprepares(eventService *sbEventService) *events.EventList {

// TODO: Do this periodically, not just once. Messages might get lost!

eventsOut := events.EmptyList()
for sn, digest := range vcState.reproposals {
if len(digest) > 0 && vcState.preprepares[sn] == nil {
Expand Down