Skip to content

Commit

Permalink
sync prepare/rollback merge
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
5kbpers committed Oct 13, 2022
1 parent ae2c430 commit 32200fe
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
3 changes: 1 addition & 2 deletions components/raftstore/src/store/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ impl<EK: KvEngine, ER: RaftEngine> Recovery<EK, ER> {
let target_peer = target_region
.get_peers()
.iter()
.filter(|p| p.store_id == store_id)
.next()
.find(|p| p.store_id == store_id)
.cloned()
.unwrap();
let _ = self.router.force_send(
Expand Down
24 changes: 18 additions & 6 deletions components/raftstore/src/store/worker/seqno_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ impl<S: Snapshot> fmt::Display for Task<S> {
}

enum HandleExecResResult {
RegionStateChanged(RegionLocalState),
ConfChange(RegionLocalState),
PrepareMerge(RegionLocalState),
RollbackMerge(RegionLocalState),
SplitRegion {
new_created_regions: Vec<RegionLocalState>,
region_state: RegionLocalState,
Expand Down Expand Up @@ -196,8 +198,8 @@ impl<EK: KvEngine, ER: RaftEngine> Runner<EK, ER> {

for result in self.handle_exec_res(res.region_id, seqno.get_number(), &res.exec_res) {
match result {
HandleExecResResult::RegionStateChanged(region_state) => {
info!("region state changed"; "region_id" => res.region_id, "region_state" => ?region_state, "apply_state" => ?res.apply_state);
HandleExecResResult::ConfChange(region_state) => {
info!("region conf changed"; "region_id" => res.region_id, "region_state" => ?region_state, "apply_state" => ?res.apply_state);
relation.set_region_state(region_state);
}
HandleExecResResult::SplitRegion {
Expand Down Expand Up @@ -226,6 +228,16 @@ impl<EK: KvEngine, ER: RaftEngine> Runner<EK, ER> {
// restart.
sync_relation_with_current_batch.push(res.region_id);
}
HandleExecResResult::PrepareMerge(region_state) => {
info!("prepare merge"; "region_id" => res.region_id, "region_state" => ?region_state, "apply_state" => ?res.apply_state);
relation.set_region_state(region_state);
sync_relation_with_current_batch.push(res.region_id);
}
HandleExecResResult::RollbackMerge(region_state) => {
info!("rollback merge"; "region_id" => res.region_id, "region_state" => ?region_state, "apply_state" => ?res.apply_state);
relation.set_region_state(region_state);
sync_relation_with_current_batch.push(res.region_id);
}
HandleExecResResult::CommitMerge {
target_state,
source_relation,
Expand Down Expand Up @@ -420,7 +432,7 @@ impl<EK: KvEngine, ER: RaftEngine> Runner<EK, ER> {
let mut state = RegionLocalState::default();
state.set_region(cp.region.clone());
state.set_state(PeerState::Normal);
results.push(HandleExecResResult::RegionStateChanged(state));
results.push(HandleExecResResult::ConfChange(state));
};
}
ExecResult::SplitRegion {
Expand Down Expand Up @@ -458,7 +470,7 @@ impl<EK: KvEngine, ER: RaftEngine> Runner<EK, ER> {
state.set_region(region.clone());
state.set_state(PeerState::Merging);
state.set_merge_state(merge_state.clone());
results.push(HandleExecResResult::RegionStateChanged(state));
results.push(HandleExecResResult::PrepareMerge(state));
}
ExecResult::CommitMerge {
region,
Expand Down Expand Up @@ -500,7 +512,7 @@ impl<EK: KvEngine, ER: RaftEngine> Runner<EK, ER> {
let mut state = RegionLocalState::default();
state.set_region(region.clone());
state.set_state(PeerState::Normal);
results.push(HandleExecResResult::RegionStateChanged(state));
results.push(HandleExecResResult::RollbackMerge(state));
}
ExecResult::DeleteRange { .. }
| ExecResult::IngestSst { .. }
Expand Down

0 comments on commit 32200fe

Please sign in to comment.