Skip to content

Commit

Permalink
Use factory module to reconfigure avail. layer
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic committed Aug 22, 2022
1 parent 5858184 commit ace0efa
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 52 deletions.
7 changes: 6 additions & 1 deletion pkg/deploytest/fakeapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
type FakeApp struct {
ProtocolModule t.ModuleID

// Stores the current ISS epoch.
currentEpoch t.EpochNr

Membership map[t.NodeID]t.NodeAddress

// The state of the FakeApp only consists of a counter of processed requests.
Expand All @@ -37,6 +40,7 @@ type FakeApp struct {
func NewFakeApp(protocolModule t.ModuleID, membership map[t.NodeID]t.NodeAddress) *FakeApp {
return &FakeApp{
ProtocolModule: protocolModule,
currentEpoch: 0,
Membership: membership,
delivered: make(map[t.ClientID]map[t.ReqNo]struct{}),
RequestsProcessed: 0,
Expand Down Expand Up @@ -71,6 +75,7 @@ func (fa *FakeApp) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
return nil, fmt.Errorf("app restore state error: %w", err)
}
case *eventpb.Event_NewEpoch:
fa.currentEpoch = t.EpochNr(e.NewEpoch.EpochNr)
return events.ListOf(events.NewConfig(fa.ProtocolModule, fa.Membership)), nil
default:
return nil, fmt.Errorf("unexpected type of App event: %T", event.Type)
Expand Down Expand Up @@ -99,7 +104,7 @@ func (fa *FakeApp) ApplyDeliver(deliver *eventpb.Deliver) (*events.EventList, er
}

return events.ListOf(availabilityevents.RequestTransactions(
"availability",
t.ModuleID("availability").Then(t.ModuleID(fmt.Sprintf("%v", fa.currentEpoch))),
deliver.Cert,
&availabilitypb.RequestTransactionsOrigin{
Module: "app",
Expand Down
7 changes: 4 additions & 3 deletions pkg/factorymodule/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package factorymodule

import (
"fmt"

"google.golang.org/protobuf/proto"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/messagebuffer"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
t "github.com/filecoin-project/mir/pkg/types"
"google.golang.org/protobuf/proto"
)

// TODO: Add support for active modules as well.
Expand Down Expand Up @@ -117,9 +119,8 @@ func (fm *FactoryModule) applyNewModule(newModule *factorymodulepb.NewModule) (*
fm.messageBuffer.Iterate(func(_ t.NodeID, msg proto.Message) messagebuffer.Applicable {
if t.ModuleID(msg.(*eventpb.Event).DestModule) == id {
return messagebuffer.Current
} else {
return messagebuffer.Future
}
return messagebuffer.Future
}, func(_ t.NodeID, msg proto.Message) {
bufferedMessages.PushBack(msg.(*eventpb.Event))
})
Expand Down
45 changes: 40 additions & 5 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (

"github.com/filecoin-project/mir/pkg/contextstore"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/factorymodule"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/messagebuffer"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb"
"github.com/filecoin-project/mir/pkg/pb/commonpb"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
"github.com/filecoin-project/mir/pkg/pb/isspb"
"github.com/filecoin-project/mir/pkg/pb/messagepb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
Expand Down Expand Up @@ -346,9 +349,17 @@ func (iss *ISS) ImplementsModule() {}
// This event is only expected to be applied once at startup,
// after all the events stored in the WAL have been applied and before any other event has been applied.
func (iss *ISS) applyInit() (*events.EventList, error) {
eventsOut := events.EmptyList()

// Initialize the availability modules.
for epochNr, configuration := range iss.configurations {
eventsOut.PushBack(iss.initAvailabilityModule(t.EpochNr(epochNr), t.MembershipPb(configuration)))
}

// Trigger an Init event at all orderers.
return iss.initOrderers(), nil
eventsOut.PushBackList(iss.initOrderers())

return eventsOut, nil
}

// applyHashResult applies the HashResult event to the state of the ISS protocol state machine.
Expand Down Expand Up @@ -481,21 +492,28 @@ func (iss *ISS) applyAppSnapshot(appSnapshot *eventpb.AppSnapshot) (*events.Even
}

func (iss *ISS) applyNewConfig(config *eventpb.NewConfig) (*events.EventList, error) {
eventsOut := events.EmptyList()

// Convenience variable.
newMembership := t.Membership(config.Membership)

// Initialize the new availability module
eventsOut.PushBack(iss.initAvailabilityModule(t.EpochNr(len(iss.configurations)), config.Membership))

iss.logger.Log(logging.LevelDebug, "Adding configuration",
"forEpoch", len(iss.configurations),
"currentEpoch", iss.epoch.Nr,
"newConfigNodes", maputil.GetSortedKeys(newMembership))

// Save the new configuration.
iss.configurations = append(iss.configurations, newMembership)

// Advance to the next epoch if this configuration was the last missing bit.
if iss.epochFinished() {
return iss.advanceEpoch(), nil
eventsOut.PushBackList(iss.advanceEpoch())
}

return events.EmptyList(), nil
return eventsOut, nil
}

// applyLogEntryHashResult applies the event of receiving the digest of a delivered CommitLogEntry.
Expand Down Expand Up @@ -795,6 +813,7 @@ func (iss *ISS) applyStableCheckpointSigVerResult(signaturesOK bool, chkp *isspb
iss.configurations = make([]map[t.NodeID]t.NodeAddress, len(chkp.Snapshot.Configuration.Memberships))
for i, m := range chkp.Snapshot.Configuration.Memberships {
iss.configurations[i] = t.Membership(m)
eventsOut.PushBack(iss.initAvailabilityModule(t.EpochNr(i), m))
}

// TODO: Check if all the configurations are present in the checkpoint.
Expand All @@ -821,6 +840,17 @@ func (iss *ISS) applyStableCheckpointSigVerResult(signaturesOK bool, chkp *isspb
return eventsOut
}

func (iss *ISS) initAvailabilityModule(epochNr t.EpochNr, membership *commonpb.Membership) *eventpb.Event {
return factorymodule.FactoryNewModule(
iss.moduleConfig.Availability,
iss.moduleConfig.Availability.Then(t.ModuleID(fmt.Sprintf("%v", epochNr))),
t.RetentionIndex(epochNr),
&factorymodulepb.GeneratorParams{Type: &factorymodulepb.GeneratorParams_MultisigCollector{
MultisigCollector: &mscpb.InstanceParams{Membership: membership},
}},
)
}

// applySBMessage applies a message destined for an orderer (i.e. a Sequenced Broadcast implementation).
func (iss *ISS) applySBMessage(message *isspb.SBMessage, from t.NodeID) *events.EventList {

Expand Down Expand Up @@ -932,7 +962,11 @@ func (iss *ISS) initEpoch(newEpoch t.EpochNr, membership []t.NodeID) {
sbInst := newPbftInstance(
iss.ownID,
seg,
newPBFTConfig(iss.config, membership),
newPBFTConfig(
iss.config,
membership,
iss.moduleConfig.Availability.Then(t.ModuleID(fmt.Sprintf("%v", epoch.Nr))),
),
&sbEventService{moduleConfig: iss.moduleConfig, epoch: newEpoch, instance: t.SBInstanceNr(i)},
logging.Decorate(iss.logger, "PBFT: ", "epoch", newEpoch, "instance", i))

Expand Down Expand Up @@ -1149,11 +1183,12 @@ func membershipSet(membership []t.NodeID) map[t.NodeID]struct{} {
}

// Returns a configuration of a new PBFT instance based on the current ISS configuration.
func newPBFTConfig(issParams *ModuleParams, membership []t.NodeID) *PBFTConfig {
func newPBFTConfig(issParams *ModuleParams, membership []t.NodeID, availabilityModuleID t.ModuleID) *PBFTConfig {

// Return a new PBFT configuration with selected values from the ISS configuration.
return &PBFTConfig{
Membership: membership,
AvailabilityModuleID: availabilityModuleID,
MaxProposeDelay: issParams.MaxProposeDelay,
MsgBufCapacity: issParams.MsgBufCapacity,
DoneResendPeriod: issParams.PBFTDoneResendPeriod,
Expand Down
51 changes: 34 additions & 17 deletions pkg/iss/iss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"github.com/filecoin-project/mir"
"github.com/filecoin-project/mir/pkg/availability/multisigcollector"
"github.com/filecoin-project/mir/pkg/deploytest"
"github.com/filecoin-project/mir/pkg/factorymodule"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
"github.com/filecoin-project/mir/pkg/testsim"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"
)

const (
Expand Down Expand Up @@ -415,31 +418,45 @@ func newDeployment(conf *TestConfig) (*deploytest.Deployment, error) {
)

// Instantiate the availability layer.
multisigCollector, err := multisigcollector.NewModule(
&multisigcollector.ModuleConfig{
Self: "availability",
Mempool: "mempool",
Net: "net",
Crypto: "crypto",
},
&multisigcollector.ModuleParams{
InstanceUID: []byte("chat multisig collector"),
AllNodes: nodeIDs,
F: (len(nodeIDs) - 1) / 2,
},
nodeID,
mscFactory := factorymodule.New(
"availability",
factorymodule.DefaultParams(
func(mscID t.ModuleID, params *factorymodulepb.GeneratorParams) (modules.PassiveModule, error) {

m := params.Type.(*factorymodulepb.GeneratorParams_MultisigCollector).MultisigCollector.Membership
mscNodeIDs := maputil.GetSortedKeys(t.Membership(m))

// Instantiate the availability layer.
multisigCollector, err := multisigcollector.NewModule(
&multisigcollector.ModuleConfig{
Self: mscID,
Mempool: "mempool",
Net: "net",
Crypto: "crypto",
},
&multisigcollector.ModuleParams{
InstanceUID: []byte(mscID),
AllNodes: mscNodeIDs,
F: (len(mscNodeIDs) - 1) / 2,
},
nodeID,
)
if err != nil {
return nil, err
}
return multisigCollector, nil
},
),
logger,
)
if err != nil {
return nil, fmt.Errorf("error instantiating availability module: %w", err)
}

modulesWithDefaults, err := DefaultModules(map[t.ModuleID]modules.Module{
"app": fakeApp,
"crypto": cryptoSystem.Module(nodeID),
"iss": issProtocol,
"net": transport,
"mempool": mempool,
"availability": multisigCollector,
"availability": mscFactory,
})
if err != nil {
return nil, fmt.Errorf("error initializing the Mir modules: %w", err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/iss/pbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ func (pbft *pbftInstance) Segment() *segment {
return pbft.segment
}

// AvailabilityModuleID returns the ID of the availability module
// from which this SB instance gets its availability certificates.
func (pbft *pbftInstance) AvailabilityModuleID() t.ModuleID {
return pbft.config.AvailabilityModuleID
}

// ============================================================
// General protocol logic (other specific parts in separate files)
// ============================================================
Expand Down
3 changes: 3 additions & 0 deletions pkg/iss/pbftconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type PBFTConfig struct {
// Must not be empty.
Membership []t.NodeID

// ID of the availability module from which this PBFT instance gets its availability certificates.
AvailabilityModuleID t.ModuleID

// The maximum time duration between two proposals of new certificatees during normal operation.
// This parameter caps the waiting time in order to bound latency.
// When MaxProposeDelay has elapsed since the last proposal,
Expand Down
19 changes: 13 additions & 6 deletions pkg/iss/sbinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type sbInstance interface {

// Segment returns the segment assigned to this SB instance.
Segment() *segment

// AvailabilityModuleID returns the ID of the availability module
// from which this SB instance gets its availability certificates.
AvailabilityModuleID() t.ModuleID
}

// ============================================================
Expand Down Expand Up @@ -90,12 +94,15 @@ func (iss *ISS) applySBInstDeliver(instance sbInstance, deliver *isspb.SBDeliver
// that the orderer will propose.
// To this end, applySBInstCertRequest requests a new certificate from the availability layer.
func (iss *ISS) applySBInstCertRequest(instance sbInstance) *events.EventList {
return events.ListOf(availabilityevents.RequestCert(iss.moduleConfig.Availability, &availabilitypb.RequestCertOrigin{
Module: iss.moduleConfig.Self.Pb(),
Type: &availabilitypb.RequestCertOrigin_ContextStore{ContextStore: &contextstorepb.Origin{
ItemID: iss.contextStore.Store(instance).Pb(),
}},
}))
return events.ListOf(availabilityevents.RequestCert(
instance.AvailabilityModuleID(),
&availabilitypb.RequestCertOrigin{
Module: iss.moduleConfig.Self.Pb(),
Type: &availabilitypb.RequestCertOrigin_ContextStore{ContextStore: &contextstorepb.Origin{
ItemID: iss.contextStore.Store(instance).Pb(),
}},
},
))
}

func (iss *ISS) applyNewCert(newCert *availabilitypb.NewCert) (*events.EventList, error) {
Expand Down
18 changes: 16 additions & 2 deletions samples/chat-demo/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"fmt"
"strings"

"github.com/multiformats/go-multiaddr"

availabilityevents "github.com/filecoin-project/mir/pkg/availability/events"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/modules"
Expand All @@ -27,8 +29,10 @@ import (
"github.com/filecoin-project/mir/pkg/pb/eventpb"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"
)

"github.com/multiformats/go-multiaddr"
const (
availabilityModuleID = t.ModuleID("availability")
)

// ChatApp and its methods implement the application logic of the small chat demo application
Expand All @@ -41,6 +45,9 @@ type ChatApp struct {
// to which each delivered request appends one message.
messages []string

// Stores the current ISS epoch.
currentEpoch t.EpochNr

// Stores the next membership to be submitted to the Node on the next NewEpoch event.
newMembership map[t.NodeID]t.NodeAddress

Expand All @@ -58,6 +65,7 @@ func NewChatApp(initialMembership map[t.NodeID]t.NodeAddress, transport net.Tran

return &ChatApp{
messages: make([]string, 0),
currentEpoch: 0,
newMembership: initialMembership,
transport: transport,
}
Expand Down Expand Up @@ -111,7 +119,7 @@ func (chat *ChatApp) ApplyDeliver(deliver *eventpb.Deliver) (*events.EventList,
}

return events.ListOf(availabilityevents.RequestTransactions(
"availability",
availabilityModuleID.Then(t.ModuleID(fmt.Sprintf("%v", chat.currentEpoch))),
deliver.Cert,
&availabilitypb.RequestTransactionsOrigin{
Module: "app",
Expand Down Expand Up @@ -193,6 +201,9 @@ func (chat *ChatApp) applyConfigMsg(configMsg string) {

func (chat *ChatApp) applyNewEpoch(newEpoch *eventpb.NewEpoch) (*events.EventList, error) {

// Update current epoch number.
chat.currentEpoch = t.EpochNr(newEpoch.EpochNr)

// Create network connections to all nodes in the new membership.
var nodeAddrs map[t.NodeID]t.NodeAddress
var err error
Expand Down Expand Up @@ -234,6 +245,9 @@ func (chat *ChatApp) applyRestoreState(snapshot *commonpb.StateSnapshot) (*event
// Restore chat messages
chat.restoreChat(snapshot.AppData)

// Restore epoch number
chat.currentEpoch = t.EpochNr(snapshot.Configuration.EpochNr)

// Restore configuration
if err := chat.restoreConfiguration(snapshot.Configuration); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ace0efa

Please sign in to comment.