Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancel fap on TiFlash's side #363

Merged
merged 49 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
34c343c
init
CalvinNeo Dec 1, 2023
82ef2a4
make fn_apply_fap_snapshot return
CalvinNeo Dec 6, 2023
eeb1905
fix
CalvinNeo Dec 6, 2023
fbaa6b0
fix
CalvinNeo Dec 19, 2023
41b9fe1
z
CalvinNeo Dec 22, 2023
1b06859
Merge remote-tracking branch 'upstream/raftstore-proxy' into support-…
CalvinNeo Jan 2, 2024
fc24d44
fix restart
CalvinNeo Jan 4, 2024
a7c7249
fix snapshot fap
CalvinNeo Jan 4, 2024
8fab8d9
z
CalvinNeo Jan 4, 2024
32f0b0a
opt for lock
CalvinNeo Jan 4, 2024
45f51e0
fix
CalvinNeo Jan 4, 2024
c3fa44b
z
CalvinNeo Jan 4, 2024
a19b89b
fix test
CalvinNeo Jan 4, 2024
a4a4a67
f
CalvinNeo Jan 5, 2024
928009d
support assert exist
CalvinNeo Jan 5, 2024
ad98e92
clippy
CalvinNeo Jan 5, 2024
4f43390
fmt
CalvinNeo Jan 5, 2024
8c14105
fmt
CalvinNeo Jan 5, 2024
77ee5dc
f
CalvinNeo Jan 5, 2024
4096e6d
f
CalvinNeo Jan 5, 2024
1eaec8a
f
CalvinNeo Jan 5, 2024
99a00ea
z
CalvinNeo Jan 5, 2024
b947128
f
CalvinNeo Jan 5, 2024
34f63b4
z
CalvinNeo Jan 5, 2024
bc25a9d
fix
CalvinNeo Jan 8, 2024
1c602bc
comment
CalvinNeo Jan 8, 2024
620c002
z
CalvinNeo Jan 8, 2024
0b907a0
z
CalvinNeo Jan 8, 2024
04c814e
fix
CalvinNeo Jan 8, 2024
cca278e
fixed
CalvinNeo Jan 8, 2024
a290296
introduces some new ffis
CalvinNeo Jan 9, 2024
626894c
has problems
CalvinNeo Jan 9, 2024
1eab01a
Revert "has problems"
CalvinNeo Jan 9, 2024
0314cdc
fix tests
CalvinNeo Jan 9, 2024
a66895f
f
CalvinNeo Jan 9, 2024
cb12635
f
CalvinNeo Jan 9, 2024
328bde7
make test work
CalvinNeo Jan 9, 2024
f9a6b5d
fix timeout
CalvinNeo Jan 9, 2024
d863c23
f
CalvinNeo Jan 9, 2024
f17eae0
support cancel
CalvinNeo Jan 10, 2024
93ec946
support cancel2
CalvinNeo Jan 10, 2024
0124951
tests
CalvinNeo Jan 10, 2024
0128eaf
fix cancel
CalvinNeo Jan 10, 2024
f56d87e
fix
CalvinNeo Jan 10, 2024
d4712f0
fix2
CalvinNeo Jan 10, 2024
cc067f1
a
CalvinNeo Jan 10, 2024
4d493a3
z
CalvinNeo Jan 10, 2024
d7a5504
fix test
CalvinNeo Jan 10, 2024
753ec8e
fix test
CalvinNeo Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 133 additions & 60 deletions proxy_components/engine_store_ffi/src/core/fast_add_peer.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
core::{common::*, ProxyForwarder},
fatal,
};

impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
pub fn pre_apply_snapshot_for_fap_snapshot(
&self,
ob_region: &Region,
peer_id: u64,
snap_key: &store::SnapKey,
) -> bool {
let region_id = ob_region.get_id();
let mut should_skip = false;
#[allow(clippy::collapsible_if)]
if self.packed_envs.engine_store_cfg.enable_fast_add_peer {
if self.get_cached_manager().access_cached_region_info_mut(
region_id,
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(_) => {
if !self.engine_store_server_helper.kvstore_region_exist(region_id) {
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
info!("fast path: prehandle first snapshot skipped {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
should_skip = true;
}
}
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
let pstate = self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id);
if pstate == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
// We have a fap snapshot now. skip
info!("fast path: prehandle first snapshot skipped after restart {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
should_skip = true;
} else {
info!("fast path: prehandle first snapshot no skipped after restart {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"state" => ?pstate,
"inited" => false,
"should_skip" => should_skip,
);
}
}
},
).is_err() {
fatal!("pre_apply_snapshot_for_fap_snapshot poisoned")
};
}
should_skip
}

pub fn post_apply_snapshot_for_fap_snapshot(
&self,
ob_region: &Region,
peer_id: u64,
snap_key: &store::SnapKey,
) -> bool {
let region_id = ob_region.get_id();
let try_apply_fap_snapshot = |c: Arc<CachedRegionInfo>, restarted: bool| {
info!("fast path: start applying first snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
// Even if the feature is not enabled, the snapshot could still be a previously
// generated fap snapshot. So we have to also handle this snapshot,
// to prevent error data.
let current_enabled = self.packed_envs.engine_store_cfg.enable_fast_add_peer;
let snapshot_sent_time = c.snapshot_inflight.load(Ordering::SeqCst);
let fap_start_time = c.fast_add_peer_start.load(Ordering::SeqCst);
let current = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

let assert_exist = if !restarted {
snapshot_sent_time != 0
} else {
false
};
if !self
.engine_store_server_helper
.apply_fap_snapshot(region_id, peer_id, assert_exist)
{
// This is not a fap snapshot.
info!("fast path: this is not fap snapshot {}:{} {}, goto tikv snapshot", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
"cost_total" => current.as_millis() - fap_start_time,
"current_enabled" => current_enabled,
"from_restart" => restarted,
);
c.snapshot_inflight.store(0, Ordering::SeqCst);
c.fast_add_peer_start.store(0, Ordering::SeqCst);
c.inited_or_fallback.store(true, Ordering::SeqCst);
return false;
}
info!("fast path: finished applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
"cost_total" => current.as_millis() - fap_start_time,
"current_enabled" => current_enabled,
"from_restart" => restarted,
);
c.snapshot_inflight.store(0, Ordering::SeqCst);
c.fast_add_peer_start.store(0, Ordering::SeqCst);
c.inited_or_fallback.store(true, Ordering::SeqCst);
true
};

// We should handle fap snapshot even if enable_fast_add_peer is false.
// However, if enable_unips, by no means can we handle fap snapshot.
#[allow(unused_mut)]
let mut should_check_fap_snapshot = self.packed_envs.engine_store_cfg.enable_unips;
#[allow(clippy::redundant_closure_call)]
(|| {
fail::fail_point!("post_apply_snapshot_allow_no_unips", |_| {
// UniPS can't provide a snapshot currently
should_check_fap_snapshot = true;
});
})();

let mut applied_fap = false;
#[allow(clippy::collapsible_if)]
if should_check_fap_snapshot {
let mut maybe_cached_info: Option<Arc<CachedRegionInfo>> = None;
if self
.get_cached_manager()
.access_cached_region_info_mut(
region_id,
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(o) => {
maybe_cached_info = Some(o.get().clone());
let already_existed = self.engine_store_server_helper.kvstore_region_exist(region_id);
debug!("fast path: check should apply fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
"already_existed" => already_existed,
);
if !already_existed {
// May be a fap snapshot, try to apply.
applied_fap = try_apply_fap_snapshot(o.get().clone(), false);
}
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
info!("fast path: check should apply fap snapshot noexist {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
assert!(self.is_initialized(region_id));
let o = Arc::new(CachedRegionInfo::default());
applied_fap = try_apply_fap_snapshot(o, true);
}
},
)
.is_err()
{
fatal!("poisoned");
}
}
applied_fap
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

mod command;
mod fap_snapshot;
mod region;
mod snapshot;

pub use command::*;
pub use fap_snapshot::*;
pub use region::*;
pub use snapshot::*;
36 changes: 28 additions & 8 deletions proxy_components/engine_store_ffi/src/core/forward_raft/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
)
}

pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, _: StateRole) {
pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, r: StateRole) {
let region_id = ob_region.get_id();
if e == RegionChangeEvent::Destroy {
info!(
Expand All @@ -23,6 +23,27 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
self.get_cached_manager()
.remove_cached_region_info(region_id);
}
} else if e == RegionChangeEvent::Create {
// This could happen when restore.
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(_) => {}
MapEntry::Vacant(v) => {
info!("{}{}:{}, peer created(region event)",
self.store_id, region_id, 0;
"region_id" => region_id,
"role" => ?r,
);
let c = CachedRegionInfo::default();
c.replicated_or_created.store(true, Ordering::SeqCst);
c.inited_or_fallback
.store(self.is_initialized(region_id), Ordering::SeqCst);
v.insert(Arc::new(c));
}
};
// TODO remove unwrap
self.get_cached_manager()
.access_cached_region_info_mut(region_id, f)
.unwrap();
}
}

Expand Down Expand Up @@ -85,7 +106,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(mut o) => {
// Note the region info may be registered by maybe_fast_path_tick
info!("{}{}:{} {}, peer created again",
info!("{}{}:{} {}, peer changed",
if is_fap_enabled {"fast path: ongoing "} else {" "},
self.store_id, region_id, 0;
"region_id" => region_id,
Expand All @@ -100,19 +121,18 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
}
}
MapEntry::Vacant(v) => {
info!("{}{}:{} {}, peer created",
info!("{}{}:{} {}, peer created(role event)",
if is_fap_enabled {"fast path: ongoing "} else {" "},
self.store_id, region_id, r.peer_id;
"region_id" => region_id,
"leader_id" => r.leader_id,
"role" => ?r.state,
"is_replicated" => is_replicated,
);
if is_replicated {
let c = CachedRegionInfo::default();
c.replicated_or_created.store(true, Ordering::SeqCst);
v.insert(Arc::new(c));
}
let c = CachedRegionInfo::default();
c.replicated_or_created.store(true, Ordering::SeqCst);
c.inited_or_fallback.store(r.initialized, Ordering::SeqCst);
v.insert(Arc::new(c));
}
};
// TODO remove unwrap
Expand Down
Loading
Loading