Skip to content

Commit

Permalink
fix: allow tasks to be retired with block not found
Browse files Browse the repository at this point in the history
- fixes #1014
  • Loading branch information
frrist committed Jul 1, 2022
1 parent f150334 commit af2060d
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 16 deletions.
10 changes: 2 additions & 8 deletions chain/indexer/distributed/queue/tasks/gapfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -85,18 +83,14 @@ func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Con
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(attribute.String("taskID", taskID))
span.SetAttributes(p.Attributes()...)
}
}

success, err := gh.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset for gap fill", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
return fmt.Errorf("indexing tipset for gap fill %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
}
log.Errorw("failed to index tipset for gap fill", "taskID", taskID, zap.Inline(p), "error", err)
return err
}
if !success {
Expand Down
10 changes: 2 additions & 8 deletions chain/indexer/distributed/queue/tasks/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -86,18 +84,14 @@ func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t *
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(attribute.String("taskID", taskID))
span.SetAttributes(p.Attributes()...)
}
}

success, err := ih.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
return fmt.Errorf("indexing tipset %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
}
log.Errorw("failed to index tipset", "taskID", taskID, zap.Inline(p), "error", err)
return err
}
if !success {
Expand Down

0 comments on commit af2060d

Please sign in to comment.