Skip to content

Commit

Permalink
feat: introduce major schema versioning and version 1 schema
Browse files Browse the repository at this point in the history
* feat: enable models to be version aware

* Use a structured version

* Move schema migrations to new versioned package

* Add base SQL for schemas and update README

* Fix some spelling errors caught by lint

* Appease the lint god

* Incorporate pr review feedback

Create v1 schema and adjust hypertable parameters

Use numeric type to represent numbers

add integer now function to each hypertable

multisig tx param field is nullable

Add Height to IDAddress and ChainEconomics

Fix ups to ensure schema v1 matches models

Some lint fixes

Ensure older schema version can be installed in new database

Add add FilReserveDisbursed for chain economics

Support backwards compatibility with v0 models

Fix genesis test

Support custom postgresql schema names

Explicitly check whether database has initialized version tables

Fix ChainReward.NewReward to be notnull

Convert value column in parsed_messages to numeric

Add actor_family colum to derived_gas_outputs

Add internal_messages and internal_parsed_messages models

Make InternalParsedMessage.Method a string to match schema

Remove extension comment

Only set base schema search path if non-default

Fix typo

Add version number to initial version tables

Fix template typo

Revert adding version number to initial version tables

Remove extension creation

Remove owner statements from v1 schema

Use fixed statediff

Use merged version in statediff repo

tweak logger name
  • Loading branch information
iand committed Jun 10, 2021
1 parent 91a73b9 commit 28961a0
Show file tree
Hide file tree
Showing 34 changed files with 2,571 additions and 141 deletions.
2 changes: 1 addition & 1 deletion chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
MultisigApprovalsTask = "msapprovals" // task that extracts multisig actor approvals
)

var log = logging.Logger("chain")
var log = logging.Logger("visor/chain")

var _ TipSetObserver = (*TipSetIndexer)(nil)

Expand Down
8 changes: 2 additions & 6 deletions chain/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/filecoin-project/sentinel-visor/chain/actors/builtin"
"github.com/raulk/clock"

apitest "github.com/filecoin-project/lotus/api/test"
nodetest "github.com/filecoin-project/lotus/node/test"
Expand Down Expand Up @@ -55,11 +54,8 @@ func TestWalker(t *testing.T) {
cids := bhs.Cids()
rounds := bhs.Rounds()

strg := &storage.Database{
DB: db,
Clock: clock.NewMock(),
Upsert: false,
}
strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

tsIndexer, err := NewTipSetIndexer(opener, strg, builtin.EpochDurationSeconds*time.Second, t.Name(), []string{BlocksTask})
require.NoError(t, err, "NewTipSetIndexer")
Expand Down
8 changes: 2 additions & 6 deletions chain/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/filecoin-project/sentinel-visor/chain/actors/builtin"
"github.com/raulk/clock"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -65,11 +64,8 @@ func TestWatcher(t *testing.T) {

apitest.MineUntilBlock(ctx, t, node, sn[0], nil)

strg := &storage.Database{
DB: db,
Clock: clock.NewMock(),
Upsert: false,
}
strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

tsIndexer, err := NewTipSetIndexer(opener, strg, builtin.EpochDurationSeconds*time.Second, t.Name(), []string{BlocksTask})
require.NoError(t, err, "NewTipSetIndexer")
Expand Down
3 changes: 2 additions & 1 deletion commands/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var MigrateCmd = &cli.Command{

ctx := cctx.Context

db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), false)
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), cctx.String("schema"), false)
if err != nil {
return xerrors.Errorf("connect database: %w", err)
}
Expand Down Expand Up @@ -62,6 +62,7 @@ var MigrateCmd = &cli.Command{
return xerrors.Errorf("verify schema: %w", err)
}

log.Infof("database schema is supported by this version of visor")
return nil
},
}
6 changes: 6 additions & 0 deletions commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var dbConnectFlags = []cli.Flag{
Value: defaultName,
Usage: "A name that helps to identify this instance of visor.",
},
&cli.StringFlag{
Name: "schema",
EnvVars: []string{"VISOR_SCHEMA"},
Value: "public",
Usage: "The name of the postgresql schema that holds the objects used by this instance of visor.",
},
}

var dbBehaviourFlags = []cli.Flag{
Expand Down
2 changes: 1 addition & 1 deletion commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var log = logging.Logger("visor")

func setupDatabase(cctx *cli.Context) (*storage.Database, error) {
ctx := cctx.Context
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), cctx.Bool("db-allow-upsert"))
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), cctx.String("schema"), cctx.Bool("db-allow-upsert"))
if err != nil {
return nil, xerrors.Errorf("new database: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PgStorageConf struct {
URLEnv string // name of an environment variable that contains the database URL
URL string // URL used to connect to postgresql if URLEnv is not set
ApplicationName string
SchemaName string
PoolSize int
AllowUpsert bool
}
Expand Down Expand Up @@ -83,13 +84,15 @@ func SampleConf() *Conf {
PoolSize: 20,
ApplicationName: "visor",
AllowUpsert: false,
SchemaName: "public",
},
// this second database is only here to give an example to the user
"Database2": {
URL: "postgres://postgres:password@localhost:5432/postgres",
PoolSize: 10,
ApplicationName: "visor",
AllowUpsert: false,
SchemaName: "public",
},
},

Expand Down
48 changes: 43 additions & 5 deletions model/actors/init/idaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,56 @@ import (
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
)

type IdAddress struct {
Height int64 `pg:",pk,notnull,use_zero"`
ID string `pg:",pk,notnull"`
Address string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`
}

type IdAddressV0 struct {
tableName struct{} `pg:"id_addresses"` // nolint: structcheck,unused
ID string `pg:",pk,notnull"`
Address string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`
}

func (ia *IdAddress) AsVersion(version model.Version) (interface{}, bool) {
switch version.Major {
case 0:
if ia == nil {
return (*IdAddressV0)(nil), true
}

return &IdAddressV0{
ID: ia.ID,
Address: ia.Address,
StateRoot: ia.StateRoot,
}, true
case 1:
return ia, true
default:
return nil, false
}
}

func (ia *IdAddress) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "id_addresses"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, ia)
m, ok := ia.AsVersion(version)
if !ok {
return xerrors.Errorf("IdAddress not supported for schema version %s", version)
}

return s.PersistModel(ctx, m)
}

type IdAddressList []*IdAddress
Expand All @@ -36,10 +69,15 @@ func (ias IdAddressList) Persist(ctx context.Context, s model.StorageBatch, vers
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

for _, ia := range ias {
if err := s.PersistModel(ctx, ia); err != nil {
return err
if version.Major != 1 {
// Support older versions, but in a non-optimal way
for _, m := range ias {
if err := m.Persist(ctx, s, version); err != nil {
return err
}
}
return nil
}
return nil

return s.PersistModel(ctx, ias)
}
48 changes: 47 additions & 1 deletion model/actors/miner/feedebt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
Expand All @@ -15,9 +16,38 @@ type MinerFeeDebt struct {
MinerID string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`

FeeDebt string `pg:"type:numeric,notnull"`
}

type MinerFeeDebtV0 struct {
tableName struct{} `pg:"miner_fee_debts"` // nolint: structcheck,unused
Height int64 `pg:",pk,notnull,use_zero"`
MinerID string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`

FeeDebt string `pg:",notnull"`
}

func (m *MinerFeeDebt) AsVersion(version model.Version) (interface{}, bool) {
switch version.Major {
case 0:
if m == nil {
return (*MinerFeeDebtV0)(nil), true
}

return &MinerFeeDebtV0{
Height: m.Height,
MinerID: m.MinerID,
StateRoot: m.StateRoot,
FeeDebt: m.FeeDebt,
}, true
case 1:
return m, true
default:
return nil, false
}
}

func (m *MinerFeeDebt) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := global.Tracer("").Start(ctx, "MinerFeeDebt.Persist")
defer span.End()
Expand All @@ -26,7 +56,12 @@ func (m *MinerFeeDebt) Persist(ctx context.Context, s model.StorageBatch, versio
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, m)
vm, ok := m.AsVersion(version)
if !ok {
return xerrors.Errorf("MinerFeeDebt not supported for schema version %s", version)
}

return s.PersistModel(ctx, vm)
}

type MinerFeeDebtList []*MinerFeeDebt
Expand All @@ -42,5 +77,16 @@ func (ml MinerFeeDebtList) Persist(ctx context.Context, s model.StorageBatch, ve
if len(ml) == 0 {
return nil
}

if version.Major != 1 {
// Support older versions, but in a non-optimal way
for _, m := range ml {
if err := m.Persist(ctx, s, version); err != nil {
return err
}
}
return nil
}

return s.PersistModel(ctx, ml)
}
52 changes: 51 additions & 1 deletion model/actors/miner/lockedfunds.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
Expand All @@ -15,11 +16,44 @@ type MinerLockedFund struct {
MinerID string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`

LockedFunds string `pg:"type:numeric,notnull"`
InitialPledge string `pg:"type:numeric,notnull"`
PreCommitDeposits string `pg:"type:numeric,notnull"`
}

type MinerLockedFundV0 struct {
tableName struct{} `pg:"miner_locked_funds"` // nolint: structcheck,unused
Height int64 `pg:",pk,notnull,use_zero"`
MinerID string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`

LockedFunds string `pg:",notnull"`
InitialPledge string `pg:",notnull"`
PreCommitDeposits string `pg:",notnull"`
}

func (m *MinerLockedFund) AsVersion(version model.Version) (interface{}, bool) {
switch version.Major {
case 0:
if m == nil {
return (*MinerLockedFundV0)(nil), true
}

return &MinerLockedFundV0{
Height: m.Height,
MinerID: m.MinerID,
StateRoot: m.StateRoot,
LockedFunds: m.LockedFunds,
InitialPledge: m.InitialPledge,
PreCommitDeposits: m.PreCommitDeposits,
}, true
case 1:
return m, true
default:
return nil, false
}
}

func (m *MinerLockedFund) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := global.Tracer("").Start(ctx, "MinerLockedFund.Persist")
defer span.End()
Expand All @@ -28,7 +62,12 @@ func (m *MinerLockedFund) Persist(ctx context.Context, s model.StorageBatch, ver
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, m)
vm, ok := m.AsVersion(version)
if !ok {
return xerrors.Errorf("MinerLockedFund not supported for schema version %s", version)
}

return s.PersistModel(ctx, vm)
}

type MinerLockedFundsList []*MinerLockedFund
Expand All @@ -44,5 +83,16 @@ func (ml MinerLockedFundsList) Persist(ctx context.Context, s model.StorageBatch
if len(ml) == 0 {
return nil
}

if version.Major != 1 {
// Support older versions, but in a non-optimal way
for _, m := range ml {
if err := m.Persist(ctx, s, version); err != nil {
return err
}
}
return nil
}

return s.PersistModel(ctx, ml)
}
Loading

0 comments on commit 28961a0

Please sign in to comment.