From af2060d4b45816c5cc242171243cb3dca5cba99c Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 1 Jul 2022 10:17:20 -0700 Subject: [PATCH] fix: allow tasks to be retired with block not found - fixes #1014 --- chain/indexer/distributed/queue/tasks/gapfill.go | 10 ++-------- chain/indexer/distributed/queue/tasks/index.go | 10 ++-------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/chain/indexer/distributed/queue/tasks/gapfill.go b/chain/indexer/distributed/queue/tasks/gapfill.go index 401eb45cb..2c9b7e787 100644 --- a/chain/indexer/distributed/queue/tasks/gapfill.go +++ b/chain/indexer/distributed/queue/tasks/gapfill.go @@ -4,9 +4,7 @@ import ( "context" "encoding/json" "fmt" - "strings" - "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/hibiken/asynq" "go.opentelemetry.io/otel/attribute" @@ -85,18 +83,14 @@ func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Con } span := trace.SpanFromContext(ctx) if span.IsRecording() { - span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID())) + span.SetAttributes(attribute.String("taskID", taskID)) span.SetAttributes(p.Attributes()...) } } success, err := gh.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks)) if err != nil { - if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) { - log.Errorw("failed to index tipset for gap fill", zap.Inline(p), "error", err) - // return SkipRetry to prevent the task from being retried since nodes do not contain the block - return fmt.Errorf("indexing tipset for gap fill %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry) - } + log.Errorw("failed to index tipset for gap fill", "taskID", taskID, zap.Inline(p), "error", err) return err } if !success { diff --git a/chain/indexer/distributed/queue/tasks/index.go b/chain/indexer/distributed/queue/tasks/index.go index 3608e5be0..b90848f03 100644 --- a/chain/indexer/distributed/queue/tasks/index.go +++ b/chain/indexer/distributed/queue/tasks/index.go @@ -4,9 +4,7 @@ import ( "context" "encoding/json" "fmt" - "strings" - "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/hibiken/asynq" logging "github.com/ipfs/go-log/v2" @@ -86,18 +84,14 @@ func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t * } span := trace.SpanFromContext(ctx) if span.IsRecording() { - span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID())) + span.SetAttributes(attribute.String("taskID", taskID)) span.SetAttributes(p.Attributes()...) } } success, err := ih.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks)) if err != nil { - if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) { - log.Errorw("failed to index tipset", zap.Inline(p), "error", err) - // return SkipRetry to prevent the task from being retried since nodes do not contain the block - return fmt.Errorf("indexing tipset %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry) - } + log.Errorw("failed to index tipset", "taskID", taskID, zap.Inline(p), "error", err) return err } if !success {