diff --git a/ddl/db_test.go b/ddl/db_test.go index 5c10247ba7f76..576dd61fb0524 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1280,6 +1280,7 @@ func (s *testDBSuite8) TestColumn(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) s.tk.MustExec("create table t2 (c1 int, c2 int, c3 int)") + s.tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") s.testAddColumn(c) s.testDropColumn(c) s.tk.MustExec("drop table t2") diff --git a/executor/set_test.go b/executor/set_test.go index 0a0796c1eb61f..3c09f977b0721 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -346,6 +346,17 @@ func (s *testSuite2) TestSetVar(c *C) { tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1")) tk.MustExec("set tidb_wait_table_split_finish = 0") tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0")) + + tk.MustExec("set session tidb_back_off_weight = 3") + tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3")) + tk.MustExec("set session tidb_back_off_weight = 20") + tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("20")) + _, err = tk.Exec("set session tidb_back_off_weight = -1") + c.Assert(err, NotNil) + _, err = tk.Exec("set global tidb_back_off_weight = 0") + c.Assert(err, NotNil) + tk.MustExec("set global tidb_back_off_weight = 10") + tk.MustQuery("select @@global.tidb_back_off_weight;").Check(testkit.Rows("10")) } func (s *testSuite2) TestSetCharset(c *C) { diff --git a/executor/write_test.go b/executor/write_test.go index 798fec9133615..ed2aa01b48bd0 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -2508,6 +2508,7 @@ func (s *testSuite4) TestAutoIDInRetry(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t (id int not null auto_increment primary key)") + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec("begin") tk.MustExec("insert into t values ()") tk.MustExec("insert into t values (),()") diff --git a/kv/variables.go b/kv/variables.go index 7d83aadf7f2f4..373421826dacb 100644 --- a/kv/variables.go +++ b/kv/variables.go @@ -18,6 +18,9 @@ type Variables struct { // BackoffLockFast specifies the LockFast backoff base duration in milliseconds. BackoffLockFast int + // BackOffWeight specifies the weight of the max back off time duration. + BackOffWeight int + // Hook is used for test to verify the variable take effect. Hook func(name string, vars *Variables) } @@ -26,6 +29,7 @@ type Variables struct { func NewVariables() *Variables { return &Variables{ BackoffLockFast: DefBackoffLockFast, + BackOffWeight: DefBackOffWeight, } } @@ -35,4 +39,5 @@ var DefaultVars = NewVariables() // Default values const ( DefBackoffLockFast = 100 + DefBackOffWeight = 2 ) diff --git a/server/server_test.go b/server/server_test.go index 8271c2391b584..f23d2b0c793ed 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -682,6 +682,7 @@ func runTestConcurrentUpdate(c *C) { dbt.mustExec("drop table if exists test2") dbt.mustExec("create table test2 (a int, b int)") dbt.mustExec("insert test2 values (1, 1)") + dbt.mustExec("set @@tidb_disable_txn_auto_retry = 0") txn1, err := dbt.db.Begin() c.Assert(err, IsNil) diff --git a/session/session.go b/session/session.go index 5ea7497547262..97f073c239130 100644 --- a/session/session.go +++ b/session/session.go @@ -356,8 +356,8 @@ func (s *session) doCommit(ctx context.Context) error { // mockCommitError and mockGetTSErrorInRetry use to test PR #8743. failpoint.Inject("mockCommitError", func(val failpoint.Value) { - if val.(bool) && mockCommitErrorOnce { - mockCommitErrorOnce = false + if val.(bool) && kv.IsMockCommitErrorEnable() { + kv.MockCommitErrorDisable() failpoint.Return(kv.ErrRetryable) } }) @@ -1632,6 +1632,7 @@ var builtinGlobalVariable = []string{ variable.TiDBHashAggPartialConcurrency, variable.TiDBHashAggFinalConcurrency, variable.TiDBBackoffLockFast, + variable.TiDBBackOffWeight, variable.TiDBConstraintCheckInPlace, variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, diff --git a/session/session_fail_test.go b/session/session_fail_test.go index 3c9c91259948b..9761d809aa872 100644 --- a/session/session_fail_test.go +++ b/session/session_fail_test.go @@ -95,14 +95,14 @@ func (s *testSessionSuite) TestGetTSFailDirtyState(c *C) { func (s *testSessionSuite) TestGetTSFailDirtyStateInretry(c *C) { defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockCommitError"), IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry"), IsNil) }() tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t (id int)") c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockCommitError", `return(true)`), IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry", `return(true)`), IsNil) tk.MustExec("insert into t values (2)") tk.MustQuery(`select * from t`).Check(testkit.Rows("2")) } diff --git a/session/session_test.go b/session/session_test.go index b2ebc0b15f78e..155e10fdc4106 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -293,6 +293,7 @@ func (s *testSessionSuite) TestRowLock(c *C) { tk.MustExec("insert t values (12, 2, 3)") tk.MustExec("insert t values (13, 2, 3)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk1.MustExec("update t set c2=21 where c1=11") @@ -507,6 +508,7 @@ func (s *testSessionSuite) TestRetryResetStmtCtx(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table retrytxn (a int unique, b int)") tk.MustExec("insert retrytxn values (1, 1)") + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec("begin") tk.MustExec("update retrytxn set b = b + 1 where a = 1") @@ -665,6 +667,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk1.MustExec("update t set c2=? where c1=11;", 21) @@ -881,6 +884,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec("create table t (c2 int, c1 int not null auto_increment, PRIMARY KEY (c1))") tk.MustExec("insert into t (c2) values (1), (2), (3), (4), (5)") @@ -1308,6 +1312,9 @@ func (s *testSessionSuite) TestRetry(c *C) { tk2 := testkit.NewTestKitWithInit(c, s.store) tk3 := testkit.NewTestKitWithInit(c, s.store) tk3.MustExec("SET SESSION autocommit=0;") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") + tk2.MustExec("set @@tidb_disable_txn_auto_retry = 0") + tk3.MustExec("set @@tidb_disable_txn_auto_retry = 0") var wg sync.WaitGroup wg.Add(3) @@ -1449,6 +1456,7 @@ func (s *testSessionSuite) TestResetCtx(c *C) { tk.MustExec("create table t (i int auto_increment not null key);") tk.MustExec("insert into t values (1);") + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec("begin;") tk.MustExec("insert into t values (10);") tk.MustExec("update t set i = i + row_count();") @@ -1480,6 +1488,8 @@ func (s *testSessionSuite) TestUnique(c *C) { tk1 := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec(`CREATE TABLE test ( id int(11) UNSIGNED NOT NULL AUTO_INCREMENT, val int UNIQUE, PRIMARY KEY (id)); `) tk.MustExec("begin;") tk.MustExec("insert into test(id, val) values(1, 1);") @@ -1764,6 +1774,7 @@ func (s *testSchemaSuite) TestSchemaCheckerSQL(c *C) { tk.MustExec(`insert into t values(1, 1);`) // The schema version is out of date in the first transaction, but the SQL can be retried. + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk.MustExec(`begin;`) tk1.MustExec(`alter table t add index idx(c);`) tk.MustExec(`insert into t values(2, 2);`) @@ -1808,6 +1819,7 @@ func (s *testSchemaSuite) TestPrepareStmtCommitWhenSchemaChanged(c *C) { tk1.MustExec("execute stmt using @a, @a") tk1.MustExec("commit") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk.MustExec("alter table t drop column b") tk1.MustExec("execute stmt using @a, @a") @@ -1820,6 +1832,7 @@ func (s *testSchemaSuite) TestCommitWhenSchemaChanged(c *C) { tk1 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t (a int, b int)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk1.MustExec("insert into t values (1, 1)") @@ -1837,6 +1850,7 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) { tk.MustExec("create table t (a int primary key, b int)") tk.MustExec("insert into t values (1, 1)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk1.MustExec("update t set b = 5 where a = 1") @@ -1867,6 +1881,7 @@ func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) { tk.MustExec("create table t (a int primary key, b int unique, c int)") tk.MustExec("insert into t values (1, 1, 1)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk1.MustExec("begin") tk1.MustExec("update t set b = 1, c = 2 where b = 2") tk1.MustExec("update t set b = 1, c = 2 where a = 1") @@ -2320,9 +2335,12 @@ func (s *testSessionSuite) TestKVVars(c *C) { tk.MustExec("insert kvvars values (1, 1)") tk2 := testkit.NewTestKitWithInit(c, s.store) tk2.MustExec("set @@tidb_backoff_lock_fast = 1") + tk2.MustExec("set @@tidb_back_off_weight = 100") backoffVal := new(int64) + backOffWeightVal := new(int32) tk2.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) { atomic.StoreInt64(backoffVal, int64(vars.BackoffLockFast)) + atomic.StoreInt32(backOffWeightVal, int32(vars.BackOffWeight)) } wg := new(sync.WaitGroup) wg.Add(2) @@ -2345,7 +2363,14 @@ func (s *testSessionSuite) TestKVVars(c *C) { wg.Done() }() wg.Wait() + for { + tk2.MustQuery("select * from kvvars") + if atomic.LoadInt32(backOffWeightVal) != 0 { + break + } + } c.Assert(atomic.LoadInt64(backoffVal), Equals, int64(1)) + c.Assert(atomic.LoadInt32(backOffWeightVal), Equals, int32(100)) } func (s *testSessionSuite) TestCommitRetryCount(c *C) { diff --git a/session/txn.go b/session/txn.go index df802741310ae..b646e5df00d11 100644 --- a/session/txn.go +++ b/session/txn.go @@ -346,25 +346,11 @@ type txnFuture struct { mockFail bool } -// mockGetTSErrorInRetryOnce use to make sure gofail mockGetTSErrorInRetry only mock get TS error once. -var mockGetTSErrorInRetryOnce = true - func (tf *txnFuture) wait() (kv.Transaction, error) { if tf.mockFail { return nil, errors.New("mock get timestamp fail") } - // mockGetTSErrorInRetry should wait mockCommitErrorOnce first, then will run into retry() logic. - // Then mockGetTSErrorInRetry will return retryable error when first retry. - // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout[try again later] - // This may cause duplicate data to be written. - failpoint.Inject("mockGetTSErrorInRetry", func(val failpoint.Value) { - if val.(bool) && mockGetTSErrorInRetryOnce && !mockCommitErrorOnce { - mockGetTSErrorInRetryOnce = false - failpoint.Return(nil, errors.Errorf("PD server timeout[try again later]")) - } - }) - startTS, err := tf.future.Wait() if err == nil { return tf.store.BeginWithStartTS(startTS) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 81a58c6243489..2cd81c4a86a1e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -699,6 +699,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.IndexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency) case TiDBBackoffLockFast: s.KVVars.BackoffLockFast = tidbOptPositiveInt32(val, kv.DefBackoffLockFast) + case TiDBBackOffWeight: + s.KVVars.BackOffWeight = tidbOptPositiveInt32(val, kv.DefBackOffWeight) case TiDBConstraintCheckInPlace: s.ConstraintCheckInPlace = TiDBOptOn(val) case TiDBBatchInsert: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 2a88b23393b71..fd32280801da8 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -670,6 +670,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBHashAggPartialConcurrency, strconv.Itoa(DefTiDBHashAggPartialConcurrency)}, {ScopeGlobal | ScopeSession, TiDBHashAggFinalConcurrency, strconv.Itoa(DefTiDBHashAggFinalConcurrency)}, {ScopeGlobal | ScopeSession, TiDBBackoffLockFast, strconv.Itoa(kv.DefBackoffLockFast)}, + {ScopeGlobal | ScopeSession, TiDBBackOffWeight, strconv.Itoa(kv.DefBackOffWeight)}, {ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)}, {ScopeGlobal | ScopeSession, TiDBDisableTxnAutoRetry, BoolToIntStr(DefTiDBDisableTxnAutoRetry)}, {ScopeGlobal | ScopeSession, TiDBConstraintCheckInPlace, BoolToIntStr(DefTiDBConstraintCheckInPlace)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f16aec3d5cc4e..6f249d5f9dc8b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -219,6 +219,12 @@ const ( // tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds. TiDBBackoffLockFast = "tidb_backoff_lock_fast" + // tidb_back_off_weight is used to control the max back off time in TiDB. + // The default maximum back off time is a small value. + // BackOffWeight could multiply it to let the user adjust the maximum time for retrying. + // Only positive integers can be accepted, which means that the maximum back off time can only grow. + TiDBBackOffWeight = "tidb_back_off_weight" + // tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" @@ -300,7 +306,7 @@ const ( DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB. DefTiDBGeneralLog = 0 DefTiDBRetryLimit = 10 - DefTiDBDisableTxnAutoRetry = false + DefTiDBDisableTxnAutoRetry = true DefTiDBConstraintCheckInPlace = false DefTiDBHashJoinConcurrency = 5 DefTiDBProjectionConcurrency = 4 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index a9e49e323daae..33ec0b7ab5356 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -388,7 +388,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBHashAggFinalConcurrency, TiDBDistSQLScanConcurrency, TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount, - TiDBBackoffLockFast, + TiDBBackoffLockFast, TiDBBackOffWeight, TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel: v, err := strconv.Atoi(value) if err != nil { diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 1e1d66ff0152a..78dd9a46f298f 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -20,7 +20,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/util" ) func TestT(t *testing.T) { @@ -513,7 +512,7 @@ func (s *testMockTiKVSuite) TestDeleteRange(c *C) { func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) { c.Assert(errs[i], NotNil) - c.Assert(strings.Contains(errs[i].Error(), util.WriteConflictMarker), IsTrue) + c.Assert(strings.Contains(errs[i].Error(), writeConflictMarker), IsTrue) } func (s *testMockTiKVSuite) TestRC(c *C) { diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 33e5126a995c7..9dc992f1e0134 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -23,7 +23,6 @@ import ( "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" ) @@ -35,6 +34,8 @@ const ( typeRollback ) +const writeConflictMarker = "write conflict" + type mvccValue struct { valueType mvccValueType startTS uint64 @@ -256,7 +257,7 @@ func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, err func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error { if len(e.values) > 0 { if e.values[0].commitTS >= startTS { - return ErrRetryable(util.WriteConflictMarker) + return ErrRetryable(writeConflictMarker) } } if e.lock != nil { diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index e4cfe78a349ed..8d33ffe3af3aa 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/goleveldb/leveldb/util" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" - tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -572,7 +571,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu } // Note that it's a write conflict here, even if the value is a rollback one. if ok && dec1.value.commitTS >= startTS { - return ErrRetryable(tidbutil.WriteConflictMarker) + return ErrRetryable(writeConflictMarker) } op := mutation.GetOp() diff --git a/store/store_test.go b/store/store_test.go index f4d2e1ee6d1c0..38b4a9d358086 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -24,7 +24,6 @@ import ( "time" . "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/logutil" @@ -40,7 +39,7 @@ const ( type brokenStore struct{} func (s *brokenStore) Open(schema string) (kv.Storage, error) { - return nil, errors.New("try again later") + return nil, kv.ErrRetryable } func TestT(t *testing.T) { @@ -662,5 +661,5 @@ func (s *testKVSuite) TestRetryOpenStore(c *C) { } c.Assert(err, NotNil) elapse := time.Since(begin) - c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second)) + c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second), Commentf("elapse: %s", elapse)) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c078bac7f1c8e..f4029af0dccff 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -587,7 +587,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er if keyErr := commitResp.GetError(); keyErr != nil { c.mu.RLock() defer c.mu.RUnlock() - err = errors.Errorf("conn%d 2PC commit failed: %v", c.connID, keyErr.String()) + err = extractKeyErr(keyErr) if c.mu.committed { // No secondary key could be rolled back after it's primary key is committed. // There must be a serious bug somewhere. @@ -600,7 +600,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er logutil.Logger(context.Background()).Debug("2PC failed commit primary key", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Annotate(err, txnRetryableMark) + return err } c.mu.Lock() @@ -672,6 +672,9 @@ func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) err return errors.Trace(err) } +// mockGetTSErrorInRetryOnce use to make sure gofail mockGetTSErrorInRetry only mock get TS error once. +var mockGetTSErrorInRetryOnce = true + // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) error { defer func() { @@ -718,6 +721,17 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return errors.Trace(err) } + // mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic. + // Then mockGetTSErrorInRetry will return retryable error when first retry. + // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout[try again later] + // This may cause duplicate data to be written. + failpoint.Inject("mockGetTSErrorInRetry", func(val failpoint.Value) { + if val.(bool) && !kv.IsMockCommitErrorEnable() && mockGetTSErrorInRetryOnce { + mockGetTSErrorInRetryOnce = false + failpoint.Return(errors.Errorf("PD server timeout[try again later]")) + } + }) + start = time.Now() commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars)) if err != nil { @@ -749,7 +763,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) { err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d", c.connID, c.startTS, c.commitTS) - return errors.Annotate(err, txnRetryableMark) + return err } start = time.Now() diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 9d4a0d9f4fc8d..24a0e788ed8a5 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tidb/util" ) type testCommitterSuite struct { @@ -224,7 +223,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) { c.Assert(err, IsNil) err = txn2.Commit(context.Background()) c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue) + c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue, Commentf("err: %s", err)) } func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 { @@ -338,7 +337,7 @@ func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) { c.Assert(err, IsNil) err = commiter.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), commiter.keys) c.Assert(err, NotNil) - errMsgMustContain(c, err, util.WriteConflictMarker) + errMsgMustContain(c, err, "write conflict") } func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 693ae62c247bb..eaa9f68ad45c5 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -181,7 +181,7 @@ func (t backoffType) TError() error { case BoTxnLock, boTxnLockFast: return ErrResolveLockTimeout case BoPDRPC: - return ErrPDServerTimeout.GenWithStackByArgs(txnRetryableMark) + return ErrPDServerTimeout case BoRegionMiss, BoUpdateLeader: return ErrRegionUnavailable case boServerBusy: @@ -241,6 +241,11 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer { if vars != nil { b.vars = vars } + // maxSleep is the max sleep time in millisecond. + // When it is multiplied by BackOffWeight, it should not be greater than MaxInt32. + if math.MaxInt32/b.vars.BackOffWeight >= b.maxSleep { + b.maxSleep *= b.vars.BackOffWeight + } return b } diff --git a/store/tikv/error.go b/store/tikv/error.go index e4cbd79605d6f..a368acac56c0f 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -37,11 +37,11 @@ const txnRetryableMark = "[try again later]" // MySQL error instances. var ( - ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]+" "+txnRetryableMark) - ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]+" "+txnRetryableMark) - ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]+" %v") - ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]+" "+txnRetryableMark) - ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]+" "+txnRetryableMark) + ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]) + ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]) + ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]) + ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]) + ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index cf6a3e920d7f5..6b8dff0722441 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -81,9 +81,6 @@ func (d Driver) Open(path string) (kv.Storage, error) { }) if err != nil { - if strings.Contains(err.Error(), "i/o timeout") { - return nil, errors.Annotate(err, txnRetryableMark) - } return nil, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index bbc4c62b56d82..115d19c41bec7 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -16,7 +16,6 @@ package tikv import ( "context" "net" - "strings" "sync" "time" @@ -74,7 +73,6 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) { s.cluster.StopStore(s.store) _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "try again later"), IsTrue) // start store. s.cluster.StartStore(s.store) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a0cf8367996f4..76138f2747728 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -308,27 +307,31 @@ func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil } + return nil, extractKeyErr(keyErr) +} + +func extractKeyErr(keyErr *pb.KeyError) error { if keyErr.Conflict != nil { err := errors.New(conflictToString(keyErr.Conflict)) - return nil, errors.Annotate(err, txnRetryableMark) + return errors.Annotate(err, txnRetryableMark) } if keyErr.Retryable != "" { err := errors.Errorf("tikv restarts txn: %s", keyErr.GetRetryable()) logutil.Logger(context.Background()).Debug("error", zap.Error(err)) - return nil, errors.Annotate(err, txnRetryableMark) + return errors.Annotate(err, txnRetryableMark) } if keyErr.Abort != "" { err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort()) logutil.Logger(context.Background()).Warn("error", zap.Error(err)) - return nil, errors.Trace(err) + return errors.Trace(err) } - return nil, errors.Errorf("unexpected KeyError: %s", keyErr.String()) + return errors.Errorf("unexpected KeyError: %s", keyErr.String()) } func conflictToString(conflict *pb.WriteConflict) string { var buf bytes.Buffer - _, err := fmt.Fprintf(&buf, "%s txnStartTS=%d, conflictTS=%d, key=", - util.WriteConflictMarker, conflict.StartTs, conflict.ConflictTs) + _, err := fmt.Fprintf(&buf, "txnStartTS=%d, conflictTS=%d, key=", + conflict.StartTs, conflict.ConflictTs) if err != nil { logutil.Logger(context.Background()).Error("error", zap.Error(err)) } diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 1e217b41f4844..ad6286b2607f2 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -154,6 +154,6 @@ func (s *testSnapshotSuite) TestWriteConflictPrettyFormat(c *C) { Primary: []byte{116, 128, 0, 0, 0, 0, 0, 1, 155, 95, 105, 128, 0, 0, 0, 0, 0, 0, 1, 1, 82, 87, 48, 49, 0, 0, 0, 0, 251, 1, 55, 54, 56, 50, 50, 49, 49, 48, 255, 57, 0, 0, 0, 0, 0, 0, 0, 248, 1, 0, 0, 0, 0, 0, 0, 0, 0, 247}, } - expectedStr := `[write conflict] txnStartTS=399402937522847774, conflictTS=399402937719455772, key={tableID=411, indexID=1, indexValues={RW01, 768221109, , }} primary={tableID=411, indexID=1, indexValues={RW01, 768221109, , }}` + expectedStr := `txnStartTS=399402937522847774, conflictTS=399402937719455772, key={tableID=411, indexID=1, indexValues={RW01, 768221109, , }} primary={tableID=411, indexID=1, indexValues={RW01, 768221109, , }}` c.Assert(conflictToString(conflict), Equals, expectedStr) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 9d1c335b8fcbf..026e3535cce04 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -286,7 +285,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { } defer txn.store.txnLatches.UnLock(lock) if lock.IsStale() { - err = errors.Errorf("%s txnStartTS %d is stale", util.WriteConflictMarker, txn.startTS) + err = errors.Errorf("txnStartTS %d is stale", txn.startTS) return errors.Annotate(err, txnRetryableMark) } err = committer.executeAndWriteFinishBinlog(ctx) diff --git a/util/misc.go b/util/misc.go index 5b6a935b39301..15ecad2b7e296 100644 --- a/util/misc.go +++ b/util/misc.go @@ -33,8 +33,6 @@ const ( RetryInterval uint64 = 500 // GCTimeFormat is the format that gc_worker used to store times. GCTimeFormat = "20060102-15:04:05 -0700" - // WriteConflictMarker is used when transaction writing is conflicted. - WriteConflictMarker = "[write conflict]" ) // RunWithRetry will run the f with backoff and retry.