Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add visor processing stats table #96

Merged
merged 1 commit into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# vendor/
.idea
visor
!model/visor
sentinel-visor

build/.*
20 changes: 20 additions & 0 deletions commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/sentinel-visor/tasks/chain"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
"github.com/filecoin-project/sentinel-visor/tasks/message"
"github.com/filecoin-project/sentinel-visor/tasks/stats"
"github.com/filecoin-project/sentinel-visor/tasks/views"
)

Expand Down Expand Up @@ -163,6 +164,14 @@ var Run = &cli.Command{
EnvVars: []string{"VISOR_CHAINVIS_REFRESH"},
},

&cli.DurationFlag{
Name: "processingstats-refresh-rate",
Aliases: []string{"psr"},
Value: 0,
Usage: "Refresh frequency for processing stats (0 = disables refresh)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this in seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a Go duration, so it takes values like 30s and 15m

EnvVars: []string{"VISOR_PROCESSINGSTATS_REFRESH"},
},

&cli.IntFlag{
Name: "chaineconomics-workers",
Aliases: []string{"cew"},
Expand Down Expand Up @@ -309,6 +318,17 @@ var Run = &cli.Command{
RestartOnCompletion: false,
})

// Include optional refresher for processing stats
if cctx.Duration("processingstats-refresh-rate") != 0 {
scheduler.Add(schedule.TaskConfig{
Name: "ProcessingStatsRefresher",
Locker: NewGlobalSingleton(ProcessingStatsRefresherLockID, rctx.db),
Task: stats.NewProcessingStatsRefresher(rctx.db, cctx.Duration("processingstats-refresh-rate")),
RestartOnFailure: true,
RestartOnCompletion: true,
})
}

// Start the scheduler and wait for it to complete or to be cancelled.
err = scheduler.Run(ctx)
if !errors.Is(err, context.Canceled) {
Expand Down
7 changes: 4 additions & 3 deletions commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ func setupLogging(cctx *cli.Context) error {
}

const (
ChainHeadIndexerLockID = 98981111
ChainHistoryIndexerLockID = 98981112
ChainVisRefresherLockID = 98981113
ChainHeadIndexerLockID = 98981111
ChainHistoryIndexerLockID = 98981112
ChainVisRefresherLockID = 98981113
ProcessingStatsRefresherLockID = 98981114
)

func NewGlobalSingleton(id int64, d *storage.Database) *GlobalSingleton {
Expand Down
51 changes: 51 additions & 0 deletions model/visor/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package visor

import (
"context"
"fmt"
"time"

"github.com/go-pg/pg/v10"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
)

type ProcessingStat struct {
tableName struct{} `pg:"visor_processing_stats"`

// RecordedAt is the time the measurement was recorded in the database
RecordedAt time.Time `pg:",pk,notnull"`

// Measure is the name of the measurement, e.g. `messages_completed_count`
Measure string `pg:",pk,notnull"`

// Value is the value of the measurement
Value int64 `pg:",use_zero,notnull"`
}

func (s *ProcessingStat) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, s).
OnConflict("do nothing").
Insert(); err != nil {
return fmt.Errorf("persisting processing stat: %w", err)
}
return nil
}

type ProcessingStatList []*ProcessingStat

func (l ProcessingStatList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if len(l) == 0 {
return nil
}
ctx, span := global.Tracer("").Start(ctx, "ProcessingStatList.PersistWithTx", trace.WithAttributes(label.Int("count", len(l))))
defer span.End()

if _, err := tx.ModelContext(ctx, &l).
OnConflict("do nothing").
Insert(); err != nil {
return fmt.Errorf("persisting processing stats: %w", err)
}
return nil
}
47 changes: 47 additions & 0 deletions storage/migrations/9_processing_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

// Schema version 9 adds a table for processing stats and indexes over the visor processing tables

func init() {
up := batch(`
CREATE TABLE IF NOT EXISTS public.visor_processing_stats (
"recorded_at" timestamptz NOT NULL,
"measure" text NOT NULL,
"value" bigint NOT NULL,
PRIMARY KEY ("recorded_at","measure")
);

CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_statechange_idx" ON public.visor_processing_tipsets USING BTREE (statechange_completed_at, statechange_claimed_until);
CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_message_idx" ON public.visor_processing_tipsets USING BTREE (message_completed_at, message_claimed_until);
CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_economics_idx" ON public.visor_processing_tipsets USING BTREE (economics_completed_at, economics_claimed_until);
CREATE INDEX IF NOT EXISTS "visor_processing_tipsets_height_idx" ON public.visor_processing_tipsets USING BTREE (height DESC);

CREATE INDEX IF NOT EXISTS "visor_processing_messages_gas_outputs_idx" ON public.visor_processing_messages USING BTREE (gas_outputs_completed_at, gas_outputs_claimed_until);
CREATE INDEX IF NOT EXISTS "visor_processing_messages_height_idx" ON public.visor_processing_messages USING BTREE (height DESC);

CREATE INDEX IF NOT EXISTS "visor_processing_actors_completed_idx" ON public.visor_processing_actors USING BTREE (completed_at, claimed_until);
CREATE INDEX IF NOT EXISTS "visor_processing_actors_code_idx" ON public.visor_processing_actors USING BTREE (code);
CREATE INDEX IF NOT EXISTS "visor_processing_actors_height_idx" ON public.visor_processing_actors USING BTREE (height DESC);
`)

down := batch(`
DROP TABLE IF EXISTS public.visor_processing_stats;

DROP INDEX IF EXISTS visor_processing_tipsets_statechange_idx;
DROP INDEX IF EXISTS visor_processing_tipsets_message_idx;
DROP INDEX IF EXISTS visor_processing_tipsets_economics_idx;
DROP INDEX IF EXISTS visor_processing_tipsets_height_idx;

DROP INDEX IF EXISTS visor_processing_messages_gas_outputs_idx;
DROP INDEX IF EXISTS visor_processing_messages_height_idx;

DROP INDEX IF EXISTS visor_processing_actors_completed_idx;
DROP INDEX IF EXISTS visor_processing_actors_code_idx;
DROP INDEX IF EXISTS visor_processing_actors_height_idx;
`)
migrations.MustRegisterTx(up, down)
}
2 changes: 2 additions & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var models = []interface{}{
(*visor.ProcessingActor)(nil),
(*visor.ProcessingMessage)(nil),

(*visor.ProcessingStat)(nil),

(*derived.GasOutputs)(nil),
(*chain.ChainEconomics)(nil),
}
Expand Down
151 changes: 151 additions & 0 deletions tasks/stats/processing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package stats

import (
"context"
"fmt"
"strings"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/wait"
)

var statsInsert = `INSERT INTO visor_processing_stats SELECT date_trunc('minute', NOW()), measure, value FROM ( %s ) stats ON CONFLICT DO NOTHING;`

var statsTipsetsTemplate = `
-- total number of tipsets that have been discovered for processing
SELECT 'tipsets_%[1]s_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets

UNION

-- total number of tipsets that have been processed
SELECT 'tipsets_%[1]s_completed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL

UNION

-- total number of tipsets that have been processed but reported an error
SELECT 'tipsets_%[1]s_errors_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NOT NULL

UNION

-- total number of tipsets that are currently being processed
SELECT 'tipsets_%[1]s_claimed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_tipsets WHERE %[1]s_claimed_until IS NOT NULL

UNION

-- highest epoch that has been processed
SELECT 'tipsets_%[1]s_completed_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NULL

UNION

-- highest epoch that has not been processed
SELECT 'tipsets_%[1]s_incomplete_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_tipsets WHERE %[1]s_completed_at IS NULL
`

var statsMessagesTemplate = `
-- total number of messages that have been discovered for processing
SELECT 'messages_%[1]s_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages

UNION

-- total number of messages that have been processed
SELECT 'messages_%[1]s_completed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL

UNION

-- total number of messages that have been processed but reported an error
SELECT 'messages_%[1]s_errors_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NOT NULL

UNION

-- total number of messages that are currently being processed
SELECT 'messages_%[1]s_claimed_count' AS measure, COALESCE(count(*),0) AS value FROM visor_processing_messages WHERE %[1]s_claimed_until IS NOT NULL

UNION

-- highest epoch that has been processed
SELECT 'messages_%[1]s_completed_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NOT NULL AND %[1]s_errors_detected IS NULL

UNION

-- highest epoch that has not been processed
SELECT 'messages_%[1]s_incomplete_height_max' AS measure, COALESCE(max(height),0) AS value FROM visor_processing_messages WHERE %[1]s_completed_at IS NULL
`

var statsActors = `
-- total number of actors of each type that have been discovered for processing
SELECT concat('actors_', code, '_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors GROUP BY code

UNION

-- total number of actors of each type that have been processed
SELECT concat('actors_', code, '_completed_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL GROUP BY code

UNION

-- total number of actors of each type that have been processed but reported an error
SELECT concat('actors_', code, '_errors_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL AND errors_detected IS NOT NULL GROUP BY code

UNION

-- total number of actors of each type that have are currently being processed
SELECT concat('actors_', code, '_claimed_count') AS measure, COALESCE(count(*),0) AS value FROM visor_processing_actors WHERE claimed_until IS NOT NULL GROUP BY code

UNION

-- highest epoch that has been processed
SELECT concat('actors_', code, '_completed_height_max') AS measure, COALESCE(max(height),0) AS value FROM visor_processing_actors WHERE completed_at IS NOT NULL AND errors_detected IS NULL GROUP BY code

UNION

-- highest epoch that has not been processed
SELECT concat('actors_', code, '_incomplete_height_max') AS measure, COALESCE(max(height),0) AS value FROM visor_processing_actors WHERE completed_at IS NULL GROUP BY code
`

func NewProcessingStatsRefresher(d *storage.Database, refreshRate time.Duration) *ProcessingStatsRefresher {
return &ProcessingStatsRefresher{
db: d,
refreshRate: refreshRate,
}
}

// ProcessingStatsRefresher is a task which periodically collects summaries of processing tables used by visor
type ProcessingStatsRefresher struct {
db *storage.Database
refreshRate time.Duration
}

// Run starts regularly refreshing until context is done or an error occurs
func (r *ProcessingStatsRefresher) Run(ctx context.Context) error {
if r.refreshRate == 0 {
return nil
}
return wait.RepeatUntil(ctx, r.refreshRate, r.collectStats)
}

func (r *ProcessingStatsRefresher) collectStats(ctx context.Context) (bool, error) {
subQueries := []string{statsActors}

tipsetTaskTypes := []string{"message", "statechange", "economics"}

for _, taskType := range tipsetTaskTypes {
subQueries = append(subQueries, fmt.Sprintf(statsTipsetsTemplate, taskType))
}

messageTaskTypes := []string{"gas_outputs"}

for _, taskType := range messageTaskTypes {
subQueries = append(subQueries, fmt.Sprintf(statsMessagesTemplate, taskType))
}

subQuery := strings.Join(subQueries, " UNION ")

_, err := r.db.DB.ExecContext(ctx, fmt.Sprintf(statsInsert, subQuery))
if err != nil {
return true, xerrors.Errorf("refresh: %w", err)
}

return false, nil
}