From ed417ce8e1079f21ddc56d2b9c7cb900a0902d33 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 30 Sep 2024 03:35:18 +0000 Subject: [PATCH] chore: apply suggestions from CR --- src/common/meta/src/key.rs | 16 ++--- src/common/meta/src/rpc/router.rs | 27 ++++---- src/datanode/src/alive_keeper.rs | 4 +- .../src/heartbeat/handler/downgrade_region.rs | 2 +- src/datanode/src/region_server.rs | 4 +- .../src/handler/region_lease_handler.rs | 4 +- .../downgrade_leader_region.rs | 8 +-- .../rollback_downgraded_region.rs | 10 +-- .../upgrade_candidate_region.rs | 14 ++--- src/meta-srv/src/region/lease_keeper.rs | 6 +- src/mito2/src/compaction/compactor.rs | 4 +- src/mito2/src/engine.rs | 2 +- src/mito2/src/engine/basic_test.rs | 35 ----------- src/mito2/src/engine/catchup_test.rs | 2 +- src/mito2/src/engine/set_role_state_test.rs | 42 +++++++++++++ src/mito2/src/error.rs | 16 ++++- src/mito2/src/flush.rs | 15 +++-- src/mito2/src/region.rs | 61 ++++++++++--------- src/mito2/src/region/opener.rs | 2 +- src/mito2/src/worker.rs | 2 +- src/mito2/src/worker/handle_catchup.rs | 2 +- src/mito2/src/worker/handle_flush.rs | 14 +++-- src/mito2/src/worker/handle_manifest.rs | 7 +-- src/store-api/src/region_engine.rs | 2 +- 24 files changed, 166 insertions(+), 135 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index f2e3884a27e..fc953a67643 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -140,7 +140,7 @@ use crate::key::table_route::TableRouteKey; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::router::{region_distribution, RegionRoute, RegionState}; +use crate::rpc::router::{region_distribution, LeaderState, RegionRoute}; use crate::rpc::store::BatchDeleteRequest; use crate::DatanodeId; @@ -1126,14 +1126,14 @@ impl TableMetadataManager { next_region_route_status: F, ) -> Result<()> where - F: Fn(&RegionRoute) -> Option>, + F: Fn(&RegionRoute) -> Option>, { let mut new_region_routes = current_table_route_value.region_routes()?.clone(); let mut updated = 0; for route in &mut new_region_routes { if let Some(status) = next_region_route_status(route) { - if route.set_leader_status(status) { + if route.set_leader_state(status) { updated += 1; } } @@ -1280,7 +1280,7 @@ mod tests { use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; - use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionState}; + use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute}; #[test] fn test_deserialized_value_with_bytes() { @@ -1715,7 +1715,7 @@ mod tests { attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(datanode, "a2")), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), follower_peers: vec![], leader_down_since: Some(current_time_millis()), }, @@ -1753,7 +1753,7 @@ mod tests { if region_route.leader_state.is_some() { None } else { - Some(Some(RegionState::Downgrading)) + Some(Some(LeaderState::Downgrading)) } }) .await @@ -1769,7 +1769,7 @@ mod tests { assert_eq!( updated_route_value.region_routes().unwrap()[0].leader_state, - Some(RegionState::Downgrading) + Some(LeaderState::Downgrading) ); assert!(updated_route_value.region_routes().unwrap()[0] @@ -1778,7 +1778,7 @@ mod tests { assert_eq!( updated_route_value.region_routes().unwrap()[1].leader_state, - Some(RegionState::Downgrading) + Some(LeaderState::Downgrading) ); assert!(updated_route_value.region_routes().unwrap()[1] .leader_down_since diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 9e405f0c0bc..8f58b0368ea 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -111,7 +111,7 @@ pub fn convert_to_region_peer_map( /// Returns the HashMap<[RegionNumber], [RegionStatus]>; pub fn convert_to_region_leader_status_map( region_routes: &[RegionRoute], -) -> HashMap { +) -> HashMap { region_routes .iter() .filter_map(|x| { @@ -264,7 +264,7 @@ pub struct RegionRoute { alias = "leader_status", skip_serializing_if = "Option::is_none" )] - pub leader_state: Option, + pub leader_state: Option, /// The start time when the leader is in `Downgraded` status. #[serde(default)] #[builder(default = "self.default_leader_down_since()")] @@ -274,17 +274,17 @@ pub struct RegionRoute { impl RegionRouteBuilder { fn default_leader_down_since(&self) -> Option { match self.leader_state { - Some(Some(RegionState::Downgrading)) => Some(current_time_millis()), + Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()), _ => None, } } } -/// The State of the [`Region`]. +/// The State of the [`Region`] Leader. /// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc. #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)] #[strum(serialize_all = "UPPERCASE")] -pub enum RegionState { +pub enum LeaderState { /// The following cases in which the [`Region`] will be downgraded. /// /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected). @@ -301,7 +301,7 @@ impl RegionRoute { /// - The [`Region`] was planned to migrate to another [`Peer`]. /// pub fn is_leader_downgrading(&self) -> bool { - matches!(self.leader_state, Some(RegionState::Downgrading)) + matches!(self.leader_state, Some(LeaderState::Downgrading)) } /// Marks the Leader [`Region`] as [`RegionState::Downgrading`]. @@ -311,12 +311,13 @@ impl RegionRoute { /// - During the [`Region`] Failover Procedure. /// - Migrating a [`Region`]. /// - /// TODO(weny): Update comments - /// **Notes:** Meta Server will stop renewing the lease for the downgrading [`Region`]. + /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`]. + /// + /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage /// pub fn downgrade_leader(&mut self) { self.leader_down_since = Some(current_time_millis()); - self.leader_state = Some(RegionState::Downgrading) + self.leader_state = Some(LeaderState::Downgrading) } /// Returns how long since the leader is in `Downgraded` status. @@ -325,17 +326,17 @@ impl RegionRoute { .map(|start| current_time_millis() - start) } - /// Sets the leader status. + /// Sets the leader state. /// /// Returns true if updated. - pub fn set_leader_status(&mut self, status: Option) -> bool { + pub fn set_leader_state(&mut self, status: Option) -> bool { let updated = self.leader_state != status; match (status, updated) { - (Some(RegionState::Downgrading), true) => { + (Some(LeaderState::Downgrading), true) => { self.leader_down_since = Some(current_time_millis()); } - (Some(RegionState::Downgrading), false) => { + (Some(LeaderState::Downgrading), false) => { // Do nothing if leader is still in `Downgraded` status. } _ => { diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index a1c1788827c..a0ea2c0188b 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -132,7 +132,7 @@ impl RegionAliveKeeper { let _ = self .region_server .set_region_role(region_id, RegionRole::Follower); - error!(e; "Failed to close staled region {}, convert region to readonly.",region_id); + error!(e; "Failed to close staled region {}, convert region to follower.", region_id); } } } @@ -401,7 +401,7 @@ impl CountdownTask { } } () = &mut countdown => { - warn!("The region {region_id} lease is expired, convert region to readonly."); + warn!("The region {region_id} lease is expired, convert region to follower."); let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower); // resets the countdown. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index 06d7d051c0b..4608e8d9d76 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -94,7 +94,7 @@ impl HandlerContext { }); } Err(err) => { - warn!("Failed to set region to downgrading: {err:?}"); + warn!(err; "Failed to convert region to downgrading leader"); return InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: true, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 19f48221c29..4fb30a8b444 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -301,7 +301,7 @@ impl RegionServer { /// Converts region to follower gracefully. /// /// After the call returns, - /// the engine ensures no more write or flush operations will succeed in the region. + /// the engine ensures that no **further** write or flush operations will succeed in this region. pub async fn become_follower_gracefully( &self, region_id: RegionId, @@ -318,7 +318,7 @@ impl RegionServer { /// Set region to downgrading gracefully. /// /// After the call returns, - /// the engine ensures no more write operations will succeed in the region. + /// the engine ensures that no **further** write operations will succeed in this region. pub async fn downgrade_region_gracefully( &self, region_id: RegionId, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 5b1cebc33c9..de491da3715 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -111,7 +111,7 @@ mod test { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRoute, RegionState}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -297,7 +297,7 @@ mod test { region: Region::new_test(region_id), leader_peer: Some(peer.clone()), follower_peers: vec![follower_peer.clone()], - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(1), }, RegionRoute { diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 7e26a93c4e5..d8bad44871d 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_error::ext::BoxedError; -use common_meta::rpc::router::RegionState; +use common_meta::rpc::router::LeaderState; use snafu::ResultExt; use crate::error::{self, Result}; @@ -53,7 +53,7 @@ impl UpdateMetadata { .as_ref() .is_some_and(|leader_peer| leader_peer.id == from_peer_id) { - Some(Some(RegionState::Downgrading)) + Some(Some(LeaderState::Downgrading)) } else { None } @@ -81,7 +81,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionState}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -155,7 +155,7 @@ mod tests { table_metadata_manager .update_leader_region_status(table_id, &original_table_route, |route| { if route.region.id == RegionId::new(1024, 2) { - Some(Some(RegionState::Downgrading)) + Some(Some(LeaderState::Downgrading)) } else { None } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 110b1cfc24b..0d568ab7b0b 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -65,7 +65,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionState}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -110,13 +110,13 @@ mod tests { RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { @@ -207,13 +207,13 @@ mod tests { RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, RegionRoute { diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index a71fb385e19..b710a0e1f3e 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -43,7 +43,7 @@ impl UpdateMetadata { .context(error::RegionRouteNotFoundSnafu { region_id })?; // Removes downgraded status. - region_route.set_leader_status(None); + region_route.set_leader_state(None); let candidate = &ctx.persistent_ctx.to_peer; let expected_old_leader = &ctx.persistent_ctx.from_peer; @@ -190,7 +190,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRoute, RegionState}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_time::util::current_time_millis; use store_api::storage::RegionId; @@ -286,7 +286,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(current_time_millis()), }]; @@ -319,13 +319,13 @@ mod tests { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5), Peer::empty(3)], - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(current_time_millis()), }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), leader_peer: Some(Peer::empty(4)), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }, ]; @@ -430,7 +430,7 @@ mod tests { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), leader_down_since: None, }]; @@ -455,7 +455,7 @@ mod tests { let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), - leader_state: Some(RegionState::Downgrading), + leader_state: Some(LeaderState::Downgrading), ..Default::default() }]; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 9843be1543a..194f3710c85 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -220,7 +220,7 @@ mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; - use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionState}; + use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::RawTableInfo; @@ -265,7 +265,7 @@ mod tests { Some((region_id, RegionRole::Follower)) ); - region_route.leader_state = Some(RegionState::Downgrading); + region_route.leader_state = Some(LeaderState::Downgrading); // The downgraded leader region on the datanode. assert_eq!( renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), @@ -492,7 +492,7 @@ mod tests { .region(Region::new_test(region_id)) .leader_peer(Peer::empty(leader_peer_id)) .follower_peers(vec![Peer::empty(follower_peer_id)]) - .leader_state(RegionState::Downgrading) + .leader_state(LeaderState::Downgrading) .build() .unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d457eef3c91..d919633ba96 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -19,7 +19,7 @@ use api::v1::region::compact_request; use common_telemetry::info; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -381,7 +381,7 @@ impl Compactor for DefaultCompactor { // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. compaction_region .manifest_ctx - .update_manifest(smallvec![RegionLeaderState::Writable], action_list) + .update_manifest(RegionLeaderState::Writable, action_list) .await?; Ok(edit) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index aa44aa15580..ed8cc929090 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -443,7 +443,7 @@ impl EngineInner { .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - region.set_region_role_state(role); + region.set_role(role); Ok(()) } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 66008da5060..533b6a2ea1e 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -88,41 +88,6 @@ async fn test_write_to_region() { put_rows(&engine, region_id, rows).await; } -#[tokio::test] -async fn test_write_downgrading_region() { - let mut env = TestEnv::with_prefix("write-to-downgrading-region"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - - let column_schemas = rows_schema(&request); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - let result = engine - .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader) - .await - .unwrap(); - assert_eq!( - SetRegionRoleStateResponse::Success { - last_entry_id: Some(0) - }, - result - ); - - let rows = Rows { - schema: column_schemas, - rows: build_rows(0, 42), - }; - let err = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) - .await - .unwrap_err(); - assert_eq!(err.status_code(), StatusCode::RegionNotReady) -} - #[apply(multiple_log_store_factories)] async fn test_region_replay(factory: Option) { diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 881e4c57194..a9de0d6008f 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -247,7 +247,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option, + expect: RegionLeaderState, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Region {} is in {:?} state, expect: Leader or Leader(Downgrading)", + region_id, + state + ))] + FlushableRegionState { + region_id: RegionId, + state: RegionRoleState, #[snafu(implicit)] location: Location, }, @@ -956,6 +967,7 @@ impl ErrorExt for Error { CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), RegionLeaderState { .. } => StatusCode::RegionNotReady, + &FlushableRegionState { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 68721a0bdcb..506b1917e20 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{debug, error, info}; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; @@ -195,6 +195,8 @@ pub enum FlushReason { Alter, /// Flush periodically. Periodically, + /// Flush memtable during downgrading state. + Downgrading, } impl FlushReason { @@ -407,14 +409,17 @@ impl RegionFlushTask { info!("Applying {edit:?} to region {}", self.region_id); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + + let expected_state = if matches!(self.reason, FlushReason::Downgrading) { + RegionLeaderState::Downgrading + } else { + RegionLeaderState::Writable + }; // We will leak files if the manifest update fails, but we ignore them for simplicity. We can // add a cleanup job to remove them later. let version = self .manifest_ctx - .update_manifest( - smallvec![RegionLeaderState::Writable, RegionLeaderState::Downgrading], - action_list, - ) + .update_manifest(expected_state, action_list) .await?; info!( "Successfully update manifest version to {version}, region: {}", diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 7c7da747362..b05daf3da07 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -25,7 +25,6 @@ use std::sync::{Arc, RwLock}; use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; -use smallvec::{smallvec, SmallVec}; use snafu::{ensure, OptionExt}; use store_api::logstore::provider::Provider; use store_api::manifest::ManifestVersion; @@ -34,7 +33,10 @@ use store_api::region_engine::{RegionRole, RegionStatistic, SettableRegionRoleSt use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; -use crate::error::{RegionLeaderStateSnafu, RegionNotFoundSnafu, RegionTruncatedSnafu, Result}; +use crate::error::{ + FlushableRegionStateSnafu, RegionLeaderStateSnafu, RegionNotFoundSnafu, RegionTruncatedSnafu, + Result, +}; use crate::manifest::action::{RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; @@ -191,6 +193,14 @@ impl MitoRegion { ) } + /// Returns whether the region is downgrading. + pub(crate) fn is_downgrading(&self) -> bool { + matches!( + self.manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Downgrading) + ) + } + /// Returns whether the region is readonly. pub(crate) fn is_follower(&self) -> bool { self.manifest_ctx.state.load() == RegionRoleState::Follower @@ -202,9 +212,8 @@ impl MitoRegion { } /// Sets the region role state. - pub(crate) fn set_region_role_state(&self, next_role: RegionRole) { - self.manifest_ctx - .set_role_role_state(next_role, self.region_id); + pub(crate) fn set_role(&self, next_role: RegionRole) { + self.manifest_ctx.set_role(next_role, self.region_id); } /// Sets the altering state. @@ -248,7 +257,7 @@ impl MitoRegion { let _manager = self.manifest_ctx.manifest_manager.write().await; // We acquires the write lock of the manifest manager to ensure that no one is updating the manifest. // Then we change the state. - self.set_region_role_state(state.into()); + self.set_role(state.into()); } /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`. @@ -300,7 +309,7 @@ impl MitoRegion { RegionLeaderStateSnafu { region_id: self.region_id, state: actual, - expect: smallvec![expect], + expect, } .build() })?; @@ -341,7 +350,7 @@ impl ManifestContext { /// Updates the manifest if current state is `expect_state`. pub(crate) async fn update_manifest( &self, - expect_states: SmallVec<[RegionLeaderState; 2]>, + expect_state: RegionLeaderState, action_list: RegionMetaActionList, ) -> Result { // Acquires the write lock of the manifest manager. @@ -351,15 +360,12 @@ impl ManifestContext { // Checks state inside the lock. This is to ensure that we won't update the manifest // after `set_readonly_gracefully()` is called. let current_state = self.state.load(); - let expected = expect_states - .iter() - .any(|expect_state| current_state == RegionRoleState::Leader(*expect_state)); ensure!( - expected, + current_state == RegionRoleState::Leader(expect_state), RegionLeaderStateSnafu { region_id: manifest.metadata.region_id, state: current_state, - expect: expect_states, + expect: expect_state, } ); @@ -405,7 +411,7 @@ impl ManifestContext { if self.state.load() == RegionRoleState::Follower { warn!( - "Region {} becomes follower while updating manifest which may cause inconsistency", + "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}", manifest.metadata.region_id ); } @@ -434,7 +440,7 @@ impl ManifestContext { /// - Leader -> Downgrading Leader /// /// ``` - pub(crate) fn set_role_role_state(&self, next_role: RegionRole, region_id: RegionId) { + pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) { match next_role { RegionRole::Follower => { self.state.store(RegionRoleState::Follower); @@ -471,13 +477,13 @@ impl ManifestContext { RegionRoleState::Leader(RegionLeaderState::Downgrading), ) { Ok(state) => info!( - "Convert region {} to Leader(Downgrading), previous role state: {:?}", + "Convert region {} to downgrading region, previous role state: {:?}", region_id, state ), Err(state) => { if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) { warn!( - "Failed to convert region {} to Leader(Downgrading), current role state: {:?}", + "Failed to convert region {} to downgrading leader, current role state: {:?}", region_id, state ) } @@ -534,7 +540,7 @@ impl RegionMap { RegionLeaderStateSnafu { region_id, state: region.state(), - expect: smallvec![RegionLeaderState::Writable], + expect: RegionLeaderState::Writable, } ); Ok(region) @@ -566,10 +572,9 @@ impl RegionMap { .context(RegionNotFoundSnafu { region_id })?; ensure!( region.is_flushable(), - RegionLeaderStateSnafu { + FlushableRegionStateSnafu { region_id, state: region.state(), - expect: smallvec![RegionLeaderState::Writable, RegionLeaderState::Downgrading], } ); Ok(region) @@ -706,41 +711,41 @@ mod tests { let region_id = RegionId::new(1024, 0); // Leader -> Follower - manifest_ctx.set_role_role_state(RegionRole::Follower, region_id); + manifest_ctx.set_role(RegionRole::Follower, region_id); assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); // Follower -> Leader - manifest_ctx.set_role_role_state(RegionRole::Leader, region_id); + manifest_ctx.set_role(RegionRole::Leader, region_id); assert_eq!( manifest_ctx.state.load(), RegionRoleState::Leader(RegionLeaderState::Writable) ); // Leader -> Downgrading Leader - manifest_ctx.set_role_role_state(RegionRole::DowngradingLeader, region_id); + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); assert_eq!( manifest_ctx.state.load(), RegionRoleState::Leader(RegionLeaderState::Downgrading) ); // Downgrading Leader -> Follower - manifest_ctx.set_role_role_state(RegionRole::Follower, region_id); + manifest_ctx.set_role(RegionRole::Follower, region_id); assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); // Can't downgrade from follower (Follower -> Downgrading Leader) - manifest_ctx.set_role_role_state(RegionRole::DowngradingLeader, region_id); + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower); // Set region role too Downgrading Leader - manifest_ctx.set_role_role_state(RegionRole::Leader, region_id); - manifest_ctx.set_role_role_state(RegionRole::DowngradingLeader, region_id); + manifest_ctx.set_role(RegionRole::Leader, region_id); + manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); assert_eq!( manifest_ctx.state.load(), RegionRoleState::Leader(RegionLeaderState::Downgrading) ); // Downgrading Leader -> Leader - manifest_ctx.set_role_role_state(RegionRole::Leader, region_id); + manifest_ctx.set_role(RegionRole::Leader, region_id); assert_eq!( manifest_ctx.state.load(), RegionRoleState::Leader(RegionLeaderState::Writable) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 41a6ad123f3..b2a76490cc2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -173,7 +173,7 @@ impl RegionOpener { &expect.primary_key, )?; // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader. - region.set_region_role_state(RegionRole::Leader); + region.set_role(RegionRole::Leader); return Ok(region); } Ok(None) => { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index fd694e63742..e790ed08c1a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -839,7 +839,7 @@ impl RegionWorkerLoop { } } - /// Handles `set_state_gracefully`. + /// Handles `set_region_role_gracefully`. async fn set_role_state_gracefully( &mut self, region_id: RegionId, diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 3381426d9d3..cacd563ed78 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -114,7 +114,7 @@ impl RegionWorkerLoop { } if request.set_writable { - region.set_region_role_state(RegionRole::Leader); + region.set_role(RegionRole::Leader); } Ok(0) diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 5db38a95cb9..b2bc5fd2e86 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -40,12 +40,14 @@ impl RegionWorkerLoop { return; }; - let mut task = self.new_flush_task( - ®ion, - FlushReason::Manual, - request.row_group_size, - self.config.clone(), - ); + let reason = if region.is_downgrading() { + FlushReason::Downgrading + } else { + FlushReason::Manual + }; + + let mut task = + self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); if let Err(e) = self.flush_scheduler diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 1c43186387b..e97b30afec7 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -19,7 +19,6 @@ use std::collections::{HashMap, VecDeque}; use common_telemetry::{info, warn}; -use smallvec::smallvec; use store_api::storage::RegionId; use crate::cache::file_cache::{FileType, IndexKey}; @@ -200,7 +199,7 @@ impl RegionWorkerLoop { RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); let result = manifest_ctx - .update_manifest(smallvec![RegionLeaderState::Truncating], action_list) + .update_manifest(RegionLeaderState::Truncating, action_list) .await .map(|_| ()); @@ -243,7 +242,7 @@ impl RegionWorkerLoop { let result = region .manifest_ctx - .update_manifest(smallvec![RegionLeaderState::Altering], action_list) + .update_manifest(RegionLeaderState::Altering, action_list) .await .map(|_| ()); let notify = WorkerRequest::Background { @@ -341,7 +340,7 @@ async fn edit_region( let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); region .manifest_ctx - .update_manifest(smallvec![RegionLeaderState::Editing], action_list) + .update_manifest(RegionLeaderState::Editing, action_list) .await .map(|_| ()) } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index ec5568cf6d9..850b9ad3e2d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -120,7 +120,7 @@ pub enum RegionRole { // Leader is downgrading to follower. // // This state is used to prevent new write requests. - DowngradingLeader = 2, + DowngradingLeader, } impl Display for RegionRole {