Skip to content

Commit

Permalink
feat: txn for meta (#1828)
Browse files Browse the repository at this point in the history
* feat: txn for meta kvstore

* feat: txn

* chore: add unit test

* chore: more test

* chore: more test

* Update src/meta-srv/src/service/store/memory.rs

Co-authored-by: LFC <[email protected]>

* chore: by cr

---------

Co-authored-by: LFC <[email protected]>
  • Loading branch information
fengjiachun and MichaelScofield authored Jun 26, 2023
1 parent 034564f commit 78b0799
Show file tree
Hide file tree
Showing 10 changed files with 758 additions and 27 deletions.
4 changes: 4 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Etcd txn got an error: {err_msg}"))]
EtcdTxnOpResponse { err_msg: String, location: Location },

// this error is used for custom error mapping
// please do not delete it
#[snafu(display("Other error, source: {}", source))]
Expand Down Expand Up @@ -437,6 +440,7 @@ impl ErrorExt for Error {
| Error::InvalidTxnResult { .. }
| Error::InvalidUtf8Value { .. }
| Error::UnexpectedInstructionReply { .. }
| Error::EtcdTxnOpResponse { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
3 changes: 3 additions & 0 deletions src/meta-srv/src/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ mod tests {
use super::*;
use crate::service::store::kv::KvStore;
use crate::service::store::memory::MemStore;
use crate::service::store::txn::TxnService;

#[tokio::test]
async fn test_sequence() {
Expand Down Expand Up @@ -199,6 +200,8 @@ mod tests {
async fn test_sequence_force_quit() {
struct Noop;

impl TxnService for Noop {}

#[async_trait::async_trait]
impl KvStore for Noop {
async fn range(
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/service/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

pub mod etcd;
pub(crate) mod etcd_util;
pub mod ext;
pub mod kv;
pub mod memory;
pub mod txn;

use api::v1::meta::{
store_server, BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
Expand Down
50 changes: 25 additions & 25 deletions src/meta-srv/src/service/store/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use etcd_client::{

use crate::error;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST};
use crate::service::store::etcd_util::KvPair;
use crate::service::store::kv::{KvStore, KvStoreRef};
use crate::service::store::txn::TxnService;

// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
Expand Down Expand Up @@ -463,6 +465,28 @@ impl KvStore for EtcdStore {
}
}

#[async_trait::async_trait]
impl TxnService for EtcdStore {
async fn txn(
&self,
txn: crate::service::store::txn::Txn,
) -> Result<crate::service::store::txn::TxnResponse> {
let _timer = timer!(
METRIC_META_TXN_REQUEST,
&[("target", "etcd".to_string()), ("op", "txn".to_string()),]
);

let etcd_txn: Txn = txn.into();
let txn_res = self
.client
.kv_client()
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_res.try_into()
}
}

struct Get {
cluster_id: u64,
key: Vec<u8>,
Expand Down Expand Up @@ -704,30 +728,6 @@ impl TryFrom<MoveValueRequest> for MoveValue {
}
}

struct KvPair<'a>(&'a etcd_client::KeyValue);

impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}

#[inline]
fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}

impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
39 changes: 39 additions & 0 deletions src/meta-srv/src/service/store/etcd_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::KeyValue;

pub struct KvPair<'a>(&'a etcd_client::KeyValue);

impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
pub fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}

#[inline]
pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}

impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}
3 changes: 2 additions & 1 deletion src/meta-srv/src/service/store/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use api::v1::meta::{
};

use crate::error::Result;
use crate::service::store::txn::TxnService;

pub type KvStoreRef = Arc<dyn KvStore>;
pub type ResettableKvStoreRef = Arc<dyn ResettableKvStore>;

#[async_trait::async_trait]
pub trait KvStore: Send + Sync {
pub trait KvStore: TxnService {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse>;

async fn put(&self, req: PutRequest) -> Result<PutResponse>;
Expand Down
72 changes: 71 additions & 1 deletion src/meta-srv/src/service/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use parking_lot::RwLock;

use super::ext::KvStoreExt;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST};
use crate::service::store::kv::{KvStore, ResettableKvStore};
use crate::service::store::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse, TxnService};

pub struct MemStore {
inner: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
Expand Down Expand Up @@ -119,6 +120,7 @@ impl KvStore for MemStore {
} = req;

let mut memory = self.inner.write();

let prev_value = memory.insert(key.clone(), value);
let prev_kv = if prev_kv {
prev_value.map(|value| KeyValue { key, value })
Expand Down Expand Up @@ -164,6 +166,7 @@ impl KvStore for MemStore {
} = req;

let mut memory = self.inner.write();

let prev_kvs = if prev_kv {
kvs.into_iter()
.map(|kv| (kv.key.clone(), memory.insert(kv.key, kv.value)))
Expand Down Expand Up @@ -198,6 +201,7 @@ impl KvStore for MemStore {
} = req;

let mut memory = self.inner.write();

let prev_kvs = if prev_kv {
keys.into_iter()
.filter_map(|key| memory.remove(&key).map(|value| KeyValue { key, value }))
Expand Down Expand Up @@ -330,6 +334,72 @@ impl KvStore for MemStore {
}
}

#[async_trait::async_trait]
impl TxnService for MemStore {
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
let _timer = timer!(
METRIC_META_TXN_REQUEST,
&[("target", "memory".to_string()), ("op", "txn".to_string()),]
);

let TxnRequest {
compare,
success,
failure,
} = txn.into();

let mut memory = self.inner.write();

let succeeded = compare
.iter()
.all(|x| x.compare_with_value(memory.get(&x.key)));

let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
let prev_value = memory.insert(key.clone(), value);
let prev_kv = prev_value.map(|value| KeyValue { key, value });
let put_res = PutResponse {
prev_kv,
..Default::default()
};
TxnOpResponse::ResponsePut(put_res)
}
TxnOp::Get(key) => {
let value = memory.get(&key);
let kv = value.map(|value| KeyValue {
key,
value: value.clone(),
});
let get_res = RangeResponse {
kvs: kv.map(|kv| vec![kv]).unwrap_or(vec![]),
..Default::default()
};
TxnOpResponse::ResponseGet(get_res)
}
TxnOp::Delete(key) => {
let prev_value = memory.remove(&key);
let prev_kv = prev_value.map(|value| KeyValue { key, value });
let delete_res = DeleteRangeResponse {
prev_kvs: prev_kv.map(|kv| vec![kv]).unwrap_or(vec![]),
..Default::default()
};
TxnOpResponse::ResponseDelete(delete_res)
}
};

let responses: Vec<_> = if succeeded {
success.into_iter().map(do_txn).collect()
} else {
failure.into_iter().map(do_txn).collect()
};

Ok(TxnResponse {
succeeded,
responses,
})
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
Expand Down
Loading

0 comments on commit 78b0799

Please sign in to comment.