Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Availability module integration #192

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/mir/pkg/pb/commonpb"
"github.com/filecoin-project/mir/pkg/pb/contextstorepb"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
"github.com/filecoin-project/mir/pkg/pb/messagepb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
Expand Down Expand Up @@ -377,3 +378,10 @@ func NewConfig(destModule t.ModuleID, membership map[t.NodeID]t.NodeAddress) *ev
}},
}
}

func Factory(destModule t.ModuleID, evt *factorymodulepb.Factory) *eventpb.Event {
return &eventpb.Event{
Type: &eventpb.Event_Factory{Factory: evt},
DestModule: destModule.Pb(),
}
}
130 changes: 130 additions & 0 deletions pkg/factorymodule/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package factorymodule

import (
"fmt"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
t "github.com/filecoin-project/mir/pkg/types"
)

// TODO: Add support for active modules as well.

type moduleGenerator func(id t.ModuleID, params *factorymodulepb.GeneratorParams) modules.PassiveModule
xosmig marked this conversation as resolved.
Show resolved Hide resolved

type FactoryModule struct {
ownID t.ModuleID
generator moduleGenerator

submodules map[t.ModuleID]modules.PassiveModule
moduleRetention map[t.RetentionIndex][]t.ModuleID
retIdx t.RetentionIndex

logger logging.Logger
}

func NewFactoryModule(id t.ModuleID, generator moduleGenerator, logger logging.Logger) *FactoryModule {
return &FactoryModule{
ownID: id,
generator: generator,

submodules: make(map[t.ModuleID]modules.PassiveModule),
moduleRetention: make(map[t.RetentionIndex][]t.ModuleID),
retIdx: 0,

logger: logger,
}
}

func (fm *FactoryModule) ImplementsModule() {
}
xosmig marked this conversation as resolved.
Show resolved Hide resolved

func (fm *FactoryModule) ApplyEvents(evts *events.EventList) (*events.EventList, error) {
// TODO: Perform event processing in parallel (applyEvent will need to be made thread-safe).
// The idea is to have one internal thread per submodule, distribute the events to them through channels,
// and wait until all are processed.
return modules.ApplyEventsSequentially(evts, fm.applyEvent)
}

func (fm *FactoryModule) applyEvent(event *eventpb.Event) (*events.EventList, error) {

if t.ModuleID(event.DestModule) == fm.ownID {
switch e := event.Type.(type) {
case *eventpb.Event_Factory:
switch e := e.Factory.Type.(type) {
case *factorymodulepb.Factory_NewModule:
return fm.applyNewModule(e.NewModule)
case *factorymodulepb.Factory_GarbageCollect:
return fm.applyGarbageCollect(e.GarbageCollect)
default:
return nil, fmt.Errorf("unsupported factory event subtype: %T", e)
}
default:
return nil, fmt.Errorf("unsupported event type for factory module: %T", e)
}
}
return fm.forwardEvent(event)
}

func (fm *FactoryModule) applyNewModule(newModule *factorymodulepb.NewModule) (*events.EventList, error) {

// Convenience variables
id := t.ModuleID(newModule.ModuleId)
retIdx := t.RetentionIndex(newModule.RetentionIndex)

// The new module's ID must have the factory's ID as a prefix.
if id.Top() != fm.ownID {
return nil, fmt.Errorf("submodule (%v) must have the factory's ID (%v) as a prefix", id, fm.ownID)
}

// Skip creation of submodules that should have been already garbage-collected.
if retIdx < fm.retIdx {
fm.logger.Log(logging.LevelWarn, "Ignoring new module instantiation with low retention index.",
"moduleID", id, "currentRetIdx", fm.retIdx, "moduleRetIdx", retIdx)
return events.EmptyList(), nil
}

// Create new instance of the submodule and assign it to its retention index.
fm.submodules[id] = fm.generator(id, newModule.Params)
fm.moduleRetention[retIdx] = append(fm.moduleRetention[retIdx], id)

// Initialize new submodule and return the resulting events.
return fm.submodules[id].ApplyEvents(events.ListOf(events.Init(id)))
}

func (fm *FactoryModule) applyGarbageCollect(gc *factorymodulepb.GarbageCollect) (*events.EventList, error) {
// While the new retention index is larger than the current one
for t.RetentionIndex(gc.RetentionIndex) > fm.retIdx {

// Delete all modules associated with the current retention index.
for _, mID := range fm.moduleRetention[fm.retIdx] {
delete(fm.submodules, mID)
xosmig marked this conversation as resolved.
Show resolved Hide resolved
}

// Increase current retention index.
delete(fm.moduleRetention, fm.retIdx)
fm.retIdx++
}

return events.EmptyList(), nil
}

func (fm *FactoryModule) forwardEvent(event *eventpb.Event) (*events.EventList, error) {

// Convenience variable.
mID := t.ModuleID(event.DestModule)

var submodule modules.PassiveModule
var ok bool
if submodule, ok = fm.submodules[mID]; !ok {
fm.logger.Log(logging.LevelWarn, "Ignoring submodule event. Destination module not found.", "moduleID", mID)
return events.EmptyList(), nil
xosmig marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: This might be inefficient. Try to not forward events one by one.
// Especially once parallel processing is supported.
return submodule.ApplyEvents(events.ListOf(event))
}
216 changes: 216 additions & 0 deletions pkg/factorymodule/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package factorymodule

import (
"fmt"
"sort"
"testing"

"github.com/stretchr/testify/assert"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/factorymodulepb"
tp "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"
"github.com/filecoin-project/mir/pkg/util/testlogger"
)

const (
echoFactoryID = tp.ModuleID("echoFactory")
)

type echoModule struct {
t *testing.T
id tp.ModuleID
prefix string
}

func (em *echoModule) ImplementsModule() {
}
matejpavlovic marked this conversation as resolved.
Show resolved Hide resolved

func (em *echoModule) ApplyEvents(evts *events.EventList) (*events.EventList, error) {
return modules.ApplyEventsSequentially(evts, em.applyEvent)
}

func (em *echoModule) applyEvent(event *eventpb.Event) (*events.EventList, error) {

// Convenience variable
destModuleID := tp.ModuleID(event.DestModule)

assert.Equal(em.t, em.id, destModuleID)
switch e := event.Type.(type) {
case *eventpb.Event_Init:
return events.ListOf(events.TestingString(destModuleID.Top(), string(em.id)+" Init")), nil
case *eventpb.Event_TestingString:
return events.ListOf(events.TestingString(destModuleID.Top(), em.prefix+e.TestingString.GetValue())), nil
default:
return nil, fmt.Errorf("unknown echo module event type: %T", e)
}
}

func newEchoFactory(t *testing.T, logger logging.Logger) *FactoryModule {
return NewFactoryModule(
echoFactoryID,
func(id tp.ModuleID, params *factorymodulepb.GeneratorParams) modules.PassiveModule {
return &echoModule{
t: t,
id: id,
prefix: params.Type.(*factorymodulepb.GeneratorParams_EchoTestModule).EchoTestModule.Prefix,
}
},
logger)
}

func TestFactoryModule(t *testing.T) {
var echoFactory modules.PassiveModule
logger := testlogger.NewTestLogger()

testCases := map[string]func(t *testing.T){

"00 Instantiate": func(t *testing.T) {
echoFactory = newEchoFactory(t, logger)
evOut, err := echoFactory.ApplyEvents(events.ListOf(FactoryNewModule(
echoFactoryID,
echoFactoryID.Then("inst0"),
0,
FactoryEchoModuleParams("Inst 0: "),
)))
assert.NoError(t, err)
assert.Equal(t, 1, evOut.Len())
assert.Equal(t, echoFactoryID.Pb(), evOut.Slice()[0].DestModule)
assert.Equal(t,
string(echoFactoryID.Then("inst0"))+" Init",
evOut.Slice()[0].Type.(*eventpb.Event_TestingString).TestingString.GetValue(),
)
},

"01 Invoke": func(t *testing.T) {
evOut, err := echoFactory.ApplyEvents(events.ListOf(events.TestingString(
echoFactoryID.Then("inst0"),
"Hi!"),
))
assert.NoError(t, err)
assert.Equal(t, 1, evOut.Len())
assert.Equal(t,
"Inst 0: Hi!",
evOut.Slice()[0].Type.(*eventpb.Event_TestingString).TestingString.GetValue(),
)
},

"02 Instantiate many": func(t *testing.T) {
for i := 1; i <= 5; i++ {
evOut, err := echoFactory.ApplyEvents(events.ListOf(FactoryNewModule(
echoFactoryID,
echoFactoryID.Then(tp.ModuleID(fmt.Sprintf("inst%d", i))),
tp.RetentionIndex(i),
FactoryEchoModuleParams(fmt.Sprintf("Inst %d: ", i)),
)))
assert.NoError(t, err)
assert.Equal(t, 1, evOut.Len())
assert.Equal(t, echoFactoryID.Pb(), evOut.Slice()[0].DestModule)
assert.Equal(t,
string(echoFactoryID.Then(tp.ModuleID(fmt.Sprintf("inst%d", i))))+" Init",
evOut.Slice()[0].Type.(*eventpb.Event_TestingString).TestingString.GetValue(),
)
}
},

"03 Invoke many": func(t *testing.T) {
evList := events.EmptyList()
for i := 5; i >= 0; i-- {
evList.PushBack(events.TestingString(
echoFactoryID.Then(tp.ModuleID(fmt.Sprintf("inst%d", i))),
"Hi!"),
)
}
evOut, err := echoFactory.ApplyEvents(evList)
assert.NoError(t, err)
assert.Equal(t, 6, evOut.Len())

sortedOutput := evOut.Slice()

sort.Slice(sortedOutput, func(i, j int) bool {
return sortedOutput[i].Type.(*eventpb.Event_TestingString).TestingString.GetValue() <
sortedOutput[j].Type.(*eventpb.Event_TestingString).TestingString.GetValue()
})

for i := 0; i <= 5; i++ {
assert.Equal(t, echoFactoryID.Pb(), sortedOutput[0].DestModule)
assert.Equal(t,
fmt.Sprintf("Inst %d: Hi!", i),
sortedOutput[i].Type.(*eventpb.Event_TestingString).TestingString.GetValue(),
)
}
},

"04 Wrong event type": func(t *testing.T) {
wrongEvent := events.TestingUint(echoFactoryID, 42)
evOut, err := echoFactory.ApplyEvents(events.ListOf(wrongEvent))
if assert.Error(t, err) {
assert.Equal(t, fmt.Errorf("unsupported event type for factory module: %T", wrongEvent.Type), err)
}
assert.Nil(t, evOut)
},

"05 Wrong submodule ID": func(t *testing.T) {
wrongEvent := events.TestingUint(echoFactoryID.Then("non-existent-module"), 42)
evOut, err := echoFactory.ApplyEvents(events.ListOf(wrongEvent))
assert.NoError(t, err)
assert.Equal(t, 0, evOut.Len())
logger.CheckFirstEntry(t, logging.LevelWarn, "Ignoring submodule event. Destination module not found.",
"moduleID", echoFactoryID.Then("non-existent-module"))
logger.CheckEmpty(t)
},

"06 Garbage-collect some": func(t *testing.T) {
evOut, err := echoFactory.ApplyEvents(events.ListOf(FactoryGarbageCollect(
echoFactoryID,
3,
)))
assert.NoError(t, err)
assert.Equal(t, 0, evOut.Len())
},

"07 Invoke garbage-collected": func(t *testing.T) {
evList := events.EmptyList()
for i := 5; i >= 0; i-- {
evList.PushBack(events.TestingString(
echoFactoryID.Then(tp.ModuleID(fmt.Sprintf("inst%d", i))),
"Hi!"),
)
}
evOut, err := echoFactory.ApplyEvents(evList)
assert.NoError(t, err)
assert.Equal(t, 3, evOut.Len())

sortedOutput := evOut.Slice()

sort.Slice(sortedOutput, func(i, j int) bool {
return sortedOutput[i].Type.(*eventpb.Event_TestingString).TestingString.GetValue() <
sortedOutput[j].Type.(*eventpb.Event_TestingString).TestingString.GetValue()
})

for i := 0; i < 3; i++ {
logger.CheckAnyEntry(t, logging.LevelWarn, "Ignoring submodule event. Destination module not found.",
"moduleID", echoFactoryID.Then(tp.ModuleID(fmt.Sprintf("inst%d", i))))
}

for i := 3; i <= 5; i++ {
assert.Equal(t, echoFactoryID.Pb(), sortedOutput[i-3].DestModule)
assert.Equal(t,
fmt.Sprintf("Inst %d: Hi!", i),
sortedOutput[i-3].Type.(*eventpb.Event_TestingString).TestingString.GetValue(),
)
}

logger.CheckEmpty(t)
},
}

maputil.IterateSorted(testCases, func(testName string, testFunc func(t *testing.T)) bool {
t.Run(testName, testFunc)
return true
})
}
Loading