From 710e731f1bba243e4cd3c77cc659934f08e08d20 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Tue, 13 Oct 2020 14:50:06 +0100 Subject: [PATCH] feat: add visor processing stats table Add a task that writes summary stats to a new table visor_processing_stats. For each tipset processor (message discovery, state change, chain economics) and for each message processor (currently only gas outputs) and for each actor state change it writes the following metrics: - total number of items that have been discovered for processing - total number of items that have been processed - total number of items that have been processed but reported an error - total number of items that are currently being processed - highest epoch that has been processed - highest epoch that has not been processed Stats are written every minute and only a single global task is used. --- .gitignore | 1 + commands/run.go | 20 +++ commands/setup.go | 7 +- model/visor/stats.go | 51 ++++++++ storage/migrations/9_processing_stats.go | 47 +++++++ storage/sql.go | 2 + tasks/stats/processing.go | 151 +++++++++++++++++++++++ 7 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 model/visor/stats.go create mode 100644 storage/migrations/9_processing_stats.go create mode 100644 tasks/stats/processing.go diff --git a/.gitignore b/.gitignore index b579e8c51..3d23c963b 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ # vendor/ .idea visor +!model/visor sentinel-visor build/.* diff --git a/commands/run.go b/commands/run.go index 63321cee6..fe620e324 100644 --- a/commands/run.go +++ b/commands/run.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/sentinel-visor/tasks/chain" "github.com/filecoin-project/sentinel-visor/tasks/indexer" "github.com/filecoin-project/sentinel-visor/tasks/message" + "github.com/filecoin-project/sentinel-visor/tasks/stats" "github.com/filecoin-project/sentinel-visor/tasks/views" ) @@ -163,6 +164,14 @@ var Run = &cli.Command{ EnvVars: []string{"VISOR_CHAINVIS_REFRESH"}, }, + &cli.DurationFlag{ + Name: "processingstats-refresh-rate", + Aliases: []string{"psr"}, + Value: 0, + Usage: "Refresh frequency for processing stats (0 = disables refresh)", + EnvVars: []string{"VISOR_PROCESSINGSTATS_REFRESH"}, + }, + &cli.IntFlag{ Name: "chaineconomics-workers", Aliases: []string{"cew"}, @@ -309,6 +318,17 @@ var Run = &cli.Command{ RestartOnCompletion: false, }) + // Include optional refresher for processing stats + if cctx.Duration("processingstats-refresh-rate") != 0 { + scheduler.Add(schedule.TaskConfig{ + Name: "ProcessingStatsRefresher", + Locker: NewGlobalSingleton(ProcessingStatsRefresherLockID, rctx.db), + Task: stats.NewProcessingStatsRefresher(rctx.db, cctx.Duration("processingstats-refresh-rate")), + RestartOnFailure: true, + RestartOnCompletion: true, + }) + } + // Start the scheduler and wait for it to complete or to be cancelled. err = scheduler.Run(ctx) if !errors.Is(err, context.Canceled) { diff --git a/commands/setup.go b/commands/setup.go index c750f2d2b..cc1386a51 100644 --- a/commands/setup.go +++ b/commands/setup.go @@ -173,9 +173,10 @@ func setupLogging(cctx *cli.Context) error { } const ( - ChainHeadIndexerLockID = 98981111 - ChainHistoryIndexerLockID = 98981112 - ChainVisRefresherLockID = 98981113 + ChainHeadIndexerLockID = 98981111 + ChainHistoryIndexerLockID = 98981112 + ChainVisRefresherLockID = 98981113 + ProcessingStatsRefresherLockID = 98981114 ) func NewGlobalSingleton(id int64, d *storage.Database) *GlobalSingleton { diff --git a/model/visor/stats.go b/model/visor/stats.go new file mode 100644 index 000000000..0d9bec463 --- /dev/null +++ b/model/visor/stats.go @@ -0,0 +1,51 @@ +package visor + +import ( + "context" + "fmt" + "time" + + "github.com/go-pg/pg/v10" + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/label" +) + +type ProcessingStat struct { + tableName struct{} `pg:"visor_processing_stats"` + + // RecordedAt is the time the measurement was recorded in the database + RecordedAt time.Time `pg:",pk,notnull"` + + // Measure is the name of the measurement, e.g. `messages_completed_count` + Measure string `pg:",pk,notnull"` + + // Value is the value of the measurement + Value int64 `pg:",use_zero,notnull"` +} + +func (s *ProcessingStat) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, s). + OnConflict("do nothing"). + Insert(); err != nil { + return fmt.Errorf("persisting processing stat: %w", err) + } + return nil +} + +type ProcessingStatList []*ProcessingStat + +func (l ProcessingStatList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if len(l) == 0 { + return nil + } + ctx, span := global.Tracer("").Start(ctx, "ProcessingStatList.PersistWithTx", trace.WithAttributes(label.Int("count", len(l)))) + defer span.End() + + if _, err := tx.ModelContext(ctx, &l). + OnConflict("do nothing"). + Insert(); err != nil { + return fmt.Errorf("persisting processing stats: %w", err) + } + return nil +} diff --git a/storage/migrations/9_processing_stats.go b/storage/migrations/9_processing_stats.go new file mode 100644 index 000000000..855d2315b --- /dev/null +++ b/storage/migrations/9_processing_stats.go @@ -0,0 +1,47 @@ +package migrations + +import ( + "github.com/go-pg/migrations/v8" +) + +// Schema version 9 adds a table for processing stats and indexes over the visor processing tables + +func init() { + up := batch(` +CREATE TABLE IF NOT EXISTS public.visor_processing_stats ( + "recorded_at" timestamptz NOT NULL, + "measure" text NOT NULL, + "value" bigint NOT NULL, + PRIMARY KEY ("recorded_at","measure") +); + +CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_statechange_idx" ON public.visor_processing_tipsets USING BTREE (statechange_completed_at, statechange_claimed_until); +CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_message_idx" ON public.visor_processing_tipsets USING BTREE (message_completed_at, message_claimed_until); +CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_economics_idx" ON public.visor_processing_tipsets USING BTREE (economics_completed_at, economics_claimed_until); +CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_height_idx" ON public.visor_processing_tipsets USING BTREE (height DESC); + +CREATE INDEX IF NOT EXISTS "visor_processing_messages_gas_outputs_idx" ON public.visor_processing_messages USING BTREE (gas_outputs_completed_at, gas_outputs_claimed_until); +CREATE INDEX IF NOT EXISTS "visor_processing_messages_height_idx" ON public.visor_processing_messages USING BTREE (height DESC); + +CREATE INDEX IF NOT EXISTS "visor_processing_actors_completed_idx" ON public.visor_processing_actors USING BTREE (completed_at, claimed_until); +CREATE INDEX IF NOT EXISTS "visor_processing_actors_code_idx" ON public.visor_processing_actors USING BTREE (code); +CREATE INDEX IF NOT EXISTS "visor_processing_actors_height_idx" ON public.visor_processing_actors USING BTREE (height DESC); +`) + + down := batch(` +DROP TABLE IF EXISTS public.visor_processing_stats; + +DROP INDEX IF EXISTS visor_processing_tipsets_statechange_idx; +DROP INDEX IF EXISTS visor_processing_tipsets_message_idx; +DROP INDEX IF EXISTS visor_processing_tipsets_economics_idx; +DROP INDEX IF EXISTS visor_processing_tipsets_height_idx; + +DROP INDEX IF EXISTS visor_processing_messages_gas_outputs_idx; +DROP INDEX IF EXISTS visor_processing_messages_height_idx; + +DROP INDEX IF EXISTS visor_processing_actors_completed_idx; +DROP INDEX IF EXISTS visor_processing_actors_code_idx; +DROP INDEX IF EXISTS visor_processing_actors_height_idx; +`) + migrations.MustRegisterTx(up, down) +} diff --git a/storage/sql.go b/storage/sql.go index f960ff6e3..99e032968 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -58,6 +58,8 @@ var models = []interface{}{ (*visor.ProcessingActor)(nil), (*visor.ProcessingMessage)(nil), + (*visor.ProcessingStat)(nil), + (*derived.GasOutputs)(nil), (*chain.ChainEconomics)(nil), } diff --git a/tasks/stats/processing.go b/tasks/stats/processing.go new file mode 100644 index 000000000..6894211b6 --- /dev/null +++ b/tasks/stats/processing.go @@ -0,0 +1,151 @@ +package stats + +import ( + "context" + "fmt" + "strings" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/storage" + "github.com/filecoin-project/sentinel-visor/wait" +) + +var statsInsert = `INSERT INTO visor_processing_stats SELECT date_trunc('minute', NOW()), measure, value FROM ( %s ) stats ON CONFLICT DO NOTHING;` + +var statsTipsetsTemplate = ` +-- total number of tipsets that have been discovered for processing +SELECT 'tipsets_%[1]s_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets + +UNION + +-- total number of tipsets that have been processed +SELECT 'tipsets_%[1]s_completed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL + +UNION + +-- total number of tipsets that have been processed but reported an error +SELECT 'tipsets_%[1]s_errors_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NOT NULL + +UNION + +-- total number of tipsets that are currently being processed +SELECT 'tipsets_%[1]s_claimed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_claimed_until IS NOT NULL + +UNION + +-- highest epoch that has been processed +SELECT 'tipsets_%[1]s_completed_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NULL + +UNION + +-- highest epoch that has not been processed +SELECT 'tipsets_%[1]s_incomplete_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NULL +` + +var statsMessagesTemplate = ` +-- total number of messages that have been discovered for processing +SELECT 'messages_%[1]s_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages + +UNION + +-- total number of messages that have been processed +SELECT 'messages_%[1]s_completed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL + +UNION + +-- total number of messages that have been processed but reported an error +SELECT 'messages_%[1]s_errors_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NOT NULL + +UNION + +-- total number of messages that are currently being processed +SELECT 'messages_%[1]s_claimed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_claimed_until IS NOT NULL + +UNION + +-- highest epoch that has been processed +SELECT 'messages_%[1]s_completed_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NULL + +UNION + +-- highest epoch that has not been processed +SELECT 'messages_%[1]s_incomplete_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NULL +` + +var statsActors = ` +-- total number of actors of each type that have been discovered for processing +SELECT concat('actors_', code, '_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors GROUP BY code + +UNION + +-- total number of actors of each type that have been processed +SELECT concat('actors_', code, '_completed_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL GROUP BY code + +UNION + +-- total number of actors of each type that have been processed but reported an error +SELECT concat('actors_', code, '_errors_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL AND errors_detected IS NOT NULL GROUP BY code + +UNION + +-- total number of actors of each type that have are currently being processed +SELECT concat('actors_', code, '_claimed_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE claimed_until IS NOT NULL GROUP BY code + +UNION + +-- highest epoch that has been processed +SELECT concat('actors_', code, '_completed_height_max') AS measure, COALESCE(max(height),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL AND errors_detected IS NULL GROUP BY code + +UNION + +-- highest epoch that has not been processed +SELECT concat('actors_', code, '_incomplete_height_max') AS measure, COALESCE(max(height),0) AS value FROM visor_processing_actors WHERE completed_at IS NULL GROUP BY code +` + +func NewProcessingStatsRefresher(d *storage.Database, refreshRate time.Duration) *ProcessingStatsRefresher { + return &ProcessingStatsRefresher{ + db: d, + refreshRate: refreshRate, + } +} + +// ProcessingStatsRefresher is a task which periodically collects summaries of processing tables used by visor +type ProcessingStatsRefresher struct { + db *storage.Database + refreshRate time.Duration +} + +// Run starts regularly refreshing until context is done or an error occurs +func (r *ProcessingStatsRefresher) Run(ctx context.Context) error { + if r.refreshRate == 0 { + return nil + } + return wait.RepeatUntil(ctx, r.refreshRate, r.collectStats) +} + +func (r *ProcessingStatsRefresher) collectStats(ctx context.Context) (bool, error) { + subQueries := []string{statsActors} + + tipsetTaskTypes := []string{"message", "statechange", "economics"} + + for _, taskType := range tipsetTaskTypes { + subQueries = append(subQueries, fmt.Sprintf(statsTipsetsTemplate, taskType)) + } + + messageTaskTypes := []string{"gas_outputs"} + + for _, taskType := range messageTaskTypes { + subQueries = append(subQueries, fmt.Sprintf(statsMessagesTemplate, taskType)) + } + + subQuery := strings.Join(subQueries, " UNION ") + + _, err := r.db.DB.ExecContext(ctx, fmt.Sprintf(statsInsert, subQuery)) + if err != nil { + return true, xerrors.Errorf("refresh: %w", err) + } + + return false, nil +}