Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Availability module integration #192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions cmd/mircat/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func debuggerNode(id t.NodeID, membership map[t.NodeID]t.NodeAddress) (*mir.Node
// Instantiate an ISS protocol module with the default configuration.
// TODO: The initial app state must be involved here. Otherwise checkpoint hashes might not match.
issConfig := iss.DefaultConfig(membership)
protocol, err := iss.New(id, issConfig, iss.InitialStateSnapshot([]byte{}, issConfig), logging.Decorate(logger, "ISS: "))
protocol, err := iss.New(id, iss.DefaultModuleConfig(), issConfig, iss.InitialStateSnapshot([]byte{}, issConfig), logging.Decorate(logger, "ISS: "))
if err != nil {
return nil, fmt.Errorf("could not instantiate protocol module: %w", err)
}
Expand All @@ -132,12 +132,8 @@ func debuggerNode(id t.NodeID, membership map[t.NodeID]t.NodeAddress) (*mir.Node
modulesWithDefaults, err := iss.DefaultModules(map[t.ModuleID]modules.Module{
"net": modules.NullActive{},
"crypto": mirCrypto.New(&mirCrypto.DummyCrypto{DummySig: []byte{0}}),
"app": &deploytest.FakeApp{
ProtocolModule: "iss",
Membership: nil,
RequestsProcessed: 0,
},
"iss": protocol,
"app": deploytest.NewFakeApp("iss", nil),
"iss": protocol,
})
if err != nil {
panic(fmt.Errorf("error initializing the Mir modules: %w", err))
Expand Down
91 changes: 82 additions & 9 deletions pkg/deploytest/fakeapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@ import (
"encoding/binary"
"fmt"

availabilityevents "github.com/filecoin-project/mir/pkg/availability/events"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb"
"github.com/filecoin-project/mir/pkg/pb/commonpb"
"github.com/filecoin-project/mir/pkg/pb/contextstorepb"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

// FakeApp represents a dummy stub application used for testing only.
type FakeApp struct {
ProtocolModule t.ModuleID

// Stores the current ISS epoch.
currentEpoch t.EpochNr

Membership map[t.NodeID]t.NodeAddress

// The state of the FakeApp only consists of a counter of processed requests.
RequestsProcessed uint64

// Map of delivered requests that is used to filter duplicates.
// TODO: Implement compaction (client watermarks) so that this map does not grow indefinitely.
delivered map[t.ClientID]map[t.ReqNo]struct{}
}

func NewFakeApp(protocolModule t.ModuleID, membership map[t.NodeID]t.NodeAddress) *FakeApp {
return &FakeApp{
ProtocolModule: protocolModule,
currentEpoch: 0,
Membership: membership,
delivered: make(map[t.ClientID]map[t.ReqNo]struct{}),
RequestsProcessed: 0,
}
}

func (fa *FakeApp) ApplyEvents(eventsIn *events.EventList) (*events.EventList, error) {
Expand All @@ -37,8 +56,13 @@ func (fa *FakeApp) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
case *eventpb.Event_Init:
// no actions on init
case *eventpb.Event_Deliver:
if err := fa.ApplyBatch(e.Deliver.Batch); err != nil {
return nil, fmt.Errorf("app batch delivery error: %w", err)
return fa.ApplyDeliver(e.Deliver)
case *eventpb.Event_Availability:
switch e := e.Availability.Type.(type) {
case *availabilitypb.Event_ProvideTransactions:
return fa.applyProvideTransactions(e.ProvideTransactions)
default:
return nil, fmt.Errorf("unexpected availability event type: %T", e)
}
case *eventpb.Event_AppSnapshotRequest:
return events.ListOf(events.AppSnapshotResponse(
Expand All @@ -51,6 +75,7 @@ func (fa *FakeApp) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
return nil, fmt.Errorf("app restore state error: %w", err)
}
case *eventpb.Event_NewEpoch:
fa.currentEpoch = t.EpochNr(e.NewEpoch.EpochNr)
return events.ListOf(events.NewConfig(fa.ProtocolModule, fa.Membership)), nil
default:
return nil, fmt.Errorf("unexpected type of App event: %T", event.Type)
Expand All @@ -62,13 +87,61 @@ func (fa *FakeApp) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
// The ImplementsModule method only serves the purpose of indicating that this is a Module and must not be called.
func (fa *FakeApp) ImplementsModule() {}

// ApplyBatch applies a batch of requests
func (fa *FakeApp) ApplyBatch(batch *requestpb.Batch) error {
for _, req := range batch.Requests {
fa.RequestsProcessed++
fmt.Printf("Received request: \"%s\". Processed requests: %d\n", string(req.Req.Data), fa.RequestsProcessed)
// ApplyDeliver applies a batch of requests to the state of the application.
func (fa *FakeApp) ApplyDeliver(deliver *eventpb.Deliver) (*events.EventList, error) {

// Skip padding certificates. Deliver events with nil certificates are considered noops.
if deliver.Cert.Type == nil {
return events.EmptyList(), nil
}
return nil

switch c := deliver.Cert.Type.(type) {
case *availabilitypb.Cert_Msc:

if len(c.Msc.BatchId) == 0 {
fmt.Println("Received empty batch availability certificate.")
return events.EmptyList(), nil
}

return events.ListOf(availabilityevents.RequestTransactions(
t.ModuleID("availability").Then(t.ModuleID(fmt.Sprintf("%v", fa.currentEpoch))),
deliver.Cert,
&availabilitypb.RequestTransactionsOrigin{
Module: "app",
Type: &availabilitypb.RequestTransactionsOrigin_ContextStore{
ContextStore: &contextstorepb.Origin{ItemID: 0},
},
},
)), nil

default:
return nil, fmt.Errorf("unknown availability certificate type: %T", deliver.Cert.Type)
}
}

// ApplyBatch applies a batch of transactions.
func (fa *FakeApp) applyProvideTransactions(ptx *availabilitypb.ProvideTransactions) (*events.EventList, error) {

for _, req := range ptx.Txs {

// Convenience variables
clID := t.ClientID(req.ClientId)
reqNo := t.ReqNo(req.ReqNo)

// Only process request if it has not yet been delivered.
// TODO: Make this more efficient by compacting the delivered set.
_, ok := fa.delivered[clID]
if !ok {
fa.delivered[clID] = make(map[t.ReqNo]struct{})
}
if _, ok := fa.delivered[clID][reqNo]; !ok {
fa.delivered[clID][reqNo] = struct{}{}
fa.RequestsProcessed++
fmt.Printf("Received request: \"%s\". Processed requests: %d\n", string(req.Data), fa.RequestsProcessed)
}

}
return events.EmptyList(), nil
}

func (fa *FakeApp) Snapshot() []byte {
Expand Down
6 changes: 3 additions & 3 deletions pkg/deploytest/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (n *SimNode) SendEvents(proc *testsim.Process, eventList *events.EventList)

it := eventList.Iterator()
for e := it.Next(); e != nil; e = it.Next() {
m := t.ModuleID(e.DestModule)
m := t.ModuleID(e.DestModule).Top()
if eventsMap[m] == nil {
eventsMap[m] = events.EmptyList()
moduleIDs = append(moduleIDs, m)
Expand All @@ -81,7 +81,7 @@ func (n *SimNode) SendEvents(proc *testsim.Process, eventList *events.EventList)
})

for _, m := range moduleIDs {
proc.Send(n.moduleChans[m], eventsMap[m])
proc.Send(n.moduleChans[m.Top()], eventsMap[m.Top()])
proc.Yield() // wait until the receiver blocks
}
}
Expand All @@ -107,7 +107,7 @@ func (n *SimNode) WrapModules(mods modules.Modules) modules.Modules {
// WrapModule wraps the module to be used in simulation.
func (n *SimNode) WrapModule(id t.ModuleID, m modules.Module) modules.Module {
moduleChan := testsim.NewChan()
n.moduleChans[id] = moduleChan
n.moduleChans[id.Top()] = moduleChan

switch m := m.(type) {
case modules.ActiveModule:
Expand Down
10 changes: 5 additions & 5 deletions pkg/deploytest/testreplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ func (tr *TestReplica) Run(ctx context.Context) error {
}
tr.Modules["wal"] = wal

modules := tr.Modules
mod := tr.Modules
if tr.Sim != nil {
modules["timer"] = NewSimTimerModule(tr.Sim)
modules = tr.Sim.WrapModules(modules)
mod["timer"] = NewSimTimerModule(tr.Sim)
mod = tr.Sim.WrapModules(mod)
}

// Create the mir node for this replica.
node, err := mir.NewNode(
tr.ID,
tr.Config,
modules,
mod,
wal,
interceptor,
)
Expand All @@ -118,7 +118,7 @@ func (tr *TestReplica) Run(ctx context.Context) error {
}

// Create a RequestReceiver for request coming over the network.
requestReceiver := requestreceiver.NewRequestReceiver(node, "iss", logging.Decorate(tr.Config.Logger, "ReqRec: "))
requestReceiver := requestreceiver.NewRequestReceiver(node, tr.FakeRequestsDestModule, logging.Decorate(tr.Config.Logger, "ReqRec: "))

// TODO: do not assume that node IDs are integers.
p, err := strconv.Atoi(tr.ID.Pb())
Expand Down
15 changes: 12 additions & 3 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/filecoin-project/mir/pkg/contextstore"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb"
"github.com/filecoin-project/mir/pkg/pb/commonpb"
"github.com/filecoin-project/mir/pkg/pb/contextstorepb"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
"github.com/filecoin-project/mir/pkg/pb/messagepb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
Expand Down Expand Up @@ -264,10 +266,10 @@ func WALEntry(persistedEvent *eventpb.Event, retentionIndex t.RetentionIndex) *e
}

// Deliver returns an event of delivering a request batch to the application in sequence number order.
func Deliver(destModule t.ModuleID, sn t.SeqNr, batch *requestpb.Batch) *eventpb.Event {
func Deliver(destModule t.ModuleID, sn t.SeqNr, cert *availabilitypb.Cert) *eventpb.Event {
return &eventpb.Event{DestModule: destModule.Pb(), Type: &eventpb.Event_Deliver{Deliver: &eventpb.Deliver{
Sn: sn.Pb(),
Batch: batch,
Sn: sn.Pb(),
Cert: cert,
}}}
}

Expand Down Expand Up @@ -377,3 +379,10 @@ func NewConfig(destModule t.ModuleID, membership map[t.NodeID]t.NodeAddress) *ev
}},
}
}

func Factory(destModule t.ModuleID, evt *factorymodulepb.Factory) *eventpb.Event {
return &eventpb.Event{
Type: &eventpb.Event_Factory{Factory: evt},
DestModule: destModule.Pb(),
}
}
42 changes: 42 additions & 0 deletions pkg/factorymodule/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package events

import (
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
t "github.com/filecoin-project/mir/pkg/types"
)

func NewModule(
dest t.ModuleID,
id t.ModuleID,
retentionIndex t.RetentionIndex,
params *factorymodulepb.GeneratorParams,
) *eventpb.Event {
return events.Factory(dest, &factorymodulepb.Factory{Type: &factorymodulepb.Factory_NewModule{
NewModule: &factorymodulepb.NewModule{
ModuleId: id.Pb(),
RetentionIndex: retentionIndex.Pb(),
Params: params,
},
}})
}

func GarbageCollect(
dest t.ModuleID,
retentionIndex t.RetentionIndex,
) *eventpb.Event {
return events.Factory(dest, &factorymodulepb.Factory{Type: &factorymodulepb.Factory_GarbageCollect{
GarbageCollect: &factorymodulepb.GarbageCollect{
RetentionIndex: retentionIndex.Pb(),
},
}})
}

func EchoModuleParams(prefix string) *factorymodulepb.GeneratorParams {
return &factorymodulepb.GeneratorParams{Type: &factorymodulepb.GeneratorParams_EchoTestModule{
EchoTestModule: &factorymodulepb.EchoModuleParams{
Prefix: prefix,
},
}}
}
Loading