Skip to content

Commit

Permalink
raftstore-v2: persist applied index after ingset sst (#15538)
Browse files Browse the repository at this point in the history
close #15461

Signed-off-by: glorv <[email protected]>

Co-authored-by: tonyxuqqi <[email protected]>
  • Loading branch information
glorv and tonyxuqqi authored Sep 13, 2023
1 parent b75f559 commit 063c9cd
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 30 deletions.
6 changes: 6 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
if is_leader {
self.retry_pending_prepare_merge(ctx, apply_res.applied_index);
}
if !apply_res.sst_applied_index.is_empty() {
self.storage_mut()
.apply_trace_mut()
.on_sst_ingested(&apply_res.sst_applied_index);
}
self.on_data_modified(apply_res.modifications);
self.handle_read_on_apply(
ctx,
Expand Down Expand Up @@ -866,6 +871,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
apply_res.modifications = *self.modifications_mut();
apply_res.metrics = mem::take(&mut self.metrics);
apply_res.bucket_stat = self.buckets.clone();
apply_res.sst_applied_index = self.take_sst_applied_index();
let written_bytes = apply_res.metrics.written_bytes;

let skip_report = || -> bool {
Expand Down
12 changes: 10 additions & 2 deletions components/raftstore-v2/src/operation/command/write/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use collections::HashMap;
use crossbeam::channel::TrySendError;
use engine_traits::{data_cf_offset, KvEngine, RaftEngine};
use engine_traits::{data_cf_offset, KvEngine, RaftEngine, DATA_CFS_LEN};
use kvproto::import_sstpb::SstMeta;
use raftstore::{
store::{check_sst_for_ingestion, metrics::PEER_WRITE_CMD_COUNTER, util},
Expand All @@ -16,7 +16,7 @@ use crate::{
batch::StoreContext,
fsm::{ApplyResReporter, Store, StoreFsmDelegate},
raft::{Apply, Peer},
router::{PeerMsg, StoreTick},
router::{PeerMsg, SstApplyIndex, StoreTick},
worker::tablet,
};

Expand Down Expand Up @@ -107,10 +107,12 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
#[inline]
pub fn apply_ingest(&mut self, index: u64, ssts: Vec<SstMeta>) -> Result<()> {
fail::fail_point!("on_apply_ingest");
PEER_WRITE_CMD_COUNTER.ingest_sst.inc();
let mut infos = Vec::with_capacity(ssts.len());
let mut size: i64 = 0;
let mut keys: u64 = 0;
let mut cf_indexes = [u64::MAX; DATA_CFS_LEN];
for sst in &ssts {
// This may not be enough as ingest sst may not trigger flush at all.
let off = data_cf_offset(sst.get_cf_name());
Expand Down Expand Up @@ -138,6 +140,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
slog_panic!(self.logger, "corrupted sst"; "sst" => ?sst, "error" => ?e);
}
}
cf_indexes[off] = index;
}
if !infos.is_empty() {
// Unlike v1, we can't batch ssts accross regions.
Expand All @@ -154,6 +157,11 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
self.metrics.size_diff_hint += size;
self.metrics.written_bytes += size as u64;
self.metrics.written_keys += keys;
for (cf_index, index) in cf_indexes.into_iter().enumerate() {
if index != u64::MAX {
self.push_sst_applied_index(SstApplyIndex { cf_index, index });
}
}
Ok(())
}
}
Loading

0 comments on commit 063c9cd

Please sign in to comment.