Skip to content

Commit

Permalink
fix type checking for new interface
Browse files Browse the repository at this point in the history
Signed-off-by: iosmanthus <[email protected]>
  • Loading branch information
iosmanthus committed Jun 21, 2022
1 parent 503047f commit 1f314ee
Show file tree
Hide file tree
Showing 16 changed files with 687 additions and 347 deletions.
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
//! # })}
//! ```

#![feature(specialization)]
#![feature(explicit_generic_args_with_impl_trait)]
#[macro_use]
pub mod request;
#[macro_use]
Expand Down
96 changes: 57 additions & 39 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
compat::stream_fn,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
store::RegionStore,
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
};
use std::{collections::HashMap, sync::Arc, thread};
use std::marker::PhantomData;

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use grpcio::{EnvBuilder, Environment};
use slog::Logger;
use std::{collections::HashMap, sync::Arc, thread};
use tokio::sync::RwLock;

use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
use tokio::sync::RwLock;

use crate::{
BoundRange,
compat::stream_fn,
Config,
Key,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader}, region_cache::RegionCache, Result, SecurityManager, store::RegionStore, Timestamp,
};
use crate::request::request_codec::RequestCodec;

const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
Expand All @@ -42,6 +47,7 @@ const CLIENT_PREFIX: &str = "tikv-client";
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;
type RequestCodec: RequestCodec;

/// In transactional API, `region` is decoded (keys in raw format).
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore>;
Expand Down Expand Up @@ -69,11 +75,11 @@ pub trait PdClient: Send + Sync + 'static {

fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
keys: impl Iterator<Item=K> + Send + Sync + 'static,
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
{
let keys = keys.peekable();
stream_fn(keys, move |mut keys| {
Expand All @@ -95,7 +101,7 @@ pub trait PdClient: Send + Sync + 'static {
}
}
})
.boxed()
.boxed()
}

/// Returns a Stream which iterates over the contexts for each region covered by range.
Expand Down Expand Up @@ -126,7 +132,7 @@ pub trait PdClient: Send + Sync + 'static {
Ok(Some((Some(region_end), store)))
}
})
.boxed()
.boxed()
}

/// Returns a Stream which iterates over the contexts for ranges in the same region.
Expand Down Expand Up @@ -190,7 +196,7 @@ pub trait PdClient: Send + Sync + 'static {
}
}
})
.boxed()
.boxed()
}

fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
Expand All @@ -204,22 +210,27 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

fn get_request_codec(&self) -> Self::RequestCodec;
}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
// TODO: change to a real codec.
_phantom: PhantomData<C>,
}

#[async_trait]
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<C, KvC> {
type KvClient = KvC::KvClient;
type RequestCodec = C;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
let store_id = region.get_store_id()?;
Expand Down Expand Up @@ -260,15 +271,19 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_request_codec(&self) -> Self::RequestCodec {
todo!()
}
}

impl PdRpcClient<TikvConnect, Cluster> {
impl<C> PdRpcClient<C, TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient> {
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
PdRpcClient::new(
config.clone(),
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
Expand All @@ -278,7 +293,7 @@ impl PdRpcClient<TikvConnect, Cluster> {
enable_codec,
logger,
)
.await
.await
}
}

Expand All @@ -291,18 +306,18 @@ fn thread_name(prefix: &str) -> String {
.unwrap_or_else(|| prefix.to_owned())
}

impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
) -> Result<PdRpcClient<C, KvC, Cl>>
where
PdFut: Future<Output=Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
{
let env = Arc::new(
EnvBuilder::new()
Expand All @@ -312,7 +327,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
);
let security_mgr = Arc::new(
if let (Some(ca_path), Some(cert_path), Some(key_path)) =
(&config.ca_path, &config.cert_path, &config.key_path)
(&config.ca_path, &config.cert_path, &config.key_path)
{
SecurityManager::load(ca_path, cert_path, key_path)?
} else {
Expand All @@ -329,6 +344,8 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
enable_codec,
region_cache: RegionCache::new(pd),
logger,
// TODO
_phantom: PhantomData,
})
}

Expand All @@ -352,10 +369,11 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {

#[cfg(test)]
pub mod test {
use super::*;
use futures::{executor, executor::block_on};

use crate::mock::*;

use futures::{executor, executor::block_on};
use super::*;

#[tokio::test]
async fn test_kv_client_caching() {
Expand Down Expand Up @@ -396,7 +414,7 @@ pub mod test {
vec![1].into(),
vec![2].into(),
vec![3].into(),
vec![5, 2].into()
vec![5, 2].into(),
]
);
assert_eq!(
Expand Down Expand Up @@ -458,36 +476,36 @@ pub mod test {
vec![
kvrpcpb::KeyRange {
start_key: k1.clone(),
end_key: k2.clone()
end_key: k2.clone(),
},
kvrpcpb::KeyRange {
start_key: k1,
end_key: k_split.clone()
}
end_key: k_split.clone(),
},
]
);
assert_eq!(ranges2.0, 2);
assert_eq!(
ranges2.1,
vec![kvrpcpb::KeyRange {
start_key: k_split.clone(),
end_key: k3
end_key: k3,
}]
);
assert_eq!(ranges3.0, 1);
assert_eq!(
ranges3.1,
vec![kvrpcpb::KeyRange {
start_key: k2,
end_key: k_split.clone()
end_key: k_split.clone(),
}]
);
assert_eq!(ranges4.0, 2);
assert_eq!(
ranges4.1,
vec![kvrpcpb::KeyRange {
start_key: k_split,
end_key: k4
end_key: k4,
}]
);
assert!(stream.next().is_none());
Expand Down
Loading

0 comments on commit 1f314ee

Please sign in to comment.