Skip to content

Commit

Permalink
Frrist/receipt events schema (#1132)
Browse files Browse the repository at this point in the history
* feat: implement actor event task and schema

- track the value of EventRoots contained in message receipts.
  • Loading branch information
frrist authored and Terry committed Mar 3, 2023
1 parent 77eda09 commit 6d31ee0
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ testfull: build
# testshort runs tests that don't require external dependencies such as postgres or redis
.PHONY: testshort
testshort:
go test -short ./... -v
go test -short ./...


.PHONY: lily
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
rewardactors "github.com/filecoin-project/lily/chain/actors/builtin/reward"
verifregactors "github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
"github.com/filecoin-project/lily/tasks/messages/actorevent"

"github.com/filecoin-project/lily/tasks"
// actor tasks
Expand Down Expand Up @@ -593,6 +594,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
out.TipsetsProcessors[t] = msapprovaltask.NewTask(api)
case tasktype.VMMessage:
out.TipsetsProcessors[t] = vm.NewTask(api)
case tasktype.ActorEvent:
out.TipsetsProcessors[t] = actorevent.NewTask(api)

//
// Blocks
Expand Down
4 changes: 3 additions & 1 deletion chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer/tasktype"
datacaptask "github.com/filecoin-project/lily/tasks/actorstate/datacap"
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
"github.com/filecoin-project/lily/tasks/messages/actorevent"
"github.com/filecoin-project/lily/tasks/messages/blockmessage"
"github.com/filecoin-project/lily/tasks/messages/gaseconomy"
"github.com/filecoin-project/lily/tasks/messages/message"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 23)
require.Len(t, proc.tipsetProcessors, 8)
require.Len(t, proc.tipsetsProcessors, 7)
require.Len(t, proc.tipsetsProcessors, 8)
require.Len(t, proc.builtinProcessors, 1)

require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs])
Expand All @@ -61,6 +62,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage])
require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval])
require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VMMessage])
require.Equal(t, actorevent.NewTask(nil), proc.tipsetsProcessors[tasktype.ActorEvent])

require.Equal(t, message.NewTask(nil), proc.tipsetProcessors[tasktype.Message])
require.Equal(t, blockmessage.NewTask(nil), proc.tipsetProcessors[tasktype.BlockMessage])
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 23)
require.Len(t, proc.TipsetProcessors, 8)
require.Len(t, proc.TipsetsProcessors, 7)
require.Len(t, proc.TipsetsProcessors, 8)
require.Len(t, proc.ReportProcessors, 1)
}
5 changes: 5 additions & 0 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
InternalMessage = "internal_messages"
InternalParsedMessage = "internal_parsed_messages"
VMMessage = "vm_messages"
ActorEvent = "actor_events"
MultisigTransaction = "multisig_transaction"
ChainPower = "chain_power"
PowerActorClaim = "power_actor_claim"
Expand Down Expand Up @@ -68,6 +69,7 @@ var AllTableTasks = []string{
InternalMessage,
InternalParsedMessage,
VMMessage,
ActorEvent,
MultisigTransaction,
ChainPower,
PowerActorClaim,
Expand Down Expand Up @@ -109,6 +111,7 @@ var TableLookup = map[string]struct{}{
InternalMessage: {},
InternalParsedMessage: {},
VMMessage: {},
ActorEvent: {},
MultisigTransaction: {},
ChainPower: {},
PowerActorClaim: {},
Expand Down Expand Up @@ -150,6 +153,7 @@ var TableComment = map[string]string{
InternalMessage: ``,
InternalParsedMessage: ``,
VMMessage: ``,
ActorEvent: ``,
MultisigTransaction: ``,
ChainPower: ``,
PowerActorClaim: ``,
Expand Down Expand Up @@ -227,6 +231,7 @@ var TableFieldComments = map[string]map[string]string{
"To": "To receiver of message.",
"Value": "Value attoFIL contained in message.",
},
ActorEvent: {},
MultisigTransaction: {
"To": "Transaction State",
},
Expand Down
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var TaskLookup = map[string][]string{
GasOutputs,
MessageGasEconomy,
BlockMessage,
ActorEvent,
},
ChainEconomicsTask: {
ChainEconomics,
Expand Down
4 changes: 2 additions & 2 deletions chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestMakeTaskNamesAlias(t *testing.T) {
},
{
taskAlias: tasktype.MessagesTask,
tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage},
tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage, tasktype.ActorEvent},
},
{
taskAlias: tasktype.ChainEconomicsTask,
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 38
const TotalTableTasks = 39
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
47 changes: 47 additions & 0 deletions model/messages/actorevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package messages

import (
"context"

"go.opencensus.io/tag"

"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
)

type ActorEvent struct {
tableName struct{} `pg:"actor_events"`

Height int64 `pg:",pk,notnull,use_zero"`
StateRoot string `pg:",pk,notnull"`
MessageCid string `pg:",pk,notnull"`
EventIndex int64 `pg:",pk,notnull,use_zero"`

Emitter string `pg:",notnull"`
Flags []byte `pg:",notnull"`
Key string `pg:",notnull"`
Value []byte `pg:",notnull"`
}

func (a *ActorEvent) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_events"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, a)
}

type ActorEventList []*ActorEvent

func (al ActorEventList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
if len(al) == 0 {
return nil
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_events"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

metrics.RecordCount(ctx, metrics.PersistModel, len(al))
return s.PersistModel(ctx, al)
}
22 changes: 22 additions & 0 deletions schemas/v1/13_actor_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package v1

func init() {
patches.Register(
13,
`
CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.actor_events (
height bigint NOT NULL,
state_root text NOT NULL,
event_index bigint NOT NULL,
message_cid text NOT NULL,
emitter text NOT NULL,
flags bytea NOT NULL,
key text NOT NULL,
value bytea NOT NULL,
PRIMARY KEY ("height", "state_root", "event_index", "message_cid")
);
`,
)
}
1 change: 1 addition & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var Models = []interface{}{
(*messages.InternalMessage)(nil),
(*messages.InternalParsedMessage)(nil),
(*messages.VMMessage)(nil),
(*messages.ActorEvent)(nil),

(*multisig.MultisigTransaction)(nil),

Expand Down
142 changes: 142 additions & 0 deletions tasks/messages/actorevent/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package actorevent

import (
"bytes"
"context"
"fmt"
"math"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/model"
messagemodel "github.com/filecoin-project/lily/model/messages"
visormodel "github.com/filecoin-project/lily/model/visor"
"github.com/filecoin-project/lily/tasks"
"github.com/filecoin-project/lily/tasks/messages"
)

type Task struct {
node tasks.DataSource
}

func NewTask(node tasks.DataSource) *Task {
return &Task{
node: node,
}
}

func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets")
if span.IsRecording() {
span.SetAttributes(
attribute.String("current", current.String()),
attribute.Int64("current_height", int64(current.Height())),
attribute.String("executed", executed.String()),
attribute.Int64("executed_height", int64(executed.Height())),
attribute.String("processor", "actor_events"),
)
}
defer span.End()

report := &visormodel.ProcessingReport{
Height: int64(current.Height()),
StateRoot: current.ParentState().String(),
}

blkMsgRect, err := t.node.TipSetMessageReceipts(ctx, current, executed)
if err != nil {
report.ErrorsDetected = fmt.Errorf("getting tipset message receipet: %w", err)
return nil, report, nil
}

var (
out = make(messagemodel.ActorEventList, 0, len(blkMsgRect))
errorsDetected = make([]*messages.MessageError, 0, len(blkMsgRect))
msgsSeen = make(map[cid.Cid]bool, len(blkMsgRect))
)

for _, m := range blkMsgRect {
select {
case <-ctx.Done():
return nil, nil, fmt.Errorf("context done: %w", ctx.Err())
default:
}

itr, err := m.Iterator()
if err != nil {
return nil, nil, err
}

for itr.HasNext() {
msg, _, rec := itr.Next()
if msgsSeen[msg.Cid()] {
continue
}
msgsSeen[msg.Cid()] = true

if rec.EventsRoot == nil {
continue
}

evtArr, err := amt.LoadAMT(ctx, t.node.Store(), *rec.EventsRoot, amt.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
report.ErrorsDetected = fmt.Errorf("loading actor events amt (%s): %w", *rec.EventsRoot, err)
return nil, report, nil
}
var evt types.Event
err = evtArr.ForEach(ctx, func(evtIdx uint64, deferred *cbg.Deferred) error {
if evtIdx > math.MaxInt {
return xerrors.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
}

emitter, err := address.NewIDAddress(uint64(evt.Emitter))
if err != nil {
errorsDetected = append(errorsDetected, &messages.MessageError{
Cid: msg.Cid(),
Error: fmt.Sprintf("failed to make ID address from event emitter (%s): %s", evt.Emitter, err),
})
return err
}
for _, e := range evt.Entries {
out = append(out, &messagemodel.ActorEvent{
Height: int64(current.Height()),
StateRoot: current.ParentState().String(),
MessageCid: msg.Cid().String(),
EventIndex: int64(evtIdx),
Emitter: emitter.String(),
Flags: []byte{e.Flags},
Key: e.Key,
Value: e.Value,
})
}
return nil
})

if err != nil {
errorsDetected = append(errorsDetected, &messages.MessageError{
Cid: msg.Cid(),
Error: fmt.Sprintf("loading actor events amt (%s): %s", *rec.EventsRoot, err),
})
continue
}
}
}

if len(errorsDetected) != 0 {
report.ErrorsDetected = errorsDetected
}

return model.PersistableList{
out,
}, report, nil
}

0 comments on commit 6d31ee0

Please sign in to comment.