Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(debug): Process actor by head without persistance #86

Merged
merged 1 commit into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions commands/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package commands

import (
"os"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
)

var Debug = &cli.Command{
Name: "debug",
Usage: "Execute individual tasks without persisting them to the database",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "actor-head",
Usage: "Process task in visor_processing_actors by head",
},
},
Action: func(cctx *cli.Context) error {
if err := setupLogging(cctx); err != nil {
return xerrors.Errorf("setup logging: %w", err)
}

tcloser, err := setupTracing(cctx)
if err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}
defer tcloser()

ctx, rctx, err := setupStorageAndAPI(cctx)
if err != nil {
return xerrors.Errorf("setup storage and api: %w", err)
}
defer func() {
rctx.closer()
if err := rctx.db.Close(ctx); err != nil {
log.Errorw("close database", "error", err)
}
}()

p, err := actorstate.NewActorStateProcessor(rctx.db, rctx.api, 0, 0, 0, 0, actorstate.SupportedActorCodes())
if err != nil {
return err
}

return p.Debug(ctx, cctx.String("actor-head"), os.Stdout)
},
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
Commands: []*cli.Command{
commands.Migrate,
commands.Run,
commands.Debug,
},
}

Expand Down
20 changes: 20 additions & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,26 @@ func (d *Database) MarkStateChangeComplete(ctx context.Context, tsk string, heig
return nil
}

// GetActorByHead returns an actor without a lease by its CID
func (d *Database) GetActorByHead(ctx context.Context, head string) (*visor.ProcessingActor, error) {
if len(head) == 0 {
return nil, xerrors.Errorf("lookup actor head was empty")
}

d.DB.AddQueryHook(pgext.DebugHook{
Verbose: true, // Print all queries.
})

a := new(visor.ProcessingActor)
if err := d.DB.ModelContext(ctx, a).
Where("head = ?", head).
Limit(1).
Select(); err != nil {
return nil, err
}
return a, nil
}

// LeaseActors leases a set of actors to process. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) LeaseActors(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64, codes []string) (visor.ProcessingActorList, error) {
var actors visor.ProcessingActorList
Expand Down
63 changes: 63 additions & 0 deletions tasks/actorstate/actorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package actorstate

import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -121,6 +125,65 @@ type ActorStateProcessor struct {
clock clock.Clock
}

func trackDuration(topic string, w io.Writer) func() {
t := time.Now()
return func() {
w.Write([]byte(fmt.Sprintf("** %s finished in %s\n", topic, time.Since(t))))
}
}

// Debug runs an individual actor and returns result
func (p *ActorStateProcessor) Debug(ctx context.Context, head string, writer io.Writer) error {
defer trackDuration("debug total", writer)()
printActorDuration := trackDuration("get actor", writer)
actor, err := p.storage.GetActorByHead(ctx, head)
if err != nil {
return xerrors.Errorf("get actor by head: %w", err)
}
printActorDuration()

printNewActorDuration := trackDuration("new actor info", writer)
info, err := NewActorInfo(actor)
if err != nil {
return xerrors.Errorf("new actor info: %w", err)
}
printNewActorDuration()
defer trackDuration("debug actor", writer)()
if err := p.debugActor(ctx, info, writer); err != nil {
return xerrors.Errorf("debug actor: %w", err)
}
return nil
}

func (p *ActorStateProcessor) debugActor(ctx context.Context, info ActorInfo, writer io.Writer) error {
// extract actor state
extractor, exists := p.extractors[info.Actor.Code]
if !exists {
return xerrors.Errorf("no extractor defined for actor code %q", info.Actor.Code.String())
}

data, err := extractor.Extract(ctx, info, p.node)
if err != nil {
return xerrors.Errorf("extract actor state: %w", err)
}

dm, err := json.MarshalIndent(data, " ", " ")
if err != nil {
return xerrors.Errorf("marshaling actor state: %w", err)
}

var result strings.Builder
header := "** ActorProcessorResult:\n"
result.Grow(len(header) + len(dm))
if _, err := result.WriteString(header); err != nil {
return xerrors.Errorf("writing actor state: %w", err)
}
result.Write(dm)
fmt.Fprint(writer, result.String())

return nil
}

// Run starts processing batches of actors and blocks until the context is done or
// an error occurs.
func (p *ActorStateProcessor) Run(ctx context.Context) error {
Expand Down