From f9ce49dc2dcfedacc958f1d7ae2d43821c3e2fd1 Mon Sep 17 00:00:00 2001 From: Terry Date: Wed, 24 May 2023 14:44:05 +0800 Subject: [PATCH 1/4] Add new option for walk config --- chain/walk/walker.go | 39 ++++++++++++++++++++++----------------- chain/walk/walker_test.go | 2 +- commands/job/job.go | 1 + commands/job/options.go | 10 ++++++++++ lens/lily/api.go | 2 ++ lens/lily/impl.go | 4 ++-- 6 files changed, 38 insertions(+), 20 deletions(-) diff --git a/chain/walk/walker.go b/chain/walk/walker.go index 177cef046..9beb1f24b 100644 --- a/chain/walk/walker.go +++ b/chain/walk/walker.go @@ -17,28 +17,30 @@ import ( var log = logging.Logger("lily/chain/walk") -func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter) *Walker { +func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnFatalError bool) *Walker { return &Walker{ - node: node, - obs: obs, - name: name, - tasks: tasks, - minHeight: minHeight, - maxHeight: maxHeight, - report: r, + node: node, + obs: obs, + name: name, + tasks: tasks, + minHeight: minHeight, + maxHeight: maxHeight, + report: r, + stopOnFatalError: stopOnFatalError, } } // Walker is a job that indexes blocks by walking the chain history. type Walker struct { - node lens.API - obs indexer.Indexer - name string - tasks []string - minHeight int64 // limit persisting to tipsets equal to or above this height - maxHeight int64 // limit persisting to tipsets equal to or below this height} - done chan struct{} - report *schedule.Reporter + node lens.API + obs indexer.Indexer + name string + tasks []string + minHeight int64 // limit persisting to tipsets equal to or above this height + maxHeight int64 // limit persisting to tipsets equal to or below this height} + done chan struct{} + report *schedule.Reporter + stopOnFatalError bool } // Run starts walking the chain history and continues until the context is done or @@ -102,7 +104,10 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) c.report.UpdateCurrentHeight(int64(ts.Height())) if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil { span.RecordError(err) - return fmt.Errorf("notify tipset: %w", err) + log.Errorf("notify tipset: %w", err) + if c.stopOnFatalError { + return err + } } else if !success { log.Errorw("walk incomplete", "height", ts.Height(), "tipset", ts.Key().String(), "reporter", c.name) } diff --git a/chain/walk/walker_test.go b/chain/walk/walker_test.go index fdac4de4f..4a41d4f47 100644 --- a/chain/walk/walker_test.go +++ b/chain/walk/walker_test.go @@ -56,7 +56,7 @@ func TestWalker(t *testing.T) { t.Logf("initializing indexer") reporter := &schedule.Reporter{} - idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter) + idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter, false) t.Logf("indexing chain") err = idx.WalkChain(ctx, nodeAPI, head) diff --git a/commands/job/job.go b/commands/job/job.go index 4fc38b4ae..5fc39cd19 100644 --- a/commands/job/job.go +++ b/commands/job/job.go @@ -35,6 +35,7 @@ var JobRunCmd = &cli.Command{ RunRestartDelayFlag, RunRestartFailure, RunRestartCompletion, + StopOnFatalError, }, Subcommands: []*cli.Command{ WalkCmd, diff --git a/commands/job/options.go b/commands/job/options.go index 2388f882c..8bfe8b726 100644 --- a/commands/job/options.go +++ b/commands/job/options.go @@ -21,6 +21,7 @@ type runOpts struct { RestartCompletion bool RestartFailure bool + StopOnFatalError bool } func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { @@ -35,6 +36,7 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { RestartOnFailure: RunFlags.RestartFailure, RestartOnCompletion: RunFlags.RestartCompletion, RestartDelay: RunFlags.RestartDelay, + StopOnFatalError: RunFlags.StopOnFatalError, } } @@ -96,6 +98,14 @@ var RunRestartFailure = &cli.BoolFlag{ Destination: &RunFlags.RestartFailure, } +var StopOnFatalError = &cli.BoolFlag{ + Name: "stop-on-fatal-error", + Usage: "Stop the job if it get error.", + EnvVars: []string{"LILY_JOB_STOP_ON_FATAL_ERROR"}, + Value: false, + Destination: &RunFlags.StopOnFatalError, +} + type notifyOps struct { queue string } diff --git a/lens/lily/api.go b/lens/lily/api.go index 21de4cd78..d35b3e00f 100644 --- a/lens/lily/api.go +++ b/lens/lily/api.go @@ -94,6 +94,8 @@ type LilyJobConfig struct { RestartOnFailure bool // RestartOnCompletion when true will restart the job when it completes. RestartOnCompletion bool + // RestartOnCompletion when true will restart the job when it completes. + StopOnFatalError bool // RestartDelay configures how long to wait before restarting the job. RestartDelay time.Duration // Storage is the name of the storage system the job will use, may be empty. diff --git a/lens/lily/impl.go b/lens/lily/impl.go index 98f3da6ae..28efc417c 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -328,7 +328,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul RestartOnFailure: cfg.JobConfig.RestartOnFailure, RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, RestartDelay: cfg.JobConfig.RestartDelay, - Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter), + Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter, cfg.JobConfig.StopOnFatalError), Reporter: reporter, } @@ -356,7 +356,7 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure, RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion, RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay, - Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter), + Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter, cfg.WalkConfig.JobConfig.StopOnFatalError), Reporter: reporter, } res := m.Scheduler.Submit(jobConfig) From 15854bb946dabf3d9165b8d7a29aa75b30e4d51d Mon Sep 17 00:00:00 2001 From: Terry Date: Wed, 24 May 2023 16:41:49 +0800 Subject: [PATCH 2/4] collect the errors --- chain/walk/walker.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/chain/walk/walker.go b/chain/walk/walker.go index 9beb1f24b..8b1685737 100644 --- a/chain/walk/walker.go +++ b/chain/walk/walker.go @@ -94,6 +94,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) defer span.End() var err error + errs := []error{} for int64(ts.Height()) >= c.minHeight && ts.Height() != 0 { select { case <-ctx.Done(): @@ -105,6 +106,10 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil { span.RecordError(err) log.Errorf("notify tipset: %w", err) + // collect error + errs = append(errs, err) + + // return an error only if the "stopOnFatalError" flag is set to true. if c.stopOnFatalError { return err } @@ -120,5 +125,9 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) } } + if len(errs) > 0 { + return fmt.Errorf("errors: %v", errs) + } + return nil } From 0120e3e61c1ffb21c447820545dc17a596b68714 Mon Sep 17 00:00:00 2001 From: Terry Date: Thu, 25 May 2023 12:29:17 +0800 Subject: [PATCH 3/4] refine the argument name and log message --- chain/walk/walker.go | 43 +++++++++++++++++++++-------------------- commands/job/job.go | 2 +- commands/job/options.go | 12 ++++++------ lens/lily/api.go | 2 +- lens/lily/impl.go | 4 ++-- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/chain/walk/walker.go b/chain/walk/walker.go index 8b1685737..9751c08b3 100644 --- a/chain/walk/walker.go +++ b/chain/walk/walker.go @@ -17,30 +17,30 @@ import ( var log = logging.Logger("lily/chain/walk") -func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnFatalError bool) *Walker { +func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnError bool) *Walker { return &Walker{ - node: node, - obs: obs, - name: name, - tasks: tasks, - minHeight: minHeight, - maxHeight: maxHeight, - report: r, - stopOnFatalError: stopOnFatalError, + node: node, + obs: obs, + name: name, + tasks: tasks, + minHeight: minHeight, + maxHeight: maxHeight, + report: r, + stopOnError: stopOnError, } } // Walker is a job that indexes blocks by walking the chain history. type Walker struct { - node lens.API - obs indexer.Indexer - name string - tasks []string - minHeight int64 // limit persisting to tipsets equal to or above this height - maxHeight int64 // limit persisting to tipsets equal to or below this height} - done chan struct{} - report *schedule.Reporter - stopOnFatalError bool + node lens.API + obs indexer.Indexer + name string + tasks []string + minHeight int64 // limit persisting to tipsets equal to or above this height + maxHeight int64 // limit persisting to tipsets equal to or below this height} + done chan struct{} + report *schedule.Reporter + stopOnError bool } // Run starts walking the chain history and continues until the context is done or @@ -105,12 +105,13 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) c.report.UpdateCurrentHeight(int64(ts.Height())) if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil { span.RecordError(err) - log.Errorf("notify tipset: %w", err) + err := fmt.Errorf("index tipset, height: %v, error: %v", ts.Height().String(), err) + log.Errorf("%v", err) // collect error errs = append(errs, err) - // return an error only if the "stopOnFatalError" flag is set to true. - if c.stopOnFatalError { + // return an error only if the "stopOnError" flag is set to true. + if c.stopOnError { return err } } else if !success { diff --git a/commands/job/job.go b/commands/job/job.go index 5fc39cd19..3bdf44e98 100644 --- a/commands/job/job.go +++ b/commands/job/job.go @@ -35,7 +35,7 @@ var JobRunCmd = &cli.Command{ RunRestartDelayFlag, RunRestartFailure, RunRestartCompletion, - StopOnFatalError, + StopOnError, }, Subcommands: []*cli.Command{ WalkCmd, diff --git a/commands/job/options.go b/commands/job/options.go index 8bfe8b726..6e9aab341 100644 --- a/commands/job/options.go +++ b/commands/job/options.go @@ -21,7 +21,7 @@ type runOpts struct { RestartCompletion bool RestartFailure bool - StopOnFatalError bool + StopOnError bool } func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { @@ -36,7 +36,7 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { RestartOnFailure: RunFlags.RestartFailure, RestartOnCompletion: RunFlags.RestartCompletion, RestartDelay: RunFlags.RestartDelay, - StopOnFatalError: RunFlags.StopOnFatalError, + StopOnError: RunFlags.StopOnError, } } @@ -98,12 +98,12 @@ var RunRestartFailure = &cli.BoolFlag{ Destination: &RunFlags.RestartFailure, } -var StopOnFatalError = &cli.BoolFlag{ - Name: "stop-on-fatal-error", +var StopOnError = &cli.BoolFlag{ + Name: "stop-on-error", Usage: "Stop the job if it get error.", - EnvVars: []string{"LILY_JOB_STOP_ON_FATAL_ERROR"}, + EnvVars: []string{"LILY_JOB_STOP_ON_ERROR"}, Value: false, - Destination: &RunFlags.StopOnFatalError, + Destination: &RunFlags.StopOnError, } type notifyOps struct { diff --git a/lens/lily/api.go b/lens/lily/api.go index d35b3e00f..4ba47651b 100644 --- a/lens/lily/api.go +++ b/lens/lily/api.go @@ -95,7 +95,7 @@ type LilyJobConfig struct { // RestartOnCompletion when true will restart the job when it completes. RestartOnCompletion bool // RestartOnCompletion when true will restart the job when it completes. - StopOnFatalError bool + StopOnError bool // RestartDelay configures how long to wait before restarting the job. RestartDelay time.Duration // Storage is the name of the storage system the job will use, may be empty. diff --git a/lens/lily/impl.go b/lens/lily/impl.go index 28efc417c..372ecf6bd 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -328,7 +328,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul RestartOnFailure: cfg.JobConfig.RestartOnFailure, RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, RestartDelay: cfg.JobConfig.RestartDelay, - Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter, cfg.JobConfig.StopOnFatalError), + Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter, cfg.JobConfig.StopOnError), Reporter: reporter, } @@ -356,7 +356,7 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure, RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion, RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay, - Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter, cfg.WalkConfig.JobConfig.StopOnFatalError), + Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter, cfg.WalkConfig.JobConfig.StopOnError), Reporter: reporter, } res := m.Scheduler.Submit(jobConfig) From f6049c28f7acf65ac48043b72bc2c3b9f10d5c47 Mon Sep 17 00:00:00 2001 From: Terry Date: Fri, 26 May 2023 10:14:36 +0800 Subject: [PATCH 4/4] Refine the message --- commands/job/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/job/options.go b/commands/job/options.go index 6e9aab341..4e23793f6 100644 --- a/commands/job/options.go +++ b/commands/job/options.go @@ -100,7 +100,7 @@ var RunRestartFailure = &cli.BoolFlag{ var StopOnError = &cli.BoolFlag{ Name: "stop-on-error", - Usage: "Stop the job if it get error.", + Usage: "Stop the job if it encounters an error.", EnvVars: []string{"LILY_JOB_STOP_ON_ERROR"}, Value: false, Destination: &RunFlags.StopOnError,