-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add new option for walk config #1212
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,28 +17,30 @@ import ( | |
|
||
var log = logging.Logger("lily/chain/walk") | ||
|
||
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter) *Walker { | ||
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnFatalError bool) *Walker { | ||
return &Walker{ | ||
node: node, | ||
obs: obs, | ||
name: name, | ||
tasks: tasks, | ||
minHeight: minHeight, | ||
maxHeight: maxHeight, | ||
report: r, | ||
node: node, | ||
obs: obs, | ||
name: name, | ||
tasks: tasks, | ||
minHeight: minHeight, | ||
maxHeight: maxHeight, | ||
report: r, | ||
stopOnFatalError: stopOnFatalError, | ||
} | ||
} | ||
|
||
// Walker is a job that indexes blocks by walking the chain history. | ||
type Walker struct { | ||
node lens.API | ||
obs indexer.Indexer | ||
name string | ||
tasks []string | ||
minHeight int64 // limit persisting to tipsets equal to or above this height | ||
maxHeight int64 // limit persisting to tipsets equal to or below this height} | ||
done chan struct{} | ||
report *schedule.Reporter | ||
node lens.API | ||
obs indexer.Indexer | ||
name string | ||
tasks []string | ||
minHeight int64 // limit persisting to tipsets equal to or above this height | ||
maxHeight int64 // limit persisting to tipsets equal to or below this height} | ||
done chan struct{} | ||
report *schedule.Reporter | ||
stopOnFatalError bool | ||
} | ||
|
||
// Run starts walking the chain history and continues until the context is done or | ||
|
@@ -92,6 +94,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) | |
defer span.End() | ||
|
||
var err error | ||
errs := []error{} | ||
for int64(ts.Height()) >= c.minHeight && ts.Height() != 0 { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -102,7 +105,14 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) | |
c.report.UpdateCurrentHeight(int64(ts.Height())) | ||
if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil { | ||
span.RecordError(err) | ||
return fmt.Errorf("notify tipset: %w", err) | ||
log.Errorf("notify tipset: %w", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's improve the log message. |
||
// collect error | ||
errs = append(errs, err) | ||
|
||
// return an error only if the "stopOnFatalError" flag is set to true. | ||
if c.stopOnFatalError { | ||
return err | ||
} | ||
} else if !success { | ||
log.Errorw("walk incomplete", "height", ts.Height(), "tipset", ts.Key().String(), "reporter", c.name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above so it looks consistent. |
||
} | ||
|
@@ -115,5 +125,9 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) | |
} | ||
} | ||
|
||
if len(errs) > 0 { | ||
return fmt.Errorf("errors: %v", errs) | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ type runOpts struct { | |
|
||
RestartCompletion bool | ||
RestartFailure bool | ||
StopOnFatalError bool | ||
} | ||
|
||
func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { | ||
|
@@ -35,6 +36,7 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { | |
RestartOnFailure: RunFlags.RestartFailure, | ||
RestartOnCompletion: RunFlags.RestartCompletion, | ||
RestartDelay: RunFlags.RestartDelay, | ||
StopOnFatalError: RunFlags.StopOnFatalError, | ||
} | ||
} | ||
|
||
|
@@ -96,6 +98,14 @@ var RunRestartFailure = &cli.BoolFlag{ | |
Destination: &RunFlags.RestartFailure, | ||
} | ||
|
||
var StopOnFatalError = &cli.BoolFlag{ | ||
Name: "stop-on-fatal-error", | ||
Usage: "Stop the job if it get error.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop the job if it encounters an error. |
||
EnvVars: []string{"LILY_JOB_STOP_ON_FATAL_ERROR"}, | ||
Value: false, | ||
Destination: &RunFlags.StopOnFatalError, | ||
} | ||
|
||
type notifyOps struct { | ||
queue string | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call it
stopOnError
so less verbose everywhere?