Skip to content

Commit

Permalink
*: add a variable to control the back off time and disable txn auto r…
Browse files Browse the repository at this point in the history
…etry by default (#10266)
  • Loading branch information
jackysp authored May 8, 2019
1 parent 1c4ebee commit d8589df
Show file tree
Hide file tree
Showing 27 changed files with 109 additions and 58 deletions.
1 change: 1 addition & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ func (s *testDBSuite8) TestColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("create table t2 (c1 int, c2 int, c3 int)")
s.tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
s.testAddColumn(c)
s.testDropColumn(c)
s.tk.MustExec("drop table t2")
Expand Down
11 changes: 11 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ func (s *testSuite2) TestSetVar(c *C) {
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))

tk.MustExec("set session tidb_back_off_weight = 3")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3"))
tk.MustExec("set session tidb_back_off_weight = 20")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("20"))
_, err = tk.Exec("set session tidb_back_off_weight = -1")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_back_off_weight = 0")
c.Assert(err, NotNil)
tk.MustExec("set global tidb_back_off_weight = 10")
tk.MustQuery("select @@global.tidb_back_off_weight;").Check(testkit.Rows("10"))
}

func (s *testSuite2) TestSetCharset(c *C) {
Expand Down
1 change: 1 addition & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,6 +2508,7 @@ func (s *testSuite4) TestAutoIDInRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int not null auto_increment primary key)")

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("insert into t values ()")
tk.MustExec("insert into t values (),()")
Expand Down
5 changes: 5 additions & 0 deletions kv/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type Variables struct {
// BackoffLockFast specifies the LockFast backoff base duration in milliseconds.
BackoffLockFast int

// BackOffWeight specifies the weight of the max back off time duration.
BackOffWeight int

// Hook is used for test to verify the variable take effect.
Hook func(name string, vars *Variables)
}
Expand All @@ -26,6 +29,7 @@ type Variables struct {
func NewVariables() *Variables {
return &Variables{
BackoffLockFast: DefBackoffLockFast,
BackOffWeight: DefBackOffWeight,
}
}

Expand All @@ -35,4 +39,5 @@ var DefaultVars = NewVariables()
// Default values
const (
DefBackoffLockFast = 100
DefBackOffWeight = 2
)
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ func runTestConcurrentUpdate(c *C) {
dbt.mustExec("drop table if exists test2")
dbt.mustExec("create table test2 (a int, b int)")
dbt.mustExec("insert test2 values (1, 1)")
dbt.mustExec("set @@tidb_disable_txn_auto_retry = 0")

txn1, err := dbt.db.Begin()
c.Assert(err, IsNil)
Expand Down
5 changes: 3 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (s *session) doCommit(ctx context.Context) error {

// mockCommitError and mockGetTSErrorInRetry use to test PR #8743.
failpoint.Inject("mockCommitError", func(val failpoint.Value) {
if val.(bool) && mockCommitErrorOnce {
mockCommitErrorOnce = false
if val.(bool) && kv.IsMockCommitErrorEnable() {
kv.MockCommitErrorDisable()
failpoint.Return(kv.ErrRetryable)
}
})
Expand Down Expand Up @@ -1632,6 +1632,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBBackOffWeight,
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
Expand Down
4 changes: 2 additions & 2 deletions session/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (s *testSessionSuite) TestGetTSFailDirtyState(c *C) {
func (s *testSessionSuite) TestGetTSFailDirtyStateInretry(c *C) {
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry"), IsNil)
}()

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int)")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry", `return(true)`), IsNil)
tk.MustExec("insert into t values (2)")
tk.MustQuery(`select * from t`).Check(testkit.Rows("2"))
}
Expand Down
25 changes: 25 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (s *testSessionSuite) TestRowLock(c *C) {
tk.MustExec("insert t values (12, 2, 3)")
tk.MustExec("insert t values (13, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=21 where c1=11")

Expand Down Expand Up @@ -507,6 +508,7 @@ func (s *testSessionSuite) TestRetryResetStmtCtx(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table retrytxn (a int unique, b int)")
tk.MustExec("insert retrytxn values (1, 1)")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("update retrytxn set b = b + 1 where a = 1")

Expand Down Expand Up @@ -665,6 +667,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) {
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=? where c1=11;", 21)

Expand Down Expand Up @@ -881,6 +884,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("create table t (c2 int, c1 int not null auto_increment, PRIMARY KEY (c1))")
tk.MustExec("insert into t (c2) values (1), (2), (3), (4), (5)")

Expand Down Expand Up @@ -1308,6 +1312,9 @@ func (s *testSessionSuite) TestRetry(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustExec("SET SESSION autocommit=0;")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk2.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk3.MustExec("set @@tidb_disable_txn_auto_retry = 0")

var wg sync.WaitGroup
wg.Add(3)
Expand Down Expand Up @@ -1449,6 +1456,7 @@ func (s *testSessionSuite) TestResetCtx(c *C) {

tk.MustExec("create table t (i int auto_increment not null key);")
tk.MustExec("insert into t values (1);")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin;")
tk.MustExec("insert into t values (10);")
tk.MustExec("update t set i = i + row_count();")
Expand Down Expand Up @@ -1480,6 +1488,8 @@ func (s *testSessionSuite) TestUnique(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`CREATE TABLE test ( id int(11) UNSIGNED NOT NULL AUTO_INCREMENT, val int UNIQUE, PRIMARY KEY (id)); `)
tk.MustExec("begin;")
tk.MustExec("insert into test(id, val) values(1, 1);")
Expand Down Expand Up @@ -1764,6 +1774,7 @@ func (s *testSchemaSuite) TestSchemaCheckerSQL(c *C) {
tk.MustExec(`insert into t values(1, 1);`)

// The schema version is out of date in the first transaction, but the SQL can be retried.
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`begin;`)
tk1.MustExec(`alter table t add index idx(c);`)
tk.MustExec(`insert into t values(2, 2);`)
Expand Down Expand Up @@ -1808,6 +1819,7 @@ func (s *testSchemaSuite) TestPrepareStmtCommitWhenSchemaChanged(c *C) {
tk1.MustExec("execute stmt using @a, @a")
tk1.MustExec("commit")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk.MustExec("alter table t drop column b")
tk1.MustExec("execute stmt using @a, @a")
Expand All @@ -1820,6 +1832,7 @@ func (s *testSchemaSuite) TestCommitWhenSchemaChanged(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (a int, b int)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("insert into t values (1, 1)")

Expand All @@ -1837,6 +1850,7 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) {
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 5 where a = 1")

Expand Down Expand Up @@ -1867,6 +1881,7 @@ func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) {
tk.MustExec("create table t (a int primary key, b int unique, c int)")
tk.MustExec("insert into t values (1, 1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 1, c = 2 where b = 2")
tk1.MustExec("update t set b = 1, c = 2 where a = 1")
Expand Down Expand Up @@ -2320,9 +2335,12 @@ func (s *testSessionSuite) TestKVVars(c *C) {
tk.MustExec("insert kvvars values (1, 1)")
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustExec("set @@tidb_backoff_lock_fast = 1")
tk2.MustExec("set @@tidb_back_off_weight = 100")
backoffVal := new(int64)
backOffWeightVal := new(int32)
tk2.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) {
atomic.StoreInt64(backoffVal, int64(vars.BackoffLockFast))
atomic.StoreInt32(backOffWeightVal, int32(vars.BackOffWeight))
}
wg := new(sync.WaitGroup)
wg.Add(2)
Expand All @@ -2345,7 +2363,14 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
break
}
}
c.Assert(atomic.LoadInt64(backoffVal), Equals, int64(1))
c.Assert(atomic.LoadInt32(backOffWeightVal), Equals, int32(100))
}

func (s *testSessionSuite) TestCommitRetryCount(c *C) {
Expand Down
14 changes: 0 additions & 14 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,25 +346,11 @@ type txnFuture struct {
mockFail bool
}

// mockGetTSErrorInRetryOnce use to make sure gofail mockGetTSErrorInRetry only mock get TS error once.
var mockGetTSErrorInRetryOnce = true

func (tf *txnFuture) wait() (kv.Transaction, error) {
if tf.mockFail {
return nil, errors.New("mock get timestamp fail")
}

// mockGetTSErrorInRetry should wait mockCommitErrorOnce first, then will run into retry() logic.
// Then mockGetTSErrorInRetry will return retryable error when first retry.
// Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout[try again later]
// This may cause duplicate data to be written.
failpoint.Inject("mockGetTSErrorInRetry", func(val failpoint.Value) {
if val.(bool) && mockGetTSErrorInRetryOnce && !mockCommitErrorOnce {
mockGetTSErrorInRetryOnce = false
failpoint.Return(nil, errors.Errorf("PD server timeout[try again later]"))
}
})

startTS, err := tf.future.Wait()
if err == nil {
return tf.store.BeginWithStartTS(startTS)
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency)
case TiDBBackoffLockFast:
s.KVVars.BackoffLockFast = tidbOptPositiveInt32(val, kv.DefBackoffLockFast)
case TiDBBackOffWeight:
s.KVVars.BackOffWeight = tidbOptPositiveInt32(val, kv.DefBackOffWeight)
case TiDBConstraintCheckInPlace:
s.ConstraintCheckInPlace = TiDBOptOn(val)
case TiDBBatchInsert:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBHashAggPartialConcurrency, strconv.Itoa(DefTiDBHashAggPartialConcurrency)},
{ScopeGlobal | ScopeSession, TiDBHashAggFinalConcurrency, strconv.Itoa(DefTiDBHashAggFinalConcurrency)},
{ScopeGlobal | ScopeSession, TiDBBackoffLockFast, strconv.Itoa(kv.DefBackoffLockFast)},
{ScopeGlobal | ScopeSession, TiDBBackOffWeight, strconv.Itoa(kv.DefBackOffWeight)},
{ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)},
{ScopeGlobal | ScopeSession, TiDBDisableTxnAutoRetry, BoolToIntStr(DefTiDBDisableTxnAutoRetry)},
{ScopeGlobal | ScopeSession, TiDBConstraintCheckInPlace, BoolToIntStr(DefTiDBConstraintCheckInPlace)},
Expand Down
8 changes: 7 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ const (
// tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds.
TiDBBackoffLockFast = "tidb_backoff_lock_fast"

// tidb_back_off_weight is used to control the max back off time in TiDB.
// The default maximum back off time is a small value.
// BackOffWeight could multiply it to let the user adjust the maximum time for retrying.
// Only positive integers can be accepted, which means that the maximum back off time can only grow.
TiDBBackOffWeight = "tidb_back_off_weight"

// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

Expand Down Expand Up @@ -300,7 +306,7 @@ const (
DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBRetryLimit = 10
DefTiDBDisableTxnAutoRetry = false
DefTiDBDisableTxnAutoRetry = true
DefTiDBConstraintCheckInPlace = false
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast,
TiDBBackoffLockFast, TiDBBackOffWeight,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -513,7 +512,7 @@ func (s *testMockTiKVSuite) TestDeleteRange(c *C) {

func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) {
c.Assert(errs[i], NotNil)
c.Assert(strings.Contains(errs[i].Error(), util.WriteConflictMarker), IsTrue)
c.Assert(strings.Contains(errs[i].Error(), writeConflictMarker), IsTrue)
}

func (s *testMockTiKVSuite) TestRC(c *C) {
Expand Down
5 changes: 3 additions & 2 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
)

Expand All @@ -35,6 +34,8 @@ const (
typeRollback
)

const writeConflictMarker = "write conflict"

type mvccValue struct {
valueType mvccValueType
startTS uint64
Expand Down Expand Up @@ -256,7 +257,7 @@ func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, err
func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
if len(e.values) > 0 {
if e.values[0].commitTS >= startTS {
return ErrRetryable(util.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}
}
if e.lock != nil {
Expand Down
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -572,7 +571,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
}
// Note that it's a write conflict here, even if the value is a rollback one.
if ok && dec1.value.commitTS >= startTS {
return ErrRetryable(tidbutil.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}

op := mutation.GetOp()
Expand Down
5 changes: 2 additions & 3 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -40,7 +39,7 @@ const (
type brokenStore struct{}

func (s *brokenStore) Open(schema string) (kv.Storage, error) {
return nil, errors.New("try again later")
return nil, kv.ErrRetryable
}

func TestT(t *testing.T) {
Expand Down Expand Up @@ -662,5 +661,5 @@ func (s *testKVSuite) TestRetryOpenStore(c *C) {
}
c.Assert(err, NotNil)
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second), Commentf("elapse: %s", elapse))
}
Loading

0 comments on commit d8589df

Please sign in to comment.