Skip to content

Commit

Permalink
cherry pick tikv#9081 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
sleepymole authored and ti-srebot committed Dec 4, 2020
1 parent 0092501 commit b823810
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 18 deletions.
51 changes: 48 additions & 3 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::borrow::Cow;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::collections::VecDeque;
use std::iter::Iterator;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;
use std::{cmp, u64};

Expand Down Expand Up @@ -1916,8 +1918,19 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let region_id = derived.get_id();
meta.set_region(&self.ctx.coprocessor_host, derived, &mut self.fsm.peer);
self.fsm.peer.post_split();

// Roughly estimate the size and keys for new regions.
let new_region_count = regions.len() as u64;
let estimated_size = self.fsm.peer.approximate_size.map(|x| x / new_region_count);
let estimated_keys = self.fsm.peer.approximate_keys.map(|x| x / new_region_count);
// It's not correct anymore, so set it to None to let split checker update it.
self.fsm.peer.approximate_size = None;
self.fsm.peer.approximate_keys = None;

let is_leader = self.fsm.peer.is_leader();
if is_leader {
self.fsm.peer.approximate_size = estimated_size;
self.fsm.peer.approximate_keys = estimated_keys;
self.fsm.peer.heartbeat_pd(self.ctx);
// Notify pd immediately to let it update the region meta.
info!(
Expand All @@ -1939,14 +1952,26 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
"err" => %e,
);
}
if let Err(e) = self.ctx.split_check_scheduler.schedule(
SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: self.fsm.peer.region().clone(),
pending_tasks: Arc::new(AtomicU64::new(1)),
cb: Box::new(move |_, _| {}),
},
) {
error!(
"failed to schedule split check task";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
"err" => ?e,
);
}
}

let last_key = enc_end_key(regions.last().unwrap());
if meta.region_ranges.remove(&last_key).is_none() {
panic!("{} original region should exists", self.fsm.peer.tag);
}
// It's not correct anymore, so set it to None to let split checker update it.
self.fsm.peer.approximate_size = None;
let last_region_id = regions.last().unwrap().get_id();
for new_region in regions {
let new_region_id = new_region.get_id();
Expand Down Expand Up @@ -2011,13 +2036,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
new_peer.has_ready |= campaigned;

if is_leader {
new_peer.peer.approximate_size = estimated_size;
new_peer.peer.approximate_keys = estimated_keys;
// The new peer is likely to become leader, send a heartbeat immediately to reduce
// client query miss.
new_peer.peer.heartbeat_pd(self.ctx);
}

new_peer.peer.activate(self.ctx);
meta.regions.insert(new_region_id, new_region);
meta.regions.insert(new_region_id, new_region.clone());
meta.readers
.insert(new_region_id, ReadDelegate::from_peer(new_peer.get_peer()));
if last_region_id == new_region_id {
Expand Down Expand Up @@ -2046,6 +2073,24 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
}
}

if is_leader {
// The size and keys for new region may be far from the real value.
// So we let split checker to update it immediately.
if let Err(e) = self.ctx.split_check_scheduler.schedule(
SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: new_region,
pending_tasks: Arc::new(AtomicU64::new(1)),
cb: Box::new(move |_, _| {}),
},
) {
error!(
"failed to schedule split check task";
"region_id" => new_region_id,
"err" => ?e,
);
}
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,10 +1176,13 @@ impl RaftBatchSystem {
builder: RaftPollerBuilder<T, C>,
auto_split_controller: AutoSplitController,
) -> Result<()> {
<<<<<<< HEAD
builder.snap_mgr.init()?;

let engines = builder.engines.clone();
let snap_mgr = builder.snap_mgr.clone();
=======
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
let cfg = builder.cfg.value().clone();
let store = builder.store.clone();
let pd_client = builder.pd_client.clone();
Expand Down Expand Up @@ -1261,7 +1264,10 @@ impl RaftBatchSystem {
store.get_id(),
Arc::clone(&pd_client),
self.router.clone(),
<<<<<<< HEAD
Arc::clone(&engines.kv),
=======
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
workers.pd_worker.scheduler(),
cfg.pd_store_heartbeat_tick_interval.as_secs(),
auto_split_controller,
Expand Down
125 changes: 122 additions & 3 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ use uuid::Uuid;
use crate::coprocessor::{CoprocessorHost, RegionChangeEvent};
use crate::store::fsm::apply::CatchUpLogs;
use crate::store::fsm::store::PollContext;
<<<<<<< HEAD
use crate::store::fsm::{
apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal, RegionProposal,
=======
use crate::store::fsm::{apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal};
use crate::store::worker::{HeartbeatTask, ReadDelegate, ReadExecutor, ReadProgress, RegionTask};
use crate::store::{
Callback, Config, GlobalReplicationState, PdTask, ReadIndexContext, ReadResponse,
SplitCheckTask,
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
};
use crate::store::worker::{ReadDelegate, ReadProgress, RegionTask};
use crate::store::{Callback, Config, PdTask, ReadResponse, RegionSnapshot};
Expand Down Expand Up @@ -407,7 +415,16 @@ pub struct Peer {

pub txn_extra_op: Arc<AtomicCell<TxnExtraOp>>,
/// Check whether this proposal can be proposed based on its epoch
<<<<<<< HEAD
cmd_epoch_checker: CmdEpochChecker,
=======
cmd_epoch_checker: CmdEpochChecker<EK::Snapshot>,

/// The number of pending pd heartbeat tasks. Pd heartbeat task may be blocked by
/// reading rocksdb. To avoid unnecessary io operations, we always let the later
/// task run when there are more than 1 pending tasks.
pub pending_pd_heartbeat_tasks: Arc<AtomicU64>,
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
}

impl Peer {
Expand Down Expand Up @@ -491,6 +508,7 @@ impl Peer {
check_stale_peers: vec![],
txn_extra_op: Arc::new(AtomicCell::new(TxnExtraOp::Noop)),
cmd_epoch_checker: Default::default(),
pending_pd_heartbeat_tasks: Arc::new(AtomicU64::new(0)),
};

// If this region has only one peer and I am the one, campaign directly.
Expand Down Expand Up @@ -2819,25 +2837,126 @@ impl Peer {
None
}

<<<<<<< HEAD
pub fn heartbeat_pd<T, C>(&mut self, ctx: &PollContext<T, C>) {
let task = PdTask::Heartbeat {
=======
fn region_replication_status(&mut self) -> Option<RegionReplicationStatus> {
if self.replication_mode_version == 0 {
return None;
}
let mut status = RegionReplicationStatus::default();
status.state_id = self.replication_mode_version;
let state = if !self.replication_sync {
if self.dr_auto_sync_state != DrAutoSyncState::Async {
let res = self.raft_group.raft.check_group_commit_consistent();
if Some(true) != res {
let mut buffer: SmallVec<[(u64, u64, u64); 5]> = SmallVec::new();
if self.get_store().applied_index_term() >= self.term() {
let progress = self.raft_group.raft.prs();
for (id, p) in progress.iter() {
if !progress.conf().voters().contains(*id) {
continue;
}
buffer.push((*id, p.commit_group_id, p.matched));
}
};
info!(
"still not reach integrity over label";
"status" => ?res,
"region_id" => self.region_id,
"peer_id" => self.peer.id,
"progress" => ?buffer
);
} else {
self.replication_sync = true;
}
match res {
Some(true) => RegionReplicationState::IntegrityOverLabel,
Some(false) => RegionReplicationState::SimpleMajority,
None => RegionReplicationState::Unknown,
}
} else {
RegionReplicationState::SimpleMajority
}
} else {
RegionReplicationState::IntegrityOverLabel
};
status.set_state(state);
Some(status)
}

pub fn is_region_size_or_keys_none(&self) -> bool {
fail_point!("region_size_or_keys_none", |_| true);
self.approximate_size.is_none() || self.approximate_keys.is_none()
}

pub fn heartbeat_pd<T>(&mut self, ctx: &PollContext<EK, ER, T>) {
let task = PdTask::Heartbeat(HeartbeatTask {
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
term: self.term(),
region: self.region().clone(),
peer: self.peer.clone(),
down_peers: self.collect_down_peers(ctx.cfg.max_peer_down_duration.0),
pending_peers: self.collect_pending_peers(ctx),
written_bytes: self.peer_stat.written_bytes,
written_keys: self.peer_stat.written_keys,
<<<<<<< HEAD
approximate_size: self.approximate_size,
approximate_keys: self.approximate_keys,
=======
approximate_size: self.approximate_size.unwrap_or_default(),
approximate_keys: self.approximate_keys.unwrap_or_default(),
replication_status: self.region_replication_status(),
});
if !self.is_region_size_or_keys_none() {
if let Err(e) = ctx.pd_scheduler.schedule(task) {
error!(
"failed to notify pd";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"err" => ?e,
);
}
return;
}

if self.pending_pd_heartbeat_tasks.load(Ordering::SeqCst) > 2 {
return;
}
let region_id = self.region_id;
let peer_id = self.peer.get_id();
let scheduler = ctx.pd_scheduler.clone();
let split_check_task = SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: self.region().clone(),
pending_tasks: self.pending_pd_heartbeat_tasks.clone(),
cb: Box::new(move |size: u64, keys: u64| {
if let PdTask::Heartbeat(mut h) = task {
h.approximate_size = size;
h.approximate_keys = keys;
if let Err(e) = scheduler.schedule(PdTask::Heartbeat(h)) {
error!(
"failed to notify pd";
"region_id" => region_id,
"peer_id" => peer_id,
"err" => ?e,
);
}
}
}),
>>>>>>> e8e008dd5... raftstore: move get_region_approximate_size to split check worker (#9081)
};
if let Err(e) = ctx.pd_scheduler.schedule(task) {
self.pending_pd_heartbeat_tasks
.fetch_add(1, Ordering::SeqCst);
if let Err(e) = ctx.split_check_scheduler.schedule(split_check_task) {
error!(
"failed to notify pd";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"region_id" => region_id,
"peer_id" => peer_id,
"err" => ?e,
);
self.pending_pd_heartbeat_tasks
.fetch_sub(1, Ordering::SeqCst);
}
}

Expand Down
4 changes: 3 additions & 1 deletion components/raftstore/src/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub use self::cleanup::{Runner as CleanupRunner, Task as CleanupTask};
pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask};
pub use self::compact::{Runner as CompactRunner, Task as CompactTask};
pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask};
pub use self::pd::{FlowStatistics, FlowStatsReporter, Runner as PdRunner, Task as PdTask};
pub use self::pd::{
FlowStatistics, FlowStatsReporter, HeartbeatTask, Runner as PdRunner, Task as PdTask,
};
pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask};
pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate};
pub use self::region::{
Expand Down
Loading

0 comments on commit b823810

Please sign in to comment.