Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

New Eudico ordering interface #214

Merged
merged 5 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion chain/consensus/common/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package common

const (
ConfigMessageType = 0 // Mir config request
ConfigMessageType = 0 // Mir specific config message
SignedMessageType = 1 // Eudico signed message
CrossMessageType = 2 // Eudico cross-message
RegistrationMessageType = 3 // Tendermint specific message type
Expand All @@ -20,3 +20,10 @@ func NewCrossMessageBytes(msg, opaque []byte) []byte {
payload = append(payload, CrossMessageType)
return payload
}

func NewConfigMessageBytes(msg, opaque []byte) []byte {
var payload []byte
payload = append(msg, opaque...)
payload = append(payload, ConfigMessageType)
return payload
}
11 changes: 6 additions & 5 deletions chain/consensus/mir/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ func (app *Application) ApplyEvent(event *eventpb.Event) (*events.EventList, err
return &events.EventList{}, nil
}

func (app *Application) ApplyBatch(batch *requestpb.Batch) error {
var block []Tx
// ApplyBatch sends a batch consisting of data only to Eudico.
func (app *Application) ApplyBatch(in *requestpb.Batch) error {
var out []Tx

for _, req := range batch.Requests {
block = append(block, req.Req.Data)
for _, req := range in.Requests {
out = append(out, req.Req.Data)
}

app.ChainNotify <- block
app.ChainNotify <- out

return nil
}
Expand Down
181 changes: 122 additions & 59 deletions chain/consensus/mir/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/mir"
mirCrypto "github.com/filecoin-project/mir/pkg/crypto"
mircrypto "github.com/filecoin-project/mir/pkg/crypto"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/grpctransport"
"github.com/filecoin-project/mir/pkg/iss"
mirLogging "github.com/filecoin-project/mir/pkg/logging"
mirlogging "github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
mirrequest "github.com/filecoin-project/mir/pkg/pb/requestpb"
"github.com/filecoin-project/mir/pkg/simplewal"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand All @@ -42,15 +42,15 @@ func newMirLogger(logger *logging.ZapEventLogger) *mirLogger {
}

// Log logs a message with additional context.
func (m *mirLogger) Log(level mirLogging.LogLevel, text string, args ...interface{}) {
func (m *mirLogger) Log(level mirlogging.LogLevel, text string, args ...interface{}) {
switch level {
case mirLogging.LevelError:
case mirlogging.LevelError:
m.logger.Errorw(text, "error", args)
case mirLogging.LevelInfo:
case mirlogging.LevelInfo:
m.logger.Infow(text, "info", args)
case mirLogging.LevelWarn:
case mirlogging.LevelWarn:
m.logger.Warnw(text, "warn", args)
case mirLogging.LevelDebug:
case mirlogging.LevelDebug:
m.logger.Debugw(text, "debug", args)
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func NewManager(ctx context.Context, addr address.Address, api v1api.FullNode) (
"net": net,
"iss": issProtocol,
"app": app,
"crypto": mirCrypto.New(cryptoManager),
"crypto": mircrypto.New(cryptoManager),
},
nil)
if err != nil {
Expand Down Expand Up @@ -211,95 +211,158 @@ func (m *Manager) Stop() {
m.Net.Stop()
}

func (m *Manager) SubmitRequests(ctx context.Context, reqRefs []*RequestRef) {
if len(reqRefs) == 0 {
func (m *Manager) SubmitRequests(ctx context.Context, requests []*mirrequest.Request) {
if len(requests) == 0 {
return
}
var reqs []*requestpb.Request
for _, ref := range reqRefs {
reqs = append(reqs, events.ClientRequest(ref.ClientID, ref.ReqNo, ref.Hash))
}
e := events.NewClientRequests("iss", reqs)
e := events.NewClientRequests("iss", requests)
if err := m.MirNode.InjectEvents(ctx, (&events.EventList{}).PushBack(e)); err != nil {
log.Errorf("failed to submit requests to Mir: %s", err)
}
log.Infof("submitted %d requests to Mir", len(requests))
}

// GetMessagesByHashes gets requests from the cache and extracts Filecoin messages.
func (m *Manager) GetMessagesByHashes(blockRequestHashes []Tx) (msgs []*types.SignedMessage, crossMsgs []*types.Message) {
log.Infof("received a block with %d hashes", len(blockRequestHashes))
for _, h := range blockRequestHashes {
req, found := m.Pool.getDel(string(h))
if !found {
log.Errorf("unable to find a request with %v hash", h)
func parseTx(tx []byte) (interface{}, error) {
ln := len(tx)
// This is very simple input validation to be protected against invalid messages.
if ln <= 2 {
return nil, fmt.Errorf("mir tx len %d is too small", ln)
}

var err error
var msg interface{}

lastByte := tx[ln-1]
switch lastByte {
case common.SignedMessageType:
msg, err = types.DecodeSignedMessage(tx[:ln-1])
case common.CrossMessageType:
msg, err = types.DecodeUnverifiedCrossMessage(tx[:ln-1])
case common.ConfigMessageType:
return nil, fmt.Errorf("config message is not supported")
default:
err = fmt.Errorf("unknown message type %d", lastByte)
}

if err != nil {
return nil, err
}

return msg, nil
}

// GetMessages extracts Filecoin messages from a Mir batch.
func (m *Manager) GetMessages(batch []Tx) (msgs []*types.SignedMessage, crossMsgs []*types.Message) {
log.Infof("received a block with %d messages", len(msgs))
for _, tx := range batch {

input, err := parseTx(tx)
if err != nil {
log.Error("unable to decode a message in Mir block:", err)
continue
}

switch msg := req.(type) {
switch msg := input.(type) {
case *types.SignedMessage:
h := msg.Cid()
found := m.Pool.deleteRequest(h.String())
if !found {
log.Errorf("unable to find a request with %v hash", h)
continue
}
msgs = append(msgs, msg)
log.Infof("got message (%s, %d) from cache", msg.Message.To, msg.Message.Nonce)
log.Infof("got message: to=%s, nonce= %d", msg.Message.To, msg.Message.Nonce)
case *types.UnverifiedCrossMsg:
h := msg.Cid()
found := m.Pool.deleteRequest(h.String())
if !found {
log.Errorf("unable to find a request with %v hash", h)
continue
}
crossMsgs = append(crossMsgs, msg.Message)
log.Infof("got cross-message (%s, %d) from cache", msg.Message.To, msg.Message.Nonce)
log.Infof("got cross-message: to=%s, nonce= %d", msg.Message.To, msg.Message.Nonce)
default:
log.Error("got unknown type request in a block")
}
}
return
}

// AddSignedMessages adds signed messages into the request cache.
func (m *Manager) AddSignedMessages(dst []*RequestRef, msgs []*types.SignedMessage) ([]*RequestRef, error) {
func (m *Manager) GetRequests(msgs []*types.SignedMessage, crossMsgs []*types.UnverifiedCrossMsg) (
requests []*mirrequest.Request,
) {
requests = append(requests, m.batchSignedMessages(msgs)...)
requests = append(requests, m.batchCrossMessages(crossMsgs)...)
return
}

// BatchPushSignedMessages pushes signed messages into the request pool and sends them to Mir.
func (m *Manager) batchSignedMessages(msgs []*types.SignedMessage) (
requests []*mirrequest.Request,
) {
for _, msg := range msgs {
hash := msg.Cid()
clientID := newMirID(m.SubnetID.String(), msg.Message.From.String())
nonce := msg.Message.Nonce
r := RequestRef{
ClientID: t.ClientID(clientID),
ReqNo: t.ReqNo(nonce),
Type: common.SignedMessageType,
Hash: hash.Bytes(),
if !m.Pool.isTargetRequest(clientID, nonce) {
continue
}
alreadyExist := m.Pool.addIfNotExist(clientID, string(r.Hash), msg)
if !alreadyExist {
log.Infof("added message %s to cache", hash.Bytes())
dst = append(dst, &r)

msgBytes, err := msg.Serialize()
if err != nil {
log.Error("unable to serialize message:", err)
continue
}
}
data := common.NewSignedMessageBytes(msgBytes, nil)

return dst, nil
}
r := &mirrequest.Request{
ClientId: clientID,
ReqNo: nonce,
Data: data,
}

// AddCrossMessages adds cross messages into the request cache.
func (m *Manager) AddCrossMessages(dst []*RequestRef, msgs []*types.UnverifiedCrossMsg) ([]*RequestRef, error) {
for _, msg := range msgs {
hash := msg.Cid()
m.Pool.addRequest(msg.Cid().String(), r)

requests = append(requests, r)
}
return requests
}

// batchCrossMessages batches cross messages into the request pool and sends them to Mir.
func (m *Manager) batchCrossMessages(crossMsgs []*types.UnverifiedCrossMsg) (
requests []*mirrequest.Request,
) {
for _, msg := range crossMsgs {
msn, err := msg.Message.From.Subnet()
if err != nil {
log.Error("unable to get subnet from message:", err)
continue
}
clientID := newMirID(msn.String(), msg.Message.From.String())
nonce := msg.Message.Nonce
r := RequestRef{
ClientID: t.ClientID(clientID),
ReqNo: t.ReqNo(nonce),
Type: common.CrossMessageType,
Hash: hash.Bytes(),
if !m.Pool.isTargetRequest(clientID, nonce) {
continue
}

msgBytes, err := msg.Serialize()
if err != nil {
log.Error("unable to serialize cross-message:", err)
continue
}
alreadyExist := m.Pool.addIfNotExist(clientID, string(r.Hash), msg)
if !alreadyExist {
log.Infof("added cross-message %s to cache", hash.Bytes())
dst = append(dst, &r)

data := common.NewCrossMessageBytes(msgBytes, nil)
r := &mirrequest.Request{
ClientId: clientID,
ReqNo: nonce,
Data: data,
}
m.Pool.addRequest(msg.Cid().String(), r)
requests = append(requests, r)
}

return dst, nil
return requests
}

// GetRequest gets the request from the cache by the key.
func (m *Manager) GetRequest(h string) (Request, bool) {
return m.Pool.getRequest(h)
// ID prints Manager ID.
func (m *Manager) ID() string {
addr := m.Addr.String()
return fmt.Sprintf("%v:%v", m.SubnetID, addr[len(addr)-6:])
}
26 changes: 4 additions & 22 deletions chain/consensus/mir/mine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Mine(ctx context.Context, addr address.Address, api v1api.FullNode) error {
if err != nil {
return fmt.Errorf("unable to create a manager: %w", err)
}
log := logging.FromContext(ctx, log).With("miner", m.MirID)
log := logging.FromContext(ctx, log).With("miner", m.ID())

log.Infof("Miner info:\n\twallet - %s\n\tnetwork - %s\n\tsubnet - %s\n\tMir ID - %s\n\tvalidators - %v",
m.Addr, m.NetName, m.SubnetID, m.MirID, m.Validators)
Expand Down Expand Up @@ -70,8 +70,8 @@ func Mine(ctx context.Context, addr address.Address, api v1api.FullNode) error {
return nil
case err := <-mirErrors:
return fmt.Errorf("miner consensus error: %w", err)
case hashes := <-mirHead:
msgs, crossMsgs := m.GetMessagesByHashes(hashes)
case batch := <-mirHead:
msgs, crossMsgs := m.GetMessages(batch)
log.With("epoch", nextEpoch).
Infof("try to create a block: msgs - %d, crossMsgs - %d", len(msgs), len(crossMsgs))

Expand Down Expand Up @@ -117,32 +117,14 @@ func Mine(ctx context.Context, addr address.Address, api v1api.FullNode) error {
log.With("epoch", nextEpoch).
Errorw("unable to select messages from mempool", "error", err)
}
log.With("epoch", nextEpoch).
Infof("retrieved %d msgs from mempool", len(msgs))

crossMsgs, err := api.GetUnverifiedCrossMsgsPool(ctx, m.SubnetID, base.Height()+1)
if err != nil {
log.With("epoch", nextEpoch).
Errorw("unable to select cross-messages from mempool", "error", err)
}
log.With("epoch", nextEpoch).
Infof("retrieved %d crossmsgs from mempool", len(crossMsgs))

var refs []*RequestRef

refs, err = m.AddSignedMessages(refs, msgs)
if err != nil {
log.With("epoch", nextEpoch).
Errorw("unable to push messages", "error", err)
}

refs, err = m.AddCrossMessages(refs, crossMsgs)
if err != nil {
log.With("epoch", nextEpoch).
Errorw("unable to push cross-messages", "error", err)
}

m.SubmitRequests(ctx, refs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this is probably called a lot, as it is in the default branch of the select statement. It looks like it creates a sort of busy-wait loop, always burning up one CPU no matter what.
It's not a big deal for now though, as I expect this branch to be part of the request pool (mempool wrapper) module, where it we'll take this into account.

This is only a comment, no direct action required, so pls mark it directly as resolved yourself when you read it.

m.SubmitRequests(ctx, m.GetRequests(msgs, crossMsgs))
}
}
}
Loading