Skip to content

Commit

Permalink
[Cherry-pick tidb-7.5] Check kill signals (tikv#1158)
Browse files Browse the repository at this point in the history
* feat: check kill signal in 2pc committer (tikv#1084)

Signed-off-by: ekexium <[email protected]>
Co-authored-by: disksing <[email protected]>

* ErrQueryInterrupted with parameters (tikv#1124)

* feat: ErrQueryInterrupted with parameters

Signed-off-by: ekexium <[email protected]>

* Revert "Revert "fix: check kill signal against 0 (tikv#1102)" (tikv#1129)"

This reverts commit 3480b5e.

Signed-off-by: ekexium <[email protected]>

---------

Signed-off-by: ekexium <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>

---------

Signed-off-by: ekexium <[email protected]>
Co-authored-by: disksing <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
3 people authored and zeminzhou committed Feb 28, 2024
1 parent f560929 commit c084e8f
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 29 deletions.
15 changes: 12 additions & 3 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ var (
// ErrTiFlashServerTimeout is the error when tiflash server is timeout.
ErrTiFlashServerTimeout = errors.New("tiflash server timeout")
// ErrQueryInterrupted is the error when the query is interrupted.
ErrQueryInterrupted = errors.New("query interruppted")
// This is deprecated. Keep it only to pass CI :-(. We can remove this later.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrTiKVStaleCommand is the error that the command is stale in tikv.
ErrTiKVStaleCommand = errors.New("tikv stale command")
// ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced.
ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced")
// ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted.
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted")
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set")
// ErrResolveLockTimeout is the error that resolve lock timeout.
ErrResolveLockTimeout = errors.New("resolve lock timeout")
// ErrLockWaitTimeout is the error that wait for the lock is timeout.
Expand All @@ -96,11 +97,19 @@ var (
// ErrIsWitness is the error when a request is send to a witness.
ErrIsWitness = errors.New("peer is witness")
// ErrUnknown is the unknow error.
ErrUnknown = errors.New("unknow")
ErrUnknown = errors.New("unknown")
// ErrResultUndetermined is the error when execution result is unknown.
ErrResultUndetermined = errors.New("execution result undetermined")
)

type ErrQueryInterruptedWithSignal struct {
Signal uint32
}

func (e ErrQueryInterruptedWithSignal) Error() string {
return fmt.Sprintf("query interrupted by signal %d", e.Signal)
}

// MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD.
const MismatchClusterID = "mismatch cluster id"

Expand Down
10 changes: 10 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2502,3 +2502,13 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() {
s.True(txn.GetMemBuffer().TryLock())
txn.GetMemBuffer().Unlock()
}

func (s *testCommitterSuite) TestKillSignal() {
txn := s.begin()
err := txn.Set([]byte("key"), []byte("value"))
s.Nil(err)
var killed uint32 = 2
txn.SetVars(kv.NewVariables(&killed))
err = txn.Commit(context.Background())
s.ErrorContains(err, "query interrupted")
}
5 changes: 2 additions & 3 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,9 +1483,8 @@ func (s *RegionRequestSender) SendReqCtx(
}

// recheck whether the session/query is killed during the Next()
boVars := bo.GetVars()
if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 {
return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted)
if err2 := bo.CheckKilled(); err2 != nil {
return nil, nil, retryTimes, err2
}
if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil {
if val.(bool) {
Expand Down
21 changes: 17 additions & 4 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,9 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
atomic.AddInt64(&detail.BackoffCount, 1)
}

if b.vars != nil && b.vars.Killed != nil {
if atomic.LoadUint32(b.vars.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
err2 := b.CheckKilled()
if err2 != nil {
return err2
}

var startTs interface{}
Expand Down Expand Up @@ -382,3 +381,17 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) {
}
return nil, 0
}

func (b *Backoffer) CheckKilled() error {
if b.vars != nil && b.vars.Killed != nil {
killed := atomic.LoadUint32(b.vars.Killed)
if killed != 0 {
logutil.BgLogger().Info(
"backoff stops because a killed signal is received",
zap.Uint32("signal", killed),
)
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed})
}
}
return nil
}
4 changes: 4 additions & 0 deletions kv/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Variables struct {

// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
// This is an enum value rather than a boolean. See sqlkiller.go
// in TiDB for its definition.
// When its value is 0, it's not killed
// When its value is not 0, it's killed, the value indicates concrete reason.
Killed *uint32
}

Expand Down
22 changes: 21 additions & 1 deletion txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
func (c *twoPhaseCommitter) doActionOnBatches(
bo *retry.Backoffer, action twoPhaseCommitAction,
batches []batchMutations,
) error {
// killSignal should never be nil for TiDB
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
// Do not reset the killed flag here. Let the upper layer reset the flag.
// Before it resets, any request is considered valid to be killed.
status := atomic.LoadUint32(c.txn.vars.Killed)
if status != 0 {
logutil.BgLogger().Info(
"query is killed", zap.Uint32(
"signal",
status,
),
)
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status})
}
}
if len(batches) == 0 {
return nil
}
Expand Down
12 changes: 0 additions & 12 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch(
return nil
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
if action.Killed != nil {
// Do not reset the killed flag here!
// actionPessimisticLock runs on each region parallelly, we have to consider that
// the error may be dropped.
if atomic.LoadUint32(action.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
}
}

Expand Down
6 changes: 0 additions & 6 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if txn.IsInAggressiveLockingMode() {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
Expand Down

0 comments on commit c084e8f

Please sign in to comment.