Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a variable to control the back off time and disable txn auto retry by default #10266

Merged
merged 7 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ func (s *testDBSuite8) TestColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("create table t2 (c1 int, c2 int, c3 int)")
s.tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
s.testAddColumn(c)
s.testDropColumn(c)
s.tk.MustExec("drop table t2")
Expand Down
11 changes: 11 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ func (s *testSuite2) TestSetVar(c *C) {
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))

tk.MustExec("set session tidb_back_off_weight = 3")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3"))
tk.MustExec("set session tidb_back_off_weight = 20")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("20"))
_, err = tk.Exec("set session tidb_back_off_weight = -1")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_back_off_weight = 0")
c.Assert(err, NotNil)
tk.MustExec("set global tidb_back_off_weight = 10")
tk.MustQuery("select @@global.tidb_back_off_weight;").Check(testkit.Rows("10"))
}

func (s *testSuite2) TestSetCharset(c *C) {
Expand Down
1 change: 1 addition & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,6 +2508,7 @@ func (s *testSuite4) TestAutoIDInRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int not null auto_increment primary key)")

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("insert into t values ()")
tk.MustExec("insert into t values (),()")
Expand Down
5 changes: 5 additions & 0 deletions kv/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type Variables struct {
// BackoffLockFast specifies the LockFast backoff base duration in milliseconds.
BackoffLockFast int

// BackOffWeight specifies the weight of the max back off time duration.
BackOffWeight int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backoff weight is a little confusion, how about directly set the max backoff time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many kinds of backoffs. Maybe set them separately in the future.


// Hook is used for test to verify the variable take effect.
Hook func(name string, vars *Variables)
}
Expand All @@ -26,6 +29,7 @@ type Variables struct {
func NewVariables() *Variables {
return &Variables{
BackoffLockFast: DefBackoffLockFast,
BackOffWeight: DefBackOffWeight,
}
}

Expand All @@ -35,4 +39,5 @@ var DefaultVars = NewVariables()
// Default values
const (
DefBackoffLockFast = 100
DefBackOffWeight = 2
)
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ func runTestConcurrentUpdate(c *C) {
dbt.mustExec("drop table if exists test2")
dbt.mustExec("create table test2 (a int, b int)")
dbt.mustExec("insert test2 values (1, 1)")
dbt.mustExec("set @@tidb_disable_txn_auto_retry = 0")

txn1, err := dbt.db.Begin()
c.Assert(err, IsNil)
Expand Down
5 changes: 3 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (s *session) doCommit(ctx context.Context) error {

// mockCommitError and mockGetTSErrorInRetry use to test PR #8743.
failpoint.Inject("mockCommitError", func(val failpoint.Value) {
if val.(bool) && mockCommitErrorOnce {
mockCommitErrorOnce = false
if val.(bool) && kv.IsMockCommitErrorEnable() {
kv.MockCommitErrorDisable()
failpoint.Return(kv.ErrRetryable)
}
})
Expand Down Expand Up @@ -1632,6 +1632,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBBackOffWeight,
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
Expand Down
4 changes: 2 additions & 2 deletions session/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (s *testSessionSuite) TestGetTSFailDirtyState(c *C) {
func (s *testSessionSuite) TestGetTSFailDirtyStateInretry(c *C) {
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry"), IsNil)
}()

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int)")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry", `return(true)`), IsNil)
tk.MustExec("insert into t values (2)")
tk.MustQuery(`select * from t`).Check(testkit.Rows("2"))
}
Expand Down
25 changes: 25 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (s *testSessionSuite) TestRowLock(c *C) {
tk.MustExec("insert t values (12, 2, 3)")
tk.MustExec("insert t values (13, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=21 where c1=11")

Expand Down Expand Up @@ -507,6 +508,7 @@ func (s *testSessionSuite) TestRetryResetStmtCtx(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table retrytxn (a int unique, b int)")
tk.MustExec("insert retrytxn values (1, 1)")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("update retrytxn set b = b + 1 where a = 1")

Expand Down Expand Up @@ -665,6 +667,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) {
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=? where c1=11;", 21)

Expand Down Expand Up @@ -881,6 +884,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("create table t (c2 int, c1 int not null auto_increment, PRIMARY KEY (c1))")
tk.MustExec("insert into t (c2) values (1), (2), (3), (4), (5)")

Expand Down Expand Up @@ -1308,6 +1312,9 @@ func (s *testSessionSuite) TestRetry(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustExec("SET SESSION autocommit=0;")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk2.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk3.MustExec("set @@tidb_disable_txn_auto_retry = 0")

var wg sync.WaitGroup
wg.Add(3)
Expand Down Expand Up @@ -1449,6 +1456,7 @@ func (s *testSessionSuite) TestResetCtx(c *C) {

tk.MustExec("create table t (i int auto_increment not null key);")
tk.MustExec("insert into t values (1);")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin;")
tk.MustExec("insert into t values (10);")
tk.MustExec("update t set i = i + row_count();")
Expand Down Expand Up @@ -1480,6 +1488,8 @@ func (s *testSessionSuite) TestUnique(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`CREATE TABLE test ( id int(11) UNSIGNED NOT NULL AUTO_INCREMENT, val int UNIQUE, PRIMARY KEY (id)); `)
tk.MustExec("begin;")
tk.MustExec("insert into test(id, val) values(1, 1);")
Expand Down Expand Up @@ -1764,6 +1774,7 @@ func (s *testSchemaSuite) TestSchemaCheckerSQL(c *C) {
tk.MustExec(`insert into t values(1, 1);`)

// The schema version is out of date in the first transaction, but the SQL can be retried.
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`begin;`)
tk1.MustExec(`alter table t add index idx(c);`)
tk.MustExec(`insert into t values(2, 2);`)
Expand Down Expand Up @@ -1808,6 +1819,7 @@ func (s *testSchemaSuite) TestPrepareStmtCommitWhenSchemaChanged(c *C) {
tk1.MustExec("execute stmt using @a, @a")
tk1.MustExec("commit")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk.MustExec("alter table t drop column b")
tk1.MustExec("execute stmt using @a, @a")
Expand All @@ -1820,6 +1832,7 @@ func (s *testSchemaSuite) TestCommitWhenSchemaChanged(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (a int, b int)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("insert into t values (1, 1)")

Expand All @@ -1837,6 +1850,7 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) {
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 5 where a = 1")

Expand Down Expand Up @@ -1867,6 +1881,7 @@ func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) {
tk.MustExec("create table t (a int primary key, b int unique, c int)")
tk.MustExec("insert into t values (1, 1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 1, c = 2 where b = 2")
tk1.MustExec("update t set b = 1, c = 2 where a = 1")
Expand Down Expand Up @@ -2320,9 +2335,12 @@ func (s *testSessionSuite) TestKVVars(c *C) {
tk.MustExec("insert kvvars values (1, 1)")
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustExec("set @@tidb_backoff_lock_fast = 1")
tk2.MustExec("set @@tidb_back_off_weight = 100")
backoffVal := new(int64)
backOffWeightVal := new(int32)
tk2.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) {
atomic.StoreInt64(backoffVal, int64(vars.BackoffLockFast))
atomic.StoreInt32(backOffWeightVal, int32(vars.BackOffWeight))
}
wg := new(sync.WaitGroup)
wg.Add(2)
Expand All @@ -2345,7 +2363,14 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
break
}
}
c.Assert(atomic.LoadInt64(backoffVal), Equals, int64(1))
c.Assert(atomic.LoadInt32(backOffWeightVal), Equals, int32(100))
}

func (s *testSessionSuite) TestCommitRetryCount(c *C) {
Expand Down
14 changes: 0 additions & 14 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,25 +346,11 @@ type txnFuture struct {
mockFail bool
}

// mockGetTSErrorInRetryOnce use to make sure gofail mockGetTSErrorInRetry only mock get TS error once.
var mockGetTSErrorInRetryOnce = true

func (tf *txnFuture) wait() (kv.Transaction, error) {
if tf.mockFail {
return nil, errors.New("mock get timestamp fail")
}

// mockGetTSErrorInRetry should wait mockCommitErrorOnce first, then will run into retry() logic.
// Then mockGetTSErrorInRetry will return retryable error when first retry.
// Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout[try again later]
// This may cause duplicate data to be written.
failpoint.Inject("mockGetTSErrorInRetry", func(val failpoint.Value) {
if val.(bool) && mockGetTSErrorInRetryOnce && !mockCommitErrorOnce {
mockGetTSErrorInRetryOnce = false
failpoint.Return(nil, errors.Errorf("PD server timeout[try again later]"))
}
})

startTS, err := tf.future.Wait()
if err == nil {
return tf.store.BeginWithStartTS(startTS)
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency)
case TiDBBackoffLockFast:
s.KVVars.BackoffLockFast = tidbOptPositiveInt32(val, kv.DefBackoffLockFast)
case TiDBBackOffWeight:
s.KVVars.BackOffWeight = tidbOptPositiveInt32(val, kv.DefBackOffWeight)
case TiDBConstraintCheckInPlace:
s.ConstraintCheckInPlace = TiDBOptOn(val)
case TiDBBatchInsert:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBHashAggPartialConcurrency, strconv.Itoa(DefTiDBHashAggPartialConcurrency)},
{ScopeGlobal | ScopeSession, TiDBHashAggFinalConcurrency, strconv.Itoa(DefTiDBHashAggFinalConcurrency)},
{ScopeGlobal | ScopeSession, TiDBBackoffLockFast, strconv.Itoa(kv.DefBackoffLockFast)},
{ScopeGlobal | ScopeSession, TiDBBackOffWeight, strconv.Itoa(kv.DefBackOffWeight)},
{ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)},
{ScopeGlobal | ScopeSession, TiDBDisableTxnAutoRetry, BoolToIntStr(DefTiDBDisableTxnAutoRetry)},
{ScopeGlobal | ScopeSession, TiDBConstraintCheckInPlace, BoolToIntStr(DefTiDBConstraintCheckInPlace)},
Expand Down
8 changes: 7 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ const (
// tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds.
TiDBBackoffLockFast = "tidb_backoff_lock_fast"

// tidb_back_off_weight is used to control the max back off time in TiDB.
// The default maximum back off time is a small value.
// BackOffWeight could multiply it to let the user adjust the maximum time for retrying.
// Only positive integers can be accepted, which means that the maximum back off time can only grow.
TiDBBackOffWeight = "tidb_back_off_weight"

// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

Expand Down Expand Up @@ -300,7 +306,7 @@ const (
DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBRetryLimit = 10
DefTiDBDisableTxnAutoRetry = false
DefTiDBDisableTxnAutoRetry = true
DefTiDBConstraintCheckInPlace = false
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast,
TiDBBackoffLockFast, TiDBBackOffWeight,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -513,7 +512,7 @@ func (s *testMockTiKVSuite) TestDeleteRange(c *C) {

func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) {
c.Assert(errs[i], NotNil)
c.Assert(strings.Contains(errs[i].Error(), util.WriteConflictMarker), IsTrue)
c.Assert(strings.Contains(errs[i].Error(), writeConflictMarker), IsTrue)
}

func (s *testMockTiKVSuite) TestRC(c *C) {
Expand Down
5 changes: 3 additions & 2 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
)

Expand All @@ -35,6 +34,8 @@ const (
typeRollback
)

const writeConflictMarker = "write conflict"

type mvccValue struct {
valueType mvccValueType
startTS uint64
Expand Down Expand Up @@ -256,7 +257,7 @@ func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, err
func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
if len(e.values) > 0 {
if e.values[0].commitTS >= startTS {
return ErrRetryable(util.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}
}
if e.lock != nil {
Expand Down
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -572,7 +571,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
}
// Note that it's a write conflict here, even if the value is a rollback one.
if ok && dec1.value.commitTS >= startTS {
return ErrRetryable(tidbutil.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}

op := mutation.GetOp()
Expand Down
5 changes: 2 additions & 3 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -40,7 +39,7 @@ const (
type brokenStore struct{}

func (s *brokenStore) Open(schema string) (kv.Storage, error) {
return nil, errors.New("try again later")
return nil, kv.ErrRetryable
}

func TestT(t *testing.T) {
Expand Down Expand Up @@ -662,5 +661,5 @@ func (s *testKVSuite) TestRetryOpenStore(c *C) {
}
c.Assert(err, NotNil)
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second), Commentf("elapse: %s", elapse))
}
Loading