Skip to content

Commit

Permalink
gc: Fix GcKeys task doesn't work when called with multiple keys (tikv…
Browse files Browse the repository at this point in the history
…#11248)

* fix gc keys doesn't work

Signed-off-by: Connor1996 <[email protected]>

* close tikv#11217 simplify rangekey

Signed-off-by: Connor1996 <[email protected]>

* improve test

Signed-off-by: Connor1996 <[email protected]>

* fix clippy

Signed-off-by: Connor1996 <[email protected]>

* add metrics

Signed-off-by: Connor1996 <[email protected]>

* use closed interval for end key

Signed-off-by: Connor1996 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
2 people authored and 5kbpers committed Nov 19, 2021
1 parent 2e32a8a commit 621cce2
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 64 deletions.
2 changes: 1 addition & 1 deletion components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use self::dispatcher::{
};
pub use self::error::{Error, Result};
pub use self::region_info_accessor::{
Callback as RegionInfoCallback, RegionCollector, RegionInfo, RegionInfoAccessor,
Callback as RegionInfoCallback, RangeKey, RegionCollector, RegionInfo, RegionInfoAccessor,
RegionInfoProvider, SeekRegionCallback,
};
pub use self::split_check::{
Expand Down
90 changes: 70 additions & 20 deletions components/raftstore/src/coprocessor/region_info_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::BTreeMap;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::collections::Bound::{Excluded, Unbounded};
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::{mpsc, Mutex};
use std::time::Duration;
Expand All @@ -13,7 +13,6 @@ use super::{
};
use collections::HashMap;
use engine_traits::KvEngine;
use keys::{data_end_key, data_key};
use kvproto::metapb::Region;
use raft::StateRole;
use tikv_util::worker::{Builder as WorkerBuilder, Runnable, RunnableWithTimer, Scheduler, Worker};
Expand Down Expand Up @@ -67,7 +66,30 @@ impl RegionInfo {
}

type RegionsMap = HashMap<u64, RegionInfo>;
type RegionRangesMap = BTreeMap<Vec<u8>, u64>;
type RegionRangesMap = BTreeMap<RangeKey, u64>;

// RangeKey is a wrapper used to unify the comparsion between region start key
// and region end key. Region end key is special as empty stands for the infinite,
// so we need to take special care for cases where the end key is empty.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub enum RangeKey {
Finite(Vec<u8>),
Infinite,
}

impl RangeKey {
pub fn from_start_key(key: Vec<u8>) -> Self {
RangeKey::Finite(key)
}

pub fn from_end_key(key: Vec<u8>) -> Self {
if key.is_empty() {
RangeKey::Infinite
} else {
RangeKey::Finite(key)
}
}
}

pub type Callback<T> = Box<dyn FnOnce(T) + Send>;
pub type SeekRegionCallback = Box<dyn FnOnce(&mut dyn Iterator<Item = &RegionInfo>) + Send>;
Expand Down Expand Up @@ -186,7 +208,7 @@ impl RegionCollector {
}

pub fn create_region(&mut self, region: Region, role: StateRole) {
let end_key = data_end_key(region.get_end_key());
let end_key = RangeKey::from_end_key(region.get_end_key().to_vec());
let region_id = region.get_id();

// Create new region
Expand All @@ -212,13 +234,13 @@ impl RegionCollector {
if old_region.get_end_key() != region.get_end_key() {
// The region's end_key has changed.
// Remove the old entry in `self.region_ranges`.
let old_end_key = data_end_key(old_region.get_end_key());
let old_end_key = RangeKey::from_end_key(old_region.get_end_key().to_vec());

let old_id = self.region_ranges.remove(&old_end_key).unwrap();
assert_eq!(old_id, region.get_id());

// Insert new entry to `region_ranges`.
let end_key = data_end_key(region.get_end_key());
let end_key = RangeKey::from_end_key(region.get_end_key().to_vec());
assert!(
self.region_ranges
.insert(end_key, region.get_id())
Expand Down Expand Up @@ -262,7 +284,7 @@ impl RegionCollector {
let removed_region = removed_region_info.region;
assert_eq!(removed_region.get_id(), region.get_id());

let end_key = data_end_key(removed_region.get_end_key());
let end_key = RangeKey::from_end_key(removed_region.get_end_key().to_vec());

let removed_id = self.region_ranges.remove(&end_key).unwrap();
assert_eq!(removed_id, region.get_id());
Expand Down Expand Up @@ -323,10 +345,10 @@ impl RegionCollector {

let mut stale_regions_in_range = vec![];

for (key, id) in self
.region_ranges
.range((Excluded(data_key(region.get_start_key())), Unbounded))
{
for (key, id) in self.region_ranges.range((
Excluded(RangeKey::from_start_key(region.get_start_key().to_vec())),
Unbounded,
)) {
if *id == region.get_id() {
continue;
}
Expand Down Expand Up @@ -363,10 +385,9 @@ impl RegionCollector {
}

pub fn handle_seek_region(&self, from_key: Vec<u8>, callback: SeekRegionCallback) {
let from_key = data_key(&from_key);
let mut iter = self
.region_ranges
.range((Excluded(from_key), Unbounded))
.range((Excluded(RangeKey::from_start_key(from_key)), Unbounded))
.map(|(_, region_id)| &self.regions[region_id]);
callback(&mut iter)
}
Expand All @@ -375,18 +396,23 @@ impl RegionCollector {
callback(self.regions.get(&region_id).cloned());
}

// It returns the regions covered by [start_key, end_key]
pub fn handle_get_regions_in_range(
&self,
start_key: Vec<u8>,
end_key: Vec<u8>,
callback: Callback<Vec<Region>>,
) {
let end_key = RangeKey::from_end_key(end_key);
let mut regions = vec![];
for (_, region_id) in self
.region_ranges
.range((Included(start_key), Included(end_key)))
.range((Excluded(RangeKey::from_start_key(start_key)), Unbounded))
{
let region_info = &self.regions[region_id];
if RangeKey::from_start_key(region_info.region.get_start_key().to_vec()) > end_key {
break;
}
regions.push(region_info.region.clone());
}
callback(regions);
Expand Down Expand Up @@ -648,7 +674,7 @@ mod tests {
fn check_collection(c: &RegionCollector, regions: &[(Region, StateRole)]) {
let region_ranges: Vec<_> = regions
.iter()
.map(|(r, _)| (data_end_key(r.get_end_key()), r.get_id()))
.map(|(r, _)| (RangeKey::from_end_key(r.get_end_key().to_vec()), r.get_id()))
.collect();

let mut is_regions_equal = c.regions.len() == regions.len();
Expand Down Expand Up @@ -713,7 +739,7 @@ mod tests {

assert_eq!(&c.regions[&region.get_id()].region, region);
assert_eq!(
c.region_ranges[&data_end_key(region.get_end_key())],
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())],
region.get_id()
);
}
Expand All @@ -732,11 +758,12 @@ mod tests {
if let Some(r) = c.regions.get(&region.get_id()) {
assert_eq!(r.region, *region);
assert_eq!(
c.region_ranges[&data_end_key(region.get_end_key())],
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())],
region.get_id()
);
} else {
let another_region_id = c.region_ranges[&data_end_key(region.get_end_key())];
let another_region_id =
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())];
let version = c.regions[&another_region_id]
.region
.get_region_epoch()
Expand All @@ -749,7 +776,7 @@ mod tests {
if old_end_key.as_slice() != region.get_end_key() {
assert!(
c.region_ranges
.get(&data_end_key(&old_end_key))
.get(&RangeKey::from_end_key(old_end_key))
.map_or(true, |id| *id != region.get_id())
);
}
Expand All @@ -768,7 +795,7 @@ mod tests {
if let Some(end_key) = end_key {
assert!(
c.region_ranges
.get(&data_end_key(&end_key))
.get(&RangeKey::from_end_key(end_key))
.map_or(true, |r| *r != id)
);
}
Expand All @@ -785,6 +812,29 @@ mod tests {
}
}

#[test]
#[allow(clippy::many_single_char_names)]
fn test_range_key() {
let a = RangeKey::from_start_key(b"".to_vec());
let b = RangeKey::from_start_key(b"".to_vec());
let c = RangeKey::from_end_key(b"a".to_vec());
let d = RangeKey::from_start_key(b"a".to_vec());
let e = RangeKey::from_start_key(b"d".to_vec());
let f = RangeKey::from_end_key(b"f".to_vec());
let g = RangeKey::from_end_key(b"u".to_vec());
let h = RangeKey::from_end_key(b"".to_vec());

assert!(a == b);
assert!(a < c);
assert!(a != h);
assert!(c == d);
assert!(d < e);
assert!(e < f);
assert!(f < g);
assert!(g < h);
assert!(h > g);
}

#[test]
fn test_ignore_invalid_version() {
let mut c = RegionCollector::new();
Expand Down
10 changes: 5 additions & 5 deletions components/raftstore/src/store/compaction_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use engine_traits::{
CfName, SstPartitioner, SstPartitionerContext, SstPartitionerFactory, SstPartitionerRequest,
SstPartitionerResult, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use keys::data_end_key;
use keys::{data_end_key, origin_key};
use lazy_static::lazy_static;
use tikv_util::warn;

Expand Down Expand Up @@ -91,10 +91,10 @@ pub struct CompactionGuardGenerator<P: RegionInfoProvider> {

impl<P: RegionInfoProvider> CompactionGuardGenerator<P> {
fn initialize(&mut self) {
self.use_guard = match self
.provider
.get_regions_in_range(&self.smallest_key, &self.largest_key)
{
self.use_guard = match self.provider.get_regions_in_range(
origin_key(&self.smallest_key),
origin_key(&self.largest_key),
) {
Ok(regions) => {
// The regions returned from region_info_provider should have been sorted,
// but we sort it again just in case.
Expand Down
16 changes: 8 additions & 8 deletions metrics/grafana/tikv_details.json
Original file line number Diff line number Diff line change
Expand Up @@ -14957,10 +14957,10 @@
"step": 4
},
{
"expr": "sum(rate(tikv_gc_compaction_deleted{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"expr": "sum(rate(tikv_gc_compaction_filter_skip{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "deleted",
"legendFormat": "skip",
"refId": "B"
},
{
Expand All @@ -14971,24 +14971,24 @@
"refId": "C"
},
{
"expr": "sum(rate(tikv_gc_compaction_mvcc_delete_skip_older{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"expr": "sum(rate(tikv_gc_compaction_filter_orphan_versions{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "skip-delete-filtered",
"legendFormat": "orphan-versions",
"refId": "D"
},
{
"expr": "sum(rate(tikv_gc_compaction_filter_next{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"expr": "sum(rate(tikv_gc_compaction_filter_perform{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "iter-next",
"legendFormat": "perform",
"refId": "E"
},
{
"expr": "sum(rate(tikv_gc_compaction_filter_seek{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"expr": "sum(rate(tikv_gc_compaction_filter_mvcc_deletion_handled{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "iter-seek",
"legendFormat": "mvcc-deletion-handled",
"refId": "F"
},
{
Expand Down
6 changes: 6 additions & 0 deletions src/server/gc_worker/compaction_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ lazy_static! {
"Compaction filter orphan versions for default CF",
&["tag"]
).unwrap();

pub static ref GC_COMPACTION_FILTER_MVCC_DELETION_HANDLED: IntCounter = register_int_counter!(
"tikv_gc_compaction_filter_mvcc_deletion_handled",
"MVCC deletion from compaction filter handled"
)
.unwrap();
}

pub trait CompactionFilterInitializer {
Expand Down
3 changes: 2 additions & 1 deletion src/server/gc_worker/gc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use crate::server::metrics::*;
use raftstore::coprocessor::RegionInfoProvider;
use raftstore::store::util::find_peer;

use super::compaction_filter::is_compaction_filter_allowed;
use super::config::GcWorkerConfigManager;
use super::gc_worker::{sync_gc, GcSafePointProvider, GcTask};
use super::{is_compaction_filter_allowed, Result};
use super::Result;

const POLL_SAFE_POINT_INTERVAL_SECS: u64 = 10;

Expand Down
Loading

0 comments on commit 621cce2

Please sign in to comment.