Skip to content

Commit

Permalink
txn: Check whether the primary matches when handling check_txn_status…
Browse files Browse the repository at this point in the history
… requests (tikv#14637)

close tikv#14636, ref pingcap/tidb#42937

Makes TiKV support checking whether the lock is primary when handling check_txn_status.

Signed-off-by: MyonKeminta <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: lidezhu <[email protected]>
  • Loading branch information
2 people authored and lidezhu committed Apr 27, 2023
1 parent 2d6afc0 commit c326e53
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/error_code/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ define_error_codes!(
ASSERTION_FAILED => ("AssertionFailed", "", ""),
LOCK_IF_EXISTS_FAILED => ("LockIfExistsFailed", "", ""),

PRIMARY_MISMATCH => ("PrimaryMismatch", "", ""),

UNKNOWN => ("Unknown", "", "")
);
5 changes: 5 additions & 0 deletions etc/error_code.toml
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ error = '''
KV:Storage:LockIfExistsFailed
'''

["KV:Storage:PrimaryMismatch"]
error = '''
KV:Storage:PrimaryMismatch
'''

["KV:Storage:Unknown"]
error = '''
KV:Storage:Unknown
Expand Down
7 changes: 7 additions & 0 deletions src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ pub fn extract_key_error(err: &Error) -> kvrpcpb::KeyError {
assertion_failed.set_existing_commit_ts(existing_commit_ts.into_inner());
key_error.set_assertion_failed(assertion_failed);
}
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::PrimaryMismatch(lock_info),
))))) => {
let mut primary_mismatch = kvrpcpb::PrimaryMismatch::default();
primary_mismatch.set_lock_info(lock_info.clone());
key_error.set_primary_mismatch(primary_mismatch);
}
_ => {
error!(?*err; "txn aborts");
key_error.set_abort(format!("{:?}", err));
Expand Down
10 changes: 9 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7910,6 +7910,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_fail_callback(tx.clone(), 0, |e| match e {
Expand All @@ -7936,6 +7937,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, LockNotExist),
Expand Down Expand Up @@ -7993,6 +7995,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -8038,6 +8041,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, committed(ts(20, 0))),
Expand All @@ -8049,7 +8053,7 @@ mod tests {
.sched_txn_command(
commands::Prewrite::with_lock_ttl(
vec![Mutation::make_put(k.clone(), v)],
k.as_encoded().to_vec(),
k.to_raw().unwrap(),
ts(25, 0),
100,
),
Expand All @@ -8069,6 +8073,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TtlExpire),
Expand Down Expand Up @@ -9411,6 +9416,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -9447,6 +9453,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TxnStatus::TtlExpire),
Expand Down Expand Up @@ -9840,6 +9847,7 @@ mod tests {
true,
false,
false,
true,
Default::default(),
),
expect_ok_callback(tx.clone(), 0),
Expand Down
5 changes: 5 additions & 0 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub enum ErrorInner {
)]
LockIfExistsFailed { start_ts: TimeStamp, key: Vec<u8> },

#[error("check_txn_status sent to secondary lock, current lock: {0:?}")]
PrimaryMismatch(kvproto::kvrpcpb::LockInfo),

#[error("{0:?}")]
Other(#[from] Box<dyn error::Error + Sync + Send>),
}
Expand Down Expand Up @@ -298,6 +301,7 @@ impl ErrorInner {
key: key.clone(),
})
}
ErrorInner::PrimaryMismatch(l) => Some(ErrorInner::PrimaryMismatch(l.clone())),
ErrorInner::Io(_) | ErrorInner::Other(_) => None,
}
}
Expand Down Expand Up @@ -400,6 +404,7 @@ impl ErrorCodeExt for Error {
ErrorInner::CommitTsTooLarge { .. } => error_code::storage::COMMIT_TS_TOO_LARGE,
ErrorInner::AssertionFailed { .. } => error_code::storage::ASSERTION_FAILED,
ErrorInner::LockIfExistsFailed { .. } => error_code::storage::LOCK_IF_EXISTS_FAILED,
ErrorInner::PrimaryMismatch(_) => error_code::storage::PRIMARY_MISMATCH,
ErrorInner::Other(_) => error_code::storage::UNKNOWN,
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/storage/txn/actions/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ pub fn check_txn_status_lock_exists(
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
// Return the current lock info to tell the client what the actual primary is.
return Err(
ErrorInner::PrimaryMismatch(lock.into_lock_info(primary_key.into_raw()?)).into(),
);
}

// Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
Expand Down
88 changes: 72 additions & 16 deletions src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ command! {
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
// it's still uncertain.
resolving_pessimistic_lock: bool,
// Whether it's needed to check wheter the lock on the key (if any) is the primary lock.
// This is for handling some corner cases when pessimistic transactions changes its primary
// (see https://github.com/pingcap/tidb/issues/42937 for details).
// Must be set to true, unless the client is old version that doesn't support this behavior.
verify_is_primary: bool,
}
}

Expand Down Expand Up @@ -107,6 +112,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
)?,
l => (
check_txn_status_missing_lock(
Expand Down Expand Up @@ -145,16 +151,18 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
#[cfg(test)]
pub mod tests {
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::{Context, PrewriteRequestPessimisticAction::*};
use kvproto::kvrpcpb::{self, Context, LockInfo, PrewriteRequestPessimisticAction::*};
use tikv_util::deadline::Deadline;
use txn_types::{Key, WriteType};

use super::{TxnStatus::*, *};
use crate::storage::{
kv::Engine,
lock_manager::MockLockManager,
mvcc,
mvcc::tests::*,
txn::{
self,
commands::{pessimistic_rollback, WriteCommand, WriteContext},
scheduler::DEFAULT_EXECUTION_DURATION_LIMIT,
tests::*,
Expand Down Expand Up @@ -188,6 +196,7 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
Expand Down Expand Up @@ -220,7 +229,7 @@ pub mod tests {
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) {
) -> txn::Error {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let current_ts = current_ts.into();
Expand All @@ -235,23 +244,28 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
assert!(
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &MockLockManager::new(),
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
raw_ext: None,
},
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &MockLockManager::new(),
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
raw_ext: None,
},
)
.map(|r| {
panic!(
"expected check_txn_status fail but succeeded with result: {:?}",
r.pr
)
.is_err()
);
})
.unwrap_err()
}

fn committed(commit_ts: impl Into<TimeStamp>) -> impl FnOnce(TxnStatus) -> bool {
Expand Down Expand Up @@ -1188,4 +1202,46 @@ pub mod tests {
assert!(rollback.last_change_ts.is_zero());
assert_eq!(rollback.versions_to_last_change, 0);
}

#[test]
fn test_verify_is_primary() {
let mut engine = TestEngineBuilder::new().build().unwrap();

let check_lock = |l: LockInfo, key: &'_ [u8], primary: &'_ [u8], lock_type| {
assert_eq!(&l.key, key);
assert_eq!(l.lock_type, lock_type);
assert_eq!(&l.primary_lock, primary);
};

let check_error = |e, key: &'_ [u8], primary: &'_ [u8], lock_type| match e {
txn::Error(box txn::ErrorInner::Mvcc(mvcc::Error(
box mvcc::ErrorInner::PrimaryMismatch(lock_info),
))) => {
check_lock(lock_info, key, primary, lock_type);
}
e => panic!("unexpected error: {:?}", e),
};

must_acquire_pessimistic_lock(&mut engine, b"k1", b"k2", 1, 1);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::PessimisticLock);
let lock = must_pessimistic_locked(&mut engine, b"k1", 1, 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::PessimisticLock,
);

must_pessimistic_prewrite_put(&mut engine, b"k1", b"v1", b"k2", 1, 1, DoPessimisticCheck);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::Put);
let lock = must_locked(&mut engine, b"k1", 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::Put,
);
}
}
1 change: 1 addition & 0 deletions src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ impl From<CheckTxnStatusRequest> for TypedCommand<TxnStatus> {
req.get_rollback_if_not_exist(),
req.get_force_sync_commit(),
req.get_resolving_pessimistic_lock(),
req.get_verify_is_primary(),
req.take_context(),
)
}
Expand Down

0 comments on commit c326e53

Please sign in to comment.