Skip to content

Commit

Permalink
raftstore-v2: limit the flush times during server stop (#15511)
Browse files Browse the repository at this point in the history
ref #15461

limit the flush times during server stop

Signed-off-by: SpadeA-Tang <[email protected]>
  • Loading branch information
SpadeA-Tang authored Sep 5, 2023
1 parent 640143a commit 02061be
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
2 changes: 1 addition & 1 deletion components/engine_traits/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl PersistenceListener {
///
/// `largest_seqno` should be the largest seqno of the generated file.
pub fn on_flush_completed(&self, cf: &str, largest_seqno: u64, file_no: u64) {
fail_point!("on_flush_completed");
fail_point!("on_flush_completed", |_| {});
// Maybe we should hook the compaction to avoid the file is compacted before
// being recorded.
let offset = data_cf_offset(cf);
Expand Down
15 changes: 13 additions & 2 deletions components/raftstore-v2/src/operation/ready/apply_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use kvproto::{
use raftstore::store::{
util, ReadTask, TabletSnapManager, WriteTask, RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM,
};
use slog::{info, trace, Logger};
use slog::{info, trace, warn, Logger};
use tikv_util::{box_err, slog_panic, worker::Scheduler};

use crate::{
Expand Down Expand Up @@ -619,7 +619,18 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
// flush the oldest cf one by one until we are under the replay count threshold
loop {
let replay_count = self.storage().estimate_replay_count();
if replay_count < flush_threshold {
if replay_count < flush_threshold || tried_count == 3 {
// Ideally, the replay count should be 0 after three flush_oldest_cf. If not,
// there may exist bug, but it's not desireable to block here, so we at most try
// three times.
if replay_count >= flush_threshold && tried_count == 3 {
warn!(
self.logger,
"after three flush_oldest_cf, the expected replay count still exceeds the threshold";
"replay_count" => replay_count,
"threshold" => flush_threshold,
);
}
if flushed {
let admin_flush = self.storage_mut().apply_trace_mut().admin.flushed;
let (_, _, tablet_index) = ctx
Expand Down
30 changes: 30 additions & 0 deletions tests/integrations/raftstore/test_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,36 @@ fn test_flush_before_stop() {
.unwrap();
}

// test flush_before_close will not flush forever
#[test]
fn test_flush_before_stop2() {
use test_raftstore_v2::*;

let mut cluster = new_server_cluster(0, 3);
cluster.run();

fail::cfg("flush_before_cluse_threshold", "return(10)").unwrap();
fail::cfg("on_flush_completed", "return").unwrap();

for i in 0..20 {
let key = format!("k{:03}", i);
cluster.must_put_cf(CF_WRITE, key.as_bytes(), b"val");
cluster.must_put_cf(CF_LOCK, key.as_bytes(), b"val");
}

let router = cluster.get_router(1).unwrap();
let raft_engine = cluster.get_raft_engine(1);

let (tx, rx) = sync_channel(1);
let msg = PeerMsg::FlushBeforeClose { tx };
router.force_send(1, msg).unwrap();

rx.recv().unwrap();

let admin_flush = raft_engine.get_flushed_index(1, CF_RAFT).unwrap().unwrap();
assert!(admin_flush < 10);
}

// We cannot use a flushed index to call `maybe_advance_admin_flushed`
// consider a case:
// 1. lock `k` with index 6
Expand Down

0 comments on commit 02061be

Please sign in to comment.