diff --git a/chain/actor.go b/chain/actor.go index 9bce56af1..2e9fbfb73 100644 --- a/chain/actor.go +++ b/chain/actor.go @@ -8,9 +8,11 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" "github.com/filecoin-project/sentinel-visor/tasks/actorstate" @@ -119,6 +121,8 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe } func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, actorstate.ActorNameByCode(act.Code))) + res := &ActorStateResult{ Code: act.Code, Head: act.Head, diff --git a/chain/indexer.go b/chain/indexer.go index 2cfb5b886..db0ed3c05 100644 --- a/chain/indexer.go +++ b/chain/indexer.go @@ -121,6 +121,8 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n // TipSet is called when a new tipset has been discovered func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Name, t.name)) + var cancel func() var tctx context.Context // cancellable context for the task if t.window > 0 { @@ -285,7 +287,10 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error { for task, p := range taskOutputs { go func(task string, p model.Persistable) { defer wg.Done() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, task)) + if err := t.storage.PersistBatch(ctx, p); err != nil { + stats.Record(ctx, metrics.PersistFailure.M(1)) ll.Errorw("persistence failed", "task", task, "error", err) return } @@ -307,6 +312,7 @@ func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, nam data, report, err := p.ProcessTipSet(ctx, ts) if err != nil { + stats.Record(ctx, metrics.ProcessingFailure.M(1)) results <- &TaskResult{ Task: name, Error: err, @@ -321,8 +327,14 @@ func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, nam } func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor, name string, ts, pts *types.TipSet, actors map[string]types.Actor, results chan *TaskResult) { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name)) + stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height()))) + stop := metrics.Timer(ctx, metrics.ProcessingDuration) + defer stop() + data, report, err := p.ProcessActors(ctx, ts, pts, actors) if err != nil { + stats.Record(ctx, metrics.ProcessingFailure.M(1)) results <- &TaskResult{ Task: name, Error: err, diff --git a/chain/walker.go b/chain/walker.go index b48f1596c..50e5ff653 100644 --- a/chain/walker.go +++ b/chain/walker.go @@ -2,16 +2,15 @@ package chain import ( "context" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" - "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" "golang.org/x/xerrors" "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/metrics" ) func NewWalker(obs TipSetObserver, opener lens.APIOpener, minHeight, maxHeight int64) *Walker { @@ -69,8 +68,6 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) ctx, span := global.Tracer("").Start(ctx, "Walker.WalkChain", trace.WithAttributes(label.Int64("height", c.maxHeight))) defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexhistoryblock")) - log.Debugw("found tipset", "height", ts.Height()) if err := c.obs.TipSet(ctx, ts); err != nil { return xerrors.Errorf("notify tipset: %w", err) diff --git a/chain/watcher.go b/chain/watcher.go index 9620f62b7..bdfa37843 100644 --- a/chain/watcher.go +++ b/chain/watcher.go @@ -2,7 +2,7 @@ package chain import ( "context" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/api/global" "golang.org/x/xerrors" @@ -10,7 +10,6 @@ import ( store "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/metrics" ) // NewWatcher creates a new Watcher. confidence sets the number of tipsets that will be held @@ -67,8 +66,6 @@ func (c *Watcher) index(ctx context.Context, headEvents []*lotus_api.HeadChange) ctx, span := global.Tracer("").Start(ctx, "Watcher.index") defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexheadblock")) - for _, ch := range headEvents { switch ch.Type { case store.HCCurrent: diff --git a/lens/lotus/api.go b/lens/lotus/api.go index a59d82988..e519ddfab 100644 --- a/lens/lotus/api.go +++ b/lens/lotus/api.go @@ -78,6 +78,11 @@ func (aw *APIWrapper) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) func (aw *APIWrapper) StateGetReceipt(ctx context.Context, bcid cid.Cid, tsk types.TipSetKey) (*types.MessageReceipt, error) { ctx, span := global.Tracer("").Start(ctx, "Lotus.StateGetReceipt") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "StateGetReceipt")) + stop := metrics.Timer(ctx, metrics.LensRequestDuration) + defer stop() + return aw.FullNode.StateGetReceipt(ctx, bcid, tsk) } @@ -186,12 +191,21 @@ func (aw *APIWrapper) StateReadState(ctx context.Context, actor address.Address, func (aw *APIWrapper) StateVMCirculatingSupplyInternal(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) { ctx, span := global.Tracer("").Start(ctx, "Lotus.StateCirculatingSupply") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "StateVMCirculatingSupplyInternal")) + stop := metrics.Timer(ctx, metrics.LensRequestDuration) + defer stop() + return aw.FullNode.StateVMCirculatingSupplyInternal(ctx, tsk) } // GetExecutedMessagesForTipset returns a list of messages sent as part of pts (parent) with receipts found in ts (child). // No attempt at deduplication of messages is made. func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "GetExecutedMessagesForTipset")) + stop := metrics.Timer(ctx, metrics.LensRequestDuration) + defer stop() + if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { return nil, xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key()) } diff --git a/lens/lotus/cachestore.go b/lens/lotus/cachestore.go index 3a67ed35e..f11a7b56f 100644 --- a/lens/lotus/cachestore.go +++ b/lens/lotus/cachestore.go @@ -6,9 +6,11 @@ import ( "fmt" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/sentinel-visor/metrics" lru "github.com/hashicorp/golang-lru" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "golang.org/x/xerrors" ) @@ -34,6 +36,7 @@ func (cs *CacheCtxStore) Context() context.Context { func (cs *CacheCtxStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { ctx, span := global.Tracer("").Start(ctx, "CacheCtxStore.Get") defer span.End() + cu, ok := out.(cbg.CBORUnmarshaler) if !ok { return fmt.Errorf("out parameter does not implement CBORUnmarshaler") @@ -45,6 +48,10 @@ func (cs *CacheCtxStore) Get(ctx context.Context, c cid.Cid, out interface{}) er return cu.UnmarshalCBOR(bytes.NewReader(v.([]byte))) } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "ChainReadObj")) + stop := metrics.Timer(ctx, metrics.LensRequestDuration) + defer stop() + // miss :( raw, err := cs.api.ChainReadObj(ctx, c) if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index 23a6bed14..c8795d2ce 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -12,87 +12,79 @@ import ( var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 30000, 50000, 100000, 200000, 500000, 1000000, 2000000, 5000000, 10000000, 10000000) var ( - Error, _ = tag.NewKey("error") - TaskType, _ = tag.NewKey("task") + TaskType, _ = tag.NewKey("task") // name of task processor + Name, _ = tag.NewKey("name") // name of running instance of visor + Table, _ = tag.NewKey("table") // name of table data is persisted for ConnState, _ = tag.NewKey("conn_state") - State, _ = tag.NewKey("state") - API, _ = tag.NewKey("api") + API, _ = tag.NewKey("api") // name of method on lotus api + ActorCode, _ = tag.NewKey("actor_code") // human readable code of actor being processed ) var ( - ProcessingDuration = stats.Float64("processing_duration_ms", "Time taken to process a single item", stats.UnitMilliseconds) - PersistDuration = stats.Float64("persist_duration_ms", "Duration of a models persist operation", stats.UnitMilliseconds) - BatchSelectionDuration = stats.Float64("batch_selection_duration_ms", "Time taken to select a batch of work", stats.UnitMilliseconds) - CompletionDuration = stats.Float64("completion_duration_ms", "Time taken to mark an item as completed", stats.UnitMilliseconds) - DBConns = stats.Int64("db_conns", "Database connections held", stats.UnitDimensionless) - HistoricalIndexerHeight = stats.Int64("historical_sync_height", "Sync height of the historical indexer", stats.UnitDimensionless) - EpochsToSync = stats.Int64("epochs_to_sync", "Epochs yet to sync", stats.UnitDimensionless) - LensRequestDuration = stats.Float64("lens_request_duration_ms", "Duration of lotus api requets", stats.UnitMilliseconds) - TipsetHeight = stats.Int64("tipset_height", "The height of the tipset being processed", stats.UnitDimensionless) + ProcessingDuration = stats.Float64("processing_duration_ms", "Time taken to process a single item", stats.UnitMilliseconds) + PersistDuration = stats.Float64("persist_duration_ms", "Duration of a models persist operation", stats.UnitMilliseconds) + DBConns = stats.Int64("db_conns", "Database connections held", stats.UnitDimensionless) + LensRequestDuration = stats.Float64("lens_request_duration_ms", "Duration of lotus api requets", stats.UnitMilliseconds) + TipsetHeight = stats.Int64("tipset_height", "The height of the tipset being processed", stats.UnitDimensionless) + ProcessingFailure = stats.Int64("processing_failure", "Number of processing failures", stats.UnitDimensionless) + PersistFailure = stats.Int64("persist_failure", "Number of persistence failures", stats.UnitDimensionless) ) var ( ProcessingDurationView = &view.View{ Measure: ProcessingDuration, Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType}, + TagKeys: []tag.Key{TaskType, ActorCode}, } PersistDurationView = &view.View{ Measure: PersistDuration, Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType}, - } - BatchSelectionDurationView = &view.View{ - Measure: BatchSelectionDuration, - Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType}, - } - CompletionDurationView = &view.View{ - Measure: CompletionDuration, - Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType}, + TagKeys: []tag.Key{TaskType, Table, ActorCode}, } DBConnsView = &view.View{ Measure: DBConns, Aggregation: view.Count(), TagKeys: []tag.Key{ConnState}, } - HistoricalIndexerHeightView = &view.View{ - Measure: HistoricalIndexerHeight, - Aggregation: view.Sum(), - } - EpochsToSyncView = &view.View{ - Measure: EpochsToSync, - Aggregation: view.LastValue(), - } LensRequestDurationView = &view.View{ Measure: LensRequestDuration, Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType, API}, + TagKeys: []tag.Key{TaskType, API, ActorCode}, } LensRequestTotal = &view.View{ Name: "lens_request_total", Measure: LensRequestDuration, Aggregation: view.Count(), - TagKeys: []tag.Key{TaskType, API}, + TagKeys: []tag.Key{TaskType, API, ActorCode}, } TipsetHeightView = &view.View{ Measure: TipsetHeight, Aggregation: view.LastValue(), TagKeys: []tag.Key{TaskType}, } + ProcessingFailureTotalView = &view.View{ + Name: ProcessingFailure.Name() + "_total", + Measure: ProcessingFailure, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TaskType, ActorCode}, + } + PersistFailureTotalView = &view.View{ + Name: PersistFailure.Name() + "_total", + Measure: PersistFailure, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TaskType, Table, ActorCode}, + } ) var DefaultViews = []*view.View{ ProcessingDurationView, PersistDurationView, - BatchSelectionDurationView, - CompletionDurationView, DBConnsView, - HistoricalIndexerHeightView, - EpochsToSyncView, LensRequestDurationView, + LensRequestTotal, TipsetHeightView, + ProcessingFailureTotalView, + PersistFailureTotalView, } // SinceInMilliseconds returns the duration of time since the provide time as a float64. diff --git a/model/actors/common/actors.go b/model/actors/common/actors.go index 6527babda..958d5bdb8 100644 --- a/model/actors/common/actors.go +++ b/model/actors/common/actors.go @@ -3,10 +3,12 @@ package common import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -23,6 +25,11 @@ type Actor struct { func (a *Actor) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "Actor.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actors")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, a) } @@ -34,6 +41,10 @@ func (actors ActorList) Persist(ctx context.Context, s model.StorageBatch) error ctx, span := global.Tracer("").Start(ctx, "ActorList.Persist", trace.WithAttributes(label.Int("count", len(actors)))) defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actors")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(actors) == 0 { return nil } @@ -51,6 +62,11 @@ type ActorState struct { func (as *ActorState) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "ActorState.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_states")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, as) } @@ -62,6 +78,10 @@ func (states ActorStateList) Persist(ctx context.Context, s model.StorageBatch) ctx, span := global.Tracer("").Start(ctx, "ActorStateList.Persist", trace.WithAttributes(label.Int("count", len(states)))) defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_states")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(states) == 0 { return nil } diff --git a/model/actors/init/idaddress.go b/model/actors/init/idaddress.go index a139a6580..0564d7148 100644 --- a/model/actors/init/idaddress.go +++ b/model/actors/init/idaddress.go @@ -3,10 +3,12 @@ package init import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -16,11 +18,24 @@ type IdAddress struct { StateRoot string `pg:",pk,notnull"` } +func (ia *IdAddress) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "id_addresses")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + return s.PersistModel(ctx, ia) +} + type IdAddressList []*IdAddress func (ias IdAddressList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "IdAddressList.PersistWithTx", trace.WithAttributes(label.Int("count", len(ias)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "id_addresses")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + for _, ia := range ias { if err := s.PersistModel(ctx, ia); err != nil { return err diff --git a/model/actors/market/dealproposal.go b/model/actors/market/dealproposal.go index 9b98bbf59..6b79a065f 100644 --- a/model/actors/market/dealproposal.go +++ b/model/actors/market/dealproposal.go @@ -3,10 +3,13 @@ package market import ( "context" - "github.com/filecoin-project/sentinel-visor/model" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + + "github.com/filecoin-project/sentinel-visor/metrics" + "github.com/filecoin-project/sentinel-visor/model" ) type MarketDealProposal struct { @@ -32,6 +35,10 @@ type MarketDealProposal struct { } func (dp *MarketDealProposal) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "market_deal_proposals")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, dp) } @@ -40,5 +47,10 @@ type MarketDealProposals []*MarketDealProposal func (dps MarketDealProposals) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MarketDealProposals.Persist", trace.WithAttributes(label.Int("count", len(dps)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "market_deal_proposals")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, dps) } diff --git a/model/actors/market/dealstate.go b/model/actors/market/dealstate.go index 3432cae70..d4a332f63 100644 --- a/model/actors/market/dealstate.go +++ b/model/actors/market/dealstate.go @@ -3,10 +3,13 @@ package market import ( "context" - "github.com/filecoin-project/sentinel-visor/model" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + + "github.com/filecoin-project/sentinel-visor/metrics" + "github.com/filecoin-project/sentinel-visor/model" ) type MarketDealState struct { @@ -20,6 +23,10 @@ type MarketDealState struct { } func (ds *MarketDealState) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "market_deal_states")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, ds) } @@ -28,5 +35,10 @@ type MarketDealStates []*MarketDealState func (dss MarketDealStates) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MarketDealStates.PersistWithTx", trace.WithAttributes(label.Int("count", len(dss)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "market_deal_states")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, dss) } diff --git a/model/actors/miner/currentdeadline.go b/model/actors/miner/currentdeadline.go index e08136ac8..d0dc4e83d 100644 --- a/model/actors/miner/currentdeadline.go +++ b/model/actors/miner/currentdeadline.go @@ -3,8 +3,10 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -24,6 +26,11 @@ type MinerCurrentDeadlineInfo struct { func (m *MinerCurrentDeadlineInfo) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerCurrentDeadlineInfo.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_current_deadline_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } @@ -32,6 +39,11 @@ type MinerCurrentDeadlineInfoList []*MinerCurrentDeadlineInfo func (ml MinerCurrentDeadlineInfoList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerCurrentDeadlineInfoList.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_current_deadline_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/feedebt.go b/model/actors/miner/feedebt.go index 5fb251add..b284ec066 100644 --- a/model/actors/miner/feedebt.go +++ b/model/actors/miner/feedebt.go @@ -3,8 +3,10 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -19,6 +21,11 @@ type MinerFeeDebt struct { func (m *MinerFeeDebt) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerFeeDebt.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_fee_debts")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } @@ -27,6 +34,11 @@ type MinerFeeDebtList []*MinerFeeDebt func (ml MinerFeeDebtList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerFeeDebtList.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_fee_debts")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/lockedfunds.go b/model/actors/miner/lockedfunds.go index 63254df5e..d71fb5a11 100644 --- a/model/actors/miner/lockedfunds.go +++ b/model/actors/miner/lockedfunds.go @@ -3,8 +3,10 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -21,6 +23,11 @@ type MinerLockedFund struct { func (m *MinerLockedFund) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerLockedFund.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_locked_funds")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } @@ -29,6 +36,11 @@ type MinerLockedFundsList []*MinerLockedFund func (ml MinerLockedFundsList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerLockedFundsList.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_locked_funds")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/minerinfo.go b/model/actors/miner/minerinfo.go index 70e27dabc..0d213495f 100644 --- a/model/actors/miner/minerinfo.go +++ b/model/actors/miner/minerinfo.go @@ -3,8 +3,10 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -29,6 +31,11 @@ type MinerInfo struct { func (m *MinerInfo) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerInfoModel.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } @@ -37,6 +44,11 @@ type MinerInfoList []*MinerInfo func (ml MinerInfoList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerInfoList.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/precommit.go b/model/actors/miner/precommit.go index e0c14819f..7afab61ec 100644 --- a/model/actors/miner/precommit.go +++ b/model/actors/miner/precommit.go @@ -3,10 +3,12 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -32,6 +34,10 @@ type MinerPreCommitInfo struct { } func (mpi *MinerPreCommitInfo) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_pre_commit_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, mpi) } @@ -40,6 +46,11 @@ type MinerPreCommitInfoList []*MinerPreCommitInfo func (ml MinerPreCommitInfoList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerPreCommitInfoList.Persist", trace.WithAttributes(label.Int("count", len(ml)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_pre_commit_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/sector.go b/model/actors/miner/sector.go index 75e084c71..50e55c520 100644 --- a/model/actors/miner/sector.go +++ b/model/actors/miner/sector.go @@ -3,10 +3,12 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -30,6 +32,10 @@ type MinerSectorInfo struct { } func (msi *MinerSectorInfo) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, msi) } @@ -38,6 +44,11 @@ type MinerSectorInfoList []*MinerSectorInfo func (ml MinerSectorInfoList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerSectorInfoList.Persist", trace.WithAttributes(label.Int("count", len(ml)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_infos")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/sectordeals.go b/model/actors/miner/sectordeals.go index b3730e72d..a1bb1535d 100644 --- a/model/actors/miner/sectordeals.go +++ b/model/actors/miner/sectordeals.go @@ -3,10 +3,12 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -18,6 +20,10 @@ type MinerSectorDeal struct { } func (ds *MinerSectorDeal) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, ds) } @@ -26,6 +32,11 @@ type MinerSectorDealList []*MinerSectorDeal func (ml MinerSectorDealList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerSectorDealList.Persist", trace.WithAttributes(label.Int("count", len(ml)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(ml) == 0 { return nil } diff --git a/model/actors/miner/sectorevents.go b/model/actors/miner/sectorevents.go index 4e5099d31..77e3602c5 100644 --- a/model/actors/miner/sectorevents.go +++ b/model/actors/miner/sectorevents.go @@ -3,10 +3,12 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -37,12 +39,24 @@ type MinerSectorEvent struct { Event string `pg:"type:miner_sector_event_type" pg:",pk,notnull"` // nolint: staticcheck } +func (mse *MinerSectorEvent) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_events")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + return s.PersistModel(ctx, mse) +} + type MinerSectorEventList []*MinerSectorEvent func (l MinerSectorEventList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "MinerSectorEventList.Persist", trace.WithAttributes(label.Int("count", len(l)))) defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_events")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(l) == 0 { return nil } diff --git a/model/actors/miner/sectorposts.go b/model/actors/miner/sectorposts.go index 9958bb0b8..bebb1bf07 100644 --- a/model/actors/miner/sectorposts.go +++ b/model/actors/miner/sectorposts.go @@ -3,10 +3,12 @@ package miner import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -21,6 +23,10 @@ type MinerSectorPost struct { type MinerSectorPostList []*MinerSectorPost func (msp *MinerSectorPost) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_posts")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, msp) } @@ -30,5 +36,10 @@ func (ml MinerSectorPostList) Persist(ctx context.Context, s model.StorageBatch) if len(ml) == 0 { return nil } + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_posts")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, ml) } diff --git a/model/actors/multisig/transactions.go b/model/actors/multisig/transactions.go index b08aa2ae1..507699473 100644 --- a/model/actors/multisig/transactions.go +++ b/model/actors/multisig/transactions.go @@ -3,7 +3,9 @@ package multisig import ( "context" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" + "go.opencensus.io/tag" ) type MultisigTransaction struct { @@ -21,11 +23,19 @@ type MultisigTransaction struct { } func (m *MultisigTransaction) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "multisig_transactions")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } type MultisigTransactionList []*MultisigTransaction func (ml MultisigTransactionList) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "multisig_transactions")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, ml) } diff --git a/model/actors/power/chainpower.go b/model/actors/power/chainpower.go index 4cf9cb9a0..ce180e410 100644 --- a/model/actors/power/chainpower.go +++ b/model/actors/power/chainpower.go @@ -3,10 +3,12 @@ package power import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -32,6 +34,11 @@ type ChainPower struct { func (cp *ChainPower) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "ChainPower.PersistWithTx") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_powers")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, cp) } @@ -44,6 +51,10 @@ func (cpl ChainPowerList) Persist(ctx context.Context, s model.StorageBatch) err ctx, span := global.Tracer("").Start(ctx, "ChainPowerList.PersistWithTx", trace.WithAttributes(label.Int("count", len(cpl)))) defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_powers")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(cpl) == 0 { return nil } diff --git a/model/actors/power/claimedpower.go b/model/actors/power/claimedpower.go index 983c94d0e..b90afe340 100644 --- a/model/actors/power/claimedpower.go +++ b/model/actors/power/claimedpower.go @@ -3,8 +3,10 @@ package power import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -19,6 +21,11 @@ type PowerActorClaim struct { func (p *PowerActorClaim) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "PowerActorClaim.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "power_actor_claims")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, p) } @@ -27,6 +34,11 @@ type PowerActorClaimList []*PowerActorClaim func (pl PowerActorClaimList) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "PowerActorClaimList.Persist") defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "power_actor_claims")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + if len(pl) == 0 { return nil } diff --git a/model/actors/reward/chainreward.go b/model/actors/reward/chainreward.go index f8e08ffb5..fc0657d15 100644 --- a/model/actors/reward/chainreward.go +++ b/model/actors/reward/chainreward.go @@ -3,6 +3,7 @@ package reward import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "github.com/filecoin-project/sentinel-visor/metrics" @@ -28,6 +29,7 @@ func (r *ChainReward) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "ChainReward.Persist") defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_rewards")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() diff --git a/model/blocks/drand.go b/model/blocks/drand.go index 16df2aef4..3bbfa8c2f 100644 --- a/model/blocks/drand.go +++ b/model/blocks/drand.go @@ -4,10 +4,12 @@ import ( "context" "github.com/filecoin-project/lotus/chain/types" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -28,6 +30,10 @@ type DrandBlockEntrie struct { } func (dbe *DrandBlockEntrie) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "drand_block_entries")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, dbe) } @@ -39,5 +45,10 @@ func (dbes DrandBlockEntries) Persist(ctx context.Context, s model.StorageBatch) } ctx, span := global.Tracer("").Start(ctx, "DrandBlockEntries.Persist", trace.WithAttributes(label.Int("count", len(dbes)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "drand_block_entries")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, dbes) } diff --git a/model/blocks/header.go b/model/blocks/header.go index 56756ebd6..995d75571 100644 --- a/model/blocks/header.go +++ b/model/blocks/header.go @@ -4,10 +4,12 @@ import ( "context" "github.com/filecoin-project/lotus/chain/types" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -39,6 +41,10 @@ func NewBlockHeader(bh *types.BlockHeader) *BlockHeader { } func (bh *BlockHeader) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_headers")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, bh) } @@ -50,5 +56,10 @@ func (bhl BlockHeaders) Persist(ctx context.Context, s model.StorageBatch) error } ctx, span := global.Tracer("").Start(ctx, "BlockHeaders.Persist", trace.WithAttributes(label.Int("count", len(bhl)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_headers")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, bhl) } diff --git a/model/blocks/parent.go b/model/blocks/parent.go index 41ec51438..2d1f8088f 100644 --- a/model/blocks/parent.go +++ b/model/blocks/parent.go @@ -4,10 +4,12 @@ import ( "context" "github.com/filecoin-project/lotus/chain/types" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -18,6 +20,10 @@ type BlockParent struct { } func (bp *BlockParent) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_parents")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, bp) } @@ -41,5 +47,10 @@ func (bps BlockParents) Persist(ctx context.Context, s model.StorageBatch) error } ctx, span := global.Tracer("").Start(ctx, "BlockParents.Persist", trace.WithAttributes(label.Int("count", len(bps)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_parents")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, bps) } diff --git a/model/chain/economics.go b/model/chain/economics.go index 0586fa572..4beb813f6 100644 --- a/model/chain/economics.go +++ b/model/chain/economics.go @@ -3,10 +3,12 @@ package chain import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -21,6 +23,10 @@ type ChainEconomics struct { } func (c *ChainEconomics) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_economics")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, c) } @@ -32,5 +38,10 @@ func (l ChainEconomicsList) Persist(ctx context.Context, s model.StorageBatch) e } ctx, span := global.Tracer("").Start(ctx, "ChainEconomicsList.Persist", trace.WithAttributes(label.Int("count", len(l)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_economics")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, l) } diff --git a/model/derived/gasoutputs.go b/model/derived/gasoutputs.go index 41685b315..a96b57737 100644 --- a/model/derived/gasoutputs.go +++ b/model/derived/gasoutputs.go @@ -3,10 +3,12 @@ package derived import ( "context" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -38,6 +40,10 @@ type GasOutputs struct { } func (g *GasOutputs) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "derived_gas_outputs")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, g) } @@ -49,5 +55,10 @@ func (l GasOutputsList) Persist(ctx context.Context, s model.StorageBatch) error } ctx, span := global.Tracer("").Start(ctx, "GasOutputsList.Persist", trace.WithAttributes(label.Int("count", len(l)))) defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "derived_gas_outputs")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, l) } diff --git a/model/messages/blockmessage.go b/model/messages/blockmessage.go index 556fcb99f..9034499cb 100644 --- a/model/messages/blockmessage.go +++ b/model/messages/blockmessage.go @@ -19,6 +19,10 @@ type BlockMessage struct { } func (bm *BlockMessage) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_messages")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, bm) } @@ -31,7 +35,7 @@ func (bms BlockMessages) Persist(ctx context.Context, s model.StorageBatch) erro ctx, span := global.Tracer("").Start(ctx, "BlockMessages.Persist", trace.WithAttributes(label.Int("count", len(bms)))) defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "message/blockmessage")) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_messages")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() diff --git a/model/messages/gaseconomy.go b/model/messages/gaseconomy.go index f08937d61..fe541237b 100644 --- a/model/messages/gaseconomy.go +++ b/model/messages/gaseconomy.go @@ -3,6 +3,9 @@ package messages import ( "context" + "go.opencensus.io/tag" + + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -23,5 +26,9 @@ type MessageGasEconomy struct { } func (g *MessageGasEconomy) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "message_gas_economy")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, g) } diff --git a/model/messages/message.go b/model/messages/message.go index ad7676ef6..1c341232e 100644 --- a/model/messages/message.go +++ b/model/messages/message.go @@ -29,6 +29,10 @@ type Message struct { } func (m *Message) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "messages")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, m) } @@ -41,7 +45,7 @@ func (ms Messages) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "Messages.Persist", trace.WithAttributes(label.Int("count", len(ms)))) defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "message/message")) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "messages")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() diff --git a/model/messages/parsedmessage.go b/model/messages/parsedmessage.go index 411abd7f1..8e6f129ef 100644 --- a/model/messages/parsedmessage.go +++ b/model/messages/parsedmessage.go @@ -24,6 +24,10 @@ type ParsedMessage struct { } func (pm *ParsedMessage) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "parsed_messages")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, pm) } @@ -36,7 +40,7 @@ func (pms ParsedMessages) Persist(ctx context.Context, s model.StorageBatch) err ctx, span := global.Tracer("").Start(ctx, "ParsedMessages.Persist", trace.WithAttributes(label.Int("count", len(pms)))) defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "message/parsed")) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "parsed_messages")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() diff --git a/model/messages/receipt.go b/model/messages/receipt.go index e9378128a..9389762d7 100644 --- a/model/messages/receipt.go +++ b/model/messages/receipt.go @@ -23,6 +23,10 @@ type Receipt struct { } func (r *Receipt) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "receipts")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, r) } @@ -35,7 +39,7 @@ func (rs Receipts) Persist(ctx context.Context, s model.StorageBatch) error { ctx, span := global.Tracer("").Start(ctx, "Receipts.Persist", trace.WithAttributes(label.Int("count", len(rs)))) defer span.End() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "message/receipt")) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "receipts")) stop := metrics.Timer(ctx, metrics.PersistDuration) defer stop() diff --git a/model/visor/report.go b/model/visor/report.go index 79fddec8f..2119918bd 100644 --- a/model/visor/report.go +++ b/model/visor/report.go @@ -4,10 +4,12 @@ import ( "context" "time" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/label" + "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" ) @@ -38,6 +40,10 @@ type ProcessingReport struct { } func (p *ProcessingReport) Persist(ctx context.Context, s model.StorageBatch) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "visor_processing_reports")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, p) } @@ -50,5 +56,9 @@ func (pl ProcessingReportList) Persist(ctx context.Context, s model.StorageBatch ctx, span := global.Tracer("").Start(ctx, "ProcessingReportList.Persist", trace.WithAttributes(label.Int("count", len(pl)))) defer span.End() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "visor_processing_reports")) + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + return s.PersistModel(ctx, pl) }