Skip to content

Commit

Permalink
perf: ensure processing updates always include height in criteria (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
iand authored Nov 2, 2020
1 parent 8fccb47 commit 3f2e31c
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 17 deletions.
5 changes: 5 additions & 0 deletions model/derived/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
}
return nil
}

type ProcessingGasOutputs struct {
Height int64
GasOutputs
}
14 changes: 7 additions & 7 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,14 +435,14 @@ func (d *Database) FindActors(ctx context.Context, claimUntil time.Time, batchSi
return actors, nil
}

func (d *Database) MarkActorComplete(ctx context.Context, head string, code string, completedAt time.Time, errorsDetected string) error {
func (d *Database) MarkActorComplete(ctx context.Context, height int64, head string, code string, completedAt time.Time, errorsDetected string) error {
if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.ExecContext(ctx, `
UPDATE visor_processing_actors
SET claimed_until = null,
completed_at = ?,
errors_detected = ?
WHERE head = ? AND code = ?
WHERE height = ? AND head = ? AND code = ?
`, completedAt, useNullIfEmpty(errorsDetected), head, code)
if err != nil {
return err
Expand Down Expand Up @@ -521,8 +521,8 @@ func useNullIfEmpty(s string) *string {
}

// LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) (derived.GasOutputsList, error) {
var list derived.GasOutputsList
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) {
var list []*derived.ProcessingGasOutputs

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &list, `
Expand Down Expand Up @@ -561,15 +561,15 @@ SELECT * FROM leased;
return list, nil
}

func (d *Database) MarkGasOutputsMessagesComplete(ctx context.Context, cid string, completedAt time.Time, errorsDetected string) error {
func (d *Database) MarkGasOutputsMessagesComplete(ctx context.Context, height int64, cid string, completedAt time.Time, errorsDetected string) error {
if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.ExecContext(ctx, `
UPDATE visor_processing_messages
SET gas_outputs_claimed_until = null,
gas_outputs_completed_at = ?,
gas_outputs_errors_detected = ?
WHERE cid = ?
`, completedAt, useNullIfEmpty(errorsDetected), cid)
WHERE height = ? AND cid = ?
`, completedAt, useNullIfEmpty(errorsDetected), height, cid)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions storage/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestMarkActorComplete(t *testing.T) {

t.Run("with error message", func(t *testing.T) {
completedAt := testutil.KnownTime.Add(time.Minute * 1)
err = d.MarkActorComplete(ctx, "head1", "codeB", completedAt, "message")
err = d.MarkActorComplete(ctx, 1, "head1", "codeB", completedAt, "message")
require.NoError(t, err)

// Check the database contains the updated row
Expand All @@ -341,7 +341,7 @@ func TestMarkActorComplete(t *testing.T) {

t.Run("without error message", func(t *testing.T) {
completedAt := testutil.KnownTime.Add(time.Minute * 2)
err = d.MarkActorComplete(ctx, "head1", "codeB", completedAt, "")
err = d.MarkActorComplete(ctx, 1, "head1", "codeB", completedAt, "")
require.NoError(t, err)

// Check the database contains the updated row with a null errors_detected column
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestMarkGasOutputsMessagesComplete(t *testing.T) {

t.Run("with error message", func(t *testing.T) {
completedAt := testutil.KnownTime.Add(time.Minute * 1)
err = d.MarkGasOutputsMessagesComplete(ctx, "cid1", completedAt, "message")
err = d.MarkGasOutputsMessagesComplete(ctx, 1, "cid1", completedAt, "message")
require.NoError(t, err)

// Check the database contains the updated row
Expand All @@ -725,7 +725,7 @@ func TestMarkGasOutputsMessagesComplete(t *testing.T) {

t.Run("without error message", func(t *testing.T) {
completedAt := testutil.KnownTime.Add(time.Minute * 2)
err = d.MarkGasOutputsMessagesComplete(ctx, "cid1", completedAt, "")
err = d.MarkGasOutputsMessagesComplete(ctx, 1, "cid1", completedAt, "")
require.NoError(t, err)

// Check the database contains the updated row with a null errors_detected column
Expand Down
6 changes: 3 additions & 3 deletions tasks/actorstate/actorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,22 @@ func (p *ActorStateProcessor) processBatch(ctx context.Context, node lens.API) (
info, err := NewActorInfo(actor)
if err != nil {
errorLog.Errorw("unmarshal actor", "error", err.Error())
if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil {
if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil {
errorLog.Errorw("failed to mark actor complete", "error", err.Error())
}
continue
}

if err := p.processActor(ctx, node, info); err != nil {
errorLog.Errorw("process actor", "error", err.Error())
if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil {
if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil {
errorLog.Errorw("failed to mark actor complete", "error", err.Error())
}

return false, xerrors.Errorf("process actor: %w", err)
}

if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), ""); err != nil {
if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), ""); err != nil {
errorLog.Errorw("failed to mark actor complete", "error", err.Error())
}
}
Expand Down
6 changes: 3 additions & 3 deletions tasks/message/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) (

errorLog := log.With("cid", item.Cid)

if err := p.processItem(ctx, node, item); err != nil {
if err := p.processItem(ctx, node, &item.GasOutputs); err != nil {
// Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch
errorLog.Errorw("failed to process message", "error", err.Error())
if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Cid, p.clock.Now(), err.Error()); err != nil {
if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil {
errorLog.Errorw("failed to mark message complete", "error", err.Error())
}
return false, xerrors.Errorf("process item: %w", err)
}

if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Cid, p.clock.Now(), ""); err != nil {
if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), ""); err != nil {
errorLog.Errorw("failed to mark message complete", "error", err.Error())
}
}
Expand Down

0 comments on commit 3f2e31c

Please sign in to comment.