Skip to content

Commit

Permalink
feat: add leader kv store cache for metadata (#1853)
Browse files Browse the repository at this point in the history
* feat: add leader kv store cache for metadata

* refactor: create cache internal

* fix: race condition

* fix: race condition on read
  • Loading branch information
fengjiachun authored Jul 4, 2023
1 parent 2ef84f6 commit 20f2fc4
Show file tree
Hide file tree
Showing 8 changed files with 536 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub const LEASE_SECS: i64 = 5;
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2;
pub const ELECTION_KEY: &str = "__meta_srv_election";

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<LeaderKey>),
StepDown(Arc<LeaderKey>),
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/on_leader_start_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl HeartbeatHandler for OnLeaderStartHandler {
if election.in_infancy() {
ctx.is_infancy = true;
ctx.reset_in_memory();
ctx.reset_leader_cached_kv_store();
}
}
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,16 @@ mod tests {
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::StatKey;
use crate::sequence::Sequence;
use crate::service::store::cached_kv::LeaderCachedKvStore;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::memory::MemStore;

#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let leader_cached_kv_store =
Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone()));
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
Expand All @@ -166,6 +169,7 @@ mod tests {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
leader_cached_kv_store,
meta_peer_client,
mailbox,
election: None,
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ mod tests {
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{Context, HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::cached_kv::LeaderCachedKvStore;
use crate::service::store::memory::MemStore;

#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let leader_cached_kv_store =
Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone()));
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
Expand All @@ -75,6 +78,7 @@ mod tests {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
leader_cached_kv_store,
meta_peer_client,
mailbox,
election: None,
Expand Down
40 changes: 32 additions & 8 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct Context {
pub server_addr: String,
pub in_memory: ResettableKvStoreRef,
pub kv_store: KvStoreRef,
pub leader_cached_kv_store: ResettableKvStoreRef,
pub meta_peer_client: MetaPeerClientRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
Expand All @@ -94,6 +95,10 @@ impl Context {
pub fn reset_in_memory(&self) {
self.in_memory.reset();
}

pub fn reset_leader_cached_kv_store(&self) {
self.leader_cached_kv_store.reset();
}
}

pub struct LeaderValue(pub String);
Expand All @@ -120,6 +125,7 @@ pub struct MetaSrv {
// store some data that will not be persisted.
in_memory: ResettableKvStoreRef,
kv_store: KvStoreRef,
leader_cached_kv_store: ResettableKvStoreRef,
table_id_sequence: SequenceRef,
meta_peer_client: MetaPeerClientRef,
selector: SelectorRef,
Expand All @@ -146,20 +152,30 @@ impl MetaSrv {

if let Some(election) = self.election() {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let mut rx = election.subscribe_leader_change();
let _handle = common_runtime::spawn_bg(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(_) => {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
Ok(msg) => {
in_memory.reset();
leader_cached_kv_store.reset();
info!(
"Leader's cache has bean cleared on leader change: {:?}",
msg
);
match msg {
LeaderChangeMessage::Elected(_) => {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
}
}
LeaderChangeMessage::StepDown(leader) => {
error!("Leader :{:?} step down", leader);
}
}
LeaderChangeMessage::StepDown(leader) => {
error!("Leader :{:?} step down", leader);
}
},
}
Err(RecvError::Closed) => {
error!("Not expected, is leader election loop still running?");
break;
Expand Down Expand Up @@ -219,6 +235,11 @@ impl MetaSrv {
self.kv_store.clone()
}

#[inline]
pub fn leader_cached_kv_store(&self) -> ResettableKvStoreRef {
self.leader_cached_kv_store.clone()
}

#[inline]
pub fn meta_peer_client(&self) -> MetaPeerClientRef {
self.meta_peer_client.clone()
Expand Down Expand Up @@ -254,6 +275,7 @@ impl MetaSrv {
self.mailbox.clone()
}

#[inline]
pub fn procedure_manager(&self) -> &ProcedureManagerRef {
&self.procedure_manager
}
Expand All @@ -263,6 +285,7 @@ impl MetaSrv {
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory();
let kv_store = self.kv_store();
let leader_cached_kv_store = self.leader_cached_kv_store();
let meta_peer_client = self.meta_peer_client();
let mailbox = self.mailbox();
let election = self.election();
Expand All @@ -271,6 +294,7 @@ impl MetaSrv {
server_addr,
in_memory,
kv_store,
leader_cached_kv_store,
meta_peer_client,
mailbox,
election,
Expand Down
24 changes: 19 additions & 5 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;

Expand Down Expand Up @@ -131,6 +132,10 @@ impl MetaSrvBuilder {

let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default()));
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default()));
let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::new(
Arc::new(CheckLeaderByElection(election.clone())),
kv_store.clone(),
));
let meta_peer_client = meta_peer_client.unwrap_or_else(|| {
MetaPeerClientBuilder::default()
.election(election.clone())
Expand All @@ -146,6 +151,9 @@ impl MetaSrvBuilder {
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));

let handler_group = match handler_group {
Expand Down Expand Up @@ -202,16 +210,12 @@ impl MetaSrvBuilder {
}
};

let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));

let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));

Ok(MetaSrv {
started,
options,
in_memory,
kv_store,
leader_cached_kv_store,
meta_peer_client,
table_id_sequence,
selector,
Expand All @@ -230,3 +234,13 @@ impl Default for MetaSrvBuilder {
Self::new()
}
}

struct CheckLeaderByElection(Option<ElectionRef>);

impl CheckLeader for CheckLeaderByElection {
fn check(&self) -> bool {
self.0
.as_ref()
.map_or(false, |election| election.is_leader())
}
}
1 change: 1 addition & 0 deletions src/meta-srv/src/service/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cached_kv;
pub mod etcd;
pub(crate) mod etcd_util;
pub mod ext;
Expand Down
Loading

0 comments on commit 20f2fc4

Please sign in to comment.