Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new option for walk config #1212

Merged
merged 4 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions chain/walk/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,30 @@ import (

var log = logging.Logger("lily/chain/walk")

func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter) *Walker {
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnError bool) *Walker {
return &Walker{
node: node,
obs: obs,
name: name,
tasks: tasks,
minHeight: minHeight,
maxHeight: maxHeight,
report: r,
node: node,
obs: obs,
name: name,
tasks: tasks,
minHeight: minHeight,
maxHeight: maxHeight,
report: r,
stopOnError: stopOnError,
}
}

// Walker is a job that indexes blocks by walking the chain history.
type Walker struct {
node lens.API
obs indexer.Indexer
name string
tasks []string
minHeight int64 // limit persisting to tipsets equal to or above this height
maxHeight int64 // limit persisting to tipsets equal to or below this height}
done chan struct{}
report *schedule.Reporter
node lens.API
obs indexer.Indexer
name string
tasks []string
minHeight int64 // limit persisting to tipsets equal to or above this height
maxHeight int64 // limit persisting to tipsets equal to or below this height}
done chan struct{}
report *schedule.Reporter
stopOnError bool
}

// Run starts walking the chain history and continues until the context is done or
Expand Down Expand Up @@ -92,6 +94,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
defer span.End()

var err error
errs := []error{}
for int64(ts.Height()) >= c.minHeight && ts.Height() != 0 {
select {
case <-ctx.Done():
Expand All @@ -102,7 +105,15 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
c.report.UpdateCurrentHeight(int64(ts.Height()))
if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil {
span.RecordError(err)
return fmt.Errorf("notify tipset: %w", err)
err := fmt.Errorf("index tipset, height: %v, error: %v", ts.Height().String(), err)
log.Errorf("%v", err)
// collect error
errs = append(errs, err)

// return an error only if the "stopOnError" flag is set to true.
if c.stopOnError {
return err
}
} else if !success {
log.Errorw("walk incomplete", "height", ts.Height(), "tipset", ts.Key().String(), "reporter", c.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above so it looks consistent.

}
Expand All @@ -115,5 +126,9 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
}
}

if len(errs) > 0 {
return fmt.Errorf("errors: %v", errs)
}

return nil
}
2 changes: 1 addition & 1 deletion chain/walk/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestWalker(t *testing.T) {

t.Logf("initializing indexer")
reporter := &schedule.Reporter{}
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter)
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter, false)

t.Logf("indexing chain")
err = idx.WalkChain(ctx, nodeAPI, head)
Expand Down
1 change: 1 addition & 0 deletions commands/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var JobRunCmd = &cli.Command{
RunRestartDelayFlag,
RunRestartFailure,
RunRestartCompletion,
StopOnError,
},
Subcommands: []*cli.Command{
WalkCmd,
Expand Down
10 changes: 10 additions & 0 deletions commands/job/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type runOpts struct {

RestartCompletion bool
RestartFailure bool
StopOnError bool
}

func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig {
Expand All @@ -35,6 +36,7 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig {
RestartOnFailure: RunFlags.RestartFailure,
RestartOnCompletion: RunFlags.RestartCompletion,
RestartDelay: RunFlags.RestartDelay,
StopOnError: RunFlags.StopOnError,
}
}

Expand Down Expand Up @@ -96,6 +98,14 @@ var RunRestartFailure = &cli.BoolFlag{
Destination: &RunFlags.RestartFailure,
}

var StopOnError = &cli.BoolFlag{
Name: "stop-on-error",
Usage: "Stop the job if it encounters an error.",
EnvVars: []string{"LILY_JOB_STOP_ON_ERROR"},
Value: false,
Destination: &RunFlags.StopOnError,
}

type notifyOps struct {
queue string
}
Expand Down
2 changes: 2 additions & 0 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type LilyJobConfig struct {
RestartOnFailure bool
// RestartOnCompletion when true will restart the job when it completes.
RestartOnCompletion bool
// RestartOnCompletion when true will restart the job when it completes.
StopOnError bool
// RestartDelay configures how long to wait before restarting the job.
RestartDelay time.Duration
// Storage is the name of the storage system the job will use, may be empty.
Expand Down
4 changes: 2 additions & 2 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter),
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter, cfg.JobConfig.StopOnError),
Reporter: reporter,
}

Expand Down Expand Up @@ -356,7 +356,7 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi
RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion,
RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay,
Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter),
Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter, cfg.WalkConfig.JobConfig.StopOnError),
Reporter: reporter,
}
res := m.Scheduler.Submit(jobConfig)
Expand Down