Skip to content

Commit

Permalink
feat: save node lease into memory (#1841)
Browse files Browse the repository at this point in the history
* feat: lease secs = 5

* feat: set lease data into memory of leader

* fix: ignore stale heartbeat

* Update src/meta-srv/src/election.rs

Co-authored-by: LFC <[email protected]>

---------

Co-authored-by: LFC <[email protected]>
  • Loading branch information
fengjiachun and MichaelScofield authored Jun 28, 2023
1 parent f287d31 commit bc33fdc
Show file tree
Hide file tree
Showing 18 changed files with 142 additions and 132 deletions.
13 changes: 1 addition & 12 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;

use crate::cluster::MetaPeerClientBuilder;
use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
Expand Down Expand Up @@ -172,17 +171,8 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {

let in_memory = Arc::new(MemStore::default()) as ResettableKvStoreRef;

let meta_peer_client = MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
// Safety: all required fields set at initialization
.unwrap();

let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
meta_peer_client: meta_peer_client.clone(),
}) as SelectorRef,
SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};

Expand All @@ -192,7 +182,6 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
.in_memory(in_memory)
.selector(selector)
.election(election)
.meta_peer_client(meta_peer_client)
.lock(lock)
.build()
.await
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::cluster_client::ClusterClient;
Expand All @@ -30,7 +31,9 @@ use crate::metasrv::ElectionRef;
use crate::service::store::kv::ResettableKvStoreRef;
use crate::{error, util};

#[derive(Builder, Clone)]
pub type MetaPeerClientRef = Arc<MetaPeerClient>;

#[derive(Builder)]
pub struct MetaPeerClient {
election: Option<ElectionRef>,
in_memory: ResettableKvStoreRef,
Expand Down
5 changes: 3 additions & 2 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use tokio::sync::broadcast::Receiver;

use crate::error::Result;

pub const LEASE_SECS: i64 = 3;
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3;
pub const LEASE_SECS: i64 = 5;
// In a lease, there are two opportunities for renewal.
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2;
pub const ELECTION_KEY: &str = "__meta_srv_election";

#[derive(Clone)]
Expand Down
4 changes: 0 additions & 4 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("MetaSrv has no meta peer client"))]
NoMetaPeerClient { location: Location },

#[snafu(display("Invalid http body, source: {}", source))]
InvalidHttpBody {
source: http::Error,
Expand Down Expand Up @@ -399,7 +396,6 @@ impl ErrorExt for Error {
| Error::Range { .. }
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
| Error::InvalidHttpBody { .. }
| Error::Lock { .. }
| Error::Unlock { .. }
Expand Down
74 changes: 26 additions & 48 deletions src/meta-srv/src/handler/keep_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue, Role};
use api::v1::meta::{HeartbeatRequest, PutRequest, Role};
use common_telemetry::{trace, warn};
use common_time::util as time_util;
use tokio::sync::mpsc::{self, Sender};

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::metasrv::Context;
use crate::service::store::kv::KvStoreRef;

pub struct KeepLeaseHandler {
tx: Sender<KeyValue>,
}

impl KeepLeaseHandler {
pub fn new(kv_store: KvStoreRef) -> Self {
let (tx, mut rx) = mpsc::channel(1024);
let _handle = common_runtime::spawn_bg(async move {
while let Some(kv) = rx.recv().await {
let mut kvs = vec![kv];

while let Ok(kv) = rx.try_recv() {
kvs.push(kv);
}

let batch_put = BatchPutRequest {
kvs,
..Default::default()
};

if let Err(err) = kv_store.batch_put(batch_put).await {
warn!("Failed to write lease KVs, {err}");
}
}
});

Self { tx }
}
}
#[derive(Default)]
pub struct KeepLeaseHandler;

#[async_trait::async_trait]
impl HeartbeatHandler for KeepLeaseHandler {
Expand All @@ -62,28 +33,35 @@ impl HeartbeatHandler for KeepLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let HeartbeatRequest { header, peer, .. } = req;
if let Some(peer) = &peer {
let key = LeaseKey {
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
node_id: peer.id,
};
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
};
let Some(peer) = &peer else { return Ok(()); };

let key = LeaseKey {
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
node_id: peer.id,
};
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
};

trace!("Receive a heartbeat: {key:?}, {value:?}");

trace!("Receive a heartbeat: {key:?}, {value:?}");
let key = key.try_into()?;
let value = value.try_into()?;
let put_req = PutRequest {
key,
value,
..Default::default()
};

let key = key.try_into()?;
let value = value.try_into()?;
let res = ctx.in_memory.put(put_req).await;

if let Err(err) = self.tx.send(KeyValue { key, value }).await {
warn!("Failed to send lease KV to writer, peer: {peer:?}, {err}");
}
if let Err(err) = res {
warn!("Failed to update lease KV, peer: {peer:?}, {err}");
}

Ok(())
Expand Down
31 changes: 24 additions & 7 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;

use api::v1::meta::{HeartbeatRequest, PutRequest, Role};
use common_telemetry::warn;
use dashmap::DashMap;

use crate::error::Result;
Expand Down Expand Up @@ -90,13 +93,18 @@ impl HeartbeatHandler for PersistStatsHandler {
let epoch_stats = entry.value_mut();

let refresh = if let Some(epoch) = epoch_stats.epoch() {
// This node may have been redeployed.
if current_stat.node_epoch > epoch {
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.clear();
true
} else {
false
match current_stat.node_epoch.cmp(&epoch) {
Ordering::Greater => {
// This node may have been redeployed.
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.clear();
true
}
Ordering::Less => {
warn!("Ignore stale heartbeat: {:?}", current_stat);
false
}
Ordering::Equal => false,
}
} else {
epoch_stats.set_epoch(current_stat.node_epoch);
Expand Down Expand Up @@ -134,6 +142,7 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::StatKey;
use crate::sequence::Sequence;
Expand All @@ -146,10 +155,18 @@ mod tests {
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mod tests {
use api::v1::meta::{HeartbeatResponse, RequestHeader};

use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{Context, HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::memory::MemStore;
Expand All @@ -63,10 +64,18 @@ mod tests {
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let mut ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
Expand Down
18 changes: 5 additions & 13 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,37 @@

use std::collections::HashMap;

use api::v1::meta::RangeRequest;
use common_time::util as time_util;

use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
use crate::service::store::kv::KvStoreRef;
use crate::util;

pub async fn alive_datanodes(
cluster_id: u64,
kv_store: &KvStoreRef,
meta_peer_client: &MetaPeerClientRef,
lease_secs: i64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000
};

filter_datanodes(cluster_id, kv_store, lease_filter).await
filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
}

pub async fn filter_datanodes<P>(
cluster_id: u64,
kv_store: &KvStoreRef,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>
where
P: Fn(&LeaseKey, &LeaseValue) -> bool,
{
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
let req = RangeRequest {
key,
range_end,
..Default::default()
};

let res = kv_store.range(req).await?;

let kvs = res.kvs;
let kvs = meta_peer_client.range(key, range_end).await?;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
Expand Down
Loading

0 comments on commit bc33fdc

Please sign in to comment.