From 1f883dc61c331fd8647cec8ff98c2d2e1b440a63 Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 2 Aug 2022 14:09:21 -0700 Subject: [PATCH 1/9] feat: implement vm message extraction --- chain/indexer/integrated/processor/state.go | 3 + chain/indexer/tasktype/table_tasks.go | 22 +++- lens/util/repo.go | 94 ++++++++++++++ model/messages/vm.go | 65 ++++++++++ schemas/v1/8_vm_messages.go | 33 +++++ storage/sql.go | 1 + tasks/messageexecutions/vm/task.go | 129 ++++++++++++++++++++ 7 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 model/messages/vm.go create mode 100644 schemas/v1/8_vm_messages.go create mode 100644 tasks/messageexecutions/vm/task.go diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index f18e782d6..16e84b861 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -22,6 +22,7 @@ import ( poweractors "github.com/filecoin-project/lily/chain/actors/builtin/power" rewardactors "github.com/filecoin-project/lily/chain/actors/builtin/reward" verifregactors "github.com/filecoin-project/lily/chain/actors/builtin/verifreg" + "github.com/filecoin-project/lily/tasks/messageexecutions/vm" "github.com/filecoin-project/lily/tasks" // actor tasks @@ -557,6 +558,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces out.TipsetsProcessors[t] = gasecontask.NewTask(api) case tasktype.MultisigApproval: out.TipsetsProcessors[t] = msapprovaltask.NewTask(api) + case tasktype.VmMessage: + out.TipsetsProcessors[t] = vm.NewTask(api) // // Blocks diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index deccd34b9..02a4278ff 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -24,6 +24,7 @@ const ( ParsedMessage = "parsed_message" InternalMessage = "internal_messages" InternalParsedMessage = "internal_parsed_messages" + VmMessage = "vm_messages" MultisigTransaction = "multisig_transaction" ChainPower = "chain_power" PowerActorClaim = "power_actor_claim" @@ -62,6 +63,7 @@ var AllTableTasks = []string{ ParsedMessage, InternalMessage, InternalParsedMessage, + VmMessage, MultisigTransaction, ChainPower, PowerActorClaim, @@ -100,6 +102,7 @@ var TableLookup = map[string]struct{}{ ParsedMessage: {}, InternalMessage: {}, InternalParsedMessage: {}, + VmMessage: {}, MultisigTransaction: {}, ChainPower: {}, PowerActorClaim: {}, @@ -138,6 +141,7 @@ var TableComment = map[string]string{ ParsedMessage: ``, InternalMessage: ``, InternalParsedMessage: ``, + VmMessage: ``, MultisigTransaction: ``, ChainPower: ``, PowerActorClaim: ``, @@ -178,8 +182,9 @@ var TableFieldComments = map[string]map[string]string{ "DealID": "Identifier for the deal.", "EndEpoch": "The epoch at which this deal with end.", "Height": "Epoch at which this deal proposal was added or changed.", + "IsString": "Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md", "IsVerified": "Deal is with a verified provider.", - "Label": "An arbitrary client chosen label to apply to the deal.", + "Label": "An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.", "PaddedPieceSize": "The piece size in bytes with padding.", "PieceCID": "CID of a sector piece. A Piece is an object that represents a whole or part of a File.", "ProviderCollateral": "The amount of FIL (in attoFIL) the provider has pledged as collateral. The Provider deal collateral is only slashed when a sector is terminated before the deal expires.", @@ -197,6 +202,21 @@ var TableFieldComments = map[string]map[string]string{ ParsedMessage: {}, InternalMessage: {}, InternalParsedMessage: {}, + VmMessage: { + "ActorCode": "ActorCode of To (receiver)", + "Cid": "Cid of the message.", + "ExitCode": "ExitCode of message execution.", + "From": "From sender of message.", + "GasUsed": "GasUsed by message.", + "Height": "Height message was executed at.", + "Method": "Method called on To (receiver)", + "Params": "Params contained in message.", + "Returns": "Return value of message.", + "Source": "On-chain message triggering the message.", + "StateRoot": "StateRoot message was applied to.", + "To": "To receiver of message.", + "Value": "Value attoFIL contained in message.", + }, MultisigTransaction: { "To": "Transaction State", }, diff --git a/lens/util/repo.go b/lens/util/repo.go index d160883cb..cbf6fe589 100644 --- a/lens/util/repo.go +++ b/lens/util/repo.go @@ -7,6 +7,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/exitcode" @@ -282,6 +283,34 @@ func ParseParams(params []byte, method abi.MethodNum, actCode cid.Cid) (string, return string(b), m.Name, err } +func ParseReturn(ret []byte, method abi.MethodNum, actCode cid.Cid) (string, string, error) { + m, found := ActorRegistry.Methods[actCode][method] + if !found { + return "", "", fmt.Errorf("unknown method %d for actor %s", method, actCode) + } + + // if the actor method doesn't expect returns don't parse them + if m.Ret == reflect.TypeOf(new(abi.EmptyValue)) { + return "", m.Name, nil + } + + p := reflect.New(m.Ret.Elem()).Interface().(cbg.CBORUnmarshaler) + if err := p.UnmarshalCBOR(bytes.NewReader(ret)); err != nil { + actorName := builtin.ActorNameByCode(actCode) + return "", m.Name, fmt.Errorf("cbor decode into %s %s:(%s.%d) failed: %v", m.Name, actorName, actCode, method, err) + } + + b, err := MarshalWithOverrides(p, map[reflect.Type]marshaller{ + reflect.TypeOf(bitfield.BitField{}): bitfieldCountMarshaller, + }) + if err != nil { + return "", "", fmt.Errorf("failed to parse message return method: %d, actor code: %s, return: %s: %w", method, actCode, string(ret), err) + } + + return string(b), m.Name, err + +} + func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, string, error) { // Method is optional, zero means a plain value transfer if m.Method == 0 { @@ -308,6 +337,71 @@ func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, stri return method, params, nil } +type MessageParamsReturn struct { + MethodName string + Params string + Return string +} + +func MethodParamsReturnForMessage(m *MessageTrace, destCode cid.Cid) (*MessageParamsReturn, error) { + // Method is optional, zero means a plain value transfer + if m.Message.Method == 0 { + return &MessageParamsReturn{ + MethodName: "Send", + Params: "", + Return: "", + }, nil + } + + if !destCode.Defined() { + return nil, fmt.Errorf("missing actor code") + } + + params, method, err := ParseParams(m.Message.Params, m.Message.Method, destCode) + if err != nil { + log.Warnf("failed to parse parameters of message %s: %v", m.Message.Cid(), err) + return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err) + } + ret, method, err := ParseReturn(m.Receipt.Return, m.Message.Method, destCode) + if err != nil { + log.Warnf("failed to parse return of message %s: %v", m.Message.Cid(), err) + return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err) + } + + return &MessageParamsReturn{ + MethodName: method, + Params: params, + Return: ret, + }, nil +} + +func walkExecutionTrace(et *types.ExecutionTrace, trace *[]*MessageTrace) { + for _, sub := range et.Subcalls { + *trace = append(*trace, &MessageTrace{ + Message: sub.Msg, + Receipt: sub.MsgRct, + Error: sub.Error, + Duration: sub.Duration, + GasCharge: sub.GasCharges, + }) + walkExecutionTrace(&sub, trace) //nolint:scopelint,gosec + } +} + +type MessageTrace struct { + Message *types.Message + Receipt *types.MessageReceipt + Error string + Duration time.Duration + GasCharge []*types.GasTrace +} + +func GetChildMessagesOf(m *lens.MessageExecution) []*MessageTrace { + var out []*MessageTrace + walkExecutionTrace(&m.Ret.ExecutionTrace, &out) + return out +} + func ActorNameAndFamilyFromCode(c cid.Cid) (name string, family string, err error) { if !c.Defined() { return "", "", fmt.Errorf("cannot derive actor name from undefined CID") diff --git a/model/messages/vm.go b/model/messages/vm.go new file mode 100644 index 000000000..ec5757651 --- /dev/null +++ b/model/messages/vm.go @@ -0,0 +1,65 @@ +package messages + +import ( + "context" + + "go.opencensus.io/tag" + + "github.com/filecoin-project/lily/metrics" + "github.com/filecoin-project/lily/model" +) + +type VmMessage struct { + tableName struct{} `pg:"vm_messages"` + + // Height message was executed at. + Height int64 `pg:",pk,notnull,use_zero"` + // StateRoot message was applied to. + StateRoot string `pg:",pk,notnull"` + // Cid of the message. + Cid string `pg:",pk,notnull"` + // On-chain message triggering the message. + Source string `pg:",pk,notnull"` + + // From sender of message. + From string `pg:",notnull"` + // To receiver of message. + To string `pg:",notnull"` + // Value attoFIL contained in message. + Value string `pg:"type:numeric,notnull"` + // Method called on To (receiver) + Method uint64 `pg:",use_zero"` + // ActorCode of To (receiver) + ActorCode string `pg:",notnull"` + // ExitCode of message execution. + ExitCode int64 `pg:",use_zero"` + // GasUsed by message. + GasUsed int64 `pg:",use_zero"` + // Params contained in message. + Params string `pg:",type:jsonb"` + // Return value of message. + Returns string `pg:",type:jsonb"` +} + +func (v *VmMessage) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + metrics.RecordCount(ctx, metrics.PersistModel, 1) + return s.PersistModel(ctx, v) +} + +type VmMessageList []*VmMessage + +func (vl VmMessageList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { + if len(vl) == 0 { + return nil + } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + metrics.RecordCount(ctx, metrics.PersistModel, len(vl)) + return s.PersistModel(ctx, vl) +} diff --git a/schemas/v1/8_vm_messages.go b/schemas/v1/8_vm_messages.go new file mode 100644 index 000000000..190353980 --- /dev/null +++ b/schemas/v1/8_vm_messages.go @@ -0,0 +1,33 @@ +package v1 + +func init() { + patches.Register( + 8, + ` + CREATE TABLE {{ .SchemaName | default "public"}}.vm_messages ( + height bigint NOT NULL, + state_root text NOT NULL, + cid text NOT NULL, + source text, + "from" text NOT NULL, + "to" text NOT NULL, + value numeric NOT NULL, + method bigint NOT NULL, + actor_code text NOT NULL, + exit_code bigint NOT NULL, + gas_used bigint NOT NULL, + params jsonb, + returns jsonb +); +ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source); +CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING btree (height DESC); +CREATE INDEX vm_messages_state_root_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (state_root); +CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid); +CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source); +CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from"); +CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to"); +CREATE INDEX vm_messages_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (method); +CREATE INDEX vm_messages_actor_code_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (actor_code); +`, + ) +} diff --git a/storage/sql.go b/storage/sql.go index 1c428f9cc..2d5d2eb7e 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -60,6 +60,7 @@ var Models = []interface{}{ (*messages.ParsedMessage)(nil), (*messages.InternalMessage)(nil), (*messages.InternalParsedMessage)(nil), + (*messages.VmMessage)(nil), (*multisig.MultisigTransaction)(nil), diff --git a/tasks/messageexecutions/vm/task.go b/tasks/messageexecutions/vm/task.go new file mode 100644 index 000000000..2107951f4 --- /dev/null +++ b/tasks/messageexecutions/vm/task.go @@ -0,0 +1,129 @@ +package vm + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" + + "github.com/filecoin-project/lily/lens" + "github.com/filecoin-project/lily/lens/util" + "github.com/filecoin-project/lily/model" + messagemodel "github.com/filecoin-project/lily/model/messages" + visormodel "github.com/filecoin-project/lily/model/visor" + tasks "github.com/filecoin-project/lily/tasks" + messages "github.com/filecoin-project/lily/tasks/messages" +) + +type Task struct { + node tasks.DataSource +} + +func NewTask(node tasks.DataSource) *Task { + return &Task{node: node} +} + +func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { + ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") + if span.IsRecording() { + span.SetAttributes( + attribute.String("current", current.String()), + attribute.Int64("current_height", int64(current.Height())), + attribute.String("executed", executed.String()), + attribute.Int64("executed_height", int64(executed.Height())), + attribute.String("processor", "internal_parsed_message"), + ) + } + defer span.End() + + // execute in parallel as both operations are slow + grp, _ := errgroup.WithContext(ctx) + var mex []*lens.MessageExecution + grp.Go(func() error { + var err error + mex, err = t.node.MessageExecutions(ctx, current, executed) + if err != nil { + return fmt.Errorf("getting messages executions for tipset: %w", err) + } + return nil + }) + + var getActorCode func(a address.Address) (cid.Cid, bool) + grp.Go(func() error { + var err error + getActorCode, err = util.MakeGetActorCodeFunc(ctx, t.node.Store(), current, executed) + if err != nil { + return fmt.Errorf("failed to make actor code query function: %w", err) + } + return nil + }) + + report := &visormodel.ProcessingReport{ + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), + } + + // if either fail, report error and bail + if err := grp.Wait(); err != nil { + report.ErrorsDetected = err + return nil, report, nil + } + + var ( + vmMessageResults = make(messagemodel.VmMessageList, 0, len(mex)) + errorsDetected = make([]*messages.MessageError, 0) + ) + for _, parentMsg := range mex { + select { + case <-ctx.Done(): + return nil, nil, fmt.Errorf("context done: %w", ctx.Err()) + default: + } + + for _, child := range util.GetChildMessagesOf(parentMsg) { + // Cid() computes a CID, so only call it once + childCid := child.Message.Cid() + toCode, ok := getActorCode(child.Message.To) + if !ok { + errorsDetected = append(errorsDetected, &messages.MessageError{ + Cid: parentMsg.Cid, + Error: fmt.Errorf("failed to get to actor code for message: %s", childCid).Error(), + }) + continue + } + meta, err := util.MethodParamsReturnForMessage(child, toCode) + if err != nil { + errorsDetected = append(errorsDetected, &messages.MessageError{ + Cid: parentMsg.Cid, + Error: fmt.Errorf("failed get child message (%s) metadata: %w", childCid, err).Error(), + }) + continue + } + vmMessageResults = append(vmMessageResults, &messagemodel.VmMessage{ + Height: int64(parentMsg.Height), + StateRoot: parentMsg.StateRoot.String(), + Source: parentMsg.Cid.String(), + Cid: childCid.String(), + From: child.Message.From.String(), + To: child.Message.To.String(), + Value: child.Message.Value.String(), + GasUsed: child.Receipt.GasUsed, + ExitCode: int64(child.Receipt.ExitCode), + ActorCode: toCode.String(), + Method: uint64(child.Message.Method), + Params: meta.Params, + Returns: meta.Return, + }) + } + } + + if len(errorsDetected) != 0 { + report.ErrorsDetected = errorsDetected + } + return vmMessageResults, report, nil +} From db929fe7c9ee534f7f324f4228ee0417fd16c7ea Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 2 Aug 2022 16:14:08 -0700 Subject: [PATCH 2/9] test: update tests and linting --- .../integrated/processor/state_internal_test.go | 4 +++- chain/indexer/integrated/processor/state_test.go | 2 +- chain/indexer/tasktype/tasks.go | 1 + chain/indexer/tasktype/tasks_test.go | 5 ++--- lens/util/repo.go | 2 +- model/messages/vm.go | 10 +++++----- storage/sql.go | 2 +- tasks/messageexecutions/vm/task.go | 4 ++-- 8 files changed, 16 insertions(+), 14 deletions(-) diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index 5eb65184e..5ea49712e 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lily/chain/actors/builtin/reward" "github.com/filecoin-project/lily/chain/actors/builtin/verifreg" "github.com/filecoin-project/lily/chain/indexer/tasktype" + "github.com/filecoin-project/lily/tasks/messageexecutions/vm" "github.com/filecoin-project/lily/tasks/actorstate" inittask "github.com/filecoin-project/lily/tasks/actorstate/init_" @@ -48,7 +49,7 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, t.Name(), proc.name) require.Len(t, proc.actorProcessors, 21) require.Len(t, proc.tipsetProcessors, 5) - require.Len(t, proc.tipsetsProcessors, 9) + require.Len(t, proc.tipsetsProcessors, 10) require.Len(t, proc.builtinProcessors, 1) require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message]) @@ -60,6 +61,7 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage]) require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy]) require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval]) + require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage]) require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader]) require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent]) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index dac45dee2..38b6970dc 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -347,6 +347,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) { require.NoError(t, err) require.Len(t, proc.ActorProcessors, 21) require.Len(t, proc.TipsetProcessors, 5) - require.Len(t, proc.TipsetsProcessors, 9) + require.Len(t, proc.TipsetsProcessors, 10) require.Len(t, proc.ReportProcessors, 1) } diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index b5202a961..ca14db7cc 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -79,6 +79,7 @@ var TaskLookup = map[string][]string{ ImplicitMessageTask: { InternalMessage, InternalParsedMessage, + VmMessage, }, ChainConsensusTask: { ChainConsensus, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index 4badbb1b1..5491b38df 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -66,7 +66,7 @@ func TestMakeTaskNamesAlias(t *testing.T) { }, { taskAlias: tasktype.ImplicitMessageTask, - tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage}, + tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage, tasktype.VmMessage}, }, { taskAlias: tasktype.ChainConsensusTask, @@ -99,9 +99,8 @@ func TestMakeAllTaskAliasNames(t *testing.T) { require.Len(t, actual, len(storage.Models)) } -const TotalTableTasks = 35 - func TestMakeAllTaskNames(t *testing.T) { + const TotalTableTasks = 36 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/lens/util/repo.go b/lens/util/repo.go index cbf6fe589..83d63de5e 100644 --- a/lens/util/repo.go +++ b/lens/util/repo.go @@ -357,7 +357,7 @@ func MethodParamsReturnForMessage(m *MessageTrace, destCode cid.Cid) (*MessagePa return nil, fmt.Errorf("missing actor code") } - params, method, err := ParseParams(m.Message.Params, m.Message.Method, destCode) + params, _, err := ParseParams(m.Message.Params, m.Message.Method, destCode) if err != nil { log.Warnf("failed to parse parameters of message %s: %v", m.Message.Cid(), err) return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err) diff --git a/model/messages/vm.go b/model/messages/vm.go index ec5757651..5673c7de0 100644 --- a/model/messages/vm.go +++ b/model/messages/vm.go @@ -9,8 +9,8 @@ import ( "github.com/filecoin-project/lily/model" ) -type VmMessage struct { - tableName struct{} `pg:"vm_messages"` +type VMMessage struct { + tableName struct{} `pg:"vm_messages"` // nolint: structcheck // Height message was executed at. Height int64 `pg:",pk,notnull,use_zero"` @@ -41,7 +41,7 @@ type VmMessage struct { Returns string `pg:",type:jsonb"` } -func (v *VmMessage) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { +func (v *VMMessage) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() @@ -50,9 +50,9 @@ func (v *VmMessage) Persist(ctx context.Context, s model.StorageBatch, version m return s.PersistModel(ctx, v) } -type VmMessageList []*VmMessage +type VMMessageList []*VMMessage -func (vl VmMessageList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { +func (vl VMMessageList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { if len(vl) == 0 { return nil } diff --git a/storage/sql.go b/storage/sql.go index 2d5d2eb7e..5560de893 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -60,7 +60,7 @@ var Models = []interface{}{ (*messages.ParsedMessage)(nil), (*messages.InternalMessage)(nil), (*messages.InternalParsedMessage)(nil), - (*messages.VmMessage)(nil), + (*messages.VMMessage)(nil), (*multisig.MultisigTransaction)(nil), diff --git a/tasks/messageexecutions/vm/task.go b/tasks/messageexecutions/vm/task.go index 2107951f4..ca02ac76f 100644 --- a/tasks/messageexecutions/vm/task.go +++ b/tasks/messageexecutions/vm/task.go @@ -75,7 +75,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut } var ( - vmMessageResults = make(messagemodel.VmMessageList, 0, len(mex)) + vmMessageResults = make(messagemodel.VMMessageList, 0, len(mex)) errorsDetected = make([]*messages.MessageError, 0) ) for _, parentMsg := range mex { @@ -104,7 +104,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut }) continue } - vmMessageResults = append(vmMessageResults, &messagemodel.VmMessage{ + vmMessageResults = append(vmMessageResults, &messagemodel.VMMessage{ Height: int64(parentMsg.Height), StateRoot: parentMsg.StateRoot.String(), Source: parentMsg.Cid.String(), From 3436263ff92da88f86cc13c0744024c3f29e3104 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 3 Aug 2022 11:10:51 -0700 Subject: [PATCH 3/9] refactor: vm_messages schema indicies; add table comments --- schemas/v1/8_vm_messages.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/schemas/v1/8_vm_messages.go b/schemas/v1/8_vm_messages.go index 190353980..9709cac48 100644 --- a/schemas/v1/8_vm_messages.go +++ b/schemas/v1/8_vm_messages.go @@ -20,14 +20,27 @@ func init() { returns jsonb ); ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source); -CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING btree (height DESC); -CREATE INDEX vm_messages_state_root_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (state_root); +CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BRIN (height); CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid); CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source); CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from"); CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to"); -CREATE INDEX vm_messages_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (method); -CREATE INDEX vm_messages_actor_code_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (actor_code); +CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method); + + +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.height IS 'Height message was executed at.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.state_root IS 'CID of the parent state root at which this message was executed.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.cid IS 'CID of the message (note this CID does not appear on chain).'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.source IS 'CID of the on-chain message that caused this message to be sent.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."from" IS 'Address of the actor that sent the message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."to" IS 'Address of the actor that received the message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.value IS 'Amount of FIL (in attoFIL) transferred by this message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.method IS 'The method number invoked on the recipient actor. Only unique to the actor the method is being invoked on. A method number of 0 is a plain token transfer - no method execution'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.actor_code IS 'The CID of the actor that received the message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.exit_code IS 'The exit code that was returned as a result of executing the message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.gas_used IS 'A measure of the amount of resources (or units of gas) consumed, in order to execute a message.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.params IS 'Message parameters parsed and serialized as a JSON object.'; +COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.returns IS 'Result returned from executing a message parsed and serialized as a JSON object.'; `, ) } From e29c59512ed092fc10786956d3541f1e15765256 Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 5 Aug 2022 13:03:04 -0700 Subject: [PATCH 4/9] fix: MarketDealProposal description --- chain/indexer/tasktype/table_tasks.go | 2 +- model/actors/market/dealproposal.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index 02a4278ff..146217f43 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -182,7 +182,7 @@ var TableFieldComments = map[string]map[string]string{ "DealID": "Identifier for the deal.", "EndEpoch": "The epoch at which this deal with end.", "Height": "Epoch at which this deal proposal was added or changed.", - "IsString": "Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md", + "IsString": "When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md", "IsVerified": "Deal is with a verified provider.", "Label": "An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.", "PaddedPieceSize": "The piece size in bytes with padding.", diff --git a/model/actors/market/dealproposal.go b/model/actors/market/dealproposal.go index 704ca07c6..829da0b0b 100644 --- a/model/actors/market/dealproposal.go +++ b/model/actors/market/dealproposal.go @@ -49,8 +49,7 @@ type MarketDealProposal struct { // An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting. Label string - // When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. - // Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md + // When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md IsString bool } From 59f8217047e9dac84bf9760f4ddd6825db993629 Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 5 Aug 2022 13:04:00 -0700 Subject: [PATCH 5/9] fix: VmMessage description --- chain/indexer/tasktype/table_tasks.go | 4 ++-- model/messages/vm.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index 146217f43..30b583d46 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -203,13 +203,13 @@ var TableFieldComments = map[string]map[string]string{ InternalMessage: {}, InternalParsedMessage: {}, VmMessage: { - "ActorCode": "ActorCode of To (receiver)", + "ActorCode": "ActorCode of To (receiver).", "Cid": "Cid of the message.", "ExitCode": "ExitCode of message execution.", "From": "From sender of message.", "GasUsed": "GasUsed by message.", "Height": "Height message was executed at.", - "Method": "Method called on To (receiver)", + "Method": "Method called on To (receiver).", "Params": "Params contained in message.", "Returns": "Return value of message.", "Source": "On-chain message triggering the message.", diff --git a/model/messages/vm.go b/model/messages/vm.go index 5673c7de0..d3f56cd41 100644 --- a/model/messages/vm.go +++ b/model/messages/vm.go @@ -27,9 +27,9 @@ type VMMessage struct { To string `pg:",notnull"` // Value attoFIL contained in message. Value string `pg:"type:numeric,notnull"` - // Method called on To (receiver) + // Method called on To (receiver). Method uint64 `pg:",use_zero"` - // ActorCode of To (receiver) + // ActorCode of To (receiver). ActorCode string `pg:",notnull"` // ExitCode of message execution. ExitCode int64 `pg:",use_zero"` From 6e88926e5f1a402bfb13a70710b794f3b0a8f671 Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 5 Aug 2022 13:04:46 -0700 Subject: [PATCH 6/9] fix: VmMessage go-pg struct tags - use notnull --- model/messages/vm.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/model/messages/vm.go b/model/messages/vm.go index d3f56cd41..94a1ca8f5 100644 --- a/model/messages/vm.go +++ b/model/messages/vm.go @@ -28,13 +28,13 @@ type VMMessage struct { // Value attoFIL contained in message. Value string `pg:"type:numeric,notnull"` // Method called on To (receiver). - Method uint64 `pg:",use_zero"` + Method uint64 `pg:",notnull,use_zero"` // ActorCode of To (receiver). ActorCode string `pg:",notnull"` // ExitCode of message execution. - ExitCode int64 `pg:",use_zero"` + ExitCode int64 `pg:",notnull,use_zero"` // GasUsed by message. - GasUsed int64 `pg:",use_zero"` + GasUsed int64 `pg:",notnull,use_zero"` // Params contained in message. Params string `pg:",type:jsonb"` // Return value of message. From ac53c8c380036db6904c9c80e7c4afe647f78c7f Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 5 Aug 2022 13:11:55 -0700 Subject: [PATCH 7/9] fix: index using btree on vmmessage height --- schemas/v1/8_vm_messages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schemas/v1/8_vm_messages.go b/schemas/v1/8_vm_messages.go index 9709cac48..810de2517 100644 --- a/schemas/v1/8_vm_messages.go +++ b/schemas/v1/8_vm_messages.go @@ -20,7 +20,7 @@ func init() { returns jsonb ); ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source); -CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BRIN (height); +CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height); CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid); CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source); CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from"); From b6b937f3321d11ce7dd4c8e2b00780093f7514ae Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 9 Aug 2022 13:36:35 -0700 Subject: [PATCH 8/9] fix: remove redundant indices as they are already PK's --- schemas/v1/8_vm_messages.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/schemas/v1/8_vm_messages.go b/schemas/v1/8_vm_messages.go index 810de2517..d671e0a9d 100644 --- a/schemas/v1/8_vm_messages.go +++ b/schemas/v1/8_vm_messages.go @@ -21,8 +21,6 @@ func init() { ); ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source); CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height); -CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid); -CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source); CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from"); CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to"); CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method); From 9290f148b5a3b232b8326b3a11152246e0c50c23 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 10 Aug 2022 12:24:44 -0700 Subject: [PATCH 9/9] fix: trace processor name --- tasks/messageexecutions/vm/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/messageexecutions/vm/task.go b/tasks/messageexecutions/vm/task.go index ca02ac76f..e01780be2 100644 --- a/tasks/messageexecutions/vm/task.go +++ b/tasks/messageexecutions/vm/task.go @@ -36,7 +36,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut attribute.Int64("current_height", int64(current.Height())), attribute.String("executed", executed.String()), attribute.Int64("executed_height", int64(executed.Height())), - attribute.String("processor", "internal_parsed_message"), + attribute.String("processor", "vm_messages"), ) } defer span.End()