diff --git a/Makefile b/Makefile index 8b1454af0..9a3fb20fb 100644 --- a/Makefile +++ b/Makefile @@ -88,7 +88,7 @@ testfull: build # testshort runs tests that don't require external dependencies such as postgres or redis .PHONY: testshort testshort: - go test -short ./... -v + go test -short ./... .PHONY: lily diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index e7be9ba95..494a72a0c 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -24,6 +24,7 @@ import ( rewardactors "github.com/filecoin-project/lily/chain/actors/builtin/reward" verifregactors "github.com/filecoin-project/lily/chain/actors/builtin/verifreg" "github.com/filecoin-project/lily/tasks/messageexecutions/vm" + "github.com/filecoin-project/lily/tasks/messages/actorevent" "github.com/filecoin-project/lily/tasks" // actor tasks @@ -601,6 +602,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces out.TipsetsProcessors[t] = msapprovaltask.NewTask(api) case tasktype.VMMessage: out.TipsetsProcessors[t] = vm.NewTask(api) + case tasktype.ActorEvent: + out.TipsetsProcessors[t] = actorevent.NewTask(api) // // Blocks diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index a175c7f72..e50bea0e0 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lily/chain/indexer/tasktype" datacaptask "github.com/filecoin-project/lily/tasks/actorstate/datacap" "github.com/filecoin-project/lily/tasks/messageexecutions/vm" + "github.com/filecoin-project/lily/tasks/messages/actorevent" "github.com/filecoin-project/lily/tasks/messages/blockmessage" "github.com/filecoin-project/lily/tasks/messages/gaseconomy" "github.com/filecoin-project/lily/tasks/messages/message" @@ -51,7 +52,7 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, t.Name(), proc.name) require.Len(t, proc.actorProcessors, 23) require.Len(t, proc.tipsetProcessors, 8) - require.Len(t, proc.tipsetsProcessors, 7) + require.Len(t, proc.tipsetsProcessors, 8) require.Len(t, proc.builtinProcessors, 1) require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs]) @@ -61,6 +62,7 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage]) require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval]) require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VMMessage]) + require.Equal(t, actorevent.NewTask(nil), proc.tipsetsProcessors[tasktype.ActorEvent]) require.Equal(t, message.NewTask(nil), proc.tipsetProcessors[tasktype.Message]) require.Equal(t, blockmessage.NewTask(nil), proc.tipsetProcessors[tasktype.BlockMessage]) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index 3a212f303..5557c9ed1 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -397,6 +397,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) { require.NoError(t, err) require.Len(t, proc.ActorProcessors, 23) require.Len(t, proc.TipsetProcessors, 8) - require.Len(t, proc.TipsetsProcessors, 7) + require.Len(t, proc.TipsetsProcessors, 8) require.Len(t, proc.ReportProcessors, 1) } diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index 309dfe341..78dc1bdb9 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -27,6 +27,7 @@ const ( InternalMessage = "internal_messages" InternalParsedMessage = "internal_parsed_messages" VMMessage = "vm_messages" + ActorEvent = "actor_events" MultisigTransaction = "multisig_transaction" ChainPower = "chain_power" PowerActorClaim = "power_actor_claim" @@ -68,6 +69,7 @@ var AllTableTasks = []string{ InternalMessage, InternalParsedMessage, VMMessage, + ActorEvent, MultisigTransaction, ChainPower, PowerActorClaim, @@ -109,6 +111,7 @@ var TableLookup = map[string]struct{}{ InternalMessage: {}, InternalParsedMessage: {}, VMMessage: {}, + ActorEvent: {}, MultisigTransaction: {}, ChainPower: {}, PowerActorClaim: {}, @@ -150,6 +153,7 @@ var TableComment = map[string]string{ InternalMessage: ``, InternalParsedMessage: ``, VMMessage: ``, + ActorEvent: ``, MultisigTransaction: ``, ChainPower: ``, PowerActorClaim: ``, @@ -227,6 +231,7 @@ var TableFieldComments = map[string]map[string]string{ "To": "To receiver of message.", "Value": "Value attoFIL contained in message.", }, + ActorEvent: {}, MultisigTransaction: { "To": "Transaction State", }, diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index 9228d6b24..eb5b1fd56 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -71,6 +71,7 @@ var TaskLookup = map[string][]string{ GasOutputs, MessageGasEconomy, BlockMessage, + ActorEvent, }, ChainEconomicsTask: { ChainEconomics, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index ea868b033..969d7ff95 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -55,7 +55,7 @@ func TestMakeTaskNamesAlias(t *testing.T) { }, { taskAlias: tasktype.MessagesTask, - tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage}, + tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage, tasktype.ActorEvent}, }, { taskAlias: tasktype.ChainEconomicsTask, @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) { } func TestMakeAllTaskNames(t *testing.T) { - const TotalTableTasks = 38 + const TotalTableTasks = 39 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/model/messages/actorevent.go b/model/messages/actorevent.go new file mode 100644 index 000000000..41e2261d1 --- /dev/null +++ b/model/messages/actorevent.go @@ -0,0 +1,47 @@ +package messages + +import ( + "context" + + "go.opencensus.io/tag" + + "github.com/filecoin-project/lily/metrics" + "github.com/filecoin-project/lily/model" +) + +type ActorEvent struct { + tableName struct{} `pg:"actor_events"` + + Height int64 `pg:",pk,notnull,use_zero"` + StateRoot string `pg:",pk,notnull"` + MessageCid string `pg:",pk,notnull"` + EventIndex int64 `pg:",pk,notnull,use_zero"` + + Emitter string `pg:",notnull"` + Flags []byte `pg:",notnull"` + Key string `pg:",notnull"` + Value []byte `pg:",notnull"` +} + +func (a *ActorEvent) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_events")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + metrics.RecordCount(ctx, metrics.PersistModel, 1) + return s.PersistModel(ctx, a) +} + +type ActorEventList []*ActorEvent + +func (al ActorEventList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { + if len(al) == 0 { + return nil + } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_events")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + metrics.RecordCount(ctx, metrics.PersistModel, len(al)) + return s.PersistModel(ctx, al) +} diff --git a/schemas/v1/13_actor_events.go b/schemas/v1/13_actor_events.go new file mode 100644 index 000000000..a734ad79e --- /dev/null +++ b/schemas/v1/13_actor_events.go @@ -0,0 +1,22 @@ +package v1 + +func init() { + patches.Register( + 13, + ` + CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.actor_events ( + height bigint NOT NULL, + state_root text NOT NULL, + event_index bigint NOT NULL, + message_cid text NOT NULL, + + emitter text NOT NULL, + flags bytea NOT NULL, + key text NOT NULL, + value bytea NOT NULL, + + PRIMARY KEY ("height", "state_root", "event_index", "message_cid") + ); +`, + ) +} diff --git a/storage/sql.go b/storage/sql.go index 163acee67..00f821ca5 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -65,6 +65,7 @@ var Models = []interface{}{ (*messages.InternalMessage)(nil), (*messages.InternalParsedMessage)(nil), (*messages.VMMessage)(nil), + (*messages.ActorEvent)(nil), (*multisig.MultisigTransaction)(nil), diff --git a/tasks/messages/actorevent/task.go b/tasks/messages/actorevent/task.go new file mode 100644 index 000000000..21062bbb4 --- /dev/null +++ b/tasks/messages/actorevent/task.go @@ -0,0 +1,142 @@ +package actorevent + +import ( + "bytes" + "context" + "fmt" + "math" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-amt-ipld/v4" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lily/model" + messagemodel "github.com/filecoin-project/lily/model/messages" + visormodel "github.com/filecoin-project/lily/model/visor" + "github.com/filecoin-project/lily/tasks" + "github.com/filecoin-project/lily/tasks/messages" +) + +type Task struct { + node tasks.DataSource +} + +func NewTask(node tasks.DataSource) *Task { + return &Task{ + node: node, + } +} + +func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { + ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") + if span.IsRecording() { + span.SetAttributes( + attribute.String("current", current.String()), + attribute.Int64("current_height", int64(current.Height())), + attribute.String("executed", executed.String()), + attribute.Int64("executed_height", int64(executed.Height())), + attribute.String("processor", "actor_events"), + ) + } + defer span.End() + + report := &visormodel.ProcessingReport{ + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), + } + + blkMsgRect, err := t.node.TipSetMessageReceipts(ctx, current, executed) + if err != nil { + report.ErrorsDetected = fmt.Errorf("getting tipset message receipet: %w", err) + return nil, report, nil + } + + var ( + out = make(messagemodel.ActorEventList, 0, len(blkMsgRect)) + errorsDetected = make([]*messages.MessageError, 0, len(blkMsgRect)) + msgsSeen = make(map[cid.Cid]bool, len(blkMsgRect)) + ) + + for _, m := range blkMsgRect { + select { + case <-ctx.Done(): + return nil, nil, fmt.Errorf("context done: %w", ctx.Err()) + default: + } + + itr, err := m.Iterator() + if err != nil { + return nil, nil, err + } + + for itr.HasNext() { + msg, _, rec := itr.Next() + if msgsSeen[msg.Cid()] { + continue + } + msgsSeen[msg.Cid()] = true + + if rec.EventsRoot == nil { + continue + } + + evtArr, err := amt.LoadAMT(ctx, t.node.Store(), *rec.EventsRoot, amt.UseTreeBitWidth(types.EventAMTBitwidth)) + if err != nil { + report.ErrorsDetected = fmt.Errorf("loading actor events amt (%s): %w", *rec.EventsRoot, err) + return nil, report, nil + } + var evt types.Event + err = evtArr.ForEach(ctx, func(evtIdx uint64, deferred *cbg.Deferred) error { + if evtIdx > math.MaxInt { + return xerrors.Errorf("too many events") + } + if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil { + return err + } + + emitter, err := address.NewIDAddress(uint64(evt.Emitter)) + if err != nil { + errorsDetected = append(errorsDetected, &messages.MessageError{ + Cid: msg.Cid(), + Error: fmt.Sprintf("failed to make ID address from event emitter (%s): %s", evt.Emitter, err), + }) + return err + } + for _, e := range evt.Entries { + out = append(out, &messagemodel.ActorEvent{ + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), + MessageCid: msg.Cid().String(), + EventIndex: int64(evtIdx), + Emitter: emitter.String(), + Flags: []byte{e.Flags}, + Key: e.Key, + Value: e.Value, + }) + } + return nil + }) + + if err != nil { + errorsDetected = append(errorsDetected, &messages.MessageError{ + Cid: msg.Cid(), + Error: fmt.Sprintf("loading actor events amt (%s): %s", *rec.EventsRoot, err), + }) + continue + } + } + } + + if len(errorsDetected) != 0 { + report.ErrorsDetected = errorsDetected + } + + return model.PersistableList{ + out, + }, report, nil +}