Skip to content

Commit

Permalink
feat: add new task for fevm receipt (#1208)
Browse files Browse the repository at this point in the history
* Add new task: fevm receipt

* Add new schema

* Add the cache for actor

* Refine the logic in cache and placeholder checking

---------

Co-authored-by: Terry <[email protected]>
  • Loading branch information
Terryhung and Terry committed May 22, 2023
1 parent 4aa0e37 commit 8814fc4
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 37 deletions.
80 changes: 46 additions & 34 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,51 +40,32 @@ var (
executedTsCacheSize int
diffPreCommitCacheSize int
diffSectorCacheSize int
actorCacheSize int

tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE"
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
actorCacheSizeEnv = "LILY_ACTOR_CACHE_SIZE"
)

func init() {
tipsetMessageReceiptCacheSize = 4
executedTsCacheSize = 4
diffPreCommitCacheSize = 500
diffSectorCacheSize = 500
if s := os.Getenv(tipsetMessageReceiptSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
tipsetMessageReceiptCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, tipsetMessageReceiptSizeEnv, tipsetMessageReceiptCacheSize, err)
}
}
if s := os.Getenv(executedTsCacheSizeEnv); s != "" {
func getCacheSizeFromEnv(env string, defaultValue int) int {
if s := os.Getenv(env); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
executedTsCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, executedTsCacheSizeEnv, executedTsCacheSize, err)
}
}
if s := os.Getenv(diffPreCommitCacheSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
diffPreCommitCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, diffPreCommitCacheSizeEnv, diffPreCommitCacheSize, err)
}
}
if s := os.Getenv(diffSectorCacheSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
diffSectorCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, diffSectorCacheSizeEnv, diffSectorCacheSize, err)
return int(v)
}
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, env, defaultValue, err)
}
return defaultValue
}

func init() {
tipsetMessageReceiptCacheSize = getCacheSizeFromEnv(tipsetMessageReceiptSizeEnv, 4)
executedTsCacheSize = getCacheSizeFromEnv(executedTsCacheSizeEnv, 4)
diffPreCommitCacheSize = getCacheSizeFromEnv(diffPreCommitCacheSizeEnv, 500)
diffSectorCacheSize = getCacheSizeFromEnv(diffSectorCacheSizeEnv, 500)
actorCacheSize = getCacheSizeFromEnv(actorCacheSizeEnv, 1000)
}

var _ tasks.DataSource = (*DataSource)(nil)
Expand Down Expand Up @@ -117,6 +98,11 @@ func NewDataSource(node lens.API) (*DataSource, error) {
return nil, err
}

t.actorCache, err = lru.New(actorCacheSize)
if err != nil {
return nil, err
}

return t, nil
}

Expand All @@ -134,6 +120,8 @@ type DataSource struct {

diffPreCommitCache *lru.Cache
diffPreCommitGroup singleflight.Group

actorCache *lru.Cache
}

func (t *DataSource) MessageReceiptEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) {
Expand All @@ -148,10 +136,18 @@ func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet)
return t.node.MessagesForTipSetBlocks(ctx, ts)
}

func (t *DataSource) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return t.node.ChainGetMessagesInTipset(ctx, tsk)
}

func (t *DataSource) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) {
return t.node.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (t *DataSource) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return t.node.EthGetTransactionReceipt(ctx, txHash)
}

// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`).
// TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands
func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) {
Expand Down Expand Up @@ -192,13 +188,29 @@ func (t *DataSource) Store() adt.Store {
}

func (t *DataSource) Actor(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
metrics.RecordInc(ctx, metrics.DataSourceActorCacheRead)
ctx, span := otel.Tracer("").Start(ctx, "DataSource.Actor")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", tsk.String()))
span.SetAttributes(attribute.String("address", addr.String()))
}
defer span.End()
return t.node.StateGetActor(ctx, addr, tsk)

key, keyErr := asKey(addr, tsk)
if keyErr == nil {
value, found := t.actorCache.Get(key)
if found {
metrics.RecordInc(ctx, metrics.DataSourceActorCacheHit)
return value.(*types.Actor), nil
}
}

act, err := t.node.StateGetActor(ctx, addr, tsk)
if err == nil && keyErr == nil {
t.actorCache.Add(key, act)
}

return act, err
}

func (t *DataSource) MinerPower(ctx context.Context, addr address.Address, ts *types.TipSet) (*api.MinerPower, error) {
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (

// fevm task
fevmblockheadertask "github.com/filecoin-project/lily/tasks/fevm/blockheader"
fevmreceipttask "github.com/filecoin-project/lily/tasks/fevm/receipt"
fevmactorstatstask "github.com/filecoin-project/lily/tasks/fevmactorstats"

"github.com/filecoin-project/lily/chain/indexer/tasktype"
Expand Down Expand Up @@ -644,6 +645,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
out.TipsetProcessors[t] = fevmactorstatstask.NewTask(api)
case tasktype.FEVMBlockHeader:
out.TipsetsProcessors[t] = fevmblockheadertask.NewTask(api)
case tasktype.FEVMReceipt:
out.TipsetsProcessors[t] = fevmreceipttask.NewTask(api)

case BuiltinTaskName:
out.ReportProcessors[t] = indexertask.NewTask(api)
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 24)
require.Len(t, proc.tipsetProcessors, 10)
require.Len(t, proc.tipsetsProcessors, 10)
require.Len(t, proc.tipsetsProcessors, 11)
require.Len(t, proc.builtinProcessors, 1)

require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs])
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 24)
require.Len(t, proc.TipsetProcessors, 10)
require.Len(t, proc.TipsetsProcessors, 10)
require.Len(t, proc.TipsetsProcessors, 11)
require.Len(t, proc.ReportProcessors, 1)
}
5 changes: 5 additions & 0 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
VerifiedRegistryClaim = "verified_registry_claim"
FEVMActorStats = "fevm_actor_stats"
FEVMBlockHeader = "fevm_block_header"
FEVMReceipt = "fevm_receipt"
)

var AllTableTasks = []string{
Expand Down Expand Up @@ -93,6 +94,7 @@ var AllTableTasks = []string{
VerifiedRegistryClaim,
FEVMActorStats,
FEVMBlockHeader,
FEVMReceipt,
}

var TableLookup = map[string]struct{}{
Expand Down Expand Up @@ -140,6 +142,7 @@ var TableLookup = map[string]struct{}{
VerifiedRegistryClaim: {},
FEVMActorStats: {},
FEVMBlockHeader: {},
FEVMReceipt: {},
}

var TableComment = map[string]string{
Expand Down Expand Up @@ -187,6 +190,7 @@ var TableComment = map[string]string{
VerifiedRegistryClaim: ``,
FEVMActorStats: ``,
FEVMBlockHeader: ``,
FEVMReceipt: ``,
}

var TableFieldComments = map[string]map[string]string{
Expand Down Expand Up @@ -291,4 +295,5 @@ var TableFieldComments = map[string]map[string]string{
VerifiedRegistryClaim: {},
FEVMActorStats: {},
FEVMBlockHeader: {},
FEVMReceipt: {},
}
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var TaskLookup = map[string][]string{
FEVMTask: {
FEVMActorStats,
FEVMBlockHeader,
FEVMReceipt,
},
}

Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 44
const TotalTableTasks = 45
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
2 changes: 2 additions & 0 deletions lens/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ChainAPI interface {

ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error)
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error)

ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error)

Expand Down Expand Up @@ -78,6 +79,7 @@ type VMAPI interface {

type EthModuleAPI interface {
EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error)
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error)
}

type MessageExecution struct {
Expand Down
2 changes: 2 additions & 0 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type LilyAPI interface {
ChainPrune(ctx context.Context, opts api.PruneOpts) error //perm:read
ChainHotGC(ctx context.Context, opts api.HotGCOpts) error //perm:read
EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) //perm:read
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) //perm:read
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) //perm:read

// trigger graceful shutdown
Shutdown(context.Context) error
Expand Down
8 changes: 8 additions & 0 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ func (m *LilyNodeAPI) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.Et
return m.EthModuleAPI.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (m *LilyNodeAPI) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return m.EthModuleAPI.EthGetTransactionReceipt(ctx, txHash)
}

func (m *LilyNodeAPI) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return m.ChainAPI.ChainGetMessagesInTipset(ctx, tsk)
}

// MessagesForTipSetBlocks returns messages stored in the blocks of the specified tipset, messages may be duplicated
// across the returned set of BlockMessages.
func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) {
Expand Down
10 changes: 10 additions & 0 deletions lens/lily/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type LilyAPIStruct struct {
ChainPrune func(ctx context.Context, opts api.PruneOpts) error `perm:"read"`
ChainHotGC func(ctx context.Context, opts api.HotGCOpts) error `perm:"read"`
EthGetBlockByHash func(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) `perm:"read"`
EthGetTransactionReceipt func(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) `perm:"read"`
ChainGetMessagesInTipset func(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) `perm:"read"`

LogList func(context.Context) ([]string, error) `perm:"read"`
LogSetLevel func(context.Context, string, string) error `perm:"read"`
Expand Down Expand Up @@ -274,3 +276,11 @@ func (s *LilyAPIStruct) LilyGapFillNotify(ctx context.Context, cfg *LilyGapFillN
func (s *LilyAPIStruct) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) {
return s.Internal.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (s *LilyAPIStruct) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return s.Internal.EthGetTransactionReceipt(ctx, txHash)
}

func (s *LilyAPIStruct) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return s.Internal.ChainGetMessagesInTipset(ctx, tsk)
}
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var (
DataSourceMessageExecutionRead = stats.Int64("data_source_message_execution_read", "Number of reads for message executions", stats.UnitDimensionless)
DataSourceMessageExecutionCacheHit = stats.Int64("data_source_message_execution_cache_hit", "Number of cache hits for message executions", stats.UnitDimensionless)
DataSourceActorStateChangesDuration = stats.Float64("data_source_actor_state_change_ms", "Time take to collect actors whose state changed", stats.UnitMilliseconds)
DataSourceActorCacheRead = stats.Int64("data_source_actor_read", "Number of reads for message executions", stats.UnitDimensionless)
DataSourceActorCacheHit = stats.Int64("data_source_actor_cache_hit", "Number of cache hits for message executions", stats.UnitDimensionless)

// Distributed Indexer

Expand Down
51 changes: 51 additions & 0 deletions model/fevm/receipt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fevm

import (
"context"

"go.opencensus.io/tag"

"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
)

type FEVMReceipt struct {
tableName struct{} `pg:"fevm_receipt"` // nolint: structcheck

// Height message was executed at.
Height int64 `pg:",pk,notnull,use_zero"`

// Message CID
Message string `pg:",use_zero"`

TransactionHash string `pg:",notnull"`
TransactionIndex uint64 `pg:",use_zero"`
BlockHash string `pg:",notnull"`
BlockNumber uint64 `pg:",use_zero"`
From string `pg:",notnull"`
To string `pg:",notnull"`
ContractAddress string `pg:",notnull"`
Status uint64 `pg:",use_zero"`
CumulativeGasUsed uint64 `pg:",use_zero"`
GasUsed uint64 `pg:",use_zero"`
EffectiveGasPrice int64 `pg:",use_zero"`
LogsBloom string `pg:",notnull"`
Logs string `pg:",type:jsonb"`
}

func (f *FEVMReceipt) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "fevm_receipt"))
metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, f)
}

type FEVMReceiptList []*FEVMReceipt

func (f FEVMReceiptList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
if len(f) == 0 {
return nil
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "fevm_receipt"))
metrics.RecordCount(ctx, metrics.PersistModel, len(f))
return s.PersistModel(ctx, f)
}
27 changes: 27 additions & 0 deletions schemas/v1/23_fevm_receipt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package v1

func init() {
patches.Register(
23,
`
CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.fevm_receipt (
height BIGINT NOT NULL,
logs jsonb,
transaction_hash TEXT,
transaction_index BIGINT,
block_hash TEXT,
block_number BIGINT,
"from" TEXT,
"to" TEXT,
contract_address TEXT,
status BIGINT,
cumulative_gas_used BIGINT,
gas_used BIGINT,
effective_gas_price BIGINT,
logs_bloom TEXT,
message TEXT,
PRIMARY KEY(height, transaction_hash)
);
`,
)
}
1 change: 1 addition & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var Models = []interface{}{

(*fevm.FEVMActorStats)(nil),
(*fevm.FEVMBlockHeader)(nil),
(*fevm.FEVMReceipt)(nil),
}

var log = logging.Logger("lily/storage")
Expand Down
2 changes: 2 additions & 0 deletions tasks/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ type DataSource interface {
ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error)

EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error)
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error)
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error)
}
Loading

0 comments on commit 8814fc4

Please sign in to comment.