Skip to content

Commit

Permalink
fix TxnApiV1 test
Browse files Browse the repository at this point in the history
Signed-off-by: iosmanthus <[email protected]>
  • Loading branch information
iosmanthus committed Jun 22, 2022
1 parent 222908e commit 605aed8
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 139 deletions.
20 changes: 13 additions & 7 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
mod common;

use crate::common::parse_args;
use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value};
use tikv_client::{
request::request_codec::{RequestCodec, TxnApiV1},
BoundRange, Config, Key, KvPair, TransactionClient as Client, Value,
};

async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
async fn puts<C: RequestCodec>(
client: &Client<C>,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) {
let mut txn = client
.begin_optimistic()
.await
Expand All @@ -17,7 +23,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
txn.commit().await.expect("Could not commit transaction");
}

async fn get(client: &Client, key: Key) -> Option<Value> {
async fn get<C: RequestCodec>(client: &Client<C>, key: Key) -> Option<Value> {
let mut txn = client
.begin_optimistic()
.await
Expand All @@ -29,7 +35,7 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
res
}

async fn key_exists(client: &Client, key: Key) -> bool {
async fn key_exists<C: RequestCodec>(client: &Client<C>, key: Key) -> bool {
let mut txn = client
.begin_optimistic()
.await
Expand All @@ -44,7 +50,7 @@ async fn key_exists(client: &Client, key: Key) -> bool {
res
}

async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
async fn scan<C: RequestCodec>(client: &Client<C>, range: impl Into<BoundRange>, limit: u32) {
let mut txn = client
.begin_optimistic()
.await
Expand All @@ -56,7 +62,7 @@ async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
txn.commit().await.expect("Could not commit transaction");
}

async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
async fn dels<C: RequestCodec>(client: &Client<C>, keys: impl IntoIterator<Item = Key>) {
let mut txn = client
.begin_optimistic()
.await
Expand All @@ -81,7 +87,7 @@ async fn main() {
Config::default()
};

let txn = Client::new_with_config(args.pd, config, None)
let txn = Client::new_with_config(args.pd, config, TxnApiV1, None)
.await
.expect("Could not connect to tikv");

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
//! # })}
//! ```

#![feature(specialization)]
#![feature(min_specialization)]
#![feature(explicit_generic_args_with_impl_trait)]
#[macro_use]
pub mod request;
Expand Down
32 changes: 6 additions & 26 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::{collections::HashMap, marker::PhantomData, sync::Arc, thread};
use std::{collections::HashMap, sync::Arc, thread};

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
Expand All @@ -14,7 +14,6 @@ use tikv_client_store::{KvClient, KvConnect, TikvConnect};

use crate::{
compat::stream_fn,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
Expand Down Expand Up @@ -198,14 +197,6 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}

fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
if enable_codec {
codec::decode_bytes_in_place(region.region.mut_start_key(), false)?;
codec::decode_bytes_in_place(region.region.mut_end_key(), false)?;
}
Ok(region)
}

async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);
Expand All @@ -219,9 +210,8 @@ pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect,
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
region_cache: RegionCache<RetryClient<Cl>>,
region_cache: RegionCache<C, RetryClient<Cl>>,
logger: Logger,
codec: C,
}

#[async_trait]
Expand All @@ -237,20 +227,11 @@ impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpc
}

async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded()
} else {
key.clone()
};

let region = self.region_cache.get_region_by_key(&key).await?;
Self::decode_region(region, enable_codec)
self.region_cache.get_region_by_key(&key).await
}

async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
let region = self.region_cache.get_region_by_id(id).await?;
Self::decode_region(region, self.enable_codec)
self.region_cache.get_region_by_id(id).await
}

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
Expand All @@ -270,7 +251,7 @@ impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpc
}

fn get_request_codec(&self) -> Self::RequestCodec {
todo!()
self.region_cache.get_request_codec()
}
}

Expand Down Expand Up @@ -338,9 +319,8 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
region_cache: RegionCache::new(pd),
region_cache: RegionCache::new(codec, pd),
logger,
codec
})
}

Expand Down
22 changes: 11 additions & 11 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use tikv_client_common::Error;
use tikv_client_proto::metapb;

use crate::{
Backoff,
backoff::DEFAULT_REGION_BACKOFF,
BoundRange,
ColumnFamily,
config::Config,
Key, KvPair, pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan, request_codec::RequestCodec}, Result, Value,
pd::{PdClient, PdRpcClient},
raw::lowering::*,
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};

const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item=impl Into<Key>>,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
let request =
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item=impl Into<KvPair>>,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
let request = new_raw_batch_put_request::<C>(
Expand Down Expand Up @@ -339,7 +339,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item=impl Into<Key>>) -> Result<()> {
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
debug!(self.logger, "invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
Expand Down Expand Up @@ -465,7 +465,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_scan(
&self,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_scan request");
Expand Down Expand Up @@ -497,7 +497,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_scan_keys(
&self,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw batch_scan_keys request");
Expand Down Expand Up @@ -547,7 +547,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
&self,
copr_name: impl Into<String>,
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
let copr_version_req = copr_version_req.into();
Expand Down Expand Up @@ -593,7 +593,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
key_only: bool,
) -> Result<Vec<KvPair>> {
Expand Down
7 changes: 3 additions & 4 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
//! types (i.e., the types from the client crate) and converts these to the types used in the
//! generated protobuf code, then calls the low-level ctor functions in the requests module.

use std::{iter::Iterator, marker::PhantomData, ops::Range, sync::Arc};
use std::{iter::Iterator, ops::Range, sync::Arc};

use tikv_client_proto::{kvrpcpb, metapb};

use crate::{
raw::requests,
request::{request_codec::RequestCodec, KvRequest},
BoundRange, ColumnFamily, Key, KvPair, Value,
raw::requests, request::request_codec::RequestCodec, BoundRange, ColumnFamily, Key, KvPair,
Value,
};

pub fn new_raw_get_request<C: RequestCodec>(
Expand Down
5 changes: 2 additions & 3 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use crate::{
collect_first,
pd::PdClient,
request::{
plan::ResponseWithShard,
request_codec::{RawApiV1, RequestCodec},
Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
plan::ResponseWithShard, request_codec::RequestCodec, Collect, CollectSingle,
DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore},
transaction::HasLocks,
Expand Down
64 changes: 42 additions & 22 deletions src/region_cache.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
pd::{RetryClient, RetryClientTrait},
region::{RegionId, RegionVerId, RegionWithLeader, StoreId},
Key, Result,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};

use tokio::sync::{Notify, RwLock};

use tikv_client_common::Error;
use tikv_client_pd::Cluster;
use tikv_client_proto::metapb::{self, Store};
use tokio::sync::{Notify, RwLock};

use crate::{
pd::{RetryClient, RetryClientTrait},
region::{RegionId, RegionVerId, RegionWithLeader, StoreId},
request::request_codec::RequestCodec,
Key, Result,
};

const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4;

Expand Down Expand Up @@ -44,23 +48,25 @@ impl RegionCacheMap {
}
}

pub struct RegionCache<Client = RetryClient<Cluster>> {
pub struct RegionCache<C, Client = RetryClient<Cluster>> {
region_cache: RwLock<RegionCacheMap>,
store_cache: RwLock<HashMap<StoreId, Store>>,
inner_client: Arc<Client>,
codec: C,
}

impl<Client> RegionCache<Client> {
pub fn new(inner_client: Arc<Client>) -> RegionCache<Client> {
impl<C, Client> RegionCache<C, Client> {
pub fn new(codec: C, inner_client: Arc<Client>) -> Self {
RegionCache {
region_cache: RwLock::new(RegionCacheMap::new()),
store_cache: RwLock::new(HashMap::new()),
inner_client,
codec,
}
}
}

impl<C: RetryClientTrait> RegionCache<C> {
impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
// Retrieve cache entry by key. If there's no entry, query PD and update cache.
pub async fn get_region_by_key(&self, key: &Key) -> Result<RegionWithLeader> {
let region_cache_guard = self.region_cache.read().await;
Expand Down Expand Up @@ -126,9 +132,14 @@ impl<C: RetryClientTrait> RegionCache<C> {

/// Force read through (query from PD) and update cache
pub async fn read_through_region_by_key(&self, key: Key) -> Result<RegionWithLeader> {
let region = self.inner_client.clone().get_region(key.into()).await?;
self.add_region(region.clone()).await;
Ok(region)
let mut r = self
.inner_client
.clone()
.get_region(self.codec.encode_pd_query(key).into())
.await?;
r.region = self.codec.decode_region(r.region)?;
self.add_region(r.clone()).await;
Ok(r)
}

/// Force read through (query from PD) and update cache
Expand All @@ -140,7 +151,8 @@ impl<C: RetryClientTrait> RegionCache<C> {
region_cache_guard.on_my_way_id.insert(id, notify.clone());
}

let region = self.inner_client.clone().get_region_by_id(id).await?;
let mut region = self.inner_client.clone().get_region_by_id(id).await?;
region.region = self.codec.decode_region(region.region)?;
self.add_region(region.clone()).await;

// notify others
Expand Down Expand Up @@ -226,27 +238,35 @@ impl<C: RetryClientTrait> RegionCache<C> {
cache.key_to_ver_id.remove(&start_key);
}
}

pub fn get_request_codec(&self) -> C {
self.codec.clone()
}
}

#[cfg(test)]
mod test {
use super::RegionCache;
use crate::{
pd::RetryClientTrait,
region::{RegionId, RegionWithLeader},
Key, Result,
};
use async_trait::async_trait;
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering::SeqCst},
Arc,
},
};

use async_trait::async_trait;
use tokio::sync::Mutex;

use tikv_client_common::Error;
use tikv_client_proto::metapb;
use tokio::sync::Mutex;

use crate::{
pd::RetryClientTrait,
region::{RegionId, RegionWithLeader},
Key, Result,
};

use super::RegionCache;

#[derive(Default)]
struct MockRetryClient {
Expand Down
Loading

0 comments on commit 605aed8

Please sign in to comment.