Skip to content

Commit

Permalink
Support on_empty_cmd observer in Proxy (#121)
Browse files Browse the repository at this point in the history
* f

Signed-off-by: CalvinNeo <[email protected]>

* fix

Signed-off-by: CalvinNeo <[email protected]>

* add engine_tiflash with this version of engine_rocks

Signed-off-by: CalvinNeo <[email protected]>

* fmt

Signed-off-by: CalvinNeo <[email protected]>

* add engine_tiflash actually

Signed-off-by: CalvinNeo <[email protected]>

* fix

Signed-off-by: CalvinNeo <[email protected]>

* fix some tests

Signed-off-by: CalvinNeo <[email protected]>

* f

Signed-off-by: CalvinNeo <[email protected]>

* it runs if we do_write everytime

Signed-off-by: CalvinNeo <[email protected]>

* add ensure_no_common_unrecognized_keys to config checker

Signed-off-by: CalvinNeo <[email protected]>

* reorg tests

Signed-off-by: CalvinNeo <[email protected]>

* raftstore: Implement coprocessor observer on_empty_cmd (tikv#12851)

ref tikv#12849

Support new observers on_empty_cmd.

Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

* fix tests

Signed-off-by: CalvinNeo <[email protected]>

* move proxy-related code in component/server into proxy_server

Signed-off-by: CalvinNeo <[email protected]>

* move codes in component/server/src/setup.rs into proxy_server

Signed-off-by: CalvinNeo <[email protected]>

* support new ffis in mock-engine-store and new-mock-engine-store

Signed-off-by: CalvinNeo <[email protected]>

* add disclaimer

Signed-off-by: CalvinNeo <[email protected]>

* enlength some test time

Signed-off-by: CalvinNeo <[email protected]>

* f

Signed-off-by: CalvinNeo <[email protected]>

* fmt

Signed-off-by: CalvinNeo <[email protected]>

* fix tests

Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
CalvinNeo and ti-chi-bot authored Jul 14, 2022
1 parent b33b1eb commit fbd8e6f
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 52 deletions.
15 changes: 7 additions & 8 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,14 +1185,13 @@ impl<ER: RaftEngine> TiKvServer<ER> {
}
let importer = Arc::new(importer);

// TODO(tiflash) Register TiFlash observer
// let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
// node.id(),
// self.engines.as_ref().unwrap().engines.kv.clone(),
// importer.clone(),
// self.proxy_config.snap_handle_pool_size,
// );
// tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap());
let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
node.id(),
self.engines.as_ref().unwrap().engines.kv.clone(),
importer.clone(),
self.proxy_config.snap_handle_pool_size,
);
tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap());

let split_check_runner = SplitCheckRunner::new(
engines.engines.kv.clone(),
Expand Down
38 changes: 29 additions & 9 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,19 @@ impl TiFlashObserver {
) -> Self {
let engine_store_server_helper =
gen_engine_store_server_helper(engine.engine_store_server_helper);
let snap_pool = Builder::new(tikv_util::thd_name!("region-task"))
.max_thread_count(snap_handle_pool_size)
.build_future_pool();
// TODO(tiflash) start thread pool
// let snap_pool = Builder::new(tikv_util::thd_name!("region-task"))
// .max_thread_count(snap_handle_pool_size)
// .build_future_pool();
TiFlashObserver {
peer_id,
engine_store_server_helper,
engine,
sst_importer,
pre_handle_snapshot_ctx: Arc::new(Mutex::new(PrehandleContext::default())),
snap_handle_pool_size,
apply_snap_pool: Some(Arc::new(snap_pool)),
// apply_snap_pool: Some(Arc::new(snap_pool)),
apply_snap_pool: None,
}
}

Expand All @@ -129,10 +131,10 @@ impl TiFlashObserver {
&self,
coprocessor_host: &mut CoprocessorHost<E>,
) {
// coprocessor_host.registry.register_query_observer(
// TIFLASH_OBSERVER_PRIORITY,
// BoxQueryObserver::new(self.clone()),
// );
coprocessor_host.registry.register_query_observer(
TIFLASH_OBSERVER_PRIORITY,
BoxQueryObserver::new(self.clone()),
);
// coprocessor_host.registry.register_admin_observer(
// TIFLASH_OBSERVER_PRIORITY,
// BoxAdminObserver::new(self.clone()),
Expand All @@ -154,6 +156,24 @@ impl TiFlashObserver {

impl Coprocessor for TiFlashObserver {
fn stop(&self) {
self.apply_snap_pool.as_ref().unwrap().shutdown();
// TODO(tiflash)
// self.apply_snap_pool.as_ref().unwrap().shutdown();
}
}

impl QueryObserver for TiFlashObserver {
fn on_empty_cmd(&self, ob_ctx: &mut ObserverContext<'_>, index: u64, term: u64) {
fail::fail_point!("on_empty_cmd_normal", |_| {});
debug!("encounter empty cmd, maybe due to leadership change";
"region" => ?ob_ctx.region(),
"index" => index,
"term" => term,
);
// We still need to pass a dummy cmd, to forward updates.
let cmd_dummy = WriteCmds::new();
self.engine_store_server_helper.handle_write_raft_cmd(
&cmd_dummy,
RaftCmdHeader::new(ob_ctx.region().get_id(), index, term),
);
}
}
9 changes: 0 additions & 9 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,15 +1076,6 @@ where
// read index during confchange, or other situations.
apply_ctx.host.on_empty_cmd(&self.region, index, term);

{
// hacked by solotzg.
let cmds = WriteCmds::new();
apply_ctx.engine_store_server_helper.handle_write_raft_cmd(
&cmds,
RaftCmdHeader::new(self.region.get_id(), index, term),
);
}

self.apply_state.set_applied_index(index);
self.applied_index_term = term;
assert!(term > 0);
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl PdCluster {
Ok(self.base_id.fetch_add(1, Ordering::Relaxed) as u64)
}

fn put_store(&mut self, store: metapb::Store) -> Result<()> {
pub fn put_store(&mut self, store: metapb::Store) -> Result<()> {
let store_id = store.get_id();
// There is a race between put_store and handle_region_heartbeat_response. If store id is
// 0, it means it's a placeholder created by latter, we just need to update the meta.
Expand Down Expand Up @@ -459,7 +459,7 @@ impl PdCluster {
.map(|(_, region)| region.clone())
}

fn get_region_by_id(&self, region_id: u64) -> Result<Option<metapb::Region>> {
pub fn get_region_by_id(&self, region_id: u64) -> Result<Option<metapb::Region>> {
Ok(self
.region_id_keys
.get(&region_id)
Expand Down
22 changes: 20 additions & 2 deletions mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,10 @@ impl EngineStoreServerWrap {
// Currently in tests, we don't handle commands like BatchSplit,
// and sometimes we don't bootstrap region 1,
// so it is normal if we find no region.
warn!("region {} not found, create for {}", region_id, node_id);
warn!(
"region {} not found when admin, create for {}",
region_id, node_id
);
let new_region = v.insert(Default::default());
assert!((*self.engine_store_server).kvstore.contains_key(&region_id));
do_handle_admin_raft_cmd(new_region, &mut (*self.engine_store_server))
Expand Down Expand Up @@ -504,7 +507,7 @@ impl EngineStoreServerWrap {
do_handle_write_raft_cmd(o.get_mut())
}
std::collections::hash_map::Entry::Vacant(v) => {
warn!("region {} not found", region_id);
warn!("region {} not found when write", region_id);
do_handle_write_raft_cmd(v.insert(Default::default()))
}
}
Expand Down Expand Up @@ -928,12 +931,27 @@ unsafe extern "C" fn ffi_handle_ingest_sst(
header: ffi_interfaces::RaftCmdHeader,
) -> ffi_interfaces::EngineStoreApplyRes {
let store = into_engine_store_server_wrap(arg1);
let node_id = (*store.engine_store_server).id;
let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap());
debug!("ingest sst with len {}", snaps.len);

let region_id = header.region_id;
let kvstore = &mut (*store.engine_store_server).kvstore;
let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv;

match kvstore.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut o) => {}
std::collections::hash_map::Entry::Vacant(v) => {
// When we remove hacked code in handle_raft_entry_normal during migration,
// some tests in handle_raft_entry_normal may fail, since it can observe a empty cmd,
// thus creating region.
warn!(
"region {} not found when ingest, create for {}",
region_id, node_id
);
let new_region = v.insert(Default::default());
}
}
let region = kvstore.get_mut(&region_id).unwrap();

let index = header.index;
Expand Down
15 changes: 15 additions & 0 deletions new-mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,27 @@ unsafe extern "C" fn ffi_handle_ingest_sst(
header: ffi_interfaces::RaftCmdHeader,
) -> ffi_interfaces::EngineStoreApplyRes {
let store = into_engine_store_server_wrap(arg1);
let node_id = (*store.engine_store_server).id;
let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap());
debug!("ingest sst with len {}", snaps.len);

let region_id = header.region_id;
let kvstore = &mut (*store.engine_store_server).kvstore;
let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv;

match kvstore.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut o) => {}
std::collections::hash_map::Entry::Vacant(v) => {
// When we remove hacked code in handle_raft_entry_normal during migration,
// some tests in handle_raft_entry_normal may fail, since it can observe a empty cmd,
// thus creating region.
warn!(
"region {} not found when ingest, create for {}",
region_id, node_id
);
let new_region = v.insert(Default::default());
}
}
let region = kvstore.get_mut(&region_id).unwrap();

let index = header.index;
Expand Down
15 changes: 7 additions & 8 deletions new-mock-engine-store/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,13 @@ impl Simulator<TiFlashEngine> for NodeCluster {
f(node_id, &mut coprocessor_host);
}

// TODO(tiflash) Register TiFlash observer
// let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
// node_id,
// engines.kv.clone(),
// importer.clone(),
// cfg.snap_handle_pool_size,
// );
// tiflash_ob.register_to(&mut coprocessor_host);
let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
node_id,
engines.kv.clone(),
importer.clone(),
cfg.proxy_cfg.snap_handle_pool_size,
);
tiflash_ob.register_to(&mut coprocessor_host);

let cm = ConcurrencyManager::new(1.into());
self.concurrency_managers.insert(node_id, cm.clone());
Expand Down
2 changes: 2 additions & 0 deletions tests/proxy/ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

use crate::proxy::*;
63 changes: 61 additions & 2 deletions tests/proxy/normal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
collections::HashMap,
Expand Down Expand Up @@ -51,7 +51,7 @@ use tikv_util::{
HandyRwLock,
};

use crate::proxy::new_mock_cluster;
use crate::proxy::*;

#[test]
fn test_config() {
Expand Down Expand Up @@ -114,3 +114,62 @@ fn test_store_setup() {

cluster.shutdown();
}

#[test]
fn test_empty_cmd() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
// Disable compact log
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000);
cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000);
cluster.cfg.raft_store.raft_log_gc_threshold = 1000;

let _ = cluster.run();

cluster.must_put(b"k1", b"v1");
let region = cluster.get_region(b"k1");
let region_id = region.get_id();
let eng_ids = cluster
.engines
.iter()
.map(|e| e.0.to_owned())
.collect::<Vec<_>>();
let peer_1 = find_peer(&region, eng_ids[0]).cloned().unwrap();
let peer_2 = find_peer(&region, eng_ids[1]).cloned().unwrap();
cluster.must_transfer_leader(region.get_id(), peer_1.clone());
std::thread::sleep(std::time::Duration::from_secs(2));

check_key(&cluster, b"k1", b"v1", Some(true), None, None);
let prev_states = collect_all_states(&cluster, region_id);

// We need forward empty cmd generated by leadership changing to TiFlash.
cluster.must_transfer_leader(region.get_id(), peer_2.clone());
std::thread::sleep(std::time::Duration::from_secs(2));

let new_states = collect_all_states(&cluster, region_id);
for i in prev_states.keys() {
let old = prev_states.get(i).unwrap();
let new = new_states.get(i).unwrap();
assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state);
assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term);
}

std::thread::sleep(std::time::Duration::from_secs(2));
fail::cfg("on_empty_cmd_normal", "return").unwrap();

let prev_states = new_states;
cluster.must_transfer_leader(region.get_id(), peer_1.clone());
std::thread::sleep(std::time::Duration::from_secs(2));

let new_states = collect_all_states(&cluster, region_id);
for i in prev_states.keys() {
let old = prev_states.get(i).unwrap();
let new = new_states.get(i).unwrap();
assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state);
assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term);
}

fail::remove("on_empty_cmd_normal");

cluster.shutdown();
}
27 changes: 15 additions & 12 deletions tests/proxy/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
collections::HashMap,
Expand Down Expand Up @@ -49,7 +49,10 @@ use tikv_util::{
};

// TODO Need refactor if moved to raft-engine
fn get_region_local_state(engine: &engine_rocks::RocksEngine, region_id: u64) -> RegionLocalState {
pub fn get_region_local_state(
engine: &engine_rocks::RocksEngine,
region_id: u64,
) -> RegionLocalState {
let region_state_key = keys::region_state_key(region_id);
let region_state = match engine.get_msg_cf::<RegionLocalState>(CF_RAFT, &region_state_key) {
Ok(Some(s)) => s,
Expand All @@ -59,7 +62,7 @@ fn get_region_local_state(engine: &engine_rocks::RocksEngine, region_id: u64) ->
}

// TODO Need refactor if moved to raft-engine
fn get_apply_state(engine: &engine_rocks::RocksEngine, region_id: u64) -> RaftApplyState {
pub fn get_apply_state(engine: &engine_rocks::RocksEngine, region_id: u64) -> RaftApplyState {
let apply_state_key = keys::apply_state_key(region_id);
let apply_state = match engine.get_msg_cf::<RaftApplyState>(CF_RAFT, &apply_state_key) {
Ok(Some(s)) => s,
Expand All @@ -84,15 +87,15 @@ pub fn new_verify_hash_request(hash: Vec<u8>, index: u64) -> AdminRequest {
req
}

struct States {
in_memory_apply_state: RaftApplyState,
in_memory_applied_term: u64,
in_disk_apply_state: RaftApplyState,
in_disk_region_state: RegionLocalState,
ident: StoreIdent,
pub struct States {
pub in_memory_apply_state: RaftApplyState,
pub in_memory_applied_term: u64,
pub in_disk_apply_state: RaftApplyState,
pub in_disk_region_state: RegionLocalState,
pub ident: StoreIdent,
}

fn iter_ffi_helpers(
pub fn iter_ffi_helpers(
cluster: &Cluster<NodeCluster>,
store_ids: Option<Vec<u64>>,
f: &mut dyn FnMut(u64, &engine_rocks::RocksEngine, &mut FFIHelperSet) -> (),
Expand All @@ -110,7 +113,7 @@ fn iter_ffi_helpers(
}
}

fn collect_all_states(cluster: &mut Cluster<NodeCluster>, region_id: u64) -> HashMap<u64, States> {
pub fn collect_all_states(cluster: &Cluster<NodeCluster>, region_id: u64) -> HashMap<u64, States> {
let mut prev_state: HashMap<u64, States> = HashMap::default();
iter_ffi_helpers(
cluster,
Expand Down Expand Up @@ -220,7 +223,7 @@ pub fn check_key(
}
}

fn get_valid_compact_index(states: &HashMap<u64, States>) -> (u64, u64) {
pub fn get_valid_compact_index(states: &HashMap<u64, States>) -> (u64, u64) {
states
.iter()
.map(|(_, s)| {
Expand Down

0 comments on commit fbd8e6f

Please sign in to comment.