Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve metric reporting #376

Merged
merged 1 commit into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chain/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
Expand Down Expand Up @@ -119,6 +121,8 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
}

func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, actorstate.ActorNameByCode(act.Code)))

res := &ActorStateResult{
Code: act.Code,
Head: act.Head,
Expand Down
12 changes: 12 additions & 0 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n

// TipSet is called when a new tipset has been discovered
func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Name, t.name))

var cancel func()
var tctx context.Context // cancellable context for the task
if t.window > 0 {
Expand Down Expand Up @@ -285,7 +287,10 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
for task, p := range taskOutputs {
go func(task string, p model.Persistable) {
defer wg.Done()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, task))

if err := t.storage.PersistBatch(ctx, p); err != nil {
stats.Record(ctx, metrics.PersistFailure.M(1))
ll.Errorw("persistence failed", "task", task, "error", err)
return
}
Expand All @@ -307,6 +312,7 @@ func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, nam

data, report, err := p.ProcessTipSet(ctx, ts)
if err != nil {
stats.Record(ctx, metrics.ProcessingFailure.M(1))
results <- &TaskResult{
Task: name,
Error: err,
Expand All @@ -321,8 +327,14 @@ func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, nam
}

func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor, name string, ts, pts *types.TipSet, actors map[string]types.Actor, results chan *TaskResult) {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
defer stop()

data, report, err := p.ProcessActors(ctx, ts, pts, actors)
if err != nil {
stats.Record(ctx, metrics.ProcessingFailure.M(1))
results <- &TaskResult{
Task: name,
Error: err,
Expand Down
5 changes: 1 addition & 4 deletions chain/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package chain

import (
"context"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/metrics"
)

func NewWalker(obs TipSetObserver, opener lens.APIOpener, minHeight, maxHeight int64) *Walker {
Expand Down Expand Up @@ -69,8 +68,6 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
ctx, span := global.Tracer("").Start(ctx, "Walker.WalkChain", trace.WithAttributes(label.Int64("height", c.maxHeight)))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexhistoryblock"))

log.Debugw("found tipset", "height", ts.Height())
if err := c.obs.TipSet(ctx, ts); err != nil {
return xerrors.Errorf("notify tipset: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions chain/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package chain

import (
"context"
"go.opencensus.io/tag"

"go.opentelemetry.io/otel/api/global"
"golang.org/x/xerrors"

lotus_api "github.com/filecoin-project/lotus/api"
store "github.com/filecoin-project/lotus/chain/store"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/metrics"
)

// NewWatcher creates a new Watcher. confidence sets the number of tipsets that will be held
Expand Down Expand Up @@ -67,8 +66,6 @@ func (c *Watcher) index(ctx context.Context, headEvents []*lotus_api.HeadChange)
ctx, span := global.Tracer("").Start(ctx, "Watcher.index")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexheadblock"))

for _, ch := range headEvents {
switch ch.Type {
case store.HCCurrent:
Expand Down
14 changes: 14 additions & 0 deletions lens/lotus/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (aw *APIWrapper) ChainGetParentMessages(ctx context.Context, bcid cid.Cid)
func (aw *APIWrapper) StateGetReceipt(ctx context.Context, bcid cid.Cid, tsk types.TipSetKey) (*types.MessageReceipt, error) {
ctx, span := global.Tracer("").Start(ctx, "Lotus.StateGetReceipt")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "StateGetReceipt"))
stop := metrics.Timer(ctx, metrics.LensRequestDuration)
defer stop()

return aw.FullNode.StateGetReceipt(ctx, bcid, tsk)
}

Expand Down Expand Up @@ -186,12 +191,21 @@ func (aw *APIWrapper) StateReadState(ctx context.Context, actor address.Address,
func (aw *APIWrapper) StateVMCirculatingSupplyInternal(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) {
ctx, span := global.Tracer("").Start(ctx, "Lotus.StateCirculatingSupply")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "StateVMCirculatingSupplyInternal"))
stop := metrics.Timer(ctx, metrics.LensRequestDuration)
defer stop()

return aw.FullNode.StateVMCirculatingSupplyInternal(ctx, tsk)
}

// GetExecutedMessagesForTipset returns a list of messages sent as part of pts (parent) with receipts found in ts (child).
// No attempt at deduplication of messages is made.
func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "GetExecutedMessagesForTipset"))
stop := metrics.Timer(ctx, metrics.LensRequestDuration)
defer stop()

if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) {
return nil, xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key())
}
Expand Down
7 changes: 7 additions & 0 deletions lens/lotus/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"fmt"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/sentinel-visor/metrics"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"golang.org/x/xerrors"
)
Expand All @@ -34,6 +36,7 @@ func (cs *CacheCtxStore) Context() context.Context {
func (cs *CacheCtxStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
ctx, span := global.Tracer("").Start(ctx, "CacheCtxStore.Get")
defer span.End()

cu, ok := out.(cbg.CBORUnmarshaler)
if !ok {
return fmt.Errorf("out parameter does not implement CBORUnmarshaler")
Expand All @@ -45,6 +48,10 @@ func (cs *CacheCtxStore) Get(ctx context.Context, c cid.Cid, out interface{}) er
return cu.UnmarshalCBOR(bytes.NewReader(v.([]byte)))
}

ctx, _ = tag.New(ctx, tag.Upsert(metrics.API, "ChainReadObj"))
stop := metrics.Timer(ctx, metrics.LensRequestDuration)
defer stop()

// miss :(
raw, err := cs.api.ChainReadObj(ctx, c)
if err != nil {
Expand Down
70 changes: 31 additions & 39 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,87 +12,79 @@ import (
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 30000, 50000, 100000, 200000, 500000, 1000000, 2000000, 5000000, 10000000, 10000000)

var (
Error, _ = tag.NewKey("error")
TaskType, _ = tag.NewKey("task")
TaskType, _ = tag.NewKey("task") // name of task processor
Name, _ = tag.NewKey("name") // name of running instance of visor
Table, _ = tag.NewKey("table") // name of table data is persisted for
ConnState, _ = tag.NewKey("conn_state")
State, _ = tag.NewKey("state")
API, _ = tag.NewKey("api")
API, _ = tag.NewKey("api") // name of method on lotus api
ActorCode, _ = tag.NewKey("actor_code") // human readable code of actor being processed
)

var (
ProcessingDuration = stats.Float64("processing_duration_ms", "Time taken to process a single item", stats.UnitMilliseconds)
PersistDuration = stats.Float64("persist_duration_ms", "Duration of a models persist operation", stats.UnitMilliseconds)
BatchSelectionDuration = stats.Float64("batch_selection_duration_ms", "Time taken to select a batch of work", stats.UnitMilliseconds)
CompletionDuration = stats.Float64("completion_duration_ms", "Time taken to mark an item as completed", stats.UnitMilliseconds)
DBConns = stats.Int64("db_conns", "Database connections held", stats.UnitDimensionless)
HistoricalIndexerHeight = stats.Int64("historical_sync_height", "Sync height of the historical indexer", stats.UnitDimensionless)
EpochsToSync = stats.Int64("epochs_to_sync", "Epochs yet to sync", stats.UnitDimensionless)
LensRequestDuration = stats.Float64("lens_request_duration_ms", "Duration of lotus api requets", stats.UnitMilliseconds)
TipsetHeight = stats.Int64("tipset_height", "The height of the tipset being processed", stats.UnitDimensionless)
ProcessingDuration = stats.Float64("processing_duration_ms", "Time taken to process a single item", stats.UnitMilliseconds)
PersistDuration = stats.Float64("persist_duration_ms", "Duration of a models persist operation", stats.UnitMilliseconds)
DBConns = stats.Int64("db_conns", "Database connections held", stats.UnitDimensionless)
LensRequestDuration = stats.Float64("lens_request_duration_ms", "Duration of lotus api requets", stats.UnitMilliseconds)
TipsetHeight = stats.Int64("tipset_height", "The height of the tipset being processed", stats.UnitDimensionless)
ProcessingFailure = stats.Int64("processing_failure", "Number of processing failures", stats.UnitDimensionless)
PersistFailure = stats.Int64("persist_failure", "Number of persistence failures", stats.UnitDimensionless)
)

var (
ProcessingDurationView = &view.View{
Measure: ProcessingDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{TaskType},
TagKeys: []tag.Key{TaskType, ActorCode},
}
PersistDurationView = &view.View{
Measure: PersistDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{TaskType},
}
BatchSelectionDurationView = &view.View{
Measure: BatchSelectionDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{TaskType},
}
CompletionDurationView = &view.View{
Measure: CompletionDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{TaskType},
TagKeys: []tag.Key{TaskType, Table, ActorCode},
}
DBConnsView = &view.View{
Measure: DBConns,
Aggregation: view.Count(),
TagKeys: []tag.Key{ConnState},
}
HistoricalIndexerHeightView = &view.View{
Measure: HistoricalIndexerHeight,
Aggregation: view.Sum(),
}
EpochsToSyncView = &view.View{
Measure: EpochsToSync,
Aggregation: view.LastValue(),
}
LensRequestDurationView = &view.View{
Measure: LensRequestDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{TaskType, API},
TagKeys: []tag.Key{TaskType, API, ActorCode},
}
LensRequestTotal = &view.View{
Name: "lens_request_total",
Measure: LensRequestDuration,
Aggregation: view.Count(),
TagKeys: []tag.Key{TaskType, API},
TagKeys: []tag.Key{TaskType, API, ActorCode},
}
TipsetHeightView = &view.View{
Measure: TipsetHeight,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{TaskType},
}
ProcessingFailureTotalView = &view.View{
Name: ProcessingFailure.Name() + "_total",
Measure: ProcessingFailure,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TaskType, ActorCode},
}
PersistFailureTotalView = &view.View{
Name: PersistFailure.Name() + "_total",
Measure: PersistFailure,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TaskType, Table, ActorCode},
}
)

var DefaultViews = []*view.View{
ProcessingDurationView,
PersistDurationView,
BatchSelectionDurationView,
CompletionDurationView,
DBConnsView,
HistoricalIndexerHeightView,
EpochsToSyncView,
LensRequestDurationView,
LensRequestTotal,
TipsetHeightView,
ProcessingFailureTotalView,
PersistFailureTotalView,
}

// SinceInMilliseconds returns the duration of time since the provide time as a float64.
Expand Down
20 changes: 20 additions & 0 deletions model/actors/common/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package common
import (
"context"

"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"

"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
)

Expand All @@ -23,6 +25,11 @@ type Actor struct {
func (a *Actor) Persist(ctx context.Context, s model.StorageBatch) error {
ctx, span := global.Tracer("").Start(ctx, "Actor.Persist")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actors"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, a)
}

Expand All @@ -34,6 +41,10 @@ func (actors ActorList) Persist(ctx context.Context, s model.StorageBatch) error
ctx, span := global.Tracer("").Start(ctx, "ActorList.Persist", trace.WithAttributes(label.Int("count", len(actors))))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actors"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be tagging metrics for list persistence the same as individual persistence? Seems this would record duration for a batch and individuals just the same and might make the data unreliable. (If you change this, it appears this is the same for all the models.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters. The list could be a single item and we would want that to be treated identically to the individual case. We're aiming to measure how long visor is taking to persist all the data it extracts, not necessarily per item timings .

stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

if len(actors) == 0 {
return nil
}
Expand All @@ -51,6 +62,11 @@ type ActorState struct {
func (as *ActorState) Persist(ctx context.Context, s model.StorageBatch) error {
ctx, span := global.Tracer("").Start(ctx, "ActorState.Persist")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_states"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, as)
}

Expand All @@ -62,6 +78,10 @@ func (states ActorStateList) Persist(ctx context.Context, s model.StorageBatch)
ctx, span := global.Tracer("").Start(ctx, "ActorStateList.Persist", trace.WithAttributes(label.Int("count", len(states))))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "actor_states"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

if len(states) == 0 {
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions model/actors/init/idaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package init
import (
"context"

"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"

"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
)

Expand All @@ -16,11 +18,24 @@ type IdAddress struct {
StateRoot string `pg:",pk,notnull"`
}

func (ia *IdAddress) Persist(ctx context.Context, s model.StorageBatch) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "id_addresses"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

return s.PersistModel(ctx, ia)
}

type IdAddressList []*IdAddress

func (ias IdAddressList) Persist(ctx context.Context, s model.StorageBatch) error {
ctx, span := global.Tracer("").Start(ctx, "IdAddressList.PersistWithTx", trace.WithAttributes(label.Int("count", len(ias))))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "id_addresses"))
stop := metrics.Timer(ctx, metrics.PersistDuration)
defer stop()

for _, ia := range ias {
if err := s.PersistModel(ctx, ia); err != nil {
return err
Expand Down
Loading