From dd34500282353d910bdc57c0282fe6ca602c1b90 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 1 Sep 2023 16:11:25 +0800 Subject: [PATCH] transaction: Add batch_mutate interface (#418) Signed-off-by: Ping Yu --- src/transaction/buffer.rs | 9 +++ src/transaction/transaction.rs | 50 +++++++++++++ tests/common/mod.rs | 11 ++- tests/integration_tests.rs | 124 ++++++++++++++++++++++++++++++++- 4 files changed, 192 insertions(+), 2 deletions(-) diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index b2f19717..5933dff8 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -244,6 +244,15 @@ impl Buffer { } } + pub(crate) fn mutate(&mut self, m: kvrpcpb::Mutation) { + let op = kvrpcpb::Op::from_i32(m.op).unwrap(); + match op { + kvrpcpb::Op::Put => self.put(m.key.into(), m.value), + kvrpcpb::Op::Del => self.delete(m.key.into()), + _ => unimplemented!("only put and delete are supported in mutate"), + }; + } + /// Converts the buffered mutations to the proto buffer version pub fn to_proto_mutations(&self) -> Vec { self.entry_map diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 9317e171..671d6140 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -522,6 +522,56 @@ impl> Transaction { Ok(()) } + /// Batch mutate the database. + /// + /// Only `Put` and `Delete` are supported. + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); + /// let mutations = vec![ + /// kvrpcpb::Mutation { + /// op: kvrpcpb::Op::Del.into(), + /// key: b"k0".to_vec(), + /// ..Default::default() + /// }, + /// kvrpcpb::Mutation { + /// op: kvrpcpb::Op::Put.into(), + /// key: b"k1".to_vec(), + /// value: b"v1".to_vec(), + /// ..Default::default() + /// }, + /// ]; + /// txn.batch_mutate(mutations).await.unwrap(); + /// txn.commit().await.unwrap(); + /// # }); + /// ``` + pub async fn batch_mutate( + &mut self, + mutations: impl IntoIterator, + ) -> Result<()> { + debug!("invoking transactional batch mutate request"); + self.check_allow_operation().await?; + if self.is_pessimistic() { + let mutations: Vec = mutations.into_iter().collect(); + self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key.clone())), false) + .await?; + for m in mutations { + self.buffer.mutate(m); + } + } else { + for m in mutations.into_iter() { + self.buffer.mutate(m); + } + } + Ok(()) + } + /// Lock the given keys without mutating their values. /// /// In optimistic mode, write conflicts are not checked until commit. diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d5d5f06e..4d63dd56 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,12 +10,12 @@ use std::time::Duration; use log::info; use log::warn; use rand::Rng; -use tikv_client::ColumnFamily; use tikv_client::Key; use tikv_client::RawClient; use tikv_client::Result; use tikv_client::Transaction; use tikv_client::TransactionClient; +use tikv_client::{ColumnFamily, Snapshot, TransactionOptions}; use tokio::time::sleep; const ENV_PD_ADDRS: &str = "PD_ADDRS"; @@ -147,6 +147,15 @@ pub fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet> { set } +pub async fn snapshot(client: &TransactionClient, is_pessimistic: bool) -> Result { + let options = if is_pessimistic { + TransactionOptions::new_pessimistic() + } else { + TransactionOptions::new_optimistic() + }; + Ok(client.snapshot(client.current_timestamp().await?, options)) +} + /// Copied from https://github.com/tikv/tikv/blob/d86a449d7f5b656cef28576f166e73291f501d77/components/tikv_util/src/macros.rs#L55 /// Simulates Go's defer. /// diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 829368cf..850f3554 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -21,8 +21,9 @@ use rand::seq::IteratorRandom; use rand::thread_rng; use rand::Rng; use serial_test::serial; +use tikv_client::backoff::DEFAULT_REGION_BACKOFF; +use tikv_client::proto::kvrpcpb; use tikv_client::transaction::HeartbeatOption; -use tikv_client::BoundRange; use tikv_client::Error; use tikv_client::Key; use tikv_client::KvPair; @@ -31,6 +32,7 @@ use tikv_client::Result; use tikv_client::TransactionClient; use tikv_client::TransactionOptions; use tikv_client::Value; +use tikv_client::{Backoff, BoundRange, RetryOptions, Transaction}; // Parameters used in test const NUM_PEOPLE: u32 = 100; @@ -1078,3 +1080,123 @@ async fn txn_key_exists() -> Result<()> { t3.commit().await?; Ok(()) } + +#[tokio::test] +#[serial] +async fn txn_batch_mutate_optimistic() -> Result<()> { + init().await?; + let client = TransactionClient::new(pd_addrs()).await?; + + // Put k0 + { + let mut txn = client.begin_optimistic().await?; + txn.put(b"k0".to_vec(), b"v0".to_vec()).await?; + txn.commit().await?; + } + // Delete k0 and put k1, k2 + do_mutate(false).await.unwrap(); + // Read and verify + verify_mutate(false).await; + Ok(()) +} + +#[tokio::test] +#[serial] +async fn txn_batch_mutate_pessimistic() -> Result<()> { + init().await?; + let client = TransactionClient::new(pd_addrs()).await?; + + // Put k0 + { + let mut txn = client.begin_pessimistic().await?; + txn.put(b"k0".to_vec(), b"v0".to_vec()).await?; + txn.commit().await?; + } + // txn1 lock k0, to verify pessimistic locking. + let mut txn1 = client.begin_pessimistic().await?; + txn1.put(b"k0".to_vec(), b"vv".to_vec()).await?; + + // txn2 is blocked by txn1, then timeout. + let txn2_handle = tokio::spawn(do_mutate(true)); + assert!(matches!( + txn2_handle.await?.unwrap_err(), + Error::PessimisticLockError { .. } + )); + + let txn3_handle = tokio::spawn(do_mutate(true)); + // txn1 rollback to release lock. + txn1.rollback().await?; + txn3_handle.await?.unwrap(); + + // Read and verify + verify_mutate(true).await; + Ok(()) +} + +async fn begin_mutate(client: &TransactionClient, is_pessimistic: bool) -> Result { + if is_pessimistic { + let options = TransactionOptions::new_pessimistic().retry_options(RetryOptions { + region_backoff: DEFAULT_REGION_BACKOFF, + lock_backoff: Backoff::no_jitter_backoff(500, 500, 2), + }); + client.begin_with_options(options).await + } else { + client.begin_optimistic().await + } +} + +async fn do_mutate(is_pessimistic: bool) -> Result<()> { + let client = TransactionClient::new(pd_addrs()).await.unwrap(); + let mut txn = begin_mutate(&client, is_pessimistic).await.unwrap(); + + let mutations = vec![ + kvrpcpb::Mutation { + op: kvrpcpb::Op::Del.into(), + key: b"k0".to_vec(), + ..Default::default() + }, + kvrpcpb::Mutation { + op: kvrpcpb::Op::Put.into(), + key: b"k1".to_vec(), + value: b"v1".to_vec(), + ..Default::default() + }, + kvrpcpb::Mutation { + op: kvrpcpb::Op::Put.into(), + key: b"k2".to_vec(), + value: b"v2".to_vec(), + ..Default::default() + }, + ]; + + match txn.batch_mutate(mutations).await { + Ok(()) => { + txn.commit().await?; + Ok(()) + } + Err(err) => { + let _ = txn.rollback().await; + Err(err) + } + } +} + +async fn verify_mutate(is_pessimistic: bool) { + let client = TransactionClient::new(pd_addrs()).await.unwrap(); + let mut snapshot = snapshot(&client, is_pessimistic).await.unwrap(); + let res: HashMap = snapshot + .batch_get(vec!["k0".to_owned(), "k1".to_owned(), "k2".to_owned()]) + .await + .unwrap() + .map(|pair| (pair.0, pair.1)) + .collect(); + assert_eq!(res.len(), 2); + assert_eq!( + res.get(&Key::from("k1".to_owned())), + Some(Value::from("v1".to_owned())).as_ref() + ); + assert_eq!( + res.get(&Key::from("k2".to_owned())), + Some(Value::from("v2".to_owned())).as_ref() + ); +}