diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 752731acd222..80b08dd065d7 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "os" "strings" "time" @@ -98,6 +99,9 @@ type Registry struct { planFn planHookMaker metrics Metrics + // if non-empty, indicates path to file that prevents any job adoptions. + preventAdoptionFile string + mu struct { syncutil.Mutex // epoch is present to support older nodes that are not using @@ -132,6 +136,10 @@ type Registry struct { // back into the sql package. There's maybe a better way that I'm unaware of. type planHookMaker func(opName, user string) (interface{}, func()) +// PreventAdoptionFile is the name of the file which, if present in the first +// on-disk store, will prevent the adoption of background jobs by that node. +const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS" + // MakeRegistry creates a new Registry. planFn is a wrapper around // sql.newInternalPlanner. It returns a sql.PlanHookState, but must be // coerced into that in the Resumer functions. @@ -145,16 +153,18 @@ func MakeRegistry( settings *cluster.Settings, histogramWindowInterval time.Duration, planFn planHookMaker, + preventAdoptionFile string, ) *Registry { r := &Registry{ - ac: ac, - stopper: stopper, - clock: clock, - db: db, - ex: ex, - nodeID: nodeID, - settings: settings, - planFn: planFn, + ac: ac, + stopper: stopper, + clock: clock, + db: db, + ex: ex, + nodeID: nodeID, + settings: settings, + planFn: planFn, + preventAdoptionFile: preventAdoptionFile, } r.mu.epoch = 1 r.mu.jobs = make(map[int64]context.CancelFunc) @@ -376,6 +386,10 @@ func (r *Registry) Start( case <-stopper.ShouldStop(): return case <-time.After(adoptInterval): + if r.adoptionDisabled(ctx) { + r.cancelAll(ctx) + continue + } if err := r.maybeAdoptJob(ctx, nl); err != nil { log.Errorf(ctx, "error while adopting jobs: %s", err) } @@ -784,6 +798,20 @@ func (r *Registry) resume( return errCh, nil } +func (r *Registry) adoptionDisabled(ctx context.Context) bool { + if r.preventAdoptionFile != "" { + if _, err := os.Stat(r.preventAdoptionFile); err != nil { + if !os.IsNotExist(err) { + log.Warning(ctx, "error checking if job adoption is currently disabled", err) + } + return false + } + log.Warningf(ctx, "job adoption is currently disabled by existence of %s", r.preventAdoptionFile) + return true + } + return false +} + func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error { const stmt = ` SELECT id, payload, progress IS NULL, status diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 3569df770dfe..4acda7c0650d 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -88,7 +88,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) { nodeID.Reset(id) r := jobs.MakeRegistry( ac, s.Stopper(), clock, db, s.InternalExecutor().(sqlutil.InternalExecutor), - nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, + nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "", ) if err := r.Start(ctx, s.Stopper(), nodeLiveness, cancelInterval, adoptInterval); err != nil { t.Fatal(err) diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 05de9554eb34..3f7ccdf03fed 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -53,7 +53,7 @@ func TestRegistryCancelation(t *testing.T) { clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond) registry := MakeRegistry( log.AmbientContext{}, stopper, clock, db, nil /* ex */, FakeNodeID, cluster.NoSettings, - histogramWindowInterval, FakePHS) + histogramWindowInterval, FakePHS, "") const nodeCount = 1 nodeLiveness := NewFakeNodeLiveness(nodeCount) diff --git a/pkg/server/server.go b/pkg/server/server.go index ab125db20cc1..c611f8e5eccb 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -558,6 +558,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.ClusterSettings(), s.nodeLiveness, internalExecutor) s.sessionRegistry = sql.NewSessionRegistry() + var jobAdoptionStopFile string + for _, spec := range s.cfg.Stores.Specs { + if !spec.InMemory && spec.Path != "" { + jobAdoptionStopFile = filepath.Join(spec.Path, jobs.PreventAdoptionFile) + break + } + } s.jobRegistry = jobs.MakeRegistry( s.cfg.AmbientCtx, s.stopper, @@ -572,6 +579,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // in sql/jobs/registry.go on planHookMaker. return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, &execCfg) }, + jobAdoptionStopFile, ) s.registry.AddMetricStruct(s.jobRegistry.MetricsStruct())