diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 7c9059a8ea6..d534f5e4058 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -4,6 +4,8 @@ use std::borrow::Cow; use std::collections::Bound::{Excluded, Included, Unbounded}; use std::collections::VecDeque; use std::iter::Iterator; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::time::Instant; use std::{cmp, u64}; @@ -1916,8 +1918,19 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let region_id = derived.get_id(); meta.set_region(&self.ctx.coprocessor_host, derived, &mut self.fsm.peer); self.fsm.peer.post_split(); + + // Roughly estimate the size and keys for new regions. + let new_region_count = regions.len() as u64; + let estimated_size = self.fsm.peer.approximate_size.map(|x| x / new_region_count); + let estimated_keys = self.fsm.peer.approximate_keys.map(|x| x / new_region_count); + // It's not correct anymore, so set it to None to let split checker update it. + self.fsm.peer.approximate_size = None; + self.fsm.peer.approximate_keys = None; + let is_leader = self.fsm.peer.is_leader(); if is_leader { + self.fsm.peer.approximate_size = estimated_size; + self.fsm.peer.approximate_keys = estimated_keys; self.fsm.peer.heartbeat_pd(self.ctx); // Notify pd immediately to let it update the region meta. info!( @@ -1939,14 +1952,26 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { "err" => %e, ); } + if let Err(e) = self.ctx.split_check_scheduler.schedule( + SplitCheckTask::GetRegionApproximateSizeAndKeys { + region: self.fsm.peer.region().clone(), + pending_tasks: Arc::new(AtomicU64::new(1)), + cb: Box::new(move |_, _| {}), + }, + ) { + error!( + "failed to schedule split check task"; + "region_id" => self.fsm.region_id(), + "peer_id" => self.fsm.peer_id(), + "err" => ?e, + ); + } } let last_key = enc_end_key(regions.last().unwrap()); if meta.region_ranges.remove(&last_key).is_none() { panic!("{} original region should exists", self.fsm.peer.tag); } - // It's not correct anymore, so set it to None to let split checker update it. - self.fsm.peer.approximate_size = None; let last_region_id = regions.last().unwrap().get_id(); for new_region in regions { let new_region_id = new_region.get_id(); @@ -2011,13 +2036,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { new_peer.has_ready |= campaigned; if is_leader { + new_peer.peer.approximate_size = estimated_size; + new_peer.peer.approximate_keys = estimated_keys; // The new peer is likely to become leader, send a heartbeat immediately to reduce // client query miss. new_peer.peer.heartbeat_pd(self.ctx); } new_peer.peer.activate(self.ctx); - meta.regions.insert(new_region_id, new_region); + meta.regions.insert(new_region_id, new_region.clone()); meta.readers .insert(new_region_id, ReadDelegate::from_peer(new_peer.get_peer())); if last_region_id == new_region_id { @@ -2046,6 +2073,24 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } } + + if is_leader { + // The size and keys for new region may be far from the real value. + // So we let split checker to update it immediately. + if let Err(e) = self.ctx.split_check_scheduler.schedule( + SplitCheckTask::GetRegionApproximateSizeAndKeys { + region: new_region, + pending_tasks: Arc::new(AtomicU64::new(1)), + cb: Box::new(move |_, _| {}), + }, + ) { + error!( + "failed to schedule split check task"; + "region_id" => new_region_id, + "err" => ?e, + ); + } + } } } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index aec55e6eb8a..7c04e70f490 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1176,10 +1176,13 @@ impl RaftBatchSystem { builder: RaftPollerBuilder, auto_split_controller: AutoSplitController, ) -> Result<()> { +<<<<<<< HEAD builder.snap_mgr.init()?; let engines = builder.engines.clone(); let snap_mgr = builder.snap_mgr.clone(); +======= +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) let cfg = builder.cfg.value().clone(); let store = builder.store.clone(); let pd_client = builder.pd_client.clone(); @@ -1261,7 +1264,10 @@ impl RaftBatchSystem { store.get_id(), Arc::clone(&pd_client), self.router.clone(), +<<<<<<< HEAD Arc::clone(&engines.kv), +======= +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) workers.pd_worker.scheduler(), cfg.pd_store_heartbeat_tick_interval.as_secs(), auto_split_controller, diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 3fc31dfb3df..ece99479d33 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -36,8 +36,16 @@ use uuid::Uuid; use crate::coprocessor::{CoprocessorHost, RegionChangeEvent}; use crate::store::fsm::apply::CatchUpLogs; use crate::store::fsm::store::PollContext; +<<<<<<< HEAD use crate::store::fsm::{ apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal, RegionProposal, +======= +use crate::store::fsm::{apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal}; +use crate::store::worker::{HeartbeatTask, ReadDelegate, ReadExecutor, ReadProgress, RegionTask}; +use crate::store::{ + Callback, Config, GlobalReplicationState, PdTask, ReadIndexContext, ReadResponse, + SplitCheckTask, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) }; use crate::store::worker::{ReadDelegate, ReadProgress, RegionTask}; use crate::store::{Callback, Config, PdTask, ReadResponse, RegionSnapshot}; @@ -407,7 +415,16 @@ pub struct Peer { pub txn_extra_op: Arc>, /// Check whether this proposal can be proposed based on its epoch +<<<<<<< HEAD cmd_epoch_checker: CmdEpochChecker, +======= + cmd_epoch_checker: CmdEpochChecker, + + /// The number of pending pd heartbeat tasks. Pd heartbeat task may be blocked by + /// reading rocksdb. To avoid unnecessary io operations, we always let the later + /// task run when there are more than 1 pending tasks. + pub pending_pd_heartbeat_tasks: Arc, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) } impl Peer { @@ -491,6 +508,7 @@ impl Peer { check_stale_peers: vec![], txn_extra_op: Arc::new(AtomicCell::new(TxnExtraOp::Noop)), cmd_epoch_checker: Default::default(), + pending_pd_heartbeat_tasks: Arc::new(AtomicU64::new(0)), }; // If this region has only one peer and I am the one, campaign directly. @@ -2819,8 +2837,63 @@ impl Peer { None } +<<<<<<< HEAD pub fn heartbeat_pd(&mut self, ctx: &PollContext) { let task = PdTask::Heartbeat { +======= + fn region_replication_status(&mut self) -> Option { + if self.replication_mode_version == 0 { + return None; + } + let mut status = RegionReplicationStatus::default(); + status.state_id = self.replication_mode_version; + let state = if !self.replication_sync { + if self.dr_auto_sync_state != DrAutoSyncState::Async { + let res = self.raft_group.raft.check_group_commit_consistent(); + if Some(true) != res { + let mut buffer: SmallVec<[(u64, u64, u64); 5]> = SmallVec::new(); + if self.get_store().applied_index_term() >= self.term() { + let progress = self.raft_group.raft.prs(); + for (id, p) in progress.iter() { + if !progress.conf().voters().contains(*id) { + continue; + } + buffer.push((*id, p.commit_group_id, p.matched)); + } + }; + info!( + "still not reach integrity over label"; + "status" => ?res, + "region_id" => self.region_id, + "peer_id" => self.peer.id, + "progress" => ?buffer + ); + } else { + self.replication_sync = true; + } + match res { + Some(true) => RegionReplicationState::IntegrityOverLabel, + Some(false) => RegionReplicationState::SimpleMajority, + None => RegionReplicationState::Unknown, + } + } else { + RegionReplicationState::SimpleMajority + } + } else { + RegionReplicationState::IntegrityOverLabel + }; + status.set_state(state); + Some(status) + } + + pub fn is_region_size_or_keys_none(&self) -> bool { + fail_point!("region_size_or_keys_none", |_| true); + self.approximate_size.is_none() || self.approximate_keys.is_none() + } + + pub fn heartbeat_pd(&mut self, ctx: &PollContext) { + let task = PdTask::Heartbeat(HeartbeatTask { +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) term: self.term(), region: self.region().clone(), peer: self.peer.clone(), @@ -2828,16 +2901,62 @@ impl Peer { pending_peers: self.collect_pending_peers(ctx), written_bytes: self.peer_stat.written_bytes, written_keys: self.peer_stat.written_keys, +<<<<<<< HEAD approximate_size: self.approximate_size, approximate_keys: self.approximate_keys, +======= + approximate_size: self.approximate_size.unwrap_or_default(), + approximate_keys: self.approximate_keys.unwrap_or_default(), + replication_status: self.region_replication_status(), + }); + if !self.is_region_size_or_keys_none() { + if let Err(e) = ctx.pd_scheduler.schedule(task) { + error!( + "failed to notify pd"; + "region_id" => self.region_id, + "peer_id" => self.peer.get_id(), + "err" => ?e, + ); + } + return; + } + + if self.pending_pd_heartbeat_tasks.load(Ordering::SeqCst) > 2 { + return; + } + let region_id = self.region_id; + let peer_id = self.peer.get_id(); + let scheduler = ctx.pd_scheduler.clone(); + let split_check_task = SplitCheckTask::GetRegionApproximateSizeAndKeys { + region: self.region().clone(), + pending_tasks: self.pending_pd_heartbeat_tasks.clone(), + cb: Box::new(move |size: u64, keys: u64| { + if let PdTask::Heartbeat(mut h) = task { + h.approximate_size = size; + h.approximate_keys = keys; + if let Err(e) = scheduler.schedule(PdTask::Heartbeat(h)) { + error!( + "failed to notify pd"; + "region_id" => region_id, + "peer_id" => peer_id, + "err" => ?e, + ); + } + } + }), +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) }; - if let Err(e) = ctx.pd_scheduler.schedule(task) { + self.pending_pd_heartbeat_tasks + .fetch_add(1, Ordering::SeqCst); + if let Err(e) = ctx.split_check_scheduler.schedule(split_check_task) { error!( "failed to notify pd"; - "region_id" => self.region_id, - "peer_id" => self.peer.get_id(), + "region_id" => region_id, + "peer_id" => peer_id, "err" => ?e, ); + self.pending_pd_heartbeat_tasks + .fetch_sub(1, Ordering::SeqCst); } } diff --git a/components/raftstore/src/store/worker/mod.rs b/components/raftstore/src/store/worker/mod.rs index 1f5844c5d56..caf2ff21e33 100644 --- a/components/raftstore/src/store/worker/mod.rs +++ b/components/raftstore/src/store/worker/mod.rs @@ -17,7 +17,9 @@ pub use self::cleanup::{Runner as CleanupRunner, Task as CleanupTask}; pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask}; pub use self::compact::{Runner as CompactRunner, Task as CompactTask}; pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask}; -pub use self::pd::{FlowStatistics, FlowStatsReporter, Runner as PdRunner, Task as PdTask}; +pub use self::pd::{ + FlowStatistics, FlowStatsReporter, HeartbeatTask, Runner as PdRunner, Task as PdTask, +}; pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask}; pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate}; pub use self::region::{ diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index f6f529fbcee..2e12a4c3c29 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -21,7 +21,6 @@ use kvproto::raft_serverpb::RaftMessage; use prometheus::local::LocalHistogram; use raft::eraftpb::ConfChangeType; -use crate::coprocessor::{get_region_approximate_keys, get_region_approximate_size}; use crate::store::cmd_resp::new_error; use crate::store::metrics::*; use crate::store::util::is_epoch_stale; @@ -68,6 +67,19 @@ impl FlowStatsReporter for Scheduler { } } +pub struct HeartbeatTask { + pub term: u64, + pub region: metapb::Region, + pub peer: metapb::Peer, + pub down_peers: Vec, + pub pending_peers: Vec, + pub written_bytes: u64, + pub written_keys: u64, + pub approximate_size: u64, + pub approximate_keys: u64, + pub replication_status: Option, +} + /// Uses an asynchronous thread to tell PD something. pub enum Task { AskSplit { @@ -89,6 +101,7 @@ pub enum Task { AutoSplit { split_infos: Vec, }, +<<<<<<< HEAD Heartbeat { term: u64, region: metapb::Region, @@ -100,6 +113,9 @@ pub enum Task { approximate_size: Option, approximate_keys: Option, }, +======= + Heartbeat(HeartbeatTask), +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) StoreHeartbeat { stats: pdpb::StoreStats, store_info: StoreInfo, @@ -206,6 +222,7 @@ impl Display for Task { region.get_id(), KeysInfoFormatter(split_keys.iter()) ), +<<<<<<< HEAD Task::Heartbeat { ref region, ref peer, @@ -215,6 +232,14 @@ impl Display for Task { "heartbeat for region {:?}, leader {}", region, peer.get_id() +======= + Task::Heartbeat(ref hb_task) => write!( + f, + "heartbeat for region {:?}, leader {}, replication status {:?}", + hb_task.region, + hb_task.peer.get_id(), + hb_task.replication_status +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) ), Task::StoreHeartbeat { ref stats, .. } => { write!(f, "store heartbeat stats: {:?}", stats) @@ -389,8 +414,12 @@ impl StatsMonitor { pub struct Runner { store_id: u64, pd_client: Arc, +<<<<<<< HEAD router: RaftRouter, db: Arc, +======= + router: RaftRouter, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) region_peers: HashMap, store_stat: StoreStat, is_hb_receiver_scheduled: bool, @@ -410,10 +439,16 @@ impl Runner { pub fn new( store_id: u64, pd_client: Arc, +<<<<<<< HEAD router: RaftRouter, db: Arc, scheduler: Scheduler, store_heartbeat_interval: u64, +======= + router: RaftRouter, + scheduler: Scheduler>, + store_heartbeat_interval: Duration, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) auto_split_controller: AutoSplitController, ) -> Runner { let interval = Duration::from_secs(store_heartbeat_interval) / Self::INTERVAL_DIVISOR; @@ -426,7 +461,6 @@ impl Runner { store_id, pd_client, router, - db, is_hb_receiver_scheduled: false, region_peers: HashMap::default(), store_stat: StoreStat::default(), @@ -971,6 +1005,7 @@ impl Runnable for Runner { } } +<<<<<<< HEAD Task::Heartbeat { term, region, @@ -988,6 +1023,9 @@ impl Runnable for Runner { let approximate_keys = approximate_keys.unwrap_or_else(|| { get_region_approximate_keys(self.db.c(), ®ion, 0).unwrap_or_default() }); +======= + Task::Heartbeat(hb_task) => { +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) let ( read_bytes_delta, read_keys_delta, @@ -997,15 +1035,15 @@ impl Runnable for Runner { ) = { let peer_stat = self .region_peers - .entry(region.get_id()) + .entry(hb_task.region.get_id()) .or_insert_with(PeerStat::default); let read_bytes_delta = peer_stat.read_bytes - peer_stat.last_read_bytes; let read_keys_delta = peer_stat.read_keys - peer_stat.last_read_keys; - let written_bytes_delta = written_bytes - peer_stat.last_written_bytes; - let written_keys_delta = written_keys - peer_stat.last_written_keys; + let written_bytes_delta = hb_task.written_bytes - peer_stat.last_written_bytes; + let written_keys_delta = hb_task.written_keys - peer_stat.last_written_keys; let mut last_report_ts = peer_stat.last_report_ts; - peer_stat.last_written_bytes = written_bytes; - peer_stat.last_written_keys = written_keys; + peer_stat.last_written_bytes = hb_task.written_bytes; + peer_stat.last_written_keys = hb_task.written_keys; peer_stat.last_read_bytes = peer_stat.read_bytes; peer_stat.last_read_keys = peer_stat.read_keys; peer_stat.last_report_ts = UnixSecs::now(); @@ -1021,21 +1059,31 @@ impl Runnable for Runner { ) }; self.handle_heartbeat( +<<<<<<< HEAD handle, term, region, peer, +======= + hb_task.term, + hb_task.region, + hb_task.peer, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) RegionStat { - down_peers, - pending_peers, + down_peers: hb_task.down_peers, + pending_peers: hb_task.pending_peers, written_bytes: written_bytes_delta, written_keys: written_keys_delta, read_bytes: read_bytes_delta, read_keys: read_keys_delta, - approximate_size, - approximate_keys, + approximate_size: hb_task.approximate_size, + approximate_keys: hb_task.approximate_keys, last_report_ts, }, +<<<<<<< HEAD +======= + hb_task.replication_status, +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) ) } Task::StoreHeartbeat { stats, store_info } => { diff --git a/components/raftstore/src/store/worker/split_check.rs b/components/raftstore/src/store/worker/split_check.rs index 1a3a8b53b54..fb4da187148 100644 --- a/components/raftstore/src/store/worker/split_check.rs +++ b/components/raftstore/src/store/worker/split_check.rs @@ -4,6 +4,10 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; use std::fmt::{self, Display, Formatter}; use std::mem; +<<<<<<< HEAD +======= +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081) use std::sync::Arc; use engine::DB; @@ -16,6 +20,7 @@ use kvproto::pdpb::CheckPolicy; use crate::coprocessor::Config; use crate::coprocessor::CoprocessorHost; use crate::coprocessor::SplitCheckerHost; +use crate::coprocessor::{get_region_approximate_keys, get_region_approximate_size}; use crate::store::{Callback, CasualMessage, CasualRouter}; use crate::Result; use configuration::{ConfigChange, Configuration}; @@ -131,6 +136,11 @@ pub enum Task { ChangeConfig(ConfigChange), #[cfg(any(test, feature = "testexport"))] Validate(Box), + GetRegionApproximateSizeAndKeys { + region: Region, + pending_tasks: Arc, + cb: Box, + }, } impl Task { @@ -157,6 +167,11 @@ impl Display for Task { Task::ChangeConfig(_) => write!(f, "[split check worker] Change Config Task"), #[cfg(any(test, feature = "testexport"))] Task::Validate(_) => write!(f, "[split check worker] Validate config"), + Task::GetRegionApproximateSizeAndKeys { region, .. } => write!( + f, + "[split check worker] Get region approximate size and keys for region {}", + region.get_id() + ), } } } @@ -320,6 +335,28 @@ impl> Runnable for Runner { Task::ChangeConfig(c) => self.change_cfg(c), #[cfg(any(test, feature = "testexport"))] Task::Validate(f) => f(&self.cfg), + Task::GetRegionApproximateSizeAndKeys { + region, + pending_tasks, + cb, + } => { + if pending_tasks.fetch_sub(1, AtomicOrdering::SeqCst) > 1 { + return; + } + let size = + get_region_approximate_size(&self.engine, ®ion, 0).unwrap_or_default(); + let keys = + get_region_approximate_keys(&self.engine, ®ion, 0).unwrap_or_default(); + let _ = self.router.send( + region.get_id(), + CasualMessage::RegionApproximateSize { size }, + ); + let _ = self.router.send( + region.get_id(), + CasualMessage::RegionApproximateKeys { keys }, + ); + cb(size, keys); + } } } } diff --git a/tests/integrations/raftstore/test_region_heartbeat.rs b/tests/integrations/raftstore/test_region_heartbeat.rs index ef248624d9f..2f9da37d05e 100644 --- a/tests/integrations/raftstore/test_region_heartbeat.rs +++ b/tests/integrations/raftstore/test_region_heartbeat.rs @@ -204,3 +204,33 @@ fn test_region_heartbeat_term() { } panic!("reported term should be updated"); } + +#[test] +fn test_region_heartbeat_when_size_or_keys_is_none() { + let mut cluster = new_server_cluster(0, 3); + cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(50); + cluster.run(); + + fail::cfg("region_size_or_keys_none", "return").unwrap(); + for i in 0..100 { + let (k, v) = (format!("k{}", i), format!("v{}", i)); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + let region_id = cluster.get_region_id(b""); + for _ in 0..10 { + sleep_ms(100); + let size = cluster + .pd_client + .get_region_approximate_size(region_id) + .unwrap_or_default(); + let keys = cluster + .pd_client + .get_region_approximate_keys(region_id) + .unwrap_or_default(); + if size > 0 || keys > 0 { + return; + } + } + panic!("reported region keys should be updated"); +}