Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISS: Add snapshot hash to checkpoints #12

Merged
merged 7 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/grpctransport/grpctransport.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions pkg/grpctransport/grpctransport_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 52 additions & 21 deletions pkg/iss/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ SPDX-License-Identifier: Apache-2.0
package iss

import (
"bytes"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/isspb"
t "github.com/filecoin-project/mir/pkg/types"
)

// checkpointTracker represents the state associated with a single instance of the checkpoint protocol
// (establishing a single stable checkpoint).
type checkpointTracker struct {
logging.Logger

// Epoch to which this checkpoint belongs.
// It is always the epoch the checkpoint's associated sequence number (seqNr) is part of.
Expand All @@ -38,16 +42,24 @@ type checkpointTracker struct {
// Application snapshot data associated with this checkpoint.
appSnapshot []byte

// Hash of the application snapshot data associated with this checkpoint.
appSnapshotHash []byte

// Set of nodes from which any Checkpoint message has been received.
// This is necessary for ignoring all but the first message a node sends, regardless of the snapshot hash.
confirmations map[t.NodeID]struct{}

// Set of Checkpoint messages that were received ahead of time.
pendingMessages map[t.NodeID]*isspb.Checkpoint
}

// newCheckpointTracker allocates and returns a new instance of a checkpointTracker associated with sequence number sn.
func newCheckpointTracker(sn t.SeqNr) *checkpointTracker {
func newCheckpointTracker(sn t.SeqNr, logger logging.Logger) *checkpointTracker {
return &checkpointTracker{
seqNr: sn,
confirmations: make(map[t.NodeID]struct{}),
Logger: logger,
seqNr: sn,
confirmations: make(map[t.NodeID]struct{}),
pendingMessages: make(map[t.NodeID]*isspb.Checkpoint),
// the epoch and membership fields will be set later by iss.startCheckpoint
// the appSnapshot field will be set by ProcessAppSnapshot
}
Expand All @@ -60,7 +72,8 @@ func (iss *ISS) getCheckpointTracker(sn t.SeqNr) *checkpointTracker {

// If no checkpoint tracker with sequence number sn exists, create a new one.
if _, ok := iss.checkpoints[sn]; !ok {
iss.checkpoints[sn] = newCheckpointTracker(sn)
logger := logging.Decorate(iss.logger, "CT: ", "sn", sn)
iss.checkpoints[sn] = newCheckpointTracker(sn, logger)
}

// Look up and return checkpoint tracker.
Expand Down Expand Up @@ -90,50 +103,67 @@ func (ct *checkpointTracker) Start(epoch t.EpochNr, membership []t.NodeID) *even
func (ct *checkpointTracker) ProcessAppSnapshot(snapshot []byte) *events.EventList {

// Save received snapshot
// TODO: Compute and save the hash of the snapshot as well.
ct.appSnapshot = snapshot

// Initiate computing the hash of the snapshot
hashEvent := events.HashRequest([][]byte{snapshot}, AppSnapshotHashOrigin(ct.seqNr))

return (&events.EventList{}).PushBack(hashEvent)
}

func (ct *checkpointTracker) ProcessAppSnapshotHash(snapshotHash []byte) *events.EventList {

// Save the received snapshot hash
ct.appSnapshotHash = snapshotHash

// Write Checkpoint to WAL
walEvent := events.WALAppend(PersistCheckpointEvent(ct.seqNr, ct.appSnapshot), t.WALRetIndex(ct.epoch))
persistEvent := PersistCheckpointEvent(ct.seqNr, ct.appSnapshot, ct.appSnapshotHash)
walEvent := events.WALAppend(persistEvent, t.WALRetIndex(ct.epoch))

// Send a checkpoint message to all nodes after persisting checkpoint to the WAL.
// TODO: Add hash of the snapshot
// TODO: Add signature.
// TODO: Implement checkpoint message retransmission.
walEvent.FollowUp(events.SendMessage(CheckpointMessage(ct.epoch, ct.seqNr), ct.membership))
m := CheckpointMessage(ct.epoch, ct.seqNr, ct.appSnapshotHash)
walEvent.FollowUp(events.SendMessage(m, ct.membership))

// If the app snapshot was the last thing missing for the checkpoint to become stable,
// also produce the necessary events.
if ct.stable() {
walEvent.FollowUps(ct.announceStable().Slice())
// Apply pending Checkpoint messages
for s, m := range ct.pendingMessages {
walEvent.FollowUps(ct.applyMessage(m, s).Slice())
}

// Return resulting WALEvent (with the SendMessage event appended).
return (&events.EventList{}).PushBack(walEvent)
}

func (ct *checkpointTracker) applyMessage(chkpMsg *isspb.Checkpoint, source t.NodeID) *events.EventList {
func (ct *checkpointTracker) applyMessage(msg *isspb.Checkpoint, source t.NodeID) *events.EventList {

// If checkpoint is already stable, ignore message.
if ct.stable() {
return &events.EventList{}
}

// TODO: Check signature of the sender.

// TODO: Distinguish messages by snapshot hash,
// separately keeping the set of nodes from which a Checkpoint message has been received.
// Check snapshot hash
if ct.appSnapshotHash == nil {
// The message is received too early, put it aside
ct.pendingMessages[source] = msg
return &events.EventList{}
} else if !bytes.Equal(ct.appSnapshotHash, msg.AppSnapshotHash) {
// Snapshot hash mismatch
ct.Log(logging.LevelWarn, "Ignoring Checkpoint message. Mismatching app snapshot hash.", "source", source)
return &events.EventList{}
}

// Ignore duplicate messages (regardless of snapshot hash).
// Ignore duplicate messages.
if _, ok := ct.confirmations[source]; ok {
return &events.EventList{}
}

// TODO: Check signature of the sender.

// TODO: Only accept messages from nodes in membership.
// This might be more tricky than it seems, especially when the membership is not yet initialized.

// Note the reception of a Checkpoint message from node `source`.
// TODO: take the snapshot hash into account. Separate data structures will be needed for that.
ct.confirmations[source] = struct{}{}

// If, after having applied this message, the checkpoint became stable, produce the necessary events.
Expand All @@ -151,8 +181,9 @@ func (ct *checkpointTracker) stable() bool {
func (ct *checkpointTracker) announceStable() *events.EventList {
// Create a stable checkpoint object.
stableCheckpoint := &isspb.StableCheckpoint{
Epoch: ct.epoch.Pb(),
Sn: ct.seqNr.Pb(),
Epoch: ct.epoch.Pb(),
Sn: ct.seqNr.Pb(),
AppSnapshotHash: ct.appSnapshotHash,
}

// First persist the checkpoint in the WAL, then announce it to the protocol.
Expand Down
9 changes: 9 additions & 0 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ func (iss *ISS) applyHashResult(result *eventpb.HashResult) *events.EventList {
case *isspb.ISSHashOrigin_LogEntrySn:
// Hash originates from delivering a CommitLogEntry.
return iss.applyLogEntryHashResult(result.Digest, t.SeqNr(origin.LogEntrySn))
case *isspb.ISSHashOrigin_AppSnapshotSn:
// Hash originates from delivering an event of the application creating a state snapshot
return iss.applyAppSnapshotHashResult(result.Digest, t.SeqNr(origin.AppSnapshotSn))
default:
panic(fmt.Sprintf("unknown origin of hash result: %T", origin))
}
Expand Down Expand Up @@ -466,6 +469,12 @@ func (iss *ISS) applyLogEntryHashResult(digest []byte, logEntrySN t.SeqNr) *even

}

// applyAppSnapshotHashResult applies the event of receiving the digest of a delivered event of the application creating a state snapshot.
// It passes the snapshot hash to the appropriate CheckpointTracker (identified by the event's associated sequence number).
func (iss *ISS) applyAppSnapshotHashResult(digest []byte, appSnapshotSN t.SeqNr) *events.EventList {
return iss.getCheckpointTracker(appSnapshotSN).ProcessAppSnapshotHash(digest)
}

// applySBEvent applies an event triggered by or addressed to an orderer (i.e., instance of Sequenced Broadcast),
// if that event belongs to the current epoch.
// TODO: Update this comment when the TODO below is addressed.
Expand Down
18 changes: 12 additions & 6 deletions pkg/iss/protobufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func HashOrigin(origin *isspb.ISSHashOrigin) *eventpb.HashOrigin {
return &eventpb.HashOrigin{Type: &eventpb.HashOrigin_Iss{origin}}
}

func PersistCheckpointEvent(sn t.SeqNr, appSnapshot []byte) *eventpb.Event {
func PersistCheckpointEvent(sn t.SeqNr, appSnapshot, appSnapshotHash []byte) *eventpb.Event {
return Event(&isspb.ISSEvent{Type: &isspb.ISSEvent_PersistCheckpoint{PersistCheckpoint: &isspb.PersistCheckpoint{
Sn: sn.Pb(),
AppSnapshot: appSnapshot,
Sn: sn.Pb(),
AppSnapshot: appSnapshot,
AppSnapshotHash: appSnapshotHash,
}}})
}

Expand Down Expand Up @@ -79,6 +80,10 @@ func SBHashOrigin(epoch t.EpochNr, instance t.SBInstanceID, origin *isspb.SBInst
}}})
}

func AppSnapshotHashOrigin(seqNr t.SeqNr) *eventpb.HashOrigin {
return HashOrigin(&isspb.ISSHashOrigin{Type: &isspb.ISSHashOrigin_AppSnapshotSn{AppSnapshotSn: seqNr.Pb()}})
}

// ------------------------------------------------------------
// SB Instance Events

Expand Down Expand Up @@ -168,10 +173,11 @@ func SBMessage(epoch t.EpochNr, instance t.SBInstanceID, msg *isspb.SBInstanceMe
}}})
}

func CheckpointMessage(epoch t.EpochNr, sn t.SeqNr) *messagepb.Message {
func CheckpointMessage(epoch t.EpochNr, sn t.SeqNr, appSnapshotHash []byte) *messagepb.Message {
return Message(&isspb.ISSMessage{Type: &isspb.ISSMessage_Checkpoint{Checkpoint: &isspb.Checkpoint{
Epoch: epoch.Pb(),
Sn: sn.Pb(),
Epoch: epoch.Pb(),
Sn: sn.Pb(),
AppSnapshotHash: appSnapshotHash,
}}})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/pb/eventpb/eventpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading