From 1f314ee77657131e509554ee7b127e7a94851050 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Tue, 21 Jun 2022 20:48:39 +0800 Subject: [PATCH] fix type checking for new interface Signed-off-by: iosmanthus --- src/lib.rs | 2 + src/pd/client.rs | 96 +++++++----- src/raw/client.rs | 34 +++-- src/raw/lowering.rs | 61 ++++---- src/raw/requests.rs | 154 ++++++++++++++++---- src/request/mod.rs | 40 +++-- src/request/plan.rs | 71 +++++---- src/request/plan_builder.rs | 69 +++++---- src/request/request_codec.rs | 22 +++ src/request/shard.rs | 2 +- src/transaction/client.rs | 44 +++--- src/transaction/lock.rs | 13 +- src/transaction/lowering.rs | 53 +++---- src/transaction/requests.rs | 259 +++++++++++++++++++++++++-------- src/transaction/snapshot.rs | 7 +- src/transaction/transaction.rs | 107 +++++++------- 16 files changed, 687 insertions(+), 347 deletions(-) create mode 100644 src/request/request_codec.rs diff --git a/src/lib.rs b/src/lib.rs index e94785b0..7e90b420 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,6 +90,8 @@ //! # })} //! ``` +#![feature(specialization)] +#![feature(explicit_generic_args_with_impl_trait)] #[macro_use] pub mod request; #[macro_use] diff --git a/src/pd/client.rs b/src/pd/client.rs index 8bccbe32..8a886247 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -1,23 +1,28 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{ - compat::stream_fn, - kv::codec, - pd::{retry::RetryClientTrait, RetryClient}, - region::{RegionId, RegionVerId, RegionWithLeader}, - region_cache::RegionCache, - store::RegionStore, - BoundRange, Config, Key, Result, SecurityManager, Timestamp, -}; +use std::{collections::HashMap, sync::Arc, thread}; +use std::marker::PhantomData; + use async_trait::async_trait; use futures::{prelude::*, stream::BoxStream}; use grpcio::{EnvBuilder, Environment}; use slog::Logger; -use std::{collections::HashMap, sync::Arc, thread}; +use tokio::sync::RwLock; + use tikv_client_pd::Cluster; use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_store::{KvClient, KvConnect, TikvConnect}; -use tokio::sync::RwLock; + +use crate::{ + BoundRange, + compat::stream_fn, + Config, + Key, + kv::codec, + pd::{retry::RetryClientTrait, RetryClient}, + region::{RegionId, RegionVerId, RegionWithLeader}, region_cache::RegionCache, Result, SecurityManager, store::RegionStore, Timestamp, +}; +use crate::request::request_codec::RequestCodec; const CQ_COUNT: usize = 1; const CLIENT_PREFIX: &str = "tikv-client"; @@ -42,6 +47,7 @@ const CLIENT_PREFIX: &str = "tikv-client"; #[async_trait] pub trait PdClient: Send + Sync + 'static { type KvClient: KvClient + Send + Sync + 'static; + type RequestCodec: RequestCodec; /// In transactional API, `region` is decoded (keys in raw format). async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result; @@ -69,11 +75,11 @@ pub trait PdClient: Send + Sync + 'static { fn group_keys_by_region( self: Arc, - keys: impl Iterator + Send + Sync + 'static, + keys: impl Iterator + Send + Sync + 'static, ) -> BoxStream<'static, Result<(RegionId, Vec)>> - where - K: AsRef + Into + Send + Sync + 'static, - K2: Send + Sync + 'static, + where + K: AsRef + Into + Send + Sync + 'static, + K2: Send + Sync + 'static, { let keys = keys.peekable(); stream_fn(keys, move |mut keys| { @@ -95,7 +101,7 @@ pub trait PdClient: Send + Sync + 'static { } } }) - .boxed() + .boxed() } /// Returns a Stream which iterates over the contexts for each region covered by range. @@ -126,7 +132,7 @@ pub trait PdClient: Send + Sync + 'static { Ok(Some((Some(region_end), store))) } }) - .boxed() + .boxed() } /// Returns a Stream which iterates over the contexts for ranges in the same region. @@ -190,7 +196,7 @@ pub trait PdClient: Send + Sync + 'static { } } }) - .boxed() + .boxed() } fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { @@ -204,22 +210,27 @@ pub trait PdClient: Send + Sync + 'static { async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; async fn invalidate_region_cache(&self, ver_id: RegionVerId); + + fn get_request_codec(&self) -> Self::RequestCodec; } /// This client converts requests for the logical TiKV cluster into requests /// for a single TiKV store using PD and internal logic. -pub struct PdRpcClient { +pub struct PdRpcClient { pd: Arc>, kv_connect: KvC, kv_client_cache: Arc>>, enable_codec: bool, region_cache: RegionCache>, logger: Logger, + // TODO: change to a real codec. + _phantom: PhantomData, } #[async_trait] -impl PdClient for PdRpcClient { +impl PdClient for PdRpcClient { type KvClient = KvC::KvClient; + type RequestCodec = C; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { let store_id = region.get_store_id()?; @@ -260,15 +271,19 @@ impl PdClient for PdRpcClient { async fn invalidate_region_cache(&self, ver_id: RegionVerId) { self.region_cache.invalidate_region_cache(ver_id).await } + + fn get_request_codec(&self) -> Self::RequestCodec { + todo!() + } } -impl PdRpcClient { +impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], config: Config, enable_codec: bool, logger: Logger, - ) -> Result { + ) -> Result> { PdRpcClient::new( config.clone(), |env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout), @@ -278,7 +293,7 @@ impl PdRpcClient { enable_codec, logger, ) - .await + .await } } @@ -291,18 +306,18 @@ fn thread_name(prefix: &str) -> String { .unwrap_or_else(|| prefix.to_owned()) } -impl PdRpcClient { +impl PdRpcClient { pub async fn new( config: Config, kv_connect: MakeKvC, pd: MakePd, enable_codec: bool, logger: Logger, - ) -> Result> - where - PdFut: Future>>, - MakeKvC: FnOnce(Arc, Arc) -> KvC, - MakePd: FnOnce(Arc, Arc) -> PdFut, + ) -> Result> + where + PdFut: Future>>, + MakeKvC: FnOnce(Arc, Arc) -> KvC, + MakePd: FnOnce(Arc, Arc) -> PdFut, { let env = Arc::new( EnvBuilder::new() @@ -312,7 +327,7 @@ impl PdRpcClient { ); let security_mgr = Arc::new( if let (Some(ca_path), Some(cert_path), Some(key_path)) = - (&config.ca_path, &config.cert_path, &config.key_path) + (&config.ca_path, &config.cert_path, &config.key_path) { SecurityManager::load(ca_path, cert_path, key_path)? } else { @@ -329,6 +344,8 @@ impl PdRpcClient { enable_codec, region_cache: RegionCache::new(pd), logger, + // TODO + _phantom: PhantomData, }) } @@ -352,10 +369,11 @@ impl PdRpcClient { #[cfg(test)] pub mod test { - use super::*; + use futures::{executor, executor::block_on}; + use crate::mock::*; - use futures::{executor, executor::block_on}; + use super::*; #[tokio::test] async fn test_kv_client_caching() { @@ -396,7 +414,7 @@ pub mod test { vec![1].into(), vec![2].into(), vec![3].into(), - vec![5, 2].into() + vec![5, 2].into(), ] ); assert_eq!( @@ -458,12 +476,12 @@ pub mod test { vec![ kvrpcpb::KeyRange { start_key: k1.clone(), - end_key: k2.clone() + end_key: k2.clone(), }, kvrpcpb::KeyRange { start_key: k1, - end_key: k_split.clone() - } + end_key: k_split.clone(), + }, ] ); assert_eq!(ranges2.0, 2); @@ -471,7 +489,7 @@ pub mod test { ranges2.1, vec![kvrpcpb::KeyRange { start_key: k_split.clone(), - end_key: k3 + end_key: k3, }] ); assert_eq!(ranges3.0, 1); @@ -479,7 +497,7 @@ pub mod test { ranges3.1, vec![kvrpcpb::KeyRange { start_key: k2, - end_key: k_split.clone() + end_key: k_split.clone(), }] ); assert_eq!(ranges4.0, 2); @@ -487,7 +505,7 @@ pub mod test { ranges4.1, vec![kvrpcpb::KeyRange { start_key: k_split, - end_key: k4 + end_key: k4, }] ); assert!(stream.next().is_none()); diff --git a/src/raw/client.rs b/src/raw/client.rs index 3252e408..430386d3 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -2,6 +2,7 @@ use core::ops::Range; use std::{str::FromStr, sync::Arc, u32}; +use std::marker::PhantomData; use slog::{Drain, Logger}; use tikv_client_common::Error; @@ -15,6 +16,7 @@ use crate::{ request::{Collect, CollectSingle, Plan}, Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; +use crate::request::request_codec::RequestCodec; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -26,15 +28,16 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be /// awaited to execute. #[derive(Clone)] -pub struct Client { +pub struct Client> { rpc: Arc, cf: Option, /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas). atomic: bool, logger: Logger, + _phantom: std::marker::PhantomData, } -impl Client { +impl Client> { /// Create a raw [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -103,6 +106,7 @@ impl Client { cf: None, atomic: false, logger, + _phantom: PhantomData }) } @@ -137,6 +141,7 @@ impl Client { cf: Some(cf), atomic: self.atomic, logger: self.logger.clone(), + _phantom: PhantomData } } @@ -154,11 +159,12 @@ impl Client { cf: self.cf.clone(), atomic: true, logger: self.logger.clone(), + _phantom: PhantomData } } } -impl Client { +impl Client { /// Create a new 'get' request. /// /// Once resolved this request will result in the fetching of the value associated with the @@ -179,7 +185,7 @@ impl Client { /// ``` pub async fn get(&self, key: impl Into) -> Result> { debug!(self.logger, "invoking raw get request"); - let request = new_raw_get_request(key.into(), self.cf.clone()); + let request = new_raw_get_request::(key.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -211,7 +217,7 @@ impl Client { keys: impl IntoIterator>, ) -> Result> { debug!(self.logger, "invoking raw batch_get request"); - let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); + let request = new_raw_batch_get_request::(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) @@ -239,7 +245,7 @@ impl Client { /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!(self.logger, "invoking raw put request"); - let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); + let request = new_raw_put_request::(key.into(), value.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -271,7 +277,7 @@ impl Client { pairs: impl IntoIterator>, ) -> Result<()> { debug!(self.logger, "invoking raw batch_put request"); - let request = new_raw_batch_put_request( + let request = new_raw_batch_put_request::( pairs.into_iter().map(Into::into), self.cf.clone(), self.atomic, @@ -303,7 +309,7 @@ impl Client { /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { debug!(self.logger, "invoking raw delete request"); - let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); + let request = new_raw_delete_request::(key.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -334,7 +340,7 @@ impl Client { debug!(self.logger, "invoking raw batch_delete request"); self.assert_non_atomic()?; let request = - new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); + new_raw_batch_delete_request::(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -369,7 +375,7 @@ impl Client { ) -> Result<()> { debug!(self.logger, "invoking raw delete_range request"); self.assert_non_atomic()?; - let request = new_raw_delete_range_request(range.into(), self.cf.clone()); + let request = new_raw_delete_range_request::(range.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(backoff) .extract_error() @@ -520,7 +526,7 @@ impl Client { ) -> Result<(Option, bool)> { debug!(self.logger, "invoking raw compare_and_swap request"); self.assert_atomic()?; - let req = new_cas_request( + let req = new_cas_request::( key.into(), new_value.into(), previous_value.into(), @@ -543,7 +549,7 @@ impl Client { ) -> Result, Vec>)>> { let copr_version_req = copr_version_req.into(); semver::VersionReq::from_str(&copr_version_req)?; - let req = new_raw_coprocessor_request( + let req = new_raw_coprocessor_request::( copr_name.into(), copr_version_req, ranges.into_iter().map(Into::into), @@ -570,7 +576,7 @@ impl Client { }); } - let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone()); + let request = new_raw_scan_request::(range.into(), limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) @@ -595,7 +601,7 @@ impl Client { }); } - let request = new_raw_batch_scan_request( + let request = new_raw_batch_scan_request::( ranges.into_iter().map(Into::into), each_limit, key_only, diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 32327a80..d9ae6de4 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -5,70 +5,73 @@ //! generated protobuf code, then calls the low-level ctor functions in the requests module. use std::{iter::Iterator, ops::Range, sync::Arc}; +use std::marker::PhantomData; use tikv_client_proto::{kvrpcpb, metapb}; -use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value}; +use crate::{BoundRange, ColumnFamily, Key, KvPair, raw::requests, Value}; +use crate::request::KvRequest; +use crate::request::request_codec::RequestCodec; -pub fn new_raw_get_request(key: Key, cf: Option) -> kvrpcpb::RawGetRequest { - requests::new_raw_get_request(key.into(), cf) +pub fn new_raw_get_request(key: Key, cf: Option) -> kvrpcpb::RawGetRequest { + requests::new_raw_get_request::(key.into(), cf) } -pub fn new_raw_batch_get_request( - keys: impl Iterator, +pub fn new_raw_batch_get_request( + keys: impl Iterator, cf: Option, ) -> kvrpcpb::RawBatchGetRequest { - requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf) + requests::new_raw_batch_get_request::(keys.map(Into::into).collect(), cf) } -pub fn new_raw_put_request( +pub fn new_raw_put_request( key: Key, value: Value, cf: Option, atomic: bool, ) -> kvrpcpb::RawPutRequest { - requests::new_raw_put_request(key.into(), value, cf, atomic) + requests::new_raw_put_request::(key.into(), value, cf, atomic) } -pub fn new_raw_batch_put_request( - pairs: impl Iterator, +pub fn new_raw_batch_put_request( + pairs: impl Iterator, cf: Option, atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { - requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic) + requests::new_raw_batch_put_request::(pairs.map(Into::into).collect(), cf, atomic) } -pub fn new_raw_delete_request( +pub fn new_raw_delete_request( key: Key, cf: Option, atomic: bool, ) -> kvrpcpb::RawDeleteRequest { - requests::new_raw_delete_request(key.into(), cf, atomic) + requests::new_raw_delete_request::(key.into(), cf, atomic) } -pub fn new_raw_batch_delete_request( - keys: impl Iterator, +pub fn new_raw_batch_delete_request( + keys: impl Iterator, cf: Option, ) -> kvrpcpb::RawBatchDeleteRequest { - requests::new_raw_batch_delete_request(keys.map(Into::into).collect(), cf) + requests::new_raw_batch_delete_request::(keys.map(Into::into).collect(), cf) } -pub fn new_raw_delete_range_request( +pub fn new_raw_delete_range_request( range: BoundRange, cf: Option, ) -> kvrpcpb::RawDeleteRangeRequest { let (start_key, end_key) = range.into_keys(); - requests::new_raw_delete_range_request(start_key.into(), end_key.unwrap_or_default().into(), cf) + requests::new_raw_delete_range_request::(start_key.into(), end_key.unwrap_or_default().into(), cf) } -pub fn new_raw_scan_request( +pub fn new_raw_scan_request( range: BoundRange, limit: u32, key_only: bool, cf: Option, -) -> kvrpcpb::RawScanRequest { +) -> kvrpcpb::RawScanRequest { let (start_key, end_key) = range.into_keys(); - requests::new_raw_scan_request( + requests::new_raw_scan_request::( start_key.into(), end_key.unwrap_or_default().into(), limit, @@ -77,31 +80,31 @@ pub fn new_raw_scan_request( ) } -pub fn new_raw_batch_scan_request( - ranges: impl Iterator, +pub fn new_raw_batch_scan_request( + ranges: impl Iterator, each_limit: u32, key_only: bool, cf: Option, ) -> kvrpcpb::RawBatchScanRequest { - requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf) + requests::new_raw_batch_scan_request::(ranges.map(Into::into).collect(), each_limit, key_only, cf) } -pub fn new_cas_request( +pub fn new_cas_request( key: Key, value: Value, previous_value: Option, cf: Option, ) -> kvrpcpb::RawCasRequest { - requests::new_cas_request(key.into(), value, previous_value, cf) + requests::new_cas_request::(key.into(), value, previous_value, cf) } -pub fn new_raw_coprocessor_request( +pub fn new_raw_coprocessor_request( copr_name: String, copr_version_req: String, - ranges: impl Iterator, + ranges: impl Iterator, request_builder: impl Fn(metapb::Region, Vec>) -> Vec + Send + Sync + 'static, ) -> requests::RawCoprocessorRequest { - requests::new_raw_coprocessor_request( + requests::new_raw_coprocessor_request::( copr_name, copr_version_req, ranges.map(Into::into).collect(), diff --git a/src/raw/requests.rs b/src/raw/requests.rs index bd678aaf..5b0fa52e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::{any::Any, ops::Range, sync::Arc}; +use std::borrow::Cow; use async_trait::async_trait; use futures::stream::BoxStream; @@ -21,8 +22,9 @@ use crate::{ util::iter::FlatMapOkIterExt, ColumnFamily, Key, KvPair, Result, Value, }; +use crate::request::request_codec::{RawApiV1, RequestCodec}; -pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest { +pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest where kvrpcpb::RawGetRequest: KvRequest { let mut req = kvrpcpb::RawGetRequest::default(); req.set_key(key); req.maybe_set_cf(cf); @@ -30,8 +32,16 @@ pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::R req } -impl KvRequest for kvrpcpb::RawGetRequest { +impl KvRequest for kvrpcpb::RawGetRequest { type Response = kvrpcpb::RawGetResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::RawGetRequest); @@ -56,10 +66,10 @@ impl Process for DefaultProcessor { } } -pub fn new_raw_batch_get_request( +pub fn new_raw_batch_get_request( keys: Vec>, cf: Option, -) -> kvrpcpb::RawBatchGetRequest { +) -> kvrpcpb::RawBatchGetRequest where kvrpcpb::RawBatchGetRequest: KvRequest{ let mut req = kvrpcpb::RawBatchGetRequest::default(); req.set_keys(keys); req.maybe_set_cf(cf); @@ -67,8 +77,16 @@ pub fn new_raw_batch_get_request( req } -impl KvRequest for kvrpcpb::RawBatchGetRequest { +impl KvRequest for kvrpcpb::RawBatchGetRequest { type Response = kvrpcpb::RawBatchGetResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::RawBatchGetRequest); @@ -84,12 +102,12 @@ impl Merge for Collect { } } -pub fn new_raw_put_request( +pub fn new_raw_put_request( key: Vec, value: Vec, cf: Option, atomic: bool, -) -> kvrpcpb::RawPutRequest { +) -> kvrpcpb::RawPutRequest where kvrpcpb::RawPutRequest: KvRequest { let mut req = kvrpcpb::RawPutRequest::default(); req.set_key(key); req.set_value(value); @@ -99,8 +117,16 @@ pub fn new_raw_put_request( req } -impl KvRequest for kvrpcpb::RawPutRequest { +impl KvRequest for kvrpcpb::RawPutRequest { type Response = kvrpcpb::RawPutResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::RawPutRequest); @@ -111,11 +137,11 @@ impl SingleKey for kvrpcpb::RawPutRequest { } } -pub fn new_raw_batch_put_request( +pub fn new_raw_batch_put_request( pairs: Vec, cf: Option, atomic: bool, -) -> kvrpcpb::RawBatchPutRequest { +) -> kvrpcpb::RawBatchPutRequest where kvrpcpb::RawBatchPutRequest: KvRequest { let mut req = kvrpcpb::RawBatchPutRequest::default(); req.set_pairs(pairs); req.maybe_set_cf(cf); @@ -124,8 +150,16 @@ pub fn new_raw_batch_put_request( req } -impl KvRequest for kvrpcpb::RawBatchPutRequest { +impl KvRequest for kvrpcpb::RawBatchPutRequest { type Response = kvrpcpb::RawBatchPutResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::RawBatchPutRequest { @@ -150,11 +184,11 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } } -pub fn new_raw_delete_request( +pub fn new_raw_delete_request( key: Vec, cf: Option, atomic: bool, -) -> kvrpcpb::RawDeleteRequest { +) -> kvrpcpb::RawDeleteRequest where kvrpcpb::RawDeleteRequest: KvRequest { let mut req = kvrpcpb::RawDeleteRequest::default(); req.set_key(key); req.maybe_set_cf(cf); @@ -163,8 +197,16 @@ pub fn new_raw_delete_request( req } -impl KvRequest for kvrpcpb::RawDeleteRequest { +impl KvRequest for kvrpcpb::RawDeleteRequest { type Response = kvrpcpb::RawDeleteResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::RawDeleteRequest); @@ -175,10 +217,10 @@ impl SingleKey for kvrpcpb::RawDeleteRequest { } } -pub fn new_raw_batch_delete_request( +pub fn new_raw_batch_delete_request( keys: Vec>, cf: Option, -) -> kvrpcpb::RawBatchDeleteRequest { +) -> kvrpcpb::RawBatchDeleteRequest where kvrpcpb::RawBatchDeleteRequest: KvRequest { let mut req = kvrpcpb::RawBatchDeleteRequest::default(); req.set_keys(keys); req.maybe_set_cf(cf); @@ -186,17 +228,25 @@ pub fn new_raw_batch_delete_request( req } -impl KvRequest for kvrpcpb::RawBatchDeleteRequest { +impl KvRequest for kvrpcpb::RawBatchDeleteRequest { type Response = kvrpcpb::RawBatchDeleteResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::RawBatchDeleteRequest); -pub fn new_raw_delete_range_request( +pub fn new_raw_delete_range_request( start_key: Vec, end_key: Vec, cf: Option, -) -> kvrpcpb::RawDeleteRangeRequest { +) -> kvrpcpb::RawDeleteRangeRequest where kvrpcpb::RawDeleteRangeRequest: KvRequest { let mut req = kvrpcpb::RawDeleteRangeRequest::default(); req.set_start_key(start_key); req.set_end_key(end_key); @@ -205,19 +255,27 @@ pub fn new_raw_delete_range_request( req } -impl KvRequest for kvrpcpb::RawDeleteRangeRequest { +impl KvRequest for kvrpcpb::RawDeleteRangeRequest { type Response = kvrpcpb::RawDeleteRangeResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_range!(kvrpcpb::RawDeleteRangeRequest); -pub fn new_raw_scan_request( +pub fn new_raw_scan_request( start_key: Vec, end_key: Vec, limit: u32, key_only: bool, cf: Option, -) -> kvrpcpb::RawScanRequest { +) -> kvrpcpb::RawScanRequest where kvrpcpb::RawScanRequest: KvRequest { let mut req = kvrpcpb::RawScanRequest::default(); req.set_start_key(start_key); req.set_end_key(end_key); @@ -228,8 +286,16 @@ pub fn new_raw_scan_request( req } -impl KvRequest for kvrpcpb::RawScanRequest { +impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_range!(kvrpcpb::RawScanRequest); @@ -245,12 +311,12 @@ impl Merge for Collect { } } -pub fn new_raw_batch_scan_request( +pub fn new_raw_batch_scan_request( ranges: Vec, each_limit: u32, key_only: bool, cf: Option, -) -> kvrpcpb::RawBatchScanRequest { +) -> kvrpcpb::RawBatchScanRequest where kvrpcpb::RawBatchScanRequest: KvRequest { let mut req = kvrpcpb::RawBatchScanRequest::default(); req.set_ranges(ranges); req.set_each_limit(each_limit); @@ -260,8 +326,16 @@ pub fn new_raw_batch_scan_request( req } -impl KvRequest for kvrpcpb::RawBatchScanRequest { +impl KvRequest for kvrpcpb::RawBatchScanRequest { type Response = kvrpcpb::RawBatchScanResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::RawBatchScanRequest { @@ -292,12 +366,12 @@ impl Merge for Collect { } } -pub fn new_cas_request( +pub fn new_cas_request( key: Vec, value: Vec, previous_value: Option>, cf: Option, -) -> kvrpcpb::RawCasRequest { +) -> kvrpcpb::RawCasRequest where kvrpcpb::RawCasRequest: KvRequest { let mut req = kvrpcpb::RawCasRequest::default(); req.set_key(key); req.set_value(value); @@ -309,8 +383,16 @@ pub fn new_cas_request( req } -impl KvRequest for kvrpcpb::RawCasRequest { +impl KvRequest for kvrpcpb::RawCasRequest { type Response = kvrpcpb::RawCasResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::RawCasRequest); @@ -337,12 +419,12 @@ impl Process for DefaultProcessor { type RawCoprocessorRequestDataBuilder = Arc) -> Vec + Send + Sync>; -pub fn new_raw_coprocessor_request( +pub fn new_raw_coprocessor_request( copr_name: String, copr_version_req: String, ranges: Vec, data_builder: RawCoprocessorRequestDataBuilder, -) -> RawCoprocessorRequest { +) -> RawCoprocessorRequest where RawCoprocessorRequest: KvRequest { let mut inner = kvrpcpb::RawCoprocessorRequest::default(); inner.set_copr_name(copr_name); inner.set_copr_version_req(copr_version_req); @@ -378,8 +460,16 @@ impl Request for RawCoprocessorRequest { } } -impl KvRequest for RawCoprocessorRequest { +impl KvRequest for RawCoprocessorRequest { type Response = kvrpcpb::RawCoprocessorResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for RawCoprocessorRequest { diff --git a/src/request/mod.rs b/src/request/mod.rs index 959ff5e7..f18f9eb2 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -1,12 +1,17 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::borrow::Cow; + +use async_trait::async_trait; +use derive_new::new; + +use tikv_client_store::{HasKeyErrors, HasRegionError, Request}; + use crate::{ backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF}, transaction::HasLocks, }; -use async_trait::async_trait; -use derive_new::new; -use tikv_client_store::{HasKeyErrors, Request}; +use crate::pd::PdClient; pub use self::{ plan::{ @@ -22,12 +27,15 @@ pub mod plan; mod plan_builder; #[macro_use] mod shard; +pub mod request_codec; /// Abstracts any request sent to a TiKV server. #[async_trait] -pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { +pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. - type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; + type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + 'static; + fn encode_request(&self, _codec: &C) -> Cow; + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> crate::Result; } #[derive(Clone, Debug, new, Eq, PartialEq)] @@ -63,22 +71,26 @@ impl RetryOptions { #[cfg(test)] mod test { - use super::*; - use crate::{ - mock::{MockKvClient, MockPdClient}, - store::store_stream_for_keys, - transaction::lowering::new_commit_request, - Error, Key, Result, - }; - use grpcio::CallOption; use std::{ any::Any, iter, - sync::{atomic::AtomicUsize, Arc}, + sync::{Arc, atomic::AtomicUsize}, }; + + use grpcio::CallOption; + use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient}; use tikv_client_store::HasRegionError; + use crate::{ + Error, + Key, + mock::{MockKvClient, MockPdClient}, + Result, store::store_stream_for_keys, transaction::lowering::new_commit_request, + }; + + use super::*; + #[tokio::test] async fn test_region_retry() { #[derive(Clone)] diff --git a/src/request/plan.rs b/src/request/plan.rs index ce785262..e0941702 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -5,20 +5,22 @@ use std::{marker::PhantomData, sync::Arc}; use async_recursion::async_recursion; use async_trait::async_trait; use futures::{future::try_join_all, prelude::*}; +use tokio::sync::Semaphore; + use tikv_client_proto::{errorpb, errorpb::EpochNotMatch, kvrpcpb}; use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors, KvClient}; -use tokio::sync::Semaphore; use crate::{ backoff::Backoff, + Error, pd::PdClient, request::{KvRequest, Shardable}, + Result, stats::tikv_stats, store::RegionStore, - transaction::{resolve_locks, HasLocks}, - util::iter::FlatMapOkIterExt, - Error, Result, + transaction::{HasLocks, resolve_locks}, util::iter::FlatMapOkIterExt, }; +use crate::request::request_codec::RequestCodec; /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. @@ -33,27 +35,39 @@ pub trait Plan: Sized + Clone + Sync + Send + 'static { /// The simplest plan which just dispatches a request to a specific kv server. #[derive(Clone)] -pub struct Dispatch { +pub struct Dispatch> { pub request: Req, pub kv_client: Option>, + codec: C, +} + +impl> Dispatch { + pub fn new(request: Req, kv_client: Option>, codec: C) -> Self { + Self { + request, + kv_client, + codec, + } + } } #[async_trait] -impl Plan for Dispatch { +impl> Plan for Dispatch { type Result = Req::Response; async fn execute(&self) -> Result { + let req = self.request.encode_request(&self.codec); let stats = tikv_stats(self.request.label()); let result = self .kv_client .as_ref() .expect("Unreachable: kv_client has not been initialised in Dispatch") - .dispatch(&self.request) + .dispatch(req.as_ref()) .await; let result = stats.done(result); - result.map(|r| { - *r.downcast() - .expect("Downcast failed: request and response type mismatch") + result.and_then(|r| { + req.decode_response(&self.codec, *r.downcast() + .expect("Downcast failed: request and response type mismatch")) }) } } @@ -72,8 +86,8 @@ pub struct RetryableMultiRegion { } impl RetryableMultiRegion -where - P::Result: HasKeyErrors + HasRegionError, + where + P::Result: HasKeyErrors + HasRegionError, { // A plan may involve multiple shards #[async_recursion] @@ -153,7 +167,7 @@ where permits, preserve_region_results, ) - .await + .await } None => Err(Error::RegionError(e)), } @@ -205,7 +219,7 @@ where region_store, e.take_epoch_not_match(), ) - .await + .await } else if e.has_stale_command() || e.has_region_not_found() { pd_client.invalidate_region_cache(ver_id).await; Ok(false) @@ -277,8 +291,8 @@ impl Clone for RetryableMultiRegion { #[async_trait] impl Plan for RetryableMultiRegion -where - P::Result: HasKeyErrors + HasRegionError, + where + P::Result: HasKeyErrors + HasRegionError, { type Result = Vec>; @@ -294,7 +308,7 @@ where concurrency_permits.clone(), self.preserve_region_results, ) - .await + .await } } @@ -313,8 +327,8 @@ pub struct MergeResponse> { } #[async_trait] -impl>>, M: Merge> Plan - for MergeResponse +impl>>, M: Merge> Plan +for MergeResponse { type Result = M::Out; @@ -408,8 +422,8 @@ impl Clone for ResolveLock { #[async_trait] impl Plan for ResolveLock -where - P::Result: HasLocks, + where + P::Result: HasLocks, { type Result = P::Result; @@ -464,8 +478,8 @@ impl Clone for ExtractError

{ #[async_trait] impl Plan for ExtractError

-where - P::Result: HasKeyErrors + HasRegionErrors, + where + P::Result: HasKeyErrors + HasRegionErrors, { type Result = P::Result; @@ -504,8 +518,8 @@ impl Clone for PreserveShard

{ #[async_trait] impl

Plan for PreserveShard

-where - P: Plan + Shardable, + where + P: Plan + Shardable, { type Result = ResponseWithShard; @@ -544,11 +558,14 @@ impl HasRegionError for ResponseWithShard { @@ -25,19 +29,24 @@ pub struct PlanBuilder { /// Used to ensure that a plan has a designated target or targets, a target is /// a particular TiKV server. pub trait PlanBuilderPhase {} + pub struct NoTarget; + impl PlanBuilderPhase for NoTarget {} + pub struct Targetted; + impl PlanBuilderPhase for Targetted {} -impl PlanBuilder, NoTarget> { +impl PlanBuilder, NoTarget> where + C: RequestCodec, + Req: KvRequest, + PdC: PdClient { pub fn new(pd_client: Arc, request: Req) -> Self { + let codec = pd_client.get_request_codec().clone(); PlanBuilder { pd_client, - plan: Dispatch { - request, - kv_client: None, - }, + plan: Dispatch::new(request, None, codec), phantom: PhantomData, } } @@ -54,8 +63,8 @@ impl PlanBuilder { impl PlanBuilder { /// If there is a lock error, then resolve the lock and retry the request. pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder, Ph> - where - P::Result: HasLocks, + where + P::Result: HasLocks, { PlanBuilder { pd_client: self.pd_client.clone(), @@ -71,9 +80,9 @@ impl PlanBuilder { /// Merge the results of a request. Usually used where a request is sent to multiple regions /// to combine the responses from each region. pub fn merge>(self, merge: M) -> PlanBuilder, Ph> - where - In: Clone + Send + Sync + 'static, - P: Plan>>, + where + In: Clone + Send + Sync + 'static, + P: Plan>>, { PlanBuilder { pd_client: self.pd_client.clone(), @@ -90,9 +99,9 @@ impl PlanBuilder { /// to a single region because post-porcessing can be incorporated in the merge step for /// multi-region requests). pub fn post_process_default(self) -> PlanBuilder, Ph> - where - P: Plan, - DefaultProcessor: Process, + where + P: Plan, + DefaultProcessor: Process, { PlanBuilder { pd_client: self.pd_client.clone(), @@ -106,8 +115,8 @@ impl PlanBuilder { } impl PlanBuilder -where - P::Result: HasKeyErrors + HasRegionError, + where + P::Result: HasKeyErrors + HasRegionError, { /// Split the request into shards sending a request to the region of each shard. pub fn retry_multi_region( @@ -144,11 +153,11 @@ where } } -impl PlanBuilder, NoTarget> { +impl + SingleKey> PlanBuilder, NoTarget> { /// Target the request at a single region. *Note*: single region plan will /// cannot automatically retry on region errors. It's only used for requests /// that target at a specific region but not keys (e.g. ResolveLockRequest). - pub async fn single_region(self) -> Result, Targetted>> { + pub async fn single_region(self) -> Result, Targetted>> { let key = self.plan.request.key(); // TODO: retry when region error occurred let store = self.pd_client.clone().store_for_key(key.into()).await?; @@ -156,19 +165,19 @@ impl PlanBuilder, NoTa } } -impl PlanBuilder, NoTarget> { +impl> PlanBuilder, NoTarget> { /// Target the request at a single region; caller supplies the store to target. pub async fn single_region_with_store( self, store: RegionStore, - ) -> Result, Targetted>> { + ) -> Result, Targetted>> { set_single_region_store(self.plan, store, self.pd_client) } } impl PlanBuilder -where - P::Result: HasKeyErrors, + where + P::Result: HasKeyErrors, { pub fn preserve_shard(self) -> PlanBuilder, NoTarget> { PlanBuilder { @@ -183,8 +192,8 @@ where } impl PlanBuilder -where - P::Result: HasKeyErrors + HasRegionErrors, + where + P::Result: HasKeyErrors + HasRegionErrors, { pub fn extract_error(self) -> PlanBuilder, Targetted> { PlanBuilder { @@ -195,11 +204,11 @@ where } } -fn set_single_region_store( - mut plan: Dispatch, +fn set_single_region_store>( + mut plan: Dispatch, store: RegionStore, pd_client: Arc, -) -> Result, Targetted>> { +) -> Result, Targetted>> { plan.request .set_context(store.region_with_leader.context()?); plan.kv_client = Some(store.client); diff --git a/src/request/request_codec.rs b/src/request/request_codec.rs new file mode 100644 index 00000000..a90922fa --- /dev/null +++ b/src/request/request_codec.rs @@ -0,0 +1,22 @@ +use tikv_client_proto::metapb::Region; + +use crate::Result; + +pub trait RequestCodec: Sized + Clone + Sync + Send + 'static { + fn encode_key(&self, key: Vec) -> Vec { key } + fn decode_key(&self, key: Vec) -> Result> { Ok(key) } + fn encode_range(&self, start: Vec, end: Vec) -> (Vec, Vec) { (start, end) } + fn encode_pd_query(&self, key: Vec) -> Vec { key } + fn decode_region(&self, region: Region) -> Result { Ok(region) } +} + +#[derive(Clone)] +pub struct RawApiV1; + +impl RequestCodec for RawApiV1 {} + + +#[derive(Clone)] +pub struct TxnApiV1; + +impl RequestCodec for TxnApiV1 {} \ No newline at end of file diff --git a/src/request/shard.rs b/src/request/shard.rs index cdf3f198..bbe48de2 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -38,7 +38,7 @@ pub trait Shardable { fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>; } -impl Shardable for Dispatch { +impl + Shardable> Shardable for Dispatch { type Shard = Req::Shard; fn shards( diff --git a/src/transaction/client.rs b/src/transaction/client.rs index fee8e251..a65fd2a0 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,18 +1,24 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::{requests::new_scan_lock_request, resolve_locks}; +use std::{mem, sync::Arc}; +use std::marker::PhantomData; + +use slog::{Drain, Logger}; + +use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; + use crate::{ backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF}, config::Config, pd::{PdClient, PdRpcClient}, request::Plan, + Result, timestamp::TimestampExt, transaction::{Snapshot, Transaction, TransactionOptions}, - Result, }; -use slog::{Drain, Logger}; -use std::{mem, sync::Arc}; -use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; +use crate::request::request_codec::RequestCodec; + +use super::{requests::new_scan_lock_request, resolve_locks}; // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; @@ -33,12 +39,16 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { - pd: Arc, +pub struct Client { + pd: Arc>, logger: Logger, + _phantom: PhantomData, } -impl Client { +impl Client + where + C: RequestCodec, +{ /// Create a transactional [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -59,7 +69,7 @@ impl Client { pub async fn new>( pd_endpoints: Vec, logger: Option, - ) -> Result { + ) -> Result> { // debug!(self.logger, "creating transactional client"); Self::new_with_config(pd_endpoints, Config::default(), logger).await } @@ -90,7 +100,7 @@ impl Client { pd_endpoints: Vec, config: Config, optional_logger: Option, - ) -> Result { + ) -> Result> { let logger = optional_logger.unwrap_or_else(|| { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); Logger::root( @@ -104,7 +114,7 @@ impl Client { debug!(logger, "creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?); - Ok(Client { pd, logger }) + Ok(Client { pd, logger, _phantom: PhantomData }) } /// Creates a new optimistic [`Transaction`]. @@ -129,7 +139,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_optimistic(&self) -> Result { + pub async fn begin_optimistic(&self) -> Result>> { debug!(self.logger, "creating new optimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) @@ -154,7 +164,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_pessimistic(&self) -> Result { + pub async fn begin_pessimistic(&self) -> Result>> { debug!(self.logger, "creating new pessimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) @@ -179,14 +189,14 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { + pub async fn begin_with_options(&self, options: TransactionOptions) -> Result>> { debug!(self.logger, "creating new customized transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) } /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). - pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot> { debug!(self.logger, "creating new snapshot"); let logger = self.logger.new(o!("child" => 1)); Snapshot::new(self.new_transaction(timestamp, options.read_only()), logger) @@ -230,7 +240,7 @@ impl Client { let mut locks: Vec = vec![]; let mut start_key = vec![]; loop { - let req = new_scan_lock_request( + let req = new_scan_lock_request::( mem::take(&mut start_key), safepoint.version(), SCAN_LOCK_BATCH_SIZE, @@ -267,7 +277,7 @@ impl Client { Ok(res) } - fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { + fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction> { let logger = self.logger.new(o!("child" => 1)); Transaction::new(timestamp, self.pd.clone(), options, logger) } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 871effc6..01a07db9 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -15,6 +15,7 @@ use std::{ sync::Arc, }; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; +use crate::request::request_codec::RequestCodec; const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; @@ -25,9 +26,9 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get /// its status (committed or rolled back). Then, we use the status of its primary lock to determine /// the status of the other keys in the same transaction. -pub async fn resolve_locks( +pub async fn resolve_locks( locks: Vec, - pd_client: Arc, + pd_client: Arc, ) -> Result { debug!("resolving locks"); let ts = pd_client.clone().get_timestamp().await?; @@ -61,7 +62,7 @@ pub async fn resolve_locks( let commit_version = match commit_versions.get(&lock.lock_version) { Some(&commit_version) => commit_version, None => { - let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); + let request = requests::new_cleanup_request::(lock.primary_lock, lock.lock_version); let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) @@ -89,11 +90,11 @@ pub async fn resolve_locks( Ok(!has_live_locks) } -async fn resolve_lock_with_retry( +async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, commit_version: u64, - pd_client: Arc, + pd_client: Arc, ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff @@ -102,7 +103,7 @@ async fn resolve_lock_with_retry( debug!("resolving locks: attempt {}", (i + 1)); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); - let request = requests::new_resolve_lock_request(start_version, commit_version); + let request = requests::new_resolve_lock_request::(start_version, commit_version); // The only place where single-region is used let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store) diff --git a/src/transaction/lowering.rs b/src/transaction/lowering.rs index 413c6bf8..4477de46 100644 --- a/src/transaction/lowering.rs +++ b/src/transaction/lowering.rs @@ -6,26 +6,27 @@ use crate::{timestamp::TimestampExt, transaction::requests, BoundRange, Key}; use std::iter::Iterator; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; +use crate::request::request_codec::RequestCodec; -pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest { - requests::new_get_request(key.into(), timestamp.version()) +pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest { + requests::new_get_request::(key.into(), timestamp.version()) } -pub fn new_batch_get_request( +pub fn new_batch_get_request( keys: impl Iterator, timestamp: Timestamp, ) -> kvrpcpb::BatchGetRequest { - requests::new_batch_get_request(keys.map(Into::into).collect(), timestamp.version()) + requests::new_batch_get_request::(keys.map(Into::into).collect(), timestamp.version()) } -pub fn new_scan_request( +pub fn new_scan_request( range: BoundRange, timestamp: Timestamp, limit: u32, key_only: bool, ) -> kvrpcpb::ScanRequest { let (start_key, end_key) = range.into_keys(); - requests::new_scan_request( + requests::new_scan_request::( start_key.into(), end_key.unwrap_or_default().into(), timestamp.version(), @@ -34,24 +35,24 @@ pub fn new_scan_request( ) } -pub fn new_resolve_lock_request( +pub fn new_resolve_lock_request( start_version: Timestamp, commit_version: Timestamp, ) -> kvrpcpb::ResolveLockRequest { - requests::new_resolve_lock_request(start_version.version(), commit_version.version()) + requests::new_resolve_lock_request::(start_version.version(), commit_version.version()) } -pub fn new_cleanup_request(key: Key, start_version: Timestamp) -> kvrpcpb::CleanupRequest { - requests::new_cleanup_request(key.into(), start_version.version()) +pub fn new_cleanup_request(key: Key, start_version: Timestamp) -> kvrpcpb::CleanupRequest { + requests::new_cleanup_request::(key.into(), start_version.version()) } -pub fn new_prewrite_request( +pub fn new_prewrite_request( mutations: Vec, primary_lock: Key, start_version: Timestamp, lock_ttl: u64, ) -> kvrpcpb::PrewriteRequest { - requests::new_prewrite_request( + requests::new_prewrite_request::( mutations, primary_lock.into(), start_version.version(), @@ -59,14 +60,14 @@ pub fn new_prewrite_request( ) } -pub fn new_pessimistic_prewrite_request( +pub fn new_pessimistic_prewrite_request( mutations: Vec, primary_lock: Key, start_version: Timestamp, lock_ttl: u64, for_update_ts: Timestamp, ) -> kvrpcpb::PrewriteRequest { - requests::new_pessimistic_prewrite_request( + requests::new_pessimistic_prewrite_request::( mutations, primary_lock.into(), start_version.version(), @@ -75,31 +76,31 @@ pub fn new_pessimistic_prewrite_request( ) } -pub fn new_commit_request( +pub fn new_commit_request( keys: impl Iterator, start_version: Timestamp, commit_version: Timestamp, ) -> kvrpcpb::CommitRequest { - requests::new_commit_request( + requests::new_commit_request::( keys.map(Into::into).collect(), start_version.version(), commit_version.version(), ) } -pub fn new_batch_rollback_request( +pub fn new_batch_rollback_request( keys: impl Iterator, start_version: Timestamp, ) -> kvrpcpb::BatchRollbackRequest { - requests::new_batch_rollback_request(keys.map(Into::into).collect(), start_version.version()) + requests::new_batch_rollback_request::(keys.map(Into::into).collect(), start_version.version()) } -pub fn new_pessimistic_rollback_request( +pub fn new_pessimistic_rollback_request( keys: impl Iterator, start_version: Timestamp, for_update_ts: Timestamp, ) -> kvrpcpb::PessimisticRollbackRequest { - requests::new_pessimistic_rollback_request( + requests::new_pessimistic_rollback_request::( keys.map(Into::into).collect(), start_version.version(), for_update_ts.version(), @@ -132,7 +133,7 @@ impl PessimisticLock for (Key, kvrpcpb::Assertion) { } } -pub fn new_pessimistic_lock_request( +pub fn new_pessimistic_lock_request( locks: impl Iterator, primary_lock: Key, start_version: Timestamp, @@ -140,7 +141,7 @@ pub fn new_pessimistic_lock_request( for_update_ts: Timestamp, need_value: bool, ) -> kvrpcpb::PessimisticLockRequest { - requests::new_pessimistic_lock_request( + requests::new_pessimistic_lock_request::( locks .map(|pl| { let mut mutation = kvrpcpb::Mutation::default(); @@ -158,18 +159,18 @@ pub fn new_pessimistic_lock_request( ) } -pub fn new_scan_lock_request( +pub fn new_scan_lock_request( start_key: Key, safepoint: Timestamp, limit: u32, ) -> kvrpcpb::ScanLockRequest { - requests::new_scan_lock_request(start_key.into(), safepoint.version(), limit) + requests::new_scan_lock_request::(start_key.into(), safepoint.version(), limit) } -pub fn new_heart_beat_request( +pub fn new_heart_beat_request( start_ts: Timestamp, primary_lock: Key, ttl: u64, ) -> kvrpcpb::TxnHeartBeatRequest { - requests::new_heart_beat_request(start_ts.version(), primary_lock.into(), ttl) + requests::new_heart_beat_request::(start_ts.version(), primary_lock.into(), ttl) } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index e1d05df3..3cd5440c 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,26 +1,31 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +use std::{collections::HashMap, iter, sync::Arc}; +use std::borrow::Cow; + +use either::Either; +use futures::stream::BoxStream; + +use tikv_client_common::Error::PessimisticLockError; +use tikv_client_proto::{ + kvrpcpb::{self, TxnHeartBeatResponse}, + pdpb::Timestamp, +}; +use tikv_client_store::HasRegionErrors; use crate::{ collect_first, + Key, + KvPair, pd::PdClient, request::{ Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process, ResponseWithShard, Shardable, SingleKey, }, - store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore}, - timestamp::TimestampExt, - transaction::HasLocks, - util::iter::FlatMapOkIterExt, - Key, KvPair, Result, Value, -}; -use either::Either; -use futures::stream::BoxStream; -use std::{collections::HashMap, iter, sync::Arc}; -use tikv_client_common::Error::PessimisticLockError; -use tikv_client_proto::{ - kvrpcpb::{self, TxnHeartBeatResponse}, - pdpb::Timestamp, + Result, + store::{RegionStore, store_stream_for_keys, store_stream_for_range_by_start_key}, + timestamp::TimestampExt, transaction::HasLocks, util::iter::FlatMapOkIterExt, Value, }; +use crate::request::request_codec::{RequestCodec, TxnApiV1}; // implement HasLocks for a response type that has a `pairs` field, // where locks can be extracted from both the `pairs` and `error` fields @@ -63,15 +68,25 @@ macro_rules! error_locks { }; } -pub fn new_get_request(key: Vec, timestamp: u64) -> kvrpcpb::GetRequest { +pub fn new_get_request(key: Vec, timestamp: u64) -> kvrpcpb::GetRequest + where kvrpcpb::GetRequest: KvRequest +{ let mut req = kvrpcpb::GetRequest::default(); req.set_key(key); req.set_version(timestamp); req } -impl KvRequest for kvrpcpb::GetRequest { +impl KvRequest for kvrpcpb::GetRequest { type Response = kvrpcpb::GetResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::GetRequest); @@ -95,15 +110,25 @@ impl Process for DefaultProcessor { } } -pub fn new_batch_get_request(keys: Vec>, timestamp: u64) -> kvrpcpb::BatchGetRequest { +pub fn new_batch_get_request(keys: Vec>, timestamp: u64) -> kvrpcpb::BatchGetRequest + where kvrpcpb::BatchGetRequest: KvRequest +{ let mut req = kvrpcpb::BatchGetRequest::default(); req.set_keys(keys); req.set_version(timestamp); req } -impl KvRequest for kvrpcpb::BatchGetRequest { +impl KvRequest for kvrpcpb::BatchGetRequest { type Response = kvrpcpb::BatchGetResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::BatchGetRequest); @@ -119,13 +144,15 @@ impl Merge for Collect { } } -pub fn new_scan_request( +pub fn new_scan_request( start_key: Vec, end_key: Vec, timestamp: u64, limit: u32, key_only: bool, -) -> kvrpcpb::ScanRequest { +) -> kvrpcpb::ScanRequest + where kvrpcpb::ScanRequest: KvRequest +{ let mut req = kvrpcpb::ScanRequest::default(); req.set_start_key(start_key); req.set_end_key(end_key); @@ -135,8 +162,16 @@ pub fn new_scan_request( req } -impl KvRequest for kvrpcpb::ScanRequest { +impl KvRequest for kvrpcpb::ScanRequest { type Response = kvrpcpb::ScanResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_range!(kvrpcpb::ScanRequest); @@ -152,10 +187,12 @@ impl Merge for Collect { } } -pub fn new_resolve_lock_request( +pub fn new_resolve_lock_request( start_version: u64, commit_version: u64, -) -> kvrpcpb::ResolveLockRequest { +) -> kvrpcpb::ResolveLockRequest where + kvrpcpb::ResolveLockRequest: KvRequest, +{ let mut req = kvrpcpb::ResolveLockRequest::default(); req.set_start_version(start_version); req.set_commit_version(commit_version); @@ -167,11 +204,33 @@ pub fn new_resolve_lock_request( // region without keys. So it's not Shardable. And we don't automatically retry // on its region errors (in the Plan level). The region error must be manually // handled (in the upper level). -impl KvRequest for kvrpcpb::ResolveLockRequest { +impl KvRequest for kvrpcpb::ResolveLockRequest { type Response = kvrpcpb::ResolveLockResponse; + + fn encode_request(&self, _codec: &TxnApiV1) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &TxnApiV1, _resp: Self::Response) -> Result { + todo!() + } +} + +impl KvRequest for kvrpcpb::ResolveLockRequest { + default type Response = kvrpcpb::ResolveLockResponse; + + default fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + default fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } -pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest { +pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest + where kvrpcpb::CleanupRequest: KvRequest +{ let mut req = kvrpcpb::CleanupRequest::default(); req.set_key(key); req.set_start_version(start_version); @@ -179,8 +238,16 @@ pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::Cleanup req } -impl KvRequest for kvrpcpb::CleanupRequest { +impl KvRequest for kvrpcpb::CleanupRequest { type Response = kvrpcpb::CleanupResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_key!(kvrpcpb::CleanupRequest); @@ -199,12 +266,12 @@ impl Process for DefaultProcessor { } } -pub fn new_prewrite_request( +pub fn new_prewrite_request( mutations: Vec, primary_lock: Vec, start_version: u64, lock_ttl: u64, -) -> kvrpcpb::PrewriteRequest { +) -> kvrpcpb::PrewriteRequest where kvrpcpb::PrewriteRequest: KvRequest { let mut req = kvrpcpb::PrewriteRequest::default(); req.set_mutations(mutations); req.set_primary_lock(primary_lock); @@ -216,13 +283,13 @@ pub fn new_prewrite_request( req } -pub fn new_pessimistic_prewrite_request( +pub fn new_pessimistic_prewrite_request( mutations: Vec, primary_lock: Vec, start_version: u64, lock_ttl: u64, for_update_ts: u64, -) -> kvrpcpb::PrewriteRequest { +) -> kvrpcpb::PrewriteRequest where kvrpcpb::PrewriteRequest: KvRequest { let len = mutations.len(); let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl); req.set_for_update_ts(for_update_ts); @@ -230,8 +297,16 @@ pub fn new_pessimistic_prewrite_request( req } -impl KvRequest for kvrpcpb::PrewriteRequest { +impl KvRequest for kvrpcpb::PrewriteRequest { type Response = kvrpcpb::PrewriteResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::PrewriteRequest { @@ -264,29 +339,37 @@ impl Shardable for kvrpcpb::PrewriteRequest { } } -pub fn new_commit_request( +pub fn new_commit_request( keys: Vec>, start_version: u64, commit_version: u64, -) -> kvrpcpb::CommitRequest { - let mut req = kvrpcpb::CommitRequest::default(); - req.set_keys(keys); - req.set_start_version(start_version); - req.set_commit_version(commit_version); +) -> kvrpcpb::CommitRequest where kvrpcpb::CommitRequest: KvRequest { +let mut req = kvrpcpb::CommitRequest::default(); +req.set_keys(keys); +req.set_start_version(start_version); +req.set_commit_version(commit_version); - req +req } -impl KvRequest for kvrpcpb::CommitRequest { +impl KvRequest for kvrpcpb::CommitRequest { type Response = kvrpcpb::CommitResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::CommitRequest); -pub fn new_batch_rollback_request( +pub fn new_batch_rollback_request( keys: Vec>, start_version: u64, -) -> kvrpcpb::BatchRollbackRequest { +) -> kvrpcpb::BatchRollbackRequest where kvrpcpb::BatchRollbackRequest: KvRequest { let mut req = kvrpcpb::BatchRollbackRequest::default(); req.set_keys(keys); req.set_start_version(start_version); @@ -294,17 +377,25 @@ pub fn new_batch_rollback_request( req } -impl KvRequest for kvrpcpb::BatchRollbackRequest { +impl KvRequest for kvrpcpb::BatchRollbackRequest { type Response = kvrpcpb::BatchRollbackResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::BatchRollbackRequest); -pub fn new_pessimistic_rollback_request( +pub fn new_pessimistic_rollback_request( keys: Vec>, start_version: u64, for_update_ts: u64, -) -> kvrpcpb::PessimisticRollbackRequest { +) -> kvrpcpb::PessimisticRollbackRequest where kvrpcpb::PessimisticRollbackRequest: KvRequest { let mut req = kvrpcpb::PessimisticRollbackRequest::default(); req.set_keys(keys); req.set_start_version(start_version); @@ -313,20 +404,28 @@ pub fn new_pessimistic_rollback_request( req } -impl KvRequest for kvrpcpb::PessimisticRollbackRequest { +impl KvRequest for kvrpcpb::PessimisticRollbackRequest { type Response = kvrpcpb::PessimisticRollbackResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::PessimisticRollbackRequest); -pub fn new_pessimistic_lock_request( +pub fn new_pessimistic_lock_request( mutations: Vec, primary_lock: Vec, start_version: u64, lock_ttl: u64, for_update_ts: u64, need_value: bool, -) -> kvrpcpb::PessimisticLockRequest { +) -> kvrpcpb::PessimisticLockRequest where kvrpcpb::PessimisticLockRequest: KvRequest { let mut req = kvrpcpb::PessimisticLockRequest::default(); req.set_mutations(mutations); req.set_primary_lock(primary_lock); @@ -344,8 +443,16 @@ pub fn new_pessimistic_lock_request( req } -impl KvRequest for kvrpcpb::PessimisticLockRequest { +impl KvRequest for kvrpcpb::PessimisticLockRequest { type Response = kvrpcpb::PessimisticLockResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::PessimisticLockRequest { @@ -370,7 +477,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { // PessimisticLockResponse returns values that preserves the order with keys in request, thus the // kvpair result should be produced by zipping the keys in request and the values in respponse. impl Merge>> - for CollectWithShard +for CollectWithShard { type Out = Vec; @@ -430,11 +537,11 @@ impl Merge( start_key: Vec, safepoint: u64, limit: u32, -) -> kvrpcpb::ScanLockRequest { +) -> kvrpcpb::ScanLockRequest where kvrpcpb::ScanLockRequest: KvRequest { let mut req = kvrpcpb::ScanLockRequest::default(); req.set_start_key(start_key); req.set_max_version(safepoint); @@ -442,8 +549,16 @@ pub fn new_scan_lock_request( req } -impl KvRequest for kvrpcpb::ScanLockRequest { +impl KvRequest for kvrpcpb::ScanLockRequest { type Response = kvrpcpb::ScanLockResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::ScanLockRequest { @@ -474,11 +589,11 @@ impl Merge for Collect { } } -pub fn new_heart_beat_request( +pub fn new_heart_beat_request( start_ts: u64, primary_lock: Vec, ttl: u64, -) -> kvrpcpb::TxnHeartBeatRequest { +) -> kvrpcpb::TxnHeartBeatRequest where kvrpcpb::TxnHeartBeatRequest: KvRequest { let mut req = kvrpcpb::TxnHeartBeatRequest::default(); req.set_start_version(start_ts); req.set_primary_lock(primary_lock); @@ -486,8 +601,16 @@ pub fn new_heart_beat_request( req } -impl KvRequest for kvrpcpb::TxnHeartBeatRequest { +impl KvRequest for kvrpcpb::TxnHeartBeatRequest { type Response = kvrpcpb::TxnHeartBeatResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::TxnHeartBeatRequest { @@ -524,8 +647,16 @@ impl Process for DefaultProcessor { } } -impl KvRequest for kvrpcpb::CheckTxnStatusRequest { +impl KvRequest for kvrpcpb::CheckTxnStatusRequest { type Response = kvrpcpb::CheckTxnStatusResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } impl Shardable for kvrpcpb::CheckTxnStatusRequest { @@ -593,8 +724,16 @@ impl From<(u64, u64, Option)> for TransactionStatusKind { } } -impl KvRequest for kvrpcpb::CheckSecondaryLocksRequest { +impl KvRequest for kvrpcpb::CheckSecondaryLocksRequest { type Response = kvrpcpb::CheckSecondaryLocksResponse; + + fn encode_request(&self, _codec: &C) -> Cow { + todo!() + } + + fn decode_response(&self, _codec: &C, _resp: Self::Response) -> Result { + todo!() + } } shardable_keys!(kvrpcpb::CheckSecondaryLocksRequest); @@ -644,6 +783,7 @@ error_locks!(kvrpcpb::CheckTxnStatusResponse); error_locks!(kvrpcpb::CheckSecondaryLocksResponse); impl HasLocks for kvrpcpb::CleanupResponse {} + impl HasLocks for kvrpcpb::ScanLockResponse {} impl HasLocks for kvrpcpb::PessimisticRollbackResponse { @@ -675,12 +815,13 @@ impl HasLocks for kvrpcpb::PrewriteResponse { #[cfg(test)] mod tests { + use tikv_client_common::Error::{PessimisticLockError, ResolveLockError}; + use tikv_client_proto::kvrpcpb; + use crate::{ - request::{plan::Merge, CollectWithShard, ResponseWithShard}, KvPair, + request::{CollectWithShard, plan::Merge, ResponseWithShard}, }; - use tikv_client_common::Error::{PessimisticLockError, ResolveLockError}; - use tikv_client_proto::kvrpcpb; #[tokio::test] async fn test_merge_pessimistic_lock_response() { diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index d08ad6da..08ba2981 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -5,6 +5,7 @@ use derive_new::new; use futures::stream::BoxStream; use slog::Logger; use std::ops::RangeBounds; +use crate::pd::PdClient; /// A read-only transaction which reads at the given timestamp. /// @@ -14,12 +15,12 @@ use std::ops::RangeBounds; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot { - transaction: Transaction, +pub struct Snapshot { + transaction: Transaction, logger: Logger, } -impl Snapshot { +impl Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { debug!(self.logger, "invoking get request on snapshot"); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f067f668..b3e4a9f0 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,23 +1,27 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{ - backoff::{Backoff, DEFAULT_REGION_BACKOFF}, - pd::{PdClient, PdRpcClient}, - request::{ - Collect, CollectError, CollectSingle, CollectWithShard, Plan, PlanBuilder, RetryOptions, - }, - timestamp::TimestampExt, - transaction::{buffer::Buffer, lowering::*}, - BoundRange, Error, Key, KvPair, Result, Value, -}; +use std::{iter, ops::RangeBounds, sync::Arc, time::Instant}; + use derive_new::new; use fail::fail_point; use futures::{prelude::*, stream::BoxStream}; use slog::Logger; -use std::{iter, ops::RangeBounds, sync::Arc, time::Instant}; -use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; use tokio::{sync::RwLock, time::Duration}; +use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; + +use crate::{ + backoff::{Backoff, DEFAULT_REGION_BACKOFF}, + BoundRange, + Error, + Key, + KvPair, + pd::{PdClient, PdRpcClient}, request::{ + Collect, CollectError, CollectSingle, CollectWithShard, Plan, PlanBuilder, RetryOptions, + }, Result, timestamp::TimestampExt, transaction::{buffer::Buffer, lowering::*}, Value, +}; +use crate::request::KvRequest; + /// An undo-able set of actions on the dataset. /// /// Create a transaction using a [`TransactionClient`](crate::TransactionClient), then run actions @@ -58,7 +62,7 @@ use tokio::{sync::RwLock, time::Duration}; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction { +pub struct Transaction { status: Arc>, timestamp: Timestamp, buffer: Buffer, @@ -121,7 +125,7 @@ impl Transaction { self.buffer .get_or_else(key, |key| async move { - let request = new_get_request(key, timestamp); + let request = new_get_request::(key, timestamp); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) @@ -242,8 +246,8 @@ impl Transaction { /// ``` pub async fn batch_get( &mut self, - keys: impl IntoIterator>, - ) -> Result> { + keys: impl IntoIterator>, + ) -> Result> { debug!(self.logger, "invoking transactional batch_get request"); self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); @@ -252,7 +256,7 @@ impl Transaction { self.buffer .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { - let request = new_batch_get_request(keys, timestamp); + let request = new_batch_get_request::(keys, timestamp); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) @@ -294,7 +298,7 @@ impl Transaction { /// ``` pub async fn batch_get_for_update( &mut self, - keys: impl IntoIterator>, + keys: impl IntoIterator>, ) -> Result> { debug!( self.logger, @@ -342,7 +346,7 @@ impl Transaction { &mut self, range: impl Into, limit: u32, - ) -> Result> { + ) -> Result> { debug!(self.logger, "invoking transactional scan request"); self.scan_inner(range, limit, false).await } @@ -378,7 +382,7 @@ impl Transaction { &mut self, range: impl Into, limit: u32, - ) -> Result> { + ) -> Result> { debug!(self.logger, "invoking transactional scan_keys request"); Ok(self .scan_inner(range, limit, true) @@ -453,7 +457,7 @@ impl Transaction { iter::once((key.clone(), kvrpcpb::Assertion::NotExist)), false, ) - .await?; + .await?; } self.buffer.insert(key, value.into()); Ok(()) @@ -513,7 +517,7 @@ impl Transaction { /// ``` pub async fn lock_keys( &mut self, - keys: impl IntoIterator>, + keys: impl IntoIterator>, ) -> Result<()> { debug!(self.logger, "invoking transactional lock_keys request"); self.check_allow_operation().await?; @@ -578,8 +582,8 @@ impl Transaction { self.start_instant, self.logger.new(o!("child" => 1)), ) - .commit() - .await; + .commit() + .await; if res.is_ok() { let mut status = self.status.write().await; @@ -635,8 +639,8 @@ impl Transaction { self.start_instant, self.logger.new(o!("child" => 1)), ) - .rollback() - .await; + .rollback() + .await; if res.is_ok() { let mut status = self.status.write().await; @@ -661,7 +665,7 @@ impl Transaction { Some(k) => k, None => return Err(Error::NoPrimaryKey), }; - let request = new_heart_beat_request( + let request = new_heart_beat_request::( self.timestamp.clone(), primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, @@ -680,7 +684,7 @@ impl Transaction { range: impl Into, limit: u32, key_only: bool, - ) -> Result> { + ) -> Result> { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); @@ -692,7 +696,7 @@ impl Transaction { limit, !key_only, move |new_range, new_limit| async move { - let request = new_scan_request(new_range, timestamp, new_limit, key_only); + let request = new_scan_request::(new_range, timestamp, new_limit, key_only); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) @@ -717,7 +721,7 @@ impl Transaction { /// Only valid for pessimistic transactions, panics if called on an optimistic transaction. async fn pessimistic_lock( &mut self, - keys: impl IntoIterator, + keys: impl IntoIterator, need_value: bool, ) -> Result> { debug!(self.logger, "acquiring pessimistic lock"); @@ -740,7 +744,7 @@ impl Transaction { .unwrap_or_else(|| first_key.clone()); let for_update_ts = self.rpc.clone().get_timestamp().await?; self.options.push_for_update_ts(for_update_ts.clone()); - let request = new_pessimistic_lock_request( + let request = new_pessimistic_lock_request::( keys.clone().into_iter(), primary_lock, self.timestamp.clone(), @@ -786,7 +790,7 @@ impl Transaction { /// Rollback pessimistic lock async fn pessimistic_lock_rollback( &mut self, - keys: impl Iterator, + keys: impl Iterator, start_version: Timestamp, for_update_ts: Timestamp, ) -> Result<()> { @@ -797,7 +801,7 @@ impl Transaction { return Ok(()); } - let req = new_pessimistic_rollback_request( + let req = new_pessimistic_rollback_request::( keys.clone().into_iter(), start_version, for_update_ts, @@ -867,7 +871,7 @@ impl Transaction { break; } } - let request = new_heart_beat_request( + let request = new_heart_beat_request::( start_ts.clone(), primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, @@ -1099,7 +1103,7 @@ impl HeartbeatOption { /// The committer implements `prewrite`, `commit` and `rollback` functions. #[allow(clippy::too_many_arguments)] #[derive(new)] -struct Committer { +struct Committer { primary_key: Option, mutations: Vec, start_version: Timestamp, @@ -1154,13 +1158,13 @@ impl Committer { let elapsed = self.start_instant.elapsed().as_millis() as u64; let lock_ttl = self.calc_txn_lock_ttl(); let mut request = match &self.options.kind { - TransactionKind::Optimistic => new_prewrite_request( + TransactionKind::Optimistic => new_prewrite_request::( self.mutations.clone(), primary_lock, self.start_version.clone(), lock_ttl + elapsed, ), - TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request( + TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request::( self.mutations.clone(), primary_lock, self.start_version.clone(), @@ -1214,7 +1218,7 @@ impl Committer { debug!(self.logger, "committing primary"); let primary_key = self.primary_key.clone().into_iter(); let commit_version = self.rpc.clone().get_timestamp().await?; - let req = new_commit_request( + let req = new_commit_request::( primary_key, self.start_version.clone(), commit_version.clone(), @@ -1246,7 +1250,7 @@ impl Committer { let req = if self.options.async_commit { let keys = mutations.map(|m| m.key.into()); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request::(keys, self.start_version, commit_version) } else if primary_only { return Ok(()); } else { @@ -1254,7 +1258,7 @@ impl Committer { let keys = mutations .map(|m| m.key.into()) .filter(|key| &primary_key != key); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request::(keys, self.start_version, commit_version) }; let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) @@ -1276,7 +1280,7 @@ impl Committer { .map(|mutation| mutation.key.into()); match self.options.kind { TransactionKind::Optimistic => { - let req = new_batch_rollback_request(keys, self.start_version); + let req = new_batch_rollback_request::(keys, self.start_version); let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) @@ -1285,7 +1289,7 @@ impl Committer { plan.execute().await?; } TransactionKind::Pessimistic(for_update_ts) => { - let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); + let req = new_pessimistic_rollback_request::(keys, self.start_version, for_update_ts); let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) @@ -1328,24 +1332,27 @@ enum TransactionStatus { #[cfg(test)] mod tests { - use crate::{ - mock::{MockKvClient, MockPdClient}, - transaction::HeartbeatOption, - Transaction, TransactionOptions, - }; - use fail::FailScenario; - use slog::{Drain, Logger}; use std::{ any::Any, io, sync::{ - atomic::{AtomicUsize, Ordering}, Arc, + atomic::{AtomicUsize, Ordering}, }, time::Duration, }; + + use fail::FailScenario; + use slog::{Drain, Logger}; + use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; + use crate::{ + mock::{MockKvClient, MockPdClient}, + Transaction, + transaction::HeartbeatOption, TransactionOptions, + }; + #[tokio::test] async fn test_optimistic_heartbeat() -> Result<(), io::Error> { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());