From be2bbce215b1f37e8ead8cc7966d624d0d00d8de Mon Sep 17 00:00:00 2001 From: Sajjad Rizvi Date: Mon, 31 May 2021 16:31:55 -0400 Subject: [PATCH] sql/gcjob: retry failed GC jobs In the previous implementation, failed GC jobs were not being retried regardless whether the failure is permanent or transient. As a result, a GC job's failure risked orphaned data, which cannot be reclaimed. This patch adds a mechanism to retry failed GC jobs that are not permanent. No limit is set on the number of retries. For the time being, the failure type is determined based on the failure categorization of schema-change jobs. This behavior is expected to change once exponential backoff mechanism is implemented for failed jobs (#44594). This is a backport of #65910. Release note: None Fixes: #65000 --- pkg/sql/gcjob/gc_job.go | 16 ++++++++++- pkg/sql/gcjob_test/BUILD.bazel | 40 ++++++++++++++++++++++++++ pkg/sql/gcjob_test/gc_job_test.go | 47 +++++++++++++++++++++++++++++++ pkg/sql/schema_changer.go | 8 +++--- pkg/sql/type_change.go | 4 +-- 5 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 pkg/sql/gcjob_test/BUILD.bazel diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 40f9c3d53196..16272ba8904e 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -83,7 +83,12 @@ func performGC( // Resume is part of the jobs.Resumer interface. func (r schemaChangeGCResumer) Resume( ctx context.Context, phs interface{}, _ chan<- tree.Datums, -) error { +) (err error) { + defer func() { + if err != nil && !r.isPermanentGCError(err) { + err = errors.Mark(err, jobs.NewRetryJobError("gc")) + } + }() p := phs.(sql.PlanHookState) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() @@ -195,6 +200,15 @@ func (r schemaChangeGCResumer) OnFailOrCancel(context.Context, interface{}) erro return nil } +// isPermanentGCError returns true if the error is a permanent job failure, +// which indicates that the failed GC job cannot be retried. +func (r *schemaChangeGCResumer) isPermanentGCError(err error) bool { + // Currently we classify errors based on Schema Change function to backport + // to 20.2 and 21.1. This functionality should be changed once #44594 is + // implemented. + return sql.IsPermanentSchemaChangeError(err) +} + func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &schemaChangeGCResumer{ diff --git a/pkg/sql/gcjob_test/BUILD.bazel b/pkg/sql/gcjob_test/BUILD.bazel new file mode 100644 index 000000000000..6f5fc6746042 --- /dev/null +++ b/pkg/sql/gcjob_test/BUILD.bazel @@ -0,0 +1,40 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "gcjob_test_test", + size = "medium", + srcs = [ + "gc_job_test.go", + "main_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvserver", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/catalog/catalogkeys", + "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/gcjob", + "//pkg/testutils", + "//pkg/testutils/jobutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index cbd5f4931944..7fc24ff7e5b9 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "testing" "time" @@ -22,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" @@ -32,7 +35,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -311,3 +316,45 @@ SELECT job_id, status, running_status return nil }) } + +func TestGCJobRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + var failed atomic.Value + failed.Store(false) + params := base.TestServerArgs{} + params.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { + _, ok := request.GetArg(roachpb.ClearRange) + if !ok { + return nil + } + if failed.Load().(bool) { + return nil + } + failed.Store(true) + return roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ + Timestamp: hlc.Timestamp{}, + Threshold: hlc.Timestamp{}, + }) + }, + } + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(db) + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;") + tdb.Exec(t, "DROP TABLE foo CASCADE;") + var jobID int64 + tdb.QueryRow(t, ` +SELECT job_id + FROM [SHOW JOBS] + WHERE job_type = 'SCHEMA CHANGE GC' AND description LIKE '%foo%';`, + ).Scan(&jobID) + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID, + ).Scan(&status) + require.Equal(t, jobs.StatusSucceeded, status) +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 04332e72752a..111488b44d6f 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -130,11 +130,11 @@ func NewSchemaChangerForTesting( } } -// isPermanentSchemaChangeError returns true if the error results in +// IsPermanentSchemaChangeError returns true if the error results in // a permanent failure of a schema change. This function is a allowlist // instead of a blocklist: only known safe errors are confirmed to not be // permanent errors. Anything unknown is assumed to be permanent. -func isPermanentSchemaChangeError(err error) bool { +func IsPermanentSchemaChangeError(err error) bool { if err == nil { return false } @@ -2124,7 +2124,7 @@ func (r schemaChangeResumer) Resume( descID, mutationID, ) return nil - case !isPermanentSchemaChangeError(scErr): + case !IsPermanentSchemaChangeError(scErr): // Check if the error is on a allowlist of errors we should retry on, // including the schema change not having the first mutation in line. log.Warningf(ctx, "error while running schema change, retrying: %v", scErr) @@ -2284,7 +2284,7 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{} // We check for this case so that we can just return the error without // wrapping it in a retry error. return rollbackErr - case !isPermanentSchemaChangeError(rollbackErr): + case !IsPermanentSchemaChangeError(rollbackErr): // Check if the error is on a allowlist of errors we should retry on, and // have the job registry retry. return jobs.NewRetryJobError(rollbackErr.Error()) diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index 12c3107a1521..f9856145d496 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -281,7 +281,7 @@ func (t *typeSchemaChanger) execWithRetry(ctx context.Context) error { t.typeID, ) return nil - case !isPermanentSchemaChangeError(tcErr): + case !IsPermanentSchemaChangeError(tcErr): // If this isn't a permanent error, then retry. log.Infof(ctx, "retrying type schema change due to retriable error %v", tcErr) default: @@ -351,7 +351,7 @@ func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{}) "descriptor %d not found for type change job; assuming it was dropped, and exiting", tc.typeID, ) - case !isPermanentSchemaChangeError(rollbackErr): + case !IsPermanentSchemaChangeError(rollbackErr): return jobs.NewRetryJobError(rollbackErr.Error()) default: return rollbackErr