From 767e9304c546c001e3a18f32afafc804d38b088f Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 17 Jun 2024 14:08:33 -0700 Subject: [PATCH 1/6] fixing the shard issue with batch_put Signed-off-by: limbooverlambda --- src/kv/kvpair.rs | 2 +- src/raw/client.rs | 30 +++++++++++++ src/raw/requests.rs | 100 ++++++++++++++++++++++++++++++++++++++++++-- src/store/mod.rs | 2 +- 4 files changed, 129 insertions(+), 5 deletions(-) diff --git a/src/kv/kvpair.rs b/src/kv/kvpair.rs index cfc6ee1c..f609f230 100644 --- a/src/kv/kvpair.rs +++ b/src/kv/kvpair.rs @@ -25,7 +25,7 @@ use crate::proto::kvrpcpb; /// /// Many functions which accept a `KvPair` accept an `Into`, which means all of the above /// types (Like a `(Key, Value)`) can be passed directly to those functions. -#[derive(Default, Clone, Eq, PartialEq)] +#[derive(Default, Clone, Eq, PartialEq, Hash)] #[cfg_attr(test, derive(Arbitrary))] pub struct KvPair(pub Key, pub Value); diff --git a/src/raw/client.rs b/src/raw/client.rs index 71d40b2a..e885b4ec 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -876,6 +876,36 @@ mod tests { use crate::proto::kvrpcpb; use crate::Result; + #[tokio::test] + async fn test_batch_put_with_ttl() -> Result<()> { + let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( + move |req: &dyn Any| { + if let Some(_) = req.downcast_ref::() { + let resp = kvrpcpb::RawBatchPutResponse { + ..Default::default() + }; + Ok(Box::new(resp) as Box) + } else { + unreachable!() + } + }, + ))); + let client = Client { + rpc: pd_client, + cf: Some(ColumnFamily::Default), + backoff: DEFAULT_REGION_BACKOFF, + atomic: false, + keyspace: Keyspace::Enable { keyspace_id: 0 }, + }; + let pairs = vec![ + KvPair(vec![11].into(), vec![12].into()), + KvPair(vec![11].into(), vec![12].into()), + ]; + let ttls = vec![0, 0]; + assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok()); + Ok(()) + } + #[tokio::test] async fn test_raw_coprocessor() -> Result<()> { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 201ac657..f6e876d4 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,12 +1,14 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use futures::stream::BoxStream; +use futures::StreamExt; use tonic::transport::Channel; use super::RawRpcRequest; @@ -190,23 +192,44 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { } impl Shardable for kvrpcpb::RawBatchPutRequest { - type Shard = Vec; + type Shard = Vec<(kvrpcpb::KvPair, u64)>; fn shards( &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + // Maintain a map of the pair and its associated ttl + let kvs = self.pairs.clone(); + let kv_pair = kvs.into_iter().map(Into::::into); + let kv_ttl = kv_pair.zip(self.ttls.clone()).collect::>(); let mut pairs = self.pairs.clone(); pairs.sort_by(|a, b| a.key.cmp(&b.key)); store_stream_for_keys( pairs.into_iter().map(Into::::into), pd_client.clone(), ) + .map(move |r| { + let s = r.map(|(kv, store)| { + let kv_ttls = kv + .into_iter() + .map(|k: KvPair| { + let kv: kvrpcpb::KvPair = k.clone().into(); + let ttl = *kv_ttl.get(&k).unwrap(); + (kv, ttl) + }) + .collect::>(); + (kv_ttls, store) + }); + s + }) + .boxed() } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + let (pairs, ttls) = shard.into_iter().unzip(); self.set_leader(&store.region_with_leader)?; - self.pairs = shard; + self.pairs = pairs; + self.ttls = ttls; Ok(()) } } @@ -531,21 +554,34 @@ impl_raw_rpc_request!(RawDeleteRangeRequest); impl_raw_rpc_request!(RawCasRequest); impl HasLocks for kvrpcpb::RawGetResponse {} + impl HasLocks for kvrpcpb::RawBatchGetResponse {} + impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {} + impl HasLocks for kvrpcpb::RawPutResponse {} + impl HasLocks for kvrpcpb::RawBatchPutResponse {} + impl HasLocks for kvrpcpb::RawDeleteResponse {} + impl HasLocks for kvrpcpb::RawBatchDeleteResponse {} + impl HasLocks for kvrpcpb::RawScanResponse {} + impl HasLocks for kvrpcpb::RawBatchScanResponse {} + impl HasLocks for kvrpcpb::RawDeleteRangeResponse {} + impl HasLocks for kvrpcpb::RawCasResponse {} + impl HasLocks for kvrpcpb::RawCoprocessorResponse {} #[cfg(test)] mod test { use std::any::Any; + use std::ops::Deref; + use std::sync::Mutex; use super::*; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -555,7 +591,7 @@ mod test { use crate::proto::kvrpcpb; use crate::request::Keyspace; use crate::request::Plan; - use crate::Key; + #[rstest::rstest] #[case(Keyspace::Disable)] @@ -600,4 +636,62 @@ mod test { assert_eq!(scan.len(), 49); // FIXME test the keys returned. } + + #[tokio::test] + async fn test_raw_batch_put() -> Result<()> { + let region1_kvs = vec![KvPair(vec![9].into(), vec![12].into())]; + let region1_ttls = vec![0]; + let region2_kvs = vec![ + KvPair(vec![11].into(), vec![12].into()), + KvPair( + "FFF".to_string().as_bytes().to_vec().into(), + vec![12].into(), + ), + ]; + let region2_ttls = vec![0, 1]; + + let expected_map = HashMap::from([ + (region1_kvs.clone(), region1_ttls.clone()), + (region2_kvs.clone(), region2_ttls.clone()), + ]); + + let pairs: Vec = [region1_kvs, region2_kvs] + .concat() + .into_iter() + .map(|kv| kv.into()) + .collect(); + let ttls = [region1_ttls, region2_ttls].concat(); + let cf = ColumnFamily::Default; + + let actual_map: Arc, Vec>>> = + Arc::new(Mutex::new(HashMap::new())); + let fut_actual_map = actual_map.clone(); + let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( + move |req: &dyn Any| { + let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap(); + let kv_pair = req + .pairs + .clone() + .into_iter() + .map(|p| p.into()) + .collect::>(); + let ttls = req.ttls.clone(); + fut_actual_map.lock().unwrap().insert(kv_pair, ttls); + let resp = kvrpcpb::RawBatchPutResponse::default(); + Ok(Box::new(resp) as Box) + }, + ))); + + let batch_put_request = + new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false); + let keyspace = Keyspace::Enable { keyspace_id: 0 }; + let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request) + .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .plan(); + let _ = plan.execute().await; + assert_eq!(actual_map.lock().unwrap().len(), 2); + assert_eq!(actual_map.lock().unwrap().deref().clone(), expected_map); + Ok(()) + } } diff --git a/src/store/mod.rs b/src/store/mod.rs index f21373b4..b8381de3 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -36,7 +36,7 @@ pub struct RegionStore { pub struct Store { pub client: Arc, } - +#[allow(dead_code)] /// Maps keys to a stream of stores. `key_data` must be sorted in increasing order pub fn store_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, From d6bd8a4876487cf0abae7339fc2d89bc69eac1fe Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Wed, 26 Jun 2024 00:26:03 -0700 Subject: [PATCH 2/6] PR feedback Signed-off-by: limbooverlambda --- src/kv/key.rs | 15 ++++++++++++++ src/kv/mod.rs | 1 + src/raw/client.rs | 6 +++--- src/raw/requests.rs | 50 ++++++++++++++++----------------------------- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/src/kv/key.rs b/src/kv/key.rs index 1b4f0606..ae6f364a 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -17,6 +17,7 @@ use super::HexRepr; use crate::kv::codec::BytesEncoder; use crate::kv::codec::{self}; use crate::proto::kvrpcpb; +use crate::proto::kvrpcpb::KvPair; const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB @@ -80,6 +81,20 @@ impl AsRef for kvrpcpb::Mutation { } } +pub struct KvPairTTL(pub KvPair, pub u64); + +impl AsRef for KvPairTTL { + fn as_ref(&self) -> &Key { + self.0.key.as_ref() + } +} + +impl From for (KvPair, u64) { + fn from(value: KvPairTTL) -> Self { + (value.0.clone(), value.1) + } +} + impl Key { /// The empty key. pub const EMPTY: Self = Key(Vec::new()); diff --git a/src/kv/mod.rs b/src/kv/mod.rs index 489110e6..c48f476c 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -11,6 +11,7 @@ mod value; pub use bound_range::BoundRange; pub use bound_range::IntoOwnedRange; pub use key::Key; +pub use key::KvPairTTL; pub use kvpair::KvPair; pub use value::Value; diff --git a/src/raw/client.rs b/src/raw/client.rs index e885b4ec..da84eb97 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -880,7 +880,7 @@ mod tests { async fn test_batch_put_with_ttl() -> Result<()> { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( move |req: &dyn Any| { - if let Some(_) = req.downcast_ref::() { + if req.downcast_ref::().is_some() { let resp = kvrpcpb::RawBatchPutResponse { ..Default::default() }; @@ -898,8 +898,8 @@ mod tests { keyspace: Keyspace::Enable { keyspace_id: 0 }, }; let pairs = vec![ - KvPair(vec![11].into(), vec![12].into()), - KvPair(vec![11].into(), vec![12].into()), + KvPair(vec![11].into(), vec![12]), + KvPair(vec![11].into(), vec![12]), ]; let ttls = vec![0, 0]; assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok()); diff --git a/src/raw/requests.rs b/src/raw/requests.rs index f6e876d4..df99e933 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,7 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; -use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -9,10 +8,12 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::BoxStream; use futures::StreamExt; + use tonic::transport::Channel; use super::RawRpcRequest; use crate::collect_single; +use crate::kv::KvPairTTL; use crate::pd::PdClient; use crate::proto::kvrpcpb; use crate::proto::metapb; @@ -198,31 +199,19 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - // Maintain a map of the pair and its associated ttl let kvs = self.pairs.clone(); - let kv_pair = kvs.into_iter().map(Into::::into); - let kv_ttl = kv_pair.zip(self.ttls.clone()).collect::>(); - let mut pairs = self.pairs.clone(); - pairs.sort_by(|a, b| a.key.cmp(&b.key)); - store_stream_for_keys( - pairs.into_iter().map(Into::::into), - pd_client.clone(), - ) - .map(move |r| { - let s = r.map(|(kv, store)| { - let kv_ttls = kv - .into_iter() - .map(|k: KvPair| { - let kv: kvrpcpb::KvPair = k.clone().into(); - let ttl = *kv_ttl.get(&k).unwrap(); - (kv, ttl) - }) - .collect::>(); - (kv_ttls, store) - }); - s - }) - .boxed() + let ttls = self.ttls.clone(); + let mut kv_ttl: Vec = kvs + .iter() + .zip(ttls) + .map(|(kv, ttl)| KvPairTTL(kv.clone(), ttl)) + .collect(); + kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); + store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) + .map(move |r| { + r.map(|(kv, store): (Vec<(kvrpcpb::KvPair, u64)>, RegionStore)| (kv, store)) + }) + .boxed() } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { @@ -580,6 +569,7 @@ impl HasLocks for kvrpcpb::RawCoprocessorResponse {} #[cfg(test)] mod test { use std::any::Any; + use std::collections::HashMap; use std::ops::Deref; use std::sync::Mutex; @@ -592,7 +582,6 @@ mod test { use crate::request::Keyspace; use crate::request::Plan; - #[rstest::rstest] #[case(Keyspace::Disable)] #[case(Keyspace::Enable { keyspace_id: 0 })] @@ -639,14 +628,11 @@ mod test { #[tokio::test] async fn test_raw_batch_put() -> Result<()> { - let region1_kvs = vec![KvPair(vec![9].into(), vec![12].into())]; + let region1_kvs = vec![KvPair(vec![9].into(), vec![12])]; let region1_ttls = vec![0]; let region2_kvs = vec![ - KvPair(vec![11].into(), vec![12].into()), - KvPair( - "FFF".to_string().as_bytes().to_vec().into(), - vec![12].into(), - ), + KvPair(vec![11].into(), vec![12]), + KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]), ]; let region2_ttls = vec![0, 1]; From 463024ce4d0c8e4c6ab3dbcf11f1c2fb5057e8a3 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Wed, 26 Jun 2024 01:27:48 -0700 Subject: [PATCH 3/6] more make check fixes Signed-off-by: limbooverlambda --- src/kv/key.rs | 1 - src/kv/mod.rs | 1 - src/raw/client.rs | 1 - src/transaction/requests.rs | 2 +- 4 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/kv/key.rs b/src/kv/key.rs index ae6f364a..8ca48829 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -2,7 +2,6 @@ use std::fmt; use std::ops::Bound; -use std::u8; #[allow(unused_imports)] #[cfg(test)] diff --git a/src/kv/mod.rs b/src/kv/mod.rs index c48f476c..41da842e 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -1,6 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::fmt; -use std::u8; mod bound_range; pub mod codec; diff --git a/src/raw/client.rs b/src/raw/client.rs index da84eb97..76d40b65 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -3,7 +3,6 @@ use core::ops::Range; use std::str::FromStr; use std::sync::Arc; -use std::u32; use futures::StreamExt; use log::debug; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 4f0a6174..231c9e5a 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -252,7 +252,7 @@ pub fn new_prewrite_request( req.start_version = start_version; req.lock_ttl = lock_ttl; // FIXME: Lite resolve lock is currently disabled - req.txn_size = std::u64::MAX; + req.txn_size = u64::MAX; req } From 390e7c7317deae6592773c765a94160ca09a118d Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Wed, 26 Jun 2024 01:36:27 -0700 Subject: [PATCH 4/6] removing redundant map Signed-off-by: limbooverlambda --- src/raw/requests.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index df99e933..63389f2d 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -207,11 +207,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .map(|(kv, ttl)| KvPairTTL(kv.clone(), ttl)) .collect(); kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); - store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) - .map(move |r| { - r.map(|(kv, store): (Vec<(kvrpcpb::KvPair, u64)>, RegionStore)| (kv, store)) - }) - .boxed() + store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()).boxed() } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { From 9d7699486474a26a84e14864c296c3fcc2ae071c Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Sun, 30 Jun 2024 11:44:19 -0700 Subject: [PATCH 5/6] more PR feedback Signed-off-by: limbooverlambda --- src/kv/key.rs | 2 +- src/raw/requests.rs | 10 ++++------ src/store/mod.rs | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/kv/key.rs b/src/kv/key.rs index 8ca48829..fa19d3f6 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -90,7 +90,7 @@ impl AsRef for KvPairTTL { impl From for (KvPair, u64) { fn from(value: KvPairTTL) -> Self { - (value.0.clone(), value.1) + (value.0, value.1) } } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 63389f2d..a349fe86 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -7,7 +7,6 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::BoxStream; -use futures::StreamExt; use tonic::transport::Channel; @@ -202,12 +201,12 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { let kvs = self.pairs.clone(); let ttls = self.ttls.clone(); let mut kv_ttl: Vec = kvs - .iter() + .into_iter() .zip(ttls) - .map(|(kv, ttl)| KvPairTTL(kv.clone(), ttl)) + .map(|(kv, ttl)| KvPairTTL(kv, ttl)) .collect(); kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); - store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()).boxed() + store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { @@ -672,8 +671,7 @@ mod test { .retry_multi_region(DEFAULT_REGION_BACKOFF) .plan(); let _ = plan.execute().await; - assert_eq!(actual_map.lock().unwrap().len(), 2); - assert_eq!(actual_map.lock().unwrap().deref().clone(), expected_map); + assert_eq!(actual_map.lock().unwrap().deref(), &expected_map); Ok(()) } } diff --git a/src/store/mod.rs b/src/store/mod.rs index b8381de3..f21373b4 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -36,7 +36,7 @@ pub struct RegionStore { pub struct Store { pub client: Arc, } -#[allow(dead_code)] + /// Maps keys to a stream of stores. `key_data` must be sorted in increasing order pub fn store_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, From 74133f9fa9b74a8c81eea85628e79a0a0228fe52 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Sun, 30 Jun 2024 18:27:21 -0700 Subject: [PATCH 6/6] slight formatting change and remove another redundant clone Signed-off-by: limbooverlambda --- src/raw/requests.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index a349fe86..4422c883 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,15 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::any::Any; -use std::ops::Range; -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; -use futures::stream::BoxStream; - -use tonic::transport::Channel; - use super::RawRpcRequest; use crate::collect_single; use crate::kv::KvPairTTL; @@ -43,6 +33,13 @@ use crate::Key; use crate::KvPair; use crate::Result; use crate::Value; +use async_trait::async_trait; +use futures::stream::BoxStream; +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; +use std::time::Duration; +use tonic::transport::Channel; pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest { let mut req = kvrpcpb::RawGetRequest::default(); @@ -205,7 +202,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .zip(ttls) .map(|(kv, ttl)| KvPairTTL(kv, ttl)) .collect(); - kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); + kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) }