Skip to content

Commit

Permalink
*: Support API v2 (part 1) (tikv#415)
Browse files Browse the repository at this point in the history
* API v2 part1

Signed-off-by: Ping Yu <[email protected]>

* inplace encoding

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* export proto

Signed-off-by: Ping Yu <[email protected]>

* fix set_context

Signed-off-by: Ping Yu <[email protected]>

* add Codec parameter to Transaction & Snapshot

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Aug 30, 2023
1 parent abf22ba commit 4b0e844
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 90 deletions.
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@

pub mod backoff;
#[doc(hidden)]
pub mod proto; // export `proto` to enable user customized codec
#[doc(hidden)]
pub mod raw;
pub mod request;
#[doc(hidden)]
Expand All @@ -104,7 +106,6 @@ mod compat;
mod config;
mod kv;
mod pd;
mod proto;
mod region;
mod region_cache;
mod stats;
Expand Down Expand Up @@ -145,6 +146,8 @@ pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)]
pub use crate::request::codec;
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
Expand Down
21 changes: 19 additions & 2 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
Expand All @@ -30,7 +31,7 @@ use crate::Timestamp;

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
Expand All @@ -43,6 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
))
},
false,
Some(ApiV1TxnCodec::default()),
)
.await
.unwrap()
Expand Down Expand Up @@ -71,9 +73,18 @@ pub struct MockKvConnect;

pub struct MockCluster;

#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
codec: ApiV1TxnCodec,
}

impl MockPdClient {
pub fn new(client: MockKvClient) -> MockPdClient {
MockPdClient {
client,
codec: ApiV1TxnCodec::default(),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -102,6 +113,7 @@ impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
codec: ApiV1TxnCodec::default(),
}
}

Expand Down Expand Up @@ -165,6 +177,7 @@ impl MockPdClient {

#[async_trait]
impl PdClient for MockPdClient {
type Codec = ApiV1TxnCodec;
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand Down Expand Up @@ -210,4 +223,8 @@ impl PdClient for MockPdClient {
}

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

fn get_codec(&self) -> &Self::Codec {
&self.codec
}
}
63 changes: 46 additions & 17 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::request::codec::{ApiV1TxnCodec, Codec};
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
Expand Down Expand Up @@ -50,6 +51,7 @@ use crate::Timestamp;
/// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type Codec: Codec;
type KvClient: KvClient + Send + Sync + 'static;

/// In transactional API, `region` is decoded (keys in raw format).
Expand Down Expand Up @@ -189,8 +191,11 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}

fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
if enable_codec {
fn decode_region(
mut region: RegionWithLeader,
enable_mvcc_codec: bool,
) -> Result<RegionWithLeader> {
if enable_mvcc_codec {
codec::decode_bytes_in_place(&mut region.region.start_key, false)?;
codec::decode_bytes_in_place(&mut region.region.end_key, false)?;
}
Expand All @@ -200,20 +205,30 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

/// Get the codec carried by `PdClient`.
/// The purpose of carrying the codec is to avoid passing it on so many calling paths.
fn get_codec(&self) -> &Self::Codec;
}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pub struct PdRpcClient<
Cod: Codec = ApiV1TxnCodec,
KvC: KvConnect + Send + Sync + 'static = TikvConnect,
Cl = Cluster,
> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
enable_mvcc_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
codec: Option<Cod>,
}

#[async_trait]
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<Cod, KvC> {
type Codec = Cod;
type KvClient = KvC::KvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand All @@ -224,20 +239,20 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
}

async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let enable_codec = self.enable_codec;
let key = if enable_codec {
let enable_mvcc_codec = self.enable_mvcc_codec;
let key = if enable_mvcc_codec {
key.to_encoded()
} else {
key.clone()
};

let region = self.region_cache.get_region_by_key(&key).await?;
Self::decode_region(region, enable_codec)
Self::decode_region(region, enable_mvcc_codec)
}

async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
let region = self.region_cache.get_region_by_id(id).await?;
Self::decode_region(region, self.enable_codec)
Self::decode_region(region, self.enable_mvcc_codec)
}

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
Expand All @@ -255,31 +270,40 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_codec(&self) -> &Self::Codec {
self.codec
.as_ref()
.unwrap_or_else(|| panic!("codec not set"))
}
}

impl PdRpcClient<TikvConnect, Cluster> {
impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
) -> Result<PdRpcClient> {
enable_mvcc_codec: bool, // TODO: infer from `codec`.
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod>> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_codec,
enable_mvcc_codec,
codec,
)
.await
}
}

impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
) -> Result<PdRpcClient<KvC, Cl>>
enable_mvcc_codec: bool,
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod, KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<SecurityManager>) -> KvC,
Expand All @@ -301,8 +325,9 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(security_mgr),
enable_codec,
enable_mvcc_codec,
region_cache: RegionCache::new(pd),
codec,
})
}

Expand All @@ -322,6 +347,10 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
Err(e) => Err(e),
}
}

pub fn set_codec(&mut self, codec: Cod) {
self.codec = Some(codec);
}
}

fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
Expand Down
Loading

0 comments on commit 4b0e844

Please sign in to comment.