Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): add chain economics processing #94

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/filecoin-project/sentinel-visor/schedule"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/tasks/chain"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
"github.com/filecoin-project/sentinel-visor/tasks/message"
"github.com/filecoin-project/sentinel-visor/tasks/views"
Expand Down Expand Up @@ -161,6 +162,28 @@ var Run = &cli.Command{
Usage: "Refresh frequency for chain visualization views (0 = disables refresh)",
EnvVars: []string{"VISOR_CHAINVIS_REFRESH"},
},

&cli.IntFlag{
Name: "chaineconomics-workers",
Aliases: []string{"cew"},
Value: 5,
Usage: "Number of chain economics processors to start",
EnvVars: []string{"VISOR_CHAINECONOMICS_WORKERS"},
},
&cli.IntFlag{
Name: "chaineconomics-batch",
Aliases: []string{"ceb"},
Value: 50, // chain economics processing is quite fast
Usage: "Batch size for the chain economics processor",
EnvVars: []string{"VISOR_CHAINECONOMICS_BATCH"},
},
&cli.DurationFlag{
Name: "chaineconomics-lease",
Aliases: []string{"cel"},
Value: time.Minute * 15,
Usage: "Lease time for the chain economics processor",
EnvVars: []string{"VISOR_CHAINECONOMICS_LEASE"},
},
},
Action: func(cctx *cli.Context) error {
// Validate flags
Expand Down Expand Up @@ -266,6 +289,16 @@ var Run = &cli.Command{
})
}

// Add several chain economics tasks to read gas outputs from indexed messages
for i := 0; i < cctx.Int("chaineconomics-workers"); i++ {
scheduler.Add(schedule.TaskConfig{
Name: fmt.Sprintf("ChainEconomicsProcessor%03d", i),
Task: chain.NewChainEconomicsProcessor(rctx.db, rctx.api, cctx.Duration("chaineconomics-lease"), cctx.Int("chaineconomics-batch"), heightFrom, heightTo),
RestartOnFailure: true,
RestartOnCompletion: true,
})
}

// Include optional refresher for Chain Visualization views
// Zero duration will cause ChainVisRefresher to exit and should not restart
scheduler.Add(schedule.TaskConfig{
Expand Down
7 changes: 7 additions & 0 deletions lens/lotus/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,10 @@ func (aw *APIWrapper) StateReadState(ctx context.Context, actor address.Address,
func (aw *APIWrapper) ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap, gasPremium abi.TokenAmount) vm.GasOutputs {
return vm.ComputeGasOutputs(gasUsed, gasLimit, baseFee, feeCap, gasPremium)
}

// TODO: Rename to StateVMCirculatingSupplyInternal following Lotus release containing commit 83624a8858d1983a28e0cd523c5ec353a5bbdaee
func (aw *APIWrapper) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) {
ctx, span := global.Tracer("").Start(ctx, "Lotus.StateCirculatingSupply")
defer span.End()
return aw.FullNode.StateCirculatingSupply(ctx, tsk)
}
52 changes: 52 additions & 0 deletions model/chain/economics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package chain

import (
"context"

"github.com/go-pg/pg/v10"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
"golang.org/x/xerrors"
)

type ChainEconomics struct {
tableName struct{} `pg:"chain_economics"`
ParentStateRoot string `pg:",notnull"`
CirculatingFil string `pg:",notnull"`
VestedFil string `pg:",notnull"`
MinedFil string `pg:",notnull"`
BurntFil string `pg:",notnull"`
LockedFil string `pg:",notnull"`
}

func (c *ChainEconomics) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, c).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting chain economics: %w", err)
}
return nil
}

type ChainEconomicsList []*ChainEconomics

func (l ChainEconomicsList) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
return l.PersistWithTx(ctx, tx)
})
}

func (l ChainEconomicsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if len(l) == 0 {
return nil
}
ctx, span := global.Tracer("").Start(ctx, "ChainEconomicsList.PersistWithTx", trace.WithAttributes(label.Int("count", len(l))))
defer span.End()
if _, err := tx.ModelContext(ctx, &l).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting derived gas outputs: %w", err)
}
return nil
}
11 changes: 11 additions & 0 deletions model/visor/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ type ProcessingTipSet struct {

// MessageErrorsDetected contains any error encountered when reading the tipset's messages
MessageErrorsDetected string

// Chain economics processing

// EconomicsClaimedUntil marks the tipset as claimed for chain economics processing until the set time
EconomicsClaimedUntil time.Time

// EconomicsCompletedAt is the time the tipset was read from the chain and its chain economics read
EconomicsCompletedAt time.Time

// EconomicsErrorsDetected contains any error encountered when reading the tipset's chain economics
EconomicsErrorsDetected string
}

func (p *ProcessingTipSet) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
Expand Down
27 changes: 27 additions & 0 deletions storage/migrations/8_chain_economics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

// Schema version 8 adds columns for chain_economics processing

func init() {
up := batch(`
-- chain_economics table already exists in chainwatch schema (schema version 1)


ALTER TABLE public.visor_processing_tipsets ADD COLUMN IF NOT EXISTS economics_claimed_until timestamptz;
ALTER TABLE public.visor_processing_tipsets ADD COLUMN IF NOT EXISTS economics_completed_at timestamptz;
ALTER TABLE public.visor_processing_tipsets ADD COLUMN IF NOT EXISTS economics_errors_detected text;

SELECT 1;
`)

down := batch(`
ALTER TABLE public.visor_processing_tipsets DROP COLUMN IF EXISTS economics_claimed_until;
ALTER TABLE public.visor_processing_tipsets DROP COLUMN IF EXISTS economics_completed_at;
ALTER TABLE public.visor_processing_tipsets DROP COLUMN IF EXISTS economics_errors_detected;
`)
migrations.MustRegisterTx(up, down)
}
69 changes: 62 additions & 7 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/sentinel-visor/model/actors/power"
"github.com/filecoin-project/sentinel-visor/model/actors/reward"
"github.com/filecoin-project/sentinel-visor/model/blocks"
"github.com/filecoin-project/sentinel-visor/model/chain"
"github.com/filecoin-project/sentinel-visor/model/derived"
"github.com/filecoin-project/sentinel-visor/model/messages"
"github.com/filecoin-project/sentinel-visor/model/visor"
Expand Down Expand Up @@ -58,6 +59,7 @@ var models = []interface{}{
(*visor.ProcessingMessage)(nil),

(*derived.GasOutputs)(nil),
(*chain.ChainEconomics)(nil),
}

var log = logging.Logger("storage")
Expand All @@ -67,8 +69,10 @@ var (
SchemaLock AdvisoryLock = 1
)

var ErrSchemaTooOld = errors.New("database schema is too old and requires migration")
var ErrSchemaTooNew = errors.New("database schema is too new for this version of visor")
var (
ErrSchemaTooOld = errors.New("database schema is too old and requires migration")
ErrSchemaTooNew = errors.New("database schema is too new for this version of visor")
)

func NewDatabase(ctx context.Context, url string, poolSize int) (*Database, error) {
opt, err := pg.ParseURL(url)
Expand Down Expand Up @@ -119,7 +123,6 @@ func (d *Database) Connect(ctx context.Context) error {
d.DB = db
return nil
}

}

func connect(ctx context.Context, opt *pg.Options) (*pg.DB, error) {
Expand Down Expand Up @@ -277,7 +280,6 @@ WITH leased AS (
)
SELECT tip_set,height FROM leased;
`, claimUntil, d.Clock.Now(), minHeight, maxHeight, batchSize)

if err != nil {
return err
}
Expand Down Expand Up @@ -358,7 +360,6 @@ WITH leased AS (
RETURNING a.head, a.code, a.nonce, a.balance, a.address, a.parent_state_root, a.tip_set, a.parent_tip_set)
SELECT head, code, nonce, balance, address, parent_state_root, tip_set, parent_tip_set from leased;
`, claimUntil, d.Clock.Now(), minHeight, maxHeight, pg.In(codes), batchSize)

if err != nil {
return err
}
Expand Down Expand Up @@ -414,7 +415,6 @@ WITH leased AS (
)
SELECT tip_set,height FROM leased;
`, claimUntil, d.Clock.Now(), minHeight, maxHeight, batchSize)

if err != nil {
return err
}
Expand Down Expand Up @@ -483,7 +483,6 @@ WITH leased AS (
)
SELECT * FROM leased;
`, claimUntil, d.Clock.Now(), minHeight, maxHeight, batchSize)

if err != nil {
return err
}
Expand Down Expand Up @@ -514,3 +513,59 @@ func (d *Database) MarkGasOutputsMessagesComplete(ctx context.Context, cid strin

return nil
}

// LeaseTipSetEconomics leases a set of tipsets containing chain economics to process. minHeight and maxHeight define an inclusive range of heights to process.
// TODO: refactor all the tipset leasing methods into a more general function
func (d *Database) LeaseTipSetEconomics(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) (visor.ProcessingTipSetList, error) {
var tipsets visor.ProcessingTipSetList

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &tipsets, `
WITH leased AS (
UPDATE visor_processing_tipsets
SET economics_claimed_until = ?
FROM (
SELECT *
FROM visor_processing_tipsets
WHERE economics_completed_at IS null AND
(economics_claimed_until IS null OR economics_claimed_until < ?) AND
height >= ? AND height <= ?
ORDER BY height DESC
LIMIT ?
FOR UPDATE SKIP LOCKED
) candidates
WHERE visor_processing_tipsets.tip_set = candidates.tip_set AND visor_processing_tipsets.height = candidates.height
RETURNING visor_processing_tipsets.tip_set, visor_processing_tipsets.height
)
SELECT tip_set,height FROM leased;
`, claimUntil, d.Clock.Now(), minHeight, maxHeight, batchSize)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return tipsets, nil
}

func (d *Database) MarkTipSetEconomicsComplete(ctx context.Context, tipset string, height int64, completedAt time.Time, errorsDetected string) error {
if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.ExecContext(ctx, `
UPDATE visor_processing_tipsets
SET economics_claimed_until = null,
economics_completed_at = ?,
economics_errors_detected = ?
WHERE tip_set = ? AND height = ?
`, completedAt, useNullIfEmpty(errorsDetected), tipset, height)
if err != nil {
return err
}

return nil
}); err != nil {
return err
}

return nil
}
Loading