Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement vm message extraction #1027

Merged
merged 9 commits into from
Aug 10, 2022
Merged
3 changes: 3 additions & 0 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
poweractors "github.com/filecoin-project/lily/chain/actors/builtin/power"
rewardactors "github.com/filecoin-project/lily/chain/actors/builtin/reward"
verifregactors "github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"

"github.com/filecoin-project/lily/tasks"
// actor tasks
Expand Down Expand Up @@ -557,6 +558,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
out.TipsetsProcessors[t] = gasecontask.NewTask(api)
case tasktype.MultisigApproval:
out.TipsetsProcessors[t] = msapprovaltask.NewTask(api)
case tasktype.VmMessage:
out.TipsetsProcessors[t] = vm.NewTask(api)

//
// Blocks
Expand Down
4 changes: 3 additions & 1 deletion chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lily/chain/actors/builtin/reward"
"github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"

"github.com/filecoin-project/lily/tasks/actorstate"
inittask "github.com/filecoin-project/lily/tasks/actorstate/init_"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 21)
require.Len(t, proc.tipsetProcessors, 5)
require.Len(t, proc.tipsetsProcessors, 9)
require.Len(t, proc.tipsetsProcessors, 10)
require.Len(t, proc.builtinProcessors, 1)

require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message])
Expand All @@ -60,6 +61,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage])
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy])
require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval])
require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage])

require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader])
require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent])
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 21)
require.Len(t, proc.TipsetProcessors, 5)
require.Len(t, proc.TipsetsProcessors, 9)
require.Len(t, proc.TipsetsProcessors, 10)
require.Len(t, proc.ReportProcessors, 1)
}
22 changes: 21 additions & 1 deletion chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
ParsedMessage = "parsed_message"
InternalMessage = "internal_messages"
InternalParsedMessage = "internal_parsed_messages"
VmMessage = "vm_messages"
MultisigTransaction = "multisig_transaction"
ChainPower = "chain_power"
PowerActorClaim = "power_actor_claim"
Expand Down Expand Up @@ -62,6 +63,7 @@ var AllTableTasks = []string{
ParsedMessage,
InternalMessage,
InternalParsedMessage,
VmMessage,
MultisigTransaction,
ChainPower,
PowerActorClaim,
Expand Down Expand Up @@ -100,6 +102,7 @@ var TableLookup = map[string]struct{}{
ParsedMessage: {},
InternalMessage: {},
InternalParsedMessage: {},
VmMessage: {},
MultisigTransaction: {},
ChainPower: {},
PowerActorClaim: {},
Expand Down Expand Up @@ -138,6 +141,7 @@ var TableComment = map[string]string{
ParsedMessage: ``,
InternalMessage: ``,
InternalParsedMessage: ``,
VmMessage: ``,
MultisigTransaction: ``,
ChainPower: ``,
PowerActorClaim: ``,
Expand Down Expand Up @@ -178,8 +182,9 @@ var TableFieldComments = map[string]map[string]string{
"DealID": "Identifier for the deal.",
"EndEpoch": "The epoch at which this deal with end.",
"Height": "Epoch at which this deal proposal was added or changed.",
"IsString": "When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md",
"IsVerified": "Deal is with a verified provider.",
"Label": "An arbitrary client chosen label to apply to the deal.",
"Label": "An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.",
"PaddedPieceSize": "The piece size in bytes with padding.",
"PieceCID": "CID of a sector piece. A Piece is an object that represents a whole or part of a File.",
"ProviderCollateral": "The amount of FIL (in attoFIL) the provider has pledged as collateral. The Provider deal collateral is only slashed when a sector is terminated before the deal expires.",
Expand All @@ -197,6 +202,21 @@ var TableFieldComments = map[string]map[string]string{
ParsedMessage: {},
InternalMessage: {},
InternalParsedMessage: {},
VmMessage: {
"ActorCode": "ActorCode of To (receiver).",
"Cid": "Cid of the message.",
"ExitCode": "ExitCode of message execution.",
"From": "From sender of message.",
"GasUsed": "GasUsed by message.",
"Height": "Height message was executed at.",
"Method": "Method called on To (receiver).",
"Params": "Params contained in message.",
"Returns": "Return value of message.",
"Source": "On-chain message triggering the message.",
"StateRoot": "StateRoot message was applied to.",
"To": "To receiver of message.",
"Value": "Value attoFIL contained in message.",
},
MultisigTransaction: {
"To": "Transaction State",
},
Expand Down
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var TaskLookup = map[string][]string{
ImplicitMessageTask: {
InternalMessage,
InternalParsedMessage,
VmMessage,
},
ChainConsensusTask: {
ChainConsensus,
Expand Down
5 changes: 2 additions & 3 deletions chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestMakeTaskNamesAlias(t *testing.T) {
},
{
taskAlias: tasktype.ImplicitMessageTask,
tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage},
tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage, tasktype.VmMessage},
},
{
taskAlias: tasktype.ChainConsensusTask,
Expand Down Expand Up @@ -99,9 +99,8 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
require.Len(t, actual, len(storage.Models))
}

const TotalTableTasks = 35

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 36
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
94 changes: 94 additions & 0 deletions lens/util/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/exitcode"
Expand Down Expand Up @@ -282,6 +283,34 @@ func ParseParams(params []byte, method abi.MethodNum, actCode cid.Cid) (string,
return string(b), m.Name, err
}

func ParseReturn(ret []byte, method abi.MethodNum, actCode cid.Cid) (string, string, error) {
m, found := ActorRegistry.Methods[actCode][method]
if !found {
return "", "", fmt.Errorf("unknown method %d for actor %s", method, actCode)
}

// if the actor method doesn't expect returns don't parse them
if m.Ret == reflect.TypeOf(new(abi.EmptyValue)) {
return "", m.Name, nil
}

p := reflect.New(m.Ret.Elem()).Interface().(cbg.CBORUnmarshaler)
if err := p.UnmarshalCBOR(bytes.NewReader(ret)); err != nil {
actorName := builtin.ActorNameByCode(actCode)
return "", m.Name, fmt.Errorf("cbor decode into %s %s:(%s.%d) failed: %v", m.Name, actorName, actCode, method, err)
}

b, err := MarshalWithOverrides(p, map[reflect.Type]marshaller{
reflect.TypeOf(bitfield.BitField{}): bitfieldCountMarshaller,
})
if err != nil {
return "", "", fmt.Errorf("failed to parse message return method: %d, actor code: %s, return: %s: %w", method, actCode, string(ret), err)
}

return string(b), m.Name, err

}

func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, string, error) {
// Method is optional, zero means a plain value transfer
if m.Method == 0 {
Expand All @@ -308,6 +337,71 @@ func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, stri
return method, params, nil
}

type MessageParamsReturn struct {
MethodName string
Params string
Return string
}

func MethodParamsReturnForMessage(m *MessageTrace, destCode cid.Cid) (*MessageParamsReturn, error) {
// Method is optional, zero means a plain value transfer
if m.Message.Method == 0 {
return &MessageParamsReturn{
MethodName: "Send",
Params: "",
Return: "",
}, nil
}

if !destCode.Defined() {
return nil, fmt.Errorf("missing actor code")
}

params, _, err := ParseParams(m.Message.Params, m.Message.Method, destCode)
if err != nil {
log.Warnf("failed to parse parameters of message %s: %v", m.Message.Cid(), err)
return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err)
}
ret, method, err := ParseReturn(m.Receipt.Return, m.Message.Method, destCode)
if err != nil {
log.Warnf("failed to parse return of message %s: %v", m.Message.Cid(), err)
return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err)
}

return &MessageParamsReturn{
MethodName: method,
Params: params,
Return: ret,
}, nil
}

func walkExecutionTrace(et *types.ExecutionTrace, trace *[]*MessageTrace) {
for _, sub := range et.Subcalls {
*trace = append(*trace, &MessageTrace{
Message: sub.Msg,
Receipt: sub.MsgRct,
Error: sub.Error,
Duration: sub.Duration,
GasCharge: sub.GasCharges,
})
walkExecutionTrace(&sub, trace) //nolint:scopelint,gosec
}
}

type MessageTrace struct {
Message *types.Message
Receipt *types.MessageReceipt
Error string
Duration time.Duration
GasCharge []*types.GasTrace
}

func GetChildMessagesOf(m *lens.MessageExecution) []*MessageTrace {
var out []*MessageTrace
walkExecutionTrace(&m.Ret.ExecutionTrace, &out)
return out
}

func ActorNameAndFamilyFromCode(c cid.Cid) (name string, family string, err error) {
if !c.Defined() {
return "", "", fmt.Errorf("cannot derive actor name from undefined CID")
Expand Down
3 changes: 1 addition & 2 deletions model/actors/market/dealproposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ type MarketDealProposal struct {
// An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.
Label string

// When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64.
// Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md
// When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md
IsString bool
}

Expand Down
65 changes: 65 additions & 0 deletions model/messages/vm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package messages

import (
"context"

"go.opencensus.io/tag"

"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
)

type VMMessage struct {
tableName struct{} `pg:"vm_messages"` // nolint: structcheck

// Height message was executed at.
Height int64 `pg:",pk,notnull,use_zero"`
// StateRoot message was applied to.
StateRoot string `pg:",pk,notnull"`
// Cid of the message.
Cid string `pg:",pk,notnull"`
// On-chain message triggering the message.
Source string `pg:",pk,notnull"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need source to be included in the primary key. cid should have enough uniqueness within a height/stateroot.

Suggested change
Source string `pg:",pk,notnull"`
Source string `pg:",notnull"`

Copy link
Member Author

@frrist frrist Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source must remain a primary key as VM message CIDs can conflict at the same height/stateroot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't cid the CID of the intermediate (internal) message? In other words, one source has many cids? If so, then the index should go on the column with the higher cardinality (between cid and source) which I expect to be the intermediate message CIDs and I understand that to be the cid column.

Copy link
Member Author

@frrist frrist Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't cid the CID of the intermediate (internal) message, In other words, one source has many cids?

Yes correct.

Are you proposing we drop source as a primary key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Just remove from the PK to reduce the size of that index. We can use the standalone hash index for source if we ever need to condition on that column. It should reduce the size of the PK index without any performance cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. After some discussion w @frrist, it seems that the cid is not unique and so source is required in the PK afterall. 😢

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably recommend the removal of the hash-based source index. I think both indices are redundant and if the PK index isn't getting a trim, let's drop the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on dropping the source hash index esp if it's part of the PK.


// From sender of message.
From string `pg:",notnull"`
// To receiver of message.
To string `pg:",notnull"`
// Value attoFIL contained in message.
Value string `pg:"type:numeric,notnull"`
// Method called on To (receiver).
Method uint64 `pg:",notnull,use_zero"`
// ActorCode of To (receiver).
ActorCode string `pg:",notnull"`
// ExitCode of message execution.
ExitCode int64 `pg:",notnull,use_zero"`
// GasUsed by message.
GasUsed int64 `pg:",notnull,use_zero"`
// Params contained in message.
Params string `pg:",type:jsonb"`
// Return value of message.
Returns string `pg:",type:jsonb"`
}

func (v *VMMessage) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, v)
}

type VMMessageList []*VMMessage

func (vl VMMessageList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
if len(vl) == 0 {
return nil
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

metrics.RecordCount(ctx, metrics.PersistModel, len(vl))
return s.PersistModel(ctx, vl)
}
46 changes: 46 additions & 0 deletions schemas/v1/8_vm_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package v1

func init() {
patches.Register(
8,
`
CREATE TABLE {{ .SchemaName | default "public"}}.vm_messages (
height bigint NOT NULL,
state_root text NOT NULL,
cid text NOT NULL,
source text,
"from" text NOT NULL,
"to" text NOT NULL,
value numeric NOT NULL,
method bigint NOT NULL,
actor_code text NOT NULL,
exit_code bigint NOT NULL,
gas_used bigint NOT NULL,
params jsonb,
returns jsonb
);
ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source);
frrist marked this conversation as resolved.
Show resolved Hide resolved
CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height);
CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid);
CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source);
CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from");
CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to");
CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do a write-optimized schema, I'd recommend to get rid of several of these indices since all these indices need to be updated whenever an insert or update happens.

I propose the following:

Suggested change
CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height);
CREATE INDEX vm_messages_cid_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (cid);
CREATE INDEX vm_messages_source_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH (source);
CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from");
CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to");
CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method);
CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height);
CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from");
CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to");

Regarding:

CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method);

This only makes sense if we have queries that have a filter that's where actor_code = x and method = b or where actor_code = x . This wouldn't work well with filters: where method = b and actor_code = x or where method = b.

Therefore, I'd recommend based on Mike's comment here:

CREATE INDEX vm_messages_actor_code_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only makes sense if we have queries that have a filter that's where actor_code = x and method = b

This will be a common query since the method number depends on the actor code. i.e. method numbers only have meaning when compared with the actor they were called on.



COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.height IS 'Height message was executed at.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.state_root IS 'CID of the parent state root at which this message was executed.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.cid IS 'CID of the message (note this CID does not appear on chain).';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.source IS 'CID of the on-chain message that caused this message to be sent.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."from" IS 'Address of the actor that sent the message.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."to" IS 'Address of the actor that received the message.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.value IS 'Amount of FIL (in attoFIL) transferred by this message.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.method IS 'The method number invoked on the recipient actor. Only unique to the actor the method is being invoked on. A method number of 0 is a plain token transfer - no method execution';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.actor_code IS 'The CID of the actor that received the message.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.exit_code IS 'The exit code that was returned as a result of executing the message.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.gas_used IS 'A measure of the amount of resources (or units of gas) consumed, in order to execute a message.';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: Do you know what resources this would be other than gas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing the description of gas here that we use elsewhere, e.g.: https://github.com/filecoin-project/lily/blob/v0.10.0/schemas/v1/base.go#L391

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspected it was copypasta. Just wondering if there is anything other than gas that it would have been referring to and couldn't think of.

COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.params IS 'Message parameters parsed and serialized as a JSON object.';
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.returns IS 'Result returned from executing a message parsed and serialized as a JSON object.';
`,
)
}
1 change: 1 addition & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var Models = []interface{}{
(*messages.ParsedMessage)(nil),
(*messages.InternalMessage)(nil),
(*messages.InternalParsedMessage)(nil),
(*messages.VMMessage)(nil),

(*multisig.MultisigTransaction)(nil),

Expand Down
Loading